Caching Strategy Guide¶
This document describes the caching architecture and strategies for the DSTA trading platform.
Overview¶
The DSTA caching layer uses Redis to dramatically improve performance by reducing: - Database queries - External API calls - Expensive computations (indicators, analytics) - Response times
Architecture¶
Application Layer
│
▼
┌─────────────────┐
│ Cache Manager │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Redis Server │
│ (Port 6379) │
└─────────────────┘
Quick Start¶
Basic Usage¶
from infrastructure.cache import cache_manager, cached
# Manual caching
cache_manager.set('my_key', 'my_value', ttl=300)
value = cache_manager.get('my_key')
# Decorator-based caching
@cached(ttl=300, key_prefix='api')
def get_market_data(symbol):
# This result will be cached for 5 minutes
return fetch_from_exchange(symbol)
# Call the function
data = get_market_data('BTCUSDT') # Fetches from exchange
data = get_market_data('BTCUSDT') # Returns from cache
Pre-defined Strategies¶
from infrastructure.cache import CacheStrategies
# API response caching (60 seconds)
@CacheStrategies.api_response_cache(ttl=60)
def get_ticker(symbol):
return exchange.get_ticker(symbol)
# Database query caching (5 minutes)
@CacheStrategies.database_query_cache(ttl=300)
def get_candlesticks(symbol, interval):
return Candlestick.objects.filter(
symbol=symbol,
interval=interval
).order_by('-timestamp')[:100]
# Indicator caching (1 hour)
@CacheStrategies.indicator_cache(ttl=3600)
def calculate_rsi(symbol, period=14):
return compute_rsi(symbol, period)
# Session caching (30 minutes)
@CacheStrategies.session_cache(ttl=1800)
def get_user_preferences(user_id):
return UserPreferences.objects.get(user_id=user_id)
Cache Strategies¶
1. API Response Caching¶
Cache external API responses to reduce rate limiting and improve response times.
Use Cases: - Market data (ticker, orderbook) - Exchange metadata (symbols, trading rules) - Third-party APIs (news, sentiment)
Configuration:
@cached(ttl=60, key_prefix='api')
def get_binance_ticker(symbol):
"""Cache ticker data for 60 seconds."""
client = get_binance_client()
return client.get_ticker(symbol=symbol)
TTL Recommendations: - Ticker data: 5-30 seconds - Orderbook depth: 10-60 seconds - Exchange info: 1 hour - 1 day - News/sentiment: 5-15 minutes
2. Database Query Caching¶
Cache frequently accessed database queries.
Use Cases: - Historical candlestick data - User settings and preferences - Trading strategies - Backtesting results
Configuration:
@cached(ttl=300, key_prefix='db')
def get_historical_data(symbol, interval, limit=100):
"""Cache historical data for 5 minutes."""
return Candlestick.objects.filter(
symbol=symbol,
interval=interval
).order_by('-timestamp')[:limit]
TTL Recommendations: - Historical data: 5-30 minutes - User preferences: 10-60 minutes - System settings: 1-24 hours - Static data: 1 day
3. Indicator Calculation Caching¶
Cache expensive indicator calculations.
Use Cases: - Technical indicators (RSI, MACD, Bollinger Bands) - Custom analytics - ML model predictions - Risk calculations
Configuration:
@cached(ttl=3600, key_prefix='indicator')
def calculate_macd(symbol, interval, fast=12, slow=26, signal=9):
"""Cache MACD calculation for 1 hour."""
data = get_candlestick_data(symbol, interval)
return compute_macd(data, fast, slow, signal)
TTL Recommendations: - Intraday indicators (1m, 5m): 5-15 minutes - Hourly indicators: 30-60 minutes - Daily indicators: 4-24 hours - Weekly/Monthly: 1-7 days
4. Session Caching¶
Cache user session data and temporary states.
Use Cases: - User authentication tokens - Active positions - Temporary calculations - Wizard/form state
Configuration:
@cached(ttl=1800, key_prefix='session')
def get_active_positions(user_id):
"""Cache active positions for 30 minutes."""
return Position.objects.filter(
user_id=user_id,
status='active'
)
TTL Recommendations: - Auth tokens: 30-60 minutes - Active positions: 10-30 minutes - Form state: 5-15 minutes
Advanced Features¶
Custom Key Builder¶
def build_cache_key(symbol, interval, *args, **kwargs):
"""Custom cache key builder."""
return f"{symbol}:{interval}"
@cached(
ttl=300,
key_prefix='candles',
key_builder=build_cache_key
)
def get_candlesticks(symbol, interval, limit=100):
return fetch_candlesticks(symbol, interval, limit)
Conditional Caching¶
@cached(
ttl=300,
condition=lambda result: result is not None and len(result) > 0
)
def get_trades(symbol):
"""Only cache non-empty results."""
return Trade.objects.filter(symbol=symbol)
Manual Cache Management¶
from infrastructure.cache import cache_manager
# Set with custom TTL
cache_manager.set('key', 'value', ttl=600, prefix='custom')
# Get with default value
value = cache_manager.get('key', default='default', prefix='custom')
# Check existence
if cache_manager.exists('key', prefix='custom'):
print("Key exists")
# Get TTL
remaining = cache_manager.get_ttl('key', prefix='custom')
print(f"Expires in {remaining} seconds")
# Delete specific key
cache_manager.delete('key', prefix='custom')
# Delete by pattern
cache_manager.delete_pattern('user:*', prefix='session')
# Increment counter
cache_manager.increment('api_calls', amount=1, prefix='stats')
Cache Invalidation¶
Symbol-based Invalidation¶
from infrastructure.cache import CacheInvalidator
# Invalidate all data for a symbol
CacheInvalidator.invalidate_symbol_data('BTCUSDT')
# Deletes: api:*BTCUSDT*, db:*BTCUSDT*, indicator:*BTCUSDT*
Exchange-based Invalidation¶
User Session Invalidation¶
Custom Invalidation¶
# Invalidate specific patterns
cache_manager.delete_pattern('api:market:*')
cache_manager.delete_pattern('indicator:rsi:*')
Cache Warming¶
Pre-populate cache with frequently accessed data during startup or scheduled tasks.
from infrastructure.cache import CacheWarmer
# Warm market data cache
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
CacheWarmer.warm_market_data(symbols)
# Warm indicator cache
indicators = ['rsi', 'macd', 'bollinger']
CacheWarmer.warm_indicators(symbols, indicators)
Celery Task Example:
from celery import shared_task
from infrastructure.cache import CacheWarmer
@shared_task
def warm_cache():
"""Scheduled cache warming task."""
top_symbols = get_top_trading_symbols(limit=50)
CacheWarmer.warm_market_data(top_symbols)
CacheWarmer.warm_indicators(top_symbols, ['rsi', 'macd'])
Monitoring¶
Cache Statistics¶
# Get cache statistics
stats = cache_manager.get_stats()
print(stats)
# Output:
# {
# 'hits': 1000,
# 'misses': 100,
# 'sets': 150,
# 'deletes': 10,
# 'total_requests': 1100,
# 'hit_rate': '90.91%'
# }
# Reset statistics
cache_manager.clear_stats()
Function-level Statistics¶
@cached(ttl=300, key_prefix='api')
def get_data(symbol):
return fetch_data(symbol)
# Get cache info for specific function
info = get_data.cache_info()
# Clear cache for specific function
get_data.cache_clear()
Health Checks¶
def check_cache_health():
"""Health check for Redis cache."""
try:
# Test set and get
test_key = 'health:check'
cache_manager.set(test_key, 'ok', ttl=10)
value = cache_manager.get(test_key)
cache_manager.delete(test_key)
if value == 'ok':
return {'status': 'healthy', 'backend': 'redis'}
else:
return {'status': 'unhealthy', 'error': 'Data mismatch'}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
Performance Tuning¶
Redis Configuration¶
In docker-compose.prod.yml:
redis:
command: >
redis-server
--maxmemory 2gb
--maxmemory-policy allkeys-lru
--appendonly yes
--appendfsync everysec
--save 900 1
--save 300 10
--save 60 10000
Eviction Policies: - allkeys-lru: Evict least recently used keys (recommended) - volatile-lru: Evict LRU keys with TTL - allkeys-random: Evict random keys - volatile-ttl: Evict keys with shortest TTL
Connection Pooling¶
# settings.py
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://redis:6379/0',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
},
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
}
}
}
Serialization¶
# Use JSON for simple data (faster)
cache_manager.set('key', {'a': 1}, serialize=True) # Uses JSON
# Use pickle for complex objects
cache_manager.set('key', complex_object, serialize=True) # Uses pickle
Best Practices¶
1. Choose Appropriate TTLs¶
- Too short: Frequent cache misses, higher load
- Too long: Stale data, memory waste
Guidelines: - Real-time data: 5-60 seconds - Frequently updated: 1-15 minutes - Moderately updated: 15-60 minutes - Rarely updated: 1-24 hours - Static data: 1+ days
2. Use Meaningful Prefixes¶
# Good
@cached(ttl=60, key_prefix='api:binance')
@cached(ttl=300, key_prefix='db:candlesticks')
@cached(ttl=3600, key_prefix='indicator:rsi')
# Bad
@cached(ttl=60, key_prefix='data')
@cached(ttl=300, key_prefix='stuff')
3. Handle Cache Failures Gracefully¶
def get_data(symbol):
"""Always return data, even if cache fails."""
try:
cached_data = cache_manager.get(f'data:{symbol}')
if cached_data:
return cached_data
except Exception as e:
logger.warning(f"Cache error: {e}")
# Fetch fresh data if cache miss or error
data = fetch_from_source(symbol)
try:
cache_manager.set(f'data:{symbol}', data, ttl=300)
except Exception as e:
logger.warning(f"Cache set error: {e}")
return data
4. Monitor Cache Hit Rate¶
Target: > 80% hit rate for frequently accessed data
# Add to monitoring dashboard
stats = cache_manager.get_stats()
hit_rate = float(stats['hit_rate'].rstrip('%'))
if hit_rate < 80:
logger.warning(f"Low cache hit rate: {hit_rate}%")
5. Invalidate on Updates¶
def update_candlestick(symbol, interval, data):
"""Update candlestick and invalidate cache."""
# Update database
candlestick = Candlestick.objects.update_or_create(...)
# Invalidate related caches
cache_manager.delete_pattern(f'db:candles:{symbol}:{interval}*')
cache_manager.delete_pattern(f'indicator:*:{symbol}:{interval}*')
return candlestick
6. Use Cache Warming for Predictable Traffic¶
# Warm cache before market open
@shared_task
def pre_market_cache_warming():
"""Warm cache 30 minutes before market opens."""
symbols = get_active_symbols()
for symbol in symbols:
get_ticker(symbol) # Populates cache
get_candlesticks(symbol, '1h')
calculate_rsi(symbol)
Testing¶
Unit Tests¶
# tests/test_cache.py
import pytest
from infrastructure.cache import cache_manager, cached
@pytest.fixture
def clean_cache():
"""Clean cache before each test."""
cache_manager.flush_all()
yield
cache_manager.flush_all()
def test_cache_set_get(clean_cache):
"""Test basic set and get."""
cache_manager.set('test_key', 'test_value', ttl=60)
assert cache_manager.get('test_key') == 'test_value'
def test_cache_expiration(clean_cache):
"""Test TTL expiration."""
cache_manager.set('test_key', 'test_value', ttl=1)
assert cache_manager.get('test_key') == 'test_value'
import time
time.sleep(2)
assert cache_manager.get('test_key') is None
def test_cached_decorator(clean_cache):
"""Test @cached decorator."""
call_count = 0
@cached(ttl=60, key_prefix='test')
def expensive_function(x):
nonlocal call_count
call_count += 1
return x * 2
# First call - cache miss
result1 = expensive_function(5)
assert result1 == 10
assert call_count == 1
# Second call - cache hit
result2 = expensive_function(5)
assert result2 == 10
assert call_count == 1 # Function not called again
Troubleshooting¶
High Memory Usage¶
# Check Redis memory
docker exec dsta-redis-prod redis-cli INFO memory
# Check keys
docker exec dsta-redis-prod redis-cli DBSIZE
# Find large keys
docker exec dsta-redis-prod redis-cli --bigkeys
Solutions: - Reduce TTLs - Implement key expiration - Increase maxmemory limit - Change eviction policy
Low Hit Rate¶
Causes: - TTLs too short - Cache warming not implemented - Unoptimized cache keys - High data volatility
Solutions: - Increase TTLs where appropriate - Implement cache warming - Review key generation logic - Add monitoring alerts
Connection Issues¶
# Test Redis connection
import redis
r = redis.from_url('redis://localhost:6379/0')
r.ping() # Should return True
Version History¶
- 1.0.0 (2025-01-27): Initial caching layer implementation