This document describes the realtime market data streaming capability in the Argo Trading framework using Go 1.23+ iterators.
The realtime market data system extends the existing Provider interface to support WebSocket-based streaming. Key design principles:
iter.Seq2[types.MarketData, error] for streamingStream() to Providertypes.MarketData and writer.MarketDataWriter as-isThe Provider interface in pkg/marketdata/provider/provider.go includes:
type Provider interface {
// Existing methods
ConfigWriter(writer writer.MarketDataWriter)
Download(ctx context.Context, ticker string, startDate time.Time, endDate time.Time,
multiplier int, timespan models.Timespan, onProgress OnDownloadProgress) (path string, err error)
// Stream returns an iterator that yields realtime market data via WebSocket.
// Symbols and interval are configured on the provider at construction time.
// Uses Go 1.23+ iter.Seq2 pattern for streaming data.
Stream(ctx context.Context) iter.Seq2[types.MarketData, error]
// GetSymbols returns the symbols configured on this provider.
GetSymbols() []string
// GetInterval returns the interval configured on this provider.
GetInterval() string
}
package main
import (
"context"
"fmt"
"log"
"github.com/rxtech-lab/argo-trading/pkg/marketdata/provider"
)
func main() {
// Create provider with symbols and interval configured at construction
client, err := provider.NewBinanceClient(&provider.BinanceStreamConfig{
BaseStreamConfig: provider.BaseStreamConfig{
Symbols: []string{"BTCUSDT", "ETHUSDT"},
Interval: "1m",
},
})
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Stream realtime data using Go iterator
for data, err := range client.Stream(ctx) {
if err != nil {
log.Printf("stream error: %v", err)
break
}
fmt.Printf("%s: O=%.2f H=%.2f L=%.2f C=%.2f V=%.2f\n",
data.Symbol, data.Open, data.High, data.Low, data.Close, data.Volume)
// Optionally write to storage
// writer.Write(data)
}
}
BTCUSDT, ETHUSDT)1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1Mwss://stream.binance.com:9443/wsAAPL, GOOGL)1s (second aggregates), 1m (minute aggregates)wss://socket.polygon.io/stocks (real-time), wss://delayed.polygon.io/stocks (15-min delayed)| Provider | Endpoint |
|---|---|
| Binance Production | wss://stream.binance.com:9443/ws |
| Binance Testnet | wss://testnet.binance.vision/ws |
| Polygon Stocks | wss://socket.polygon.io/stocks |
Errors are yielded through the iterator. Handle them in your range loop:
for data, err := range provider.Stream(ctx) {
if err != nil {
// Handle error (connection lost, invalid symbol, etc.)
log.Printf("error: %v", err)
// Decide whether to break or continue
break
}
// Process data
}
Cancel the context to stop streaming:
ctx, cancel := context.WithCancel(context.Background())
// Start streaming in a goroutine
go func() {
for data, err := range provider.Stream(ctx) {
// ...
}
}()
// Later, stop the stream
cancel()
iter package