This document describes the realtime market data streaming capability in the Argo Trading framework using Go 1.23+ iterators.
The realtime market data system extends the existing Provider interface to support WebSocket-based streaming. Key design principles:
iter.Seq2[types.MarketData, error] for streamingStream() to Providertypes.MarketData and writer.MarketDataWriter as-isThe Provider interface in pkg/marketdata/provider/provider.go includes:
type Provider interface {
// Existing methods
ConfigWriter(writer writer.MarketDataWriter)
Download(ctx context.Context, ticker string, startDate time.Time, endDate time.Time,
multiplier int, timespan models.Timespan, onProgress OnDownloadProgress) (path string, err error)
// Stream returns an iterator that yields realtime market data via WebSocket.
// Uses Go 1.23+ iter.Seq2 pattern for streaming data.
Stream(ctx context.Context, symbols []string, interval string) iter.Seq2[types.MarketData, error]
}
package main
import (
"context"
"fmt"
"log"
"github.com/rxtech-lab/argo-trading/pkg/marketdata/provider"
)
func main() {
// Create provider
client, err := provider.NewBinanceClient()
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Stream realtime data using Go iterator
for data, err := range client.Stream(ctx, []string{"BTCUSDT", "ETHUSDT"}, "1m") {
if err != nil {
log.Printf("stream error: %v", err)
break
}
fmt.Printf("%s: O=%.2f H=%.2f L=%.2f C=%.2f V=%.2f\n",
data.Symbol, data.Open, data.High, data.Low, data.Close, data.Volume)
// Optionally write to storage
// writer.Write(data)
}
}
BTCUSDT, ETHUSDT)1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1Mwss://stream.binance.com:9443/wsAAPL, GOOGL)1s (second aggregates), 1m (minute aggregates)wss://socket.polygon.io/stocks (real-time), wss://delayed.polygon.io/stocks (15-min delayed)| Provider | Endpoint |
|---|---|
| Binance Production | wss://stream.binance.com:9443/ws |
| Binance Testnet | wss://testnet.binance.vision/ws |
| Polygon Stocks | wss://socket.polygon.io/stocks |
Errors are yielded through the iterator. Handle them in your range loop:
for data, err := range provider.Stream(ctx, symbols, interval) {
if err != nil {
// Handle error (connection lost, invalid symbol, etc.)
log.Printf("error: %v", err)
// Decide whether to break or continue
break
}
// Process data
}
Cancel the context to stop streaming:
ctx, cancel := context.WithCancel(context.Background())
// Start streaming in a goroutine
go func() {
for data, err := range provider.Stream(ctx, symbols, interval) {
// ...
}
}()
// Later, stop the stream
cancel()
iter package