Data Collection Guide¶
This document describes how to use the historical and real-time data collection features in DSTA.
Historical Data Collection¶
The download_historical command allows you to download OHLCV (Open, High, Low, Close, Volume) candlestick data from cryptocurrency exchanges.
Basic Usage¶
Download data for a single symbol:
Download data for multiple symbols:
Date Range Options¶
Specify exact date range:
python manage.py download_historical --symbol BTCUSDT --interval 1h \
--start 2024-01-01 --end 2024-12-31
Specify number of days back:
Advanced Options¶
Resume from last downloaded timestamp:
Overwrite existing data:
Custom batch size:
Quiet mode (suppress progress output):
Configuration File¶
For complex configurations, use a JSON config file:
{
"symbols": "BTCUSDT,ETHUSDT,BNBUSDT",
"interval": "1h",
"days": 90,
"batch_size": 1000,
"resume": true
}
Run with config file:
Supported Intervals¶
1m,3m,5m,15m,30m- Minutes1h,2h,4h,6h,8h,12h- Hours1d,3d- Days1w- Week1M- Month
Exchange Support¶
Currently supported exchanges: - Binance (default)
Specify exchange:
Real-Time Data Collection¶
The stream_realtime command allows you to stream live market data from cryptocurrency exchanges.
Basic Usage¶
Stream candlestick (kline) data:
Stream trade data:
Stream order book depth:
Multiple Stream Types¶
Stream multiple data types simultaneously:
python manage.py stream_realtime --symbols BTCUSDT,ETHUSDT \
--klines --intervals 1m,5m,1h \
--trades \
--depth
Advanced Options¶
Custom depth update speed:
Options: 100ms (default) or 1000ms
Stream without saving to database:
Configuration file:
{
"symbols": "BTCUSDT,ETHUSDT",
"klines": true,
"intervals": "1m,5m,1h",
"trades": true,
"depth": true,
"depth_speed": "100ms"
}
Run with config file:
Graceful Shutdown¶
To stop streaming, press Ctrl+C. The application will gracefully shut down all WebSocket connections and save any pending data.
Exchange Support¶
Currently supported exchanges: - Binance (default)
Specify exchange:
Data Storage¶
All downloaded and streamed data is automatically stored in the PostgreSQL database.
Database Models¶
Candlestick: OHLCV data - exchange: Exchange name - symbol: Trading pair (e.g., 'BTCUSDT') - interval: Timeframe (e.g., '1h', '1d') - timestamp: Opening time - open_price, high_price, low_price, close_price: Price data - volume: Trading volume - quote_volume: Quote asset volume - trades_count: Number of trades
Trade: Individual trade executions - exchange: Exchange name - symbol: Trading pair - trade_id: Unique trade ID - timestamp: Execution time - price: Trade price - quantity: Trade quantity - is_buyer_maker: Whether buyer was maker
OrderBook: Order book snapshots - exchange: Exchange name - symbol: Trading pair - timestamp: Snapshot time - bids: Bid levels (price, quantity) - asks: Ask levels (price, quantity) - last_update_id: Update ID for tracking
Querying Data¶
Using Django ORM:
from api.models import Candlestick
from datetime import datetime, timezone
# Get recent candlesticks
candles = Candlestick.objects.filter(
exchange='binance',
symbol='BTCUSDT',
interval='1h'
).order_by('-timestamp')[:100]
# Get data for specific date range
start = datetime(2024, 1, 1, tzinfo=timezone.utc)
end = datetime(2024, 12, 31, tzinfo=timezone.utc)
candles = Candlestick.objects.filter(
exchange='binance',
symbol='BTCUSDT',
interval='1h',
timestamp__gte=start,
timestamp__lte=end
).order_by('timestamp')
Programmatic Usage¶
Using BinanceClient Directly¶
from api.exchanges.binance_client import BinanceClient
from datetime import datetime, timezone, timedelta
# Initialize client
client = BinanceClient()
# Get candlesticks
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)
candlesticks = client.get_candlesticks(
symbol='BTCUSDT',
interval='1h',
start_time=start_time,
end_time=end_time,
limit=500
)
for candle in candlesticks:
print(f"{candle['timestamp']}: O:{candle['open']} C:{candle['close']}")
Using WebSocketManager¶
import asyncio
from api.websockets.manager import WebSocketManager
async def main():
# Initialize manager
manager = WebSocketManager(exchange='binance')
# Add subscriptions
manager.add_kline_subscription('BTCUSDT', '1m', save_to_db=True)
manager.add_trade_subscription('ETHUSDT', save_to_db=True)
# Start manager
await manager.start()
# Start streaming
await manager.start_streaming()
# Run for some time
await asyncio.sleep(3600) # 1 hour
# Stop
await manager.stop()
# Run
asyncio.run(main())
Custom Callbacks¶
import asyncio
from api.websockets.manager import WebSocketManager
async def my_kline_callback(data):
"""Custom callback for kline data."""
if data.get('is_closed'):
print(f"Closed candle: {data['symbol']} @ {data['close']}")
async def my_trade_callback(data):
"""Custom callback for trade data."""
print(f"Trade: {data['symbol']} {data['price']} x {data['quantity']}")
async def main():
manager = WebSocketManager(exchange='binance')
# Add subscriptions with custom callbacks
manager.add_kline_subscription(
'BTCUSDT',
'1m',
save_to_db=True,
callback=my_kline_callback
)
manager.add_trade_subscription(
'ETHUSDT',
save_to_db=True,
callback=my_trade_callback
)
await manager.start()
await manager.start_streaming()
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await manager.stop()
asyncio.run(main())
Best Practices¶
Historical Data Collection¶
- Start with small date ranges to test before downloading large amounts of data
- Use resume capability for large downloads to recover from failures
- Monitor rate limits - the tool automatically handles rate limiting, but be aware of exchange limits
- Use appropriate intervals - smaller intervals generate more data
- Regular backfills - periodically backfill data to fill any gaps
Real-Time Data Collection¶
- Monitor connection health - the manager automatically restarts failed connections
- Use appropriate update speeds -
100msfor high-frequency,1000msfor less critical - Be selective - only stream data you actually need
- Database management - consider data retention policies for high-frequency data
- Resource monitoring - WebSocket connections consume resources, monitor your system
Database Management¶
- Regular cleanup - implement data retention policies
- Index optimization - ensure proper indexes for your queries
- Archival strategy - move old data to cold storage
- Monitoring - track database growth and query performance
Troubleshooting¶
Historical Download Issues¶
Problem: Download is very slow - Solution: Increase batch size (up to 1000), check network connection, verify exchange API status
Problem: Data gaps in downloaded data - Solution: Use resume mode, check for exchange outages during that time period
Problem: Rate limit errors - Solution: Tool automatically handles rate limiting, but reduce parallel downloads if needed
Real-Time Streaming Issues¶
Problem: Frequent disconnections - Solution: Check network stability, verify exchange WebSocket status, increase ping timeout
Problem: Missing data - Solution: Check logs for errors, verify subscriptions, ensure database connection is stable
Problem: High memory usage - Solution: Reduce number of symbols/streams, enable no-save mode if persistence not needed
Database Issues¶
Problem: Duplicate key errors - Solution: Data already exists, use overwrite mode or skip duplicates
Problem: Slow queries - Solution: Ensure proper indexes, optimize query patterns, consider partitioning
Examples¶
Example 1: Daily Data Backfill¶
#!/bin/bash
# Backfill last 90 days of hourly data for top coins
python manage.py download_historical \
--symbols BTCUSDT,ETHUSDT,BNBUSDT,SOLUSDT,ADAUSDT \
--interval 1h \
--days 90 \
--resume
Example 2: Multi-Interval Real-Time Monitoring¶
#!/bin/bash
# Monitor BTC and ETH with multiple intervals
python manage.py stream_realtime \
--symbols BTCUSDT,ETHUSDT \
--klines --intervals 1m,5m,15m,1h \
--trades \
--depth --depth-speed 1000ms
Example 3: Development Testing (No Database)¶
#!/bin/bash
# Test streaming without saving to database
python manage.py stream_realtime \
--symbols BTCUSDT \
--klines --intervals 1m \
--no-save
For more information, see: - TASKS.md - Full task list and status - CHANGELOG.md - Detailed change history - PRD.md - Product requirements document