Skip to content

Data Collection Guide

This document describes how to use the historical and real-time data collection features in DSTA.

Historical Data Collection

The download_historical command allows you to download OHLCV (Open, High, Low, Close, Volume) candlestick data from cryptocurrency exchanges.

Basic Usage

Download data for a single symbol:

python manage.py download_historical --symbol BTCUSDT --interval 1h --days 30

Download data for multiple symbols:

python manage.py download_historical --symbols BTCUSDT,ETHUSDT,BNBUSDT --interval 1h --days 30

Date Range Options

Specify exact date range:

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --start 2024-01-01 --end 2024-12-31

Specify number of days back:

python manage.py download_historical --symbol BTCUSDT --interval 1h --days 90

Advanced Options

Resume from last downloaded timestamp:

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --days 30 --resume

Overwrite existing data:

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --days 30 --overwrite

Custom batch size:

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --days 30 --batch-size 500

Quiet mode (suppress progress output):

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --days 30 --quiet

Configuration File

For complex configurations, use a JSON config file:

{
  "symbols": "BTCUSDT,ETHUSDT,BNBUSDT",
  "interval": "1h",
  "days": 90,
  "batch_size": 1000,
  "resume": true
}

Run with config file:

python manage.py download_historical --config config.json

Supported Intervals

  • 1m, 3m, 5m, 15m, 30m - Minutes
  • 1h, 2h, 4h, 6h, 8h, 12h - Hours
  • 1d, 3d - Days
  • 1w - Week
  • 1M - Month

Exchange Support

Currently supported exchanges: - Binance (default)

Specify exchange:

python manage.py download_historical --symbol BTCUSDT --interval 1h \
  --days 30 --exchange binance


Real-Time Data Collection

The stream_realtime command allows you to stream live market data from cryptocurrency exchanges.

Basic Usage

Stream candlestick (kline) data:

python manage.py stream_realtime --symbols BTCUSDT --klines --intervals 1m,5m

Stream trade data:

python manage.py stream_realtime --symbols BTCUSDT,ETHUSDT --trades

Stream order book depth:

python manage.py stream_realtime --symbols BTCUSDT --depth

Multiple Stream Types

Stream multiple data types simultaneously:

python manage.py stream_realtime --symbols BTCUSDT,ETHUSDT \
  --klines --intervals 1m,5m,1h \
  --trades \
  --depth

Advanced Options

Custom depth update speed:

python manage.py stream_realtime --symbols BTCUSDT --depth \
  --depth-speed 100ms

Options: 100ms (default) or 1000ms

Stream without saving to database:

python manage.py stream_realtime --symbols BTCUSDT --klines \
  --intervals 1m --no-save

Configuration file:

{
  "symbols": "BTCUSDT,ETHUSDT",
  "klines": true,
  "intervals": "1m,5m,1h",
  "trades": true,
  "depth": true,
  "depth_speed": "100ms"
}

Run with config file:

python manage.py stream_realtime --config config.json

Graceful Shutdown

To stop streaming, press Ctrl+C. The application will gracefully shut down all WebSocket connections and save any pending data.

Exchange Support

Currently supported exchanges: - Binance (default)

Specify exchange:

python manage.py stream_realtime --symbols BTCUSDT --klines \
  --intervals 1m --exchange binance


Data Storage

All downloaded and streamed data is automatically stored in the PostgreSQL database.

Database Models

Candlestick: OHLCV data - exchange: Exchange name - symbol: Trading pair (e.g., 'BTCUSDT') - interval: Timeframe (e.g., '1h', '1d') - timestamp: Opening time - open_price, high_price, low_price, close_price: Price data - volume: Trading volume - quote_volume: Quote asset volume - trades_count: Number of trades

Trade: Individual trade executions - exchange: Exchange name - symbol: Trading pair - trade_id: Unique trade ID - timestamp: Execution time - price: Trade price - quantity: Trade quantity - is_buyer_maker: Whether buyer was maker

OrderBook: Order book snapshots - exchange: Exchange name - symbol: Trading pair - timestamp: Snapshot time - bids: Bid levels (price, quantity) - asks: Ask levels (price, quantity) - last_update_id: Update ID for tracking

Querying Data

Using Django ORM:

from api.models import Candlestick
from datetime import datetime, timezone

# Get recent candlesticks
candles = Candlestick.objects.filter(
    exchange='binance',
    symbol='BTCUSDT',
    interval='1h'
).order_by('-timestamp')[:100]

# Get data for specific date range
start = datetime(2024, 1, 1, tzinfo=timezone.utc)
end = datetime(2024, 12, 31, tzinfo=timezone.utc)

candles = Candlestick.objects.filter(
    exchange='binance',
    symbol='BTCUSDT',
    interval='1h',
    timestamp__gte=start,
    timestamp__lte=end
).order_by('timestamp')


Programmatic Usage

Using BinanceClient Directly

from api.exchanges.binance_client import BinanceClient
from datetime import datetime, timezone, timedelta

# Initialize client
client = BinanceClient()

# Get candlesticks
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)

candlesticks = client.get_candlesticks(
    symbol='BTCUSDT',
    interval='1h',
    start_time=start_time,
    end_time=end_time,
    limit=500
)

for candle in candlesticks:
    print(f"{candle['timestamp']}: O:{candle['open']} C:{candle['close']}")

Using WebSocketManager

import asyncio
from api.websockets.manager import WebSocketManager

async def main():
    # Initialize manager
    manager = WebSocketManager(exchange='binance')

    # Add subscriptions
    manager.add_kline_subscription('BTCUSDT', '1m', save_to_db=True)
    manager.add_trade_subscription('ETHUSDT', save_to_db=True)

    # Start manager
    await manager.start()

    # Start streaming
    await manager.start_streaming()

    # Run for some time
    await asyncio.sleep(3600)  # 1 hour

    # Stop
    await manager.stop()

# Run
asyncio.run(main())

Custom Callbacks

import asyncio
from api.websockets.manager import WebSocketManager

async def my_kline_callback(data):
    """Custom callback for kline data."""
    if data.get('is_closed'):
        print(f"Closed candle: {data['symbol']} @ {data['close']}")

async def my_trade_callback(data):
    """Custom callback for trade data."""
    print(f"Trade: {data['symbol']} {data['price']} x {data['quantity']}")

async def main():
    manager = WebSocketManager(exchange='binance')

    # Add subscriptions with custom callbacks
    manager.add_kline_subscription(
        'BTCUSDT',
        '1m',
        save_to_db=True,
        callback=my_kline_callback
    )

    manager.add_trade_subscription(
        'ETHUSDT',
        save_to_db=True,
        callback=my_trade_callback
    )

    await manager.start()
    await manager.start_streaming()

    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await manager.stop()

asyncio.run(main())

Best Practices

Historical Data Collection

  1. Start with small date ranges to test before downloading large amounts of data
  2. Use resume capability for large downloads to recover from failures
  3. Monitor rate limits - the tool automatically handles rate limiting, but be aware of exchange limits
  4. Use appropriate intervals - smaller intervals generate more data
  5. Regular backfills - periodically backfill data to fill any gaps

Real-Time Data Collection

  1. Monitor connection health - the manager automatically restarts failed connections
  2. Use appropriate update speeds - 100ms for high-frequency, 1000ms for less critical
  3. Be selective - only stream data you actually need
  4. Database management - consider data retention policies for high-frequency data
  5. Resource monitoring - WebSocket connections consume resources, monitor your system

Database Management

  1. Regular cleanup - implement data retention policies
  2. Index optimization - ensure proper indexes for your queries
  3. Archival strategy - move old data to cold storage
  4. Monitoring - track database growth and query performance

Troubleshooting

Historical Download Issues

Problem: Download is very slow - Solution: Increase batch size (up to 1000), check network connection, verify exchange API status

Problem: Data gaps in downloaded data - Solution: Use resume mode, check for exchange outages during that time period

Problem: Rate limit errors - Solution: Tool automatically handles rate limiting, but reduce parallel downloads if needed

Real-Time Streaming Issues

Problem: Frequent disconnections - Solution: Check network stability, verify exchange WebSocket status, increase ping timeout

Problem: Missing data - Solution: Check logs for errors, verify subscriptions, ensure database connection is stable

Problem: High memory usage - Solution: Reduce number of symbols/streams, enable no-save mode if persistence not needed

Database Issues

Problem: Duplicate key errors - Solution: Data already exists, use overwrite mode or skip duplicates

Problem: Slow queries - Solution: Ensure proper indexes, optimize query patterns, consider partitioning


Examples

Example 1: Daily Data Backfill

#!/bin/bash
# Backfill last 90 days of hourly data for top coins

python manage.py download_historical \
  --symbols BTCUSDT,ETHUSDT,BNBUSDT,SOLUSDT,ADAUSDT \
  --interval 1h \
  --days 90 \
  --resume

Example 2: Multi-Interval Real-Time Monitoring

#!/bin/bash
# Monitor BTC and ETH with multiple intervals

python manage.py stream_realtime \
  --symbols BTCUSDT,ETHUSDT \
  --klines --intervals 1m,5m,15m,1h \
  --trades \
  --depth --depth-speed 1000ms

Example 3: Development Testing (No Database)

#!/bin/bash
# Test streaming without saving to database

python manage.py stream_realtime \
  --symbols BTCUSDT \
  --klines --intervals 1m \
  --no-save

For more information, see: - TASKS.md - Full task list and status - CHANGELOG.md - Detailed change history - PRD.md - Product requirements document