This document describes the data prefetch feature that ensures indicators have sufficient historical data for accurate calculations from the moment live trading begins.
Technical indicators require historical data for calculation:
Without historical data, indicators produce incorrect or no values at the start of live trading, leading to poor trading decisions.
The prefetch feature:
engine:
symbols:
- BTCUSDT
interval: "1m"
data_output_path: "./data/live-trading"
prefetch:
enabled: true
start_time_type: days # "date" or "days"
days: 30 # Prefetch 30 days of history
Or with absolute date:
engine:
prefetch:
enabled: true
start_time_type: date
start_time: "2025-01-01T00:00:00Z"
config := engine.LiveTradingEngineConfig{
Symbols: []string{"BTCUSDT"},
Interval: "1m",
DataOutputPath: "./data/live-trading",
Prefetch: engine.PrefetchConfig{
Enabled: true,
StartTimeType: "days",
Days: 30,
},
}
| Field | Type | Description |
|---|---|---|
enabled |
bool | Enable/disable prefetch |
start_time_type |
string | "date" or "days" |
start_time |
time.Time | Absolute start time (when type is "date") |
days |
int | Number of days to prefetch (when type is "days") |
The prefetch process consists of four phases:
┌─────────────────────────────────────────────────────────────────┐
│ Phase 1: Prefetch │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Calculate │───▶│ Download │───▶│ Store to Parquet │ │
│ │ Start Time │ │ via REST API │ │ │ │
│ └─────────────┘ └──────────────┘ └──────────────────┘ │
│ │
│ - If type="days": start = now - N days │
│ - If type="date": start = configured date │
│ - Download from start to now │
│ - Store in market_data.parquet │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Phase 2: Gap Detection │
│ │
│ ┌──────────────────┐ ┌─────────────────┐ ┌───────────┐ │
│ │ Get last stored │───▶│ Connect to │───▶│ Calculate │ │
│ │ timestamp │ │ live stream │ │ gap range │ │
│ └──────────────────┘ └─────────────────┘ └───────────┘ │
│ │
│ - Query: SELECT MAX(time) FROM market_data.parquet │
│ - Note first stream timestamp │
│ - Gap = stream_start - last_stored │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Phase 3: Gap Fill │
│ │
│ ┌────────────────┐ ┌─────────────────┐ │
│ │ Fetch gap data │───▶│ Store gap data │ │
│ │ via REST API │ │ to parquet │ │
│ └────────────────┘ └─────────────────┘ │
│ │
│ - Pause live stream consumption during gap fill │
│ - REST API fetches missing candles │
│ - Store to parquet │
│ - Some live data points may be missed (acceptable) │
│ - Resume live stream after gap fill completes │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Phase 4: Live Trading │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Receive live │───▶│ Store to │───▶│ Feed to │ │
│ │ stream data │ │ parquet │ │ strategy │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
│ │
│ - Normal live trading mode │
│ - Indicators have full history │
│ - All data persisted for recovery │
└─────────────────────────────────────────────────────────────────┘
Status: EngineStatusPrefetching
The engine downloads historical data using the market data provider’s Download() method:
// Calculate start time
var startTime time.Time
if config.Prefetch.StartTimeType == "days" {
startTime = time.Now().AddDate(0, 0, -config.Prefetch.Days)
} else {
startTime = config.Prefetch.StartTime
}
// Download historical data
for _, symbol := range config.Symbols {
provider.Download(ctx, DownloadParams{
Ticker: symbol,
StartDate: startTime,
EndDate: time.Now(),
Timespan: intervalToTimespan(config.Interval),
})
}
Data is stored to market_data.parquet in the session folder.
After prefetch completes, the engine:
SELECT MAX(time) as last_time FROM read_parquet('market_data.parquet')
Connects to the live stream and notes the first received timestamp
gap := firstStreamTime.Sub(lastStoredTime)
if gap > tolerance {
// Need to fill gap
}
Gap Tolerance:
If a significant gap is detected:
// Pseudocode for gap fill
// Fetch and store gap data (blocks until complete)
gapData := provider.Download(ctx, DownloadParams{
StartDate: lastStoredTime,
EndDate: time.Now(),
})
storeToParquet(gapData)
// Resume live stream - some candles during gap fill may be missed
for data := range stream {
storeToParquet(data)
strategy.ProcessData(data)
}
Status: EngineStatusRunning
Normal live trading proceeds:
GetPreviousNumberOfDataPoints() which queries parquetIf historical data download fails:
// Retry with exponential backoff
for attempt := 0; attempt < maxRetries; attempt++ {
err := provider.Download(ctx, params)
if err == nil {
break
}
backoff := time.Second * time.Duration(math.Pow(2, float64(attempt)))
time.Sleep(backoff)
if attempt == maxRetries-1 {
// Log warning, continue without full history
log.Warn("Prefetch failed, starting with limited history")
}
}
Behavior on failure:
If gap fill fails:
err := fillGap(lastStored, firstStream)
if err != nil {
log.Warn("Gap fill failed, some indicator values may be inaccurate",
"gap", firstStream.Sub(lastStored))
// Continue with live data only
}
Behavior:
When providers rate limit requests:
if isRateLimitError(err) {
wait := parseRetryAfter(err)
log.Info("Rate limited, waiting", "duration", wait)
time.Sleep(wait)
// Retry
}
During gap fill, live stream data is not consumed. This means some candles may be missed:
Timeline:
T+0s: Gap fill starts
T+5s: Gap fill completes
T+5s: Resume live stream
Missed: Any candles that closed between T+0s and T+5s
This is acceptable because:
When local time differs from provider time:
// Use provider timestamps, not local time
const tolerance = 2 * interval
if abs(localTime - providerTime) > tolerance {
log.Warn("Clock skew detected", "local", localTime, "provider", providerTime)
}
// Gap calculation uses provider timestamps only
gap := firstStreamProviderTime.Sub(lastStoredProviderTime)
When engine restarts with existing data:
// Check for existing data
lastStored, err := getLastStoredTime(parquetPath)
if err == nil {
// Have existing data, adjust prefetch range
prefetchStart = lastStored
log.Info("Resuming from existing data", "lastStored", lastStored)
}
If the parquet file is empty or corrupt:
_, err := getLastStoredTime(parquetPath)
if err != nil {
// Start fresh prefetch
log.Warn("Could not read existing data, starting fresh prefetch")
os.Remove(parquetPath)
// Full prefetch from configured start time
}
The PersistentStreamingDataSource queries the parquet file:
func (d *PersistentStreamingDataSource) GetPreviousNumberOfDataPoints(
symbol string,
timestamp time.Time,
count int,
) ([]types.MarketData, error) {
query := fmt.Sprintf(`
SELECT * FROM read_parquet('%s')
WHERE symbol = '%s' AND time < '%s'
ORDER BY time DESC
LIMIT %d
`, d.parquetPath, symbol, timestamp.Format(time.RFC3339), count)
return executeQuery(query)
}
For a 14-period RSI on the first live candle:
Prefetched data: 30 days = ~43,200 candles (1m interval)
RSI needs: 14 candles minimum
First live candle at T:
- Strategy calls GetPreviousNumberOfDataPoints(symbol, T, 14)
- Returns candles T-1, T-2, ..., T-14 from parquet
- RSI calculated correctly
To verify indicators are accurate after prefetch:
// Test helper
func TestIndicatorAccuracyAfterPrefetch(t *testing.T) {
// Run backtest
backtestRSI := runBacktest(startDate, endDate)
// Run live with prefetch from same startDate
liveRSI := runLiveWithPrefetch(startDate)
// Compare at first live candle
assert.InDelta(t, backtestRSI[0], liveRSI[0], 0.001)
}
Approximate prefetch times (depends on provider and network):
| Data Range | Candles (1m) | Time Estimate |
|---|---|---|
| 1 day | 1,440 | 2-5 seconds |
| 7 days | 10,080 | 10-20 seconds |
| 30 days | 43,200 | 30-60 seconds |
| 90 days | 129,600 | 1-3 minutes |
During gap fill, stream data is buffered in memory:
Parquet files are compressed: