The Live Trading Engine enables real-time strategy execution using streaming market data and live/paper trading providers. It follows patterns established by the backtest engine, allowing the same WASM strategies to work in both backtest and live modes.
The Live Trading Engine is designed to:
Provider.Stream() interfaceTradingSystemProvider interface┌─────────────────────────────────────────────────────────────────────────────┐
│ Live Trading Engine │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ Engine Configuration │ │
│ │ - Market Data Provider Type + Config │ │
│ │ - Trading Provider Type + Config │ │
│ │ - Strategy WASM Path + Config │ │
│ │ - Symbols to trade │ │
│ │ - Prefetch settings │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Strategy (WASM Plugin) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ProcessData(MarketData) → Analyze → PlaceOrder / GetPositions / etc. ││
│ └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘
│
┌───────────────────────┴───────────────────────┐
│ │
▼ ▼
┌──────────────────────────────┐ ┌──────────────────────────────────┐
│ Market Data Provider │ │ Trading Provider │
│ (with Stream support) │ │ (TradingSystemProvider) │
│ │ │ │
│ ┌────────────────────────┐ │ │ ┌────────────────────────────┐ │
│ │ Stream(ctx, symbols, │ │ │ │ PlaceOrder(order) │ │
│ │ interval) │ │ │ │ GetPositions() │ │
│ │ → iter.Seq2[MarketData]│ │ │ │ CancelOrder(id) │ │
│ └────────────────────────┘ │ │ │ GetAccountInfo() │ │
│ │ │ └────────────────────────────┘ │
│ Providers: │ │ │
│ - Binance (WebSocket) │ │ Providers: │
│ - Polygon (WebSocket) │ │ - binance-paper │
│ │ │ - binance-live │
│ │ │ - ibkr-paper │
│ │ │ - ibkr-live │
└──────────────────────────────┘ └──────────────────────────────────┘
package engine
type LiveTradingEngine interface {
// Initialize sets up the engine with the given configuration.
Initialize(config LiveTradingEngineConfig) error
// LoadStrategyFromFile loads a WASM strategy from a file path.
LoadStrategyFromFile(strategyPath string) error
// LoadStrategyFromBytes loads a WASM strategy from bytes.
LoadStrategyFromBytes(strategyBytes []byte) error
// LoadStrategy loads a pre-created strategy runtime.
LoadStrategy(strategy runtime.StrategyRuntime) error
// SetStrategyConfig sets the strategy configuration (YAML/JSON string).
SetStrategyConfig(config string) error
// SetMarketDataProvider configures the market data provider.
SetMarketDataProvider(provider provider.Provider) error
// SetTradingProvider configures the trading provider.
SetTradingProvider(provider tradingprovider.TradingSystemProvider) error
// SetDataOutputPath sets the base directory for session data output (orders, trades, marks, logs, stats).
// Must be called before Run() if persistence is desired.
SetDataOutputPath(path string) error
// Run starts the live trading engine.
// Blocks until context is cancelled or a fatal error occurs.
Run(ctx context.Context, callbacks LiveTradingCallbacks) error
// GetConfigSchema returns the JSON schema for engine configuration.
GetConfigSchema() (string, error)
}
type LiveTradingEngineConfig struct {
// MarketDataCacheSize is the number of historical data points to cache per symbol
MarketDataCacheSize int `json:"market_data_cache_size" yaml:"market_data_cache_size"`
// EnableLogging enables strategy log storage
EnableLogging bool `json:"enable_logging" yaml:"enable_logging"`
// Prefetch configuration for historical data
Prefetch PrefetchConfig `json:"prefetch" yaml:"prefetch"`
}
// Note: symbols and interval are configured via the market data provider, not the engine config.
// Note: data output path is set via SetDataOutputPath(), not in config.
type PrefetchConfig struct {
// Enabled enables historical data prefetching
Enabled bool json:"enabled" yaml:"enabled"
// StartTimeType is either "date" or "days"
StartTimeType string `json:"start_time_type" yaml:"start_time_type"`
// StartTime is the absolute start time (used when StartTimeType is "date")
StartTime time.Time `json:"start_time" yaml:"start_time"`
// Days is the number of days to prefetch (used when StartTimeType is "days")
Days int `json:"days" yaml:"days"` } ```
type LiveTradingCallbacks struct {
// OnEngineStart is called when the engine starts successfully.
OnEngineStart *OnEngineStartCallback
// OnEngineStop is called when the engine stops (always called via defer).
OnEngineStop *OnEngineStopCallback
// OnMarketData is called for each market data point received.
OnMarketData *OnMarketDataCallback
// OnOrderPlaced is called when an order is placed by the strategy.
OnOrderPlaced *OnOrderPlacedCallback
// OnOrderFilled is called when an order is filled.
OnOrderFilled *OnOrderFilledCallback
// OnError is called when a non-fatal error occurs.
OnError *OnErrorCallback
// OnStrategyError is called when the strategy returns an error.
OnStrategyError *OnStrategyErrorCallback
// OnStatsUpdate is called periodically with real-time statistics.
OnStatsUpdate *OnStatsUpdateCallback
// OnStatusUpdate is called when the engine status changes.
OnStatusUpdate *OnStatusUpdateCallback
}
// EngineStatus represents the current status of the live trading engine.
type EngineStatus string
const (
// EngineStatusPrefetching indicates the engine is downloading historical data.
EngineStatusPrefetching EngineStatus = "prefetching"
// EngineStatusRunning indicates the engine is processing live market data.
EngineStatusRunning EngineStatus = "running"
// EngineStatusStopping indicates the engine is shutting down (cleanup in progress).
// Note: Not implemented yet - reserved for future use.
EngineStatusStopping EngineStatus = "stopping"
)
type OnEngineStartCallback func(symbols []string, interval string, dataPath string) error
type OnEngineStopCallback func(err error)
type OnMarketDataCallback func(data types.MarketData) error
type OnOrderPlacedCallback func(order types.ExecuteOrder) error
type OnOrderFilledCallback func(order types.Order) error
type OnErrorCallback func(err error)
type OnStrategyErrorCallback func(data types.MarketData, err error)
type OnStatsUpdateCallback func(stats LiveTradeStats) error
type OnStatusUpdateCallback func(status EngineStatus) error
Market data providers implement the Provider interface with real-time streaming support:
type Provider interface {
// Stream returns an iterator of real-time market data
Stream(ctx context.Context, symbols []string, interval string) iter.Seq2[types.MarketData, error]
// Download fetches historical data
Download(ctx context.Context, params DownloadParams) (string, error)
}
| Provider | Type | Description |
|---|---|---|
binance |
Crypto | Binance WebSocket streaming for cryptocurrency |
polygon |
Stocks | Polygon.io WebSocket streaming for US equities |
market_data:
provider_type: binance
config:
use_testnet: true
Trading providers implement the TradingSystemProvider interface:
type TradingSystemProvider interface {
PlaceOrder(order types.ExecuteOrder) error
PlaceMultipleOrders(orders []types.ExecuteOrder) error
GetPositions() ([]types.Position, error)
GetPosition(symbol string) (types.Position, error)
CancelOrder(orderID string) error
CancelAllOrders() error
GetOrderStatus(orderID string) (types.OrderStatus, error)
GetAccountInfo() (types.AccountInfo, error)
GetOpenOrders() ([]types.ExecuteOrder, error)
GetTrades(filter types.TradeFilter) ([]types.Trade, error)
GetMaxBuyQuantity(symbol string, price float64) (float64, error)
GetMaxSellQuantity(symbol string) (float64, error)
}
| Provider | Type | Description |
|---|---|---|
binance-paper |
Crypto | Binance testnet for paper trading |
binance-live |
Crypto | Binance mainnet for live trading |
ibkr-paper |
Stocks | Interactive Brokers paper trading |
ibkr-live |
Stocks | Interactive Brokers live trading |
trading:
provider_type: binance-paper
config:
api_key: ${BINANCE_TESTNET_API_KEY}
secret_key: ${BINANCE_TESTNET_SECRET_KEY}
engine:
symbols:
- BTCUSDT
- ETHUSDT
interval: "1m"
market_data_cache_size: 1000
enable_logging: true
prefetch:
enabled: true
start_time_type: days
days: 30
market_data:
provider_type: binance
config:
use_testnet: true
trading:
provider_type: binance-paper
config:
api_key: ${BINANCE_TESTNET_API_KEY}
secret_key: ${BINANCE_TESTNET_SECRET_KEY}
strategy:
wasm_path: ./examples/strategy/strategy.wasm
config_path: ./config/strategy/my-strategy.yaml
package main
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/rxtech-lab/argo-trading/internal/trading/engine"
"github.com/rxtech-lab/argo-trading/pkg/marketdata/provider"
)
func main() {
// Create engine
eng, err := engine.NewLiveTradingEngineV1()
if err != nil {
log.Fatal(err)
}
// Configure engine
config := engine.LiveTradingEngineConfig{
MarketDataCacheSize: 1000,
EnableLogging: true,
Prefetch: engine.PrefetchConfig{
Enabled: true,
StartTimeType: "days",
Days: 30,
},
}
// Note: symbols and interval are configured via the market data provider
if err := eng.Initialize(config); err != nil {
log.Fatal(err)
}
// Set data output path for persistence
if err := eng.SetDataOutputPath("./data/live-trading"); err != nil {
log.Fatal(err)
}
// Set providers and load strategy...
// See full example in CLI usage below
}
go run cmd/trading/main.go \
--strategy-wasm ./strategy.wasm \
--market-data-provider binance \
--trading-provider binance-paper \
--trading-config ./config/trading.json \
--symbols BTCUSDT,ETHUSDT \
--interval 1m \
--data-output ./data/live-trading
| Flag | Required | Description |
|---|---|---|
--strategy-wasm |
Yes | Path to strategy WASM file |
--strategy-config |
No | Path to strategy config |
--market-data-provider |
Yes | binance or polygon |
--polygon-api-key |
Polygon only | Polygon API key |
--trading-provider |
Yes | binance-paper, binance-live, etc. |
--trading-config |
Yes | Provider config file |
--symbols |
Yes | Comma-separated symbols |
--interval |
No | Default: 1m |
--cache-size |
No | Default: 1000 |
--data-output |
Yes | Data persistence directory |
--prefetch-type |
No | date or days |
--prefetch-start |
No | Start time (if type is date) |
--prefetch-days |
No | Days to prefetch (if type is days) |
Paper Trading with Binance:
go run cmd/trading/main.go \
--strategy-wasm ./examples/strategy/strategy.wasm \
--market-data-provider binance \
--trading-provider binance-paper \
--trading-config ./config/binance-testnet.json \
--symbols BTCUSDT \
--interval 5m \
--data-output ./data/live \
--prefetch-type days \
--prefetch-days 30
With Polygon Data and IBKR Trading:
go run cmd/trading/main.go \
--strategy-wasm ./examples/strategy/strategy.wasm \
--market-data-provider polygon \
--polygon-api-key $POLYGON_API_KEY \
--trading-provider ibkr-paper \
--trading-config ./config/ibkr-paper.json \
--symbols AAPL,GOOGL \
--interval 1m \
--data-output ./data/live
The engine handles graceful shutdown on SIGINT/SIGTERM:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutting down...")
cancel()
}()
eng.Run(ctx, callbacks)
On shutdown:
OnEngineStop callback is invoked