Skip to content

Caching Strategy Guide

This document describes the caching architecture and strategies for the DSTA trading platform.

Overview

The DSTA caching layer uses Redis to dramatically improve performance by reducing: - Database queries - External API calls - Expensive computations (indicators, analytics) - Response times

Architecture

Application Layer
┌─────────────────┐
│ Cache Manager   │
└────────┬────────┘
┌─────────────────┐
│  Redis Server   │
│   (Port 6379)   │
└─────────────────┘

Quick Start

Basic Usage

from infrastructure.cache import cache_manager, cached

# Manual caching
cache_manager.set('my_key', 'my_value', ttl=300)
value = cache_manager.get('my_key')

# Decorator-based caching
@cached(ttl=300, key_prefix='api')
def get_market_data(symbol):
    # This result will be cached for 5 minutes
    return fetch_from_exchange(symbol)

# Call the function
data = get_market_data('BTCUSDT')  # Fetches from exchange
data = get_market_data('BTCUSDT')  # Returns from cache

Pre-defined Strategies

from infrastructure.cache import CacheStrategies

# API response caching (60 seconds)
@CacheStrategies.api_response_cache(ttl=60)
def get_ticker(symbol):
    return exchange.get_ticker(symbol)

# Database query caching (5 minutes)
@CacheStrategies.database_query_cache(ttl=300)
def get_candlesticks(symbol, interval):
    return Candlestick.objects.filter(
        symbol=symbol,
        interval=interval
    ).order_by('-timestamp')[:100]

# Indicator caching (1 hour)
@CacheStrategies.indicator_cache(ttl=3600)
def calculate_rsi(symbol, period=14):
    return compute_rsi(symbol, period)

# Session caching (30 minutes)
@CacheStrategies.session_cache(ttl=1800)
def get_user_preferences(user_id):
    return UserPreferences.objects.get(user_id=user_id)

Cache Strategies

1. API Response Caching

Cache external API responses to reduce rate limiting and improve response times.

Use Cases: - Market data (ticker, orderbook) - Exchange metadata (symbols, trading rules) - Third-party APIs (news, sentiment)

Configuration:

@cached(ttl=60, key_prefix='api')
def get_binance_ticker(symbol):
    """Cache ticker data for 60 seconds."""
    client = get_binance_client()
    return client.get_ticker(symbol=symbol)

TTL Recommendations: - Ticker data: 5-30 seconds - Orderbook depth: 10-60 seconds - Exchange info: 1 hour - 1 day - News/sentiment: 5-15 minutes

2. Database Query Caching

Cache frequently accessed database queries.

Use Cases: - Historical candlestick data - User settings and preferences - Trading strategies - Backtesting results

Configuration:

@cached(ttl=300, key_prefix='db')
def get_historical_data(symbol, interval, limit=100):
    """Cache historical data for 5 minutes."""
    return Candlestick.objects.filter(
        symbol=symbol,
        interval=interval
    ).order_by('-timestamp')[:limit]

TTL Recommendations: - Historical data: 5-30 minutes - User preferences: 10-60 minutes - System settings: 1-24 hours - Static data: 1 day

3. Indicator Calculation Caching

Cache expensive indicator calculations.

Use Cases: - Technical indicators (RSI, MACD, Bollinger Bands) - Custom analytics - ML model predictions - Risk calculations

Configuration:

@cached(ttl=3600, key_prefix='indicator')
def calculate_macd(symbol, interval, fast=12, slow=26, signal=9):
    """Cache MACD calculation for 1 hour."""
    data = get_candlestick_data(symbol, interval)
    return compute_macd(data, fast, slow, signal)

TTL Recommendations: - Intraday indicators (1m, 5m): 5-15 minutes - Hourly indicators: 30-60 minutes - Daily indicators: 4-24 hours - Weekly/Monthly: 1-7 days

4. Session Caching

Cache user session data and temporary states.

Use Cases: - User authentication tokens - Active positions - Temporary calculations - Wizard/form state

Configuration:

@cached(ttl=1800, key_prefix='session')
def get_active_positions(user_id):
    """Cache active positions for 30 minutes."""
    return Position.objects.filter(
        user_id=user_id,
        status='active'
    )

TTL Recommendations: - Auth tokens: 30-60 minutes - Active positions: 10-30 minutes - Form state: 5-15 minutes

Advanced Features

Custom Key Builder

def build_cache_key(symbol, interval, *args, **kwargs):
    """Custom cache key builder."""
    return f"{symbol}:{interval}"

@cached(
    ttl=300,
    key_prefix='candles',
    key_builder=build_cache_key
)
def get_candlesticks(symbol, interval, limit=100):
    return fetch_candlesticks(symbol, interval, limit)

Conditional Caching

@cached(
    ttl=300,
    condition=lambda result: result is not None and len(result) > 0
)
def get_trades(symbol):
    """Only cache non-empty results."""
    return Trade.objects.filter(symbol=symbol)

Manual Cache Management

from infrastructure.cache import cache_manager

# Set with custom TTL
cache_manager.set('key', 'value', ttl=600, prefix='custom')

# Get with default value
value = cache_manager.get('key', default='default', prefix='custom')

# Check existence
if cache_manager.exists('key', prefix='custom'):
    print("Key exists")

# Get TTL
remaining = cache_manager.get_ttl('key', prefix='custom')
print(f"Expires in {remaining} seconds")

# Delete specific key
cache_manager.delete('key', prefix='custom')

# Delete by pattern
cache_manager.delete_pattern('user:*', prefix='session')

# Increment counter
cache_manager.increment('api_calls', amount=1, prefix='stats')

Cache Invalidation

Symbol-based Invalidation

from infrastructure.cache import CacheInvalidator

# Invalidate all data for a symbol
CacheInvalidator.invalidate_symbol_data('BTCUSDT')
# Deletes: api:*BTCUSDT*, db:*BTCUSDT*, indicator:*BTCUSDT*

Exchange-based Invalidation

# Invalidate all data for an exchange
CacheInvalidator.invalidate_exchange_data('binance')

User Session Invalidation

# Invalidate user session on logout
CacheInvalidator.invalidate_user_session(user_id=123)

Custom Invalidation

# Invalidate specific patterns
cache_manager.delete_pattern('api:market:*')
cache_manager.delete_pattern('indicator:rsi:*')

Cache Warming

Pre-populate cache with frequently accessed data during startup or scheduled tasks.

from infrastructure.cache import CacheWarmer

# Warm market data cache
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
CacheWarmer.warm_market_data(symbols)

# Warm indicator cache
indicators = ['rsi', 'macd', 'bollinger']
CacheWarmer.warm_indicators(symbols, indicators)

Celery Task Example:

from celery import shared_task
from infrastructure.cache import CacheWarmer

@shared_task
def warm_cache():
    """Scheduled cache warming task."""
    top_symbols = get_top_trading_symbols(limit=50)
    CacheWarmer.warm_market_data(top_symbols)
    CacheWarmer.warm_indicators(top_symbols, ['rsi', 'macd'])

Monitoring

Cache Statistics

# Get cache statistics
stats = cache_manager.get_stats()
print(stats)
# Output:
# {
#     'hits': 1000,
#     'misses': 100,
#     'sets': 150,
#     'deletes': 10,
#     'total_requests': 1100,
#     'hit_rate': '90.91%'
# }

# Reset statistics
cache_manager.clear_stats()

Function-level Statistics

@cached(ttl=300, key_prefix='api')
def get_data(symbol):
    return fetch_data(symbol)

# Get cache info for specific function
info = get_data.cache_info()

# Clear cache for specific function
get_data.cache_clear()

Health Checks

def check_cache_health():
    """Health check for Redis cache."""
    try:
        # Test set and get
        test_key = 'health:check'
        cache_manager.set(test_key, 'ok', ttl=10)
        value = cache_manager.get(test_key)
        cache_manager.delete(test_key)

        if value == 'ok':
            return {'status': 'healthy', 'backend': 'redis'}
        else:
            return {'status': 'unhealthy', 'error': 'Data mismatch'}
    except Exception as e:
        return {'status': 'unhealthy', 'error': str(e)}

Performance Tuning

Redis Configuration

In docker-compose.prod.yml:

redis:
  command: >
    redis-server
    --maxmemory 2gb
    --maxmemory-policy allkeys-lru
    --appendonly yes
    --appendfsync everysec
    --save 900 1
    --save 300 10
    --save 60 10000

Eviction Policies: - allkeys-lru: Evict least recently used keys (recommended) - volatile-lru: Evict LRU keys with TTL - allkeys-random: Evict random keys - volatile-ttl: Evict keys with shortest TTL

Connection Pooling

# settings.py
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://redis:6379/0',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'CONNECTION_POOL_KWARGS': {
                'max_connections': 50,
                'retry_on_timeout': True,
            },
            'SOCKET_CONNECT_TIMEOUT': 5,
            'SOCKET_TIMEOUT': 5,
        }
    }
}

Serialization

# Use JSON for simple data (faster)
cache_manager.set('key', {'a': 1}, serialize=True)  # Uses JSON

# Use pickle for complex objects
cache_manager.set('key', complex_object, serialize=True)  # Uses pickle

Best Practices

1. Choose Appropriate TTLs

  • Too short: Frequent cache misses, higher load
  • Too long: Stale data, memory waste

Guidelines: - Real-time data: 5-60 seconds - Frequently updated: 1-15 minutes - Moderately updated: 15-60 minutes - Rarely updated: 1-24 hours - Static data: 1+ days

2. Use Meaningful Prefixes

# Good
@cached(ttl=60, key_prefix='api:binance')
@cached(ttl=300, key_prefix='db:candlesticks')
@cached(ttl=3600, key_prefix='indicator:rsi')

# Bad
@cached(ttl=60, key_prefix='data')
@cached(ttl=300, key_prefix='stuff')

3. Handle Cache Failures Gracefully

def get_data(symbol):
    """Always return data, even if cache fails."""
    try:
        cached_data = cache_manager.get(f'data:{symbol}')
        if cached_data:
            return cached_data
    except Exception as e:
        logger.warning(f"Cache error: {e}")

    # Fetch fresh data if cache miss or error
    data = fetch_from_source(symbol)

    try:
        cache_manager.set(f'data:{symbol}', data, ttl=300)
    except Exception as e:
        logger.warning(f"Cache set error: {e}")

    return data

4. Monitor Cache Hit Rate

Target: > 80% hit rate for frequently accessed data

# Add to monitoring dashboard
stats = cache_manager.get_stats()
hit_rate = float(stats['hit_rate'].rstrip('%'))

if hit_rate < 80:
    logger.warning(f"Low cache hit rate: {hit_rate}%")

5. Invalidate on Updates

def update_candlestick(symbol, interval, data):
    """Update candlestick and invalidate cache."""
    # Update database
    candlestick = Candlestick.objects.update_or_create(...)

    # Invalidate related caches
    cache_manager.delete_pattern(f'db:candles:{symbol}:{interval}*')
    cache_manager.delete_pattern(f'indicator:*:{symbol}:{interval}*')

    return candlestick

6. Use Cache Warming for Predictable Traffic

# Warm cache before market open
@shared_task
def pre_market_cache_warming():
    """Warm cache 30 minutes before market opens."""
    symbols = get_active_symbols()
    for symbol in symbols:
        get_ticker(symbol)  # Populates cache
        get_candlesticks(symbol, '1h')
        calculate_rsi(symbol)

Testing

Unit Tests

# tests/test_cache.py
import pytest
from infrastructure.cache import cache_manager, cached

@pytest.fixture
def clean_cache():
    """Clean cache before each test."""
    cache_manager.flush_all()
    yield
    cache_manager.flush_all()

def test_cache_set_get(clean_cache):
    """Test basic set and get."""
    cache_manager.set('test_key', 'test_value', ttl=60)
    assert cache_manager.get('test_key') == 'test_value'

def test_cache_expiration(clean_cache):
    """Test TTL expiration."""
    cache_manager.set('test_key', 'test_value', ttl=1)
    assert cache_manager.get('test_key') == 'test_value'

    import time
    time.sleep(2)
    assert cache_manager.get('test_key') is None

def test_cached_decorator(clean_cache):
    """Test @cached decorator."""
    call_count = 0

    @cached(ttl=60, key_prefix='test')
    def expensive_function(x):
        nonlocal call_count
        call_count += 1
        return x * 2

    # First call - cache miss
    result1 = expensive_function(5)
    assert result1 == 10
    assert call_count == 1

    # Second call - cache hit
    result2 = expensive_function(5)
    assert result2 == 10
    assert call_count == 1  # Function not called again

Troubleshooting

High Memory Usage

# Check Redis memory
docker exec dsta-redis-prod redis-cli INFO memory

# Check keys
docker exec dsta-redis-prod redis-cli DBSIZE

# Find large keys
docker exec dsta-redis-prod redis-cli --bigkeys

Solutions: - Reduce TTLs - Implement key expiration - Increase maxmemory limit - Change eviction policy

Low Hit Rate

Causes: - TTLs too short - Cache warming not implemented - Unoptimized cache keys - High data volatility

Solutions: - Increase TTLs where appropriate - Implement cache warming - Review key generation logic - Add monitoring alerts

Connection Issues

# Test Redis connection
import redis
r = redis.from_url('redis://localhost:6379/0')
r.ping()  # Should return True

Version History

  • 1.0.0 (2025-01-27): Initial caching layer implementation