Performance Optimization Guide for Financial Data Processing¶
Published: January 11, 2025 | Author: QuantDB Team | Category: Technical Deep Dive
🎯 Overview¶
This comprehensive guide covers advanced performance optimization techniques for financial data processing using QuantDB. Learn how to maximize cache efficiency, optimize data retrieval patterns, and implement best practices for production-grade financial applications.
What You'll Learn: - Smart caching strategies and trading calendar optimization - Batch processing techniques for maximum throughput - Memory management and resource optimization - Production deployment best practices
🧠Smart Caching Strategies¶
Understanding Cache Behavior¶
QuantDB's intelligent caching system operates on multiple levels. Understanding these levels is crucial for optimization:
# Cache hierarchy visualization
class CacheHierarchy:
def __init__(self):
self.levels = {
'L1_memory': {
'access_time_ms': 0.1,
'capacity_mb': 100,
'volatility': 'high' # Lost on restart
},
'L2_sqlite': {
'access_time_ms': 10,
'capacity_mb': 10000,
'volatility': 'low' # Persistent
},
'L3_akshare': {
'access_time_ms': 1000,
'capacity_mb': 'unlimited',
'volatility': 'none' # Always available
}
}
def get_optimal_strategy(self, data_pattern):
"""Determine optimal caching strategy based on data access pattern"""
if data_pattern['frequency'] == 'high' and data_pattern['recency'] == 'recent':
return 'aggressive_l1_caching'
elif data_pattern['frequency'] == 'medium':
return 'balanced_l1_l2_caching'
else:
return 'l2_only_caching'
Trading Calendar-Aware Optimization¶
Leverage trading calendar intelligence for maximum cache efficiency:
import qdb
from datetime import datetime, timedelta
class TradingCalendarOptimizer:
def __init__(self):
self.trading_calendar = qdb.get_trading_calendar()
def optimize_data_requests(self, symbol, start_date, end_date):
"""Optimize data requests based on trading calendar"""
# Filter to only trading days
trading_days = self.trading_calendar.get_trading_days(start_date, end_date)
# Group consecutive trading days for batch requests
date_ranges = self.group_consecutive_dates(trading_days)
# Prioritize recent data (more likely to be accessed again)
date_ranges.sort(key=lambda x: x['end_date'], reverse=True)
return date_ranges
def group_consecutive_dates(self, dates):
"""Group consecutive dates into ranges for efficient batch processing"""
if not dates:
return []
ranges = []
current_start = dates[0]
current_end = dates[0]
for date in dates[1:]:
if (date - current_end).days == 1:
current_end = date
else:
ranges.append({
'start_date': current_start,
'end_date': current_end,
'trading_days': (current_end - current_start).days + 1
})
current_start = date
current_end = date
# Add the last range
ranges.append({
'start_date': current_start,
'end_date': current_end,
'trading_days': (current_end - current_start).days + 1
})
return ranges
# Usage example
optimizer = TradingCalendarOptimizer()
optimized_ranges = optimizer.optimize_data_requests("000001", "20240101", "20241231")
for range_info in optimized_ranges:
print(f"Range: {range_info['start_date']} to {range_info['end_date']} "
f"({range_info['trading_days']} trading days)")
Predictive Cache Warming¶
Implement predictive caching based on usage patterns:
class PredictiveCacheWarmer:
def __init__(self):
self.usage_patterns = {}
self.prediction_model = UsagePredictionModel()
def learn_usage_pattern(self, user_id, symbol, timestamp):
"""Learn from user data access patterns"""
if user_id not in self.usage_patterns:
self.usage_patterns[user_id] = []
self.usage_patterns[user_id].append({
'symbol': symbol,
'timestamp': timestamp,
'hour': timestamp.hour,
'day_of_week': timestamp.weekday()
})
def predict_next_requests(self, user_id):
"""Predict likely next data requests"""
if user_id not in self.usage_patterns:
return []
patterns = self.usage_patterns[user_id]
current_time = datetime.now()
# Analyze patterns
predictions = self.prediction_model.predict(
patterns=patterns,
current_time=current_time
)
return predictions
def warm_cache_proactively(self, user_id):
"""Proactively warm cache based on predictions"""
predictions = self.predict_next_requests(user_id)
for prediction in predictions:
if prediction['confidence'] > 0.7: # High confidence threshold
# Pre-load data in background
self.background_load(
symbol=prediction['symbol'],
date_range=prediction['date_range'],
priority=prediction['confidence']
)
def background_load(self, symbol, date_range, priority):
"""Load data in background thread"""
import threading
def load_data():
try:
qdb.get_stock_data(
symbol,
start_date=date_range['start'],
end_date=date_range['end']
)
except Exception as e:
print(f"Background loading failed for {symbol}: {e}")
thread = threading.Thread(target=load_data)
thread.daemon = True
thread.start()
âš¡ Batch Processing Optimization¶
Efficient Batch Data Retrieval¶
Maximize throughput with optimized batch processing:
class BatchProcessor:
def __init__(self, batch_size=50, max_concurrent=5):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
self.semaphore = threading.Semaphore(max_concurrent)
def process_large_universe(self, symbols, start_date, end_date):
"""Process large stock universe efficiently"""
# Split into optimal batch sizes
batches = self.create_batches(symbols, self.batch_size)
# Process batches concurrently
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrent) as executor:
future_to_batch = {
executor.submit(self.process_batch, batch, start_date, end_date): batch
for batch in batches
}
for future in concurrent.futures.as_completed(future_to_batch):
batch = future_to_batch[future]
try:
batch_results = future.result()
results.update(batch_results)
except Exception as e:
print(f"Batch processing failed: {e}")
return results
def process_batch(self, symbols_batch, start_date, end_date):
"""Process a single batch of symbols"""
with self.semaphore: # Limit concurrent requests
try:
return qdb.get_multiple_stocks(
symbols_batch,
start_date=start_date,
end_date=end_date
)
except Exception as e:
print(f"Error processing batch {symbols_batch[:3]}...: {e}")
return {}
def create_batches(self, items, batch_size):
"""Split items into batches of specified size"""
return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
# Usage example
processor = BatchProcessor(batch_size=100, max_concurrent=3)
universe = qdb.get_stock_list()[:1000] # Top 1000 stocks
start_time = time.time()
all_data = processor.process_large_universe(universe, "20240101", "20241231")
processing_time = time.time() - start_time
print(f"Processed {len(all_data)} stocks in {processing_time:.1f} seconds")
print(f"Average time per stock: {processing_time/len(all_data)*1000:.1f}ms")
Intelligent Request Deduplication¶
Avoid duplicate requests through intelligent deduplication:
class RequestDeduplicator:
def __init__(self):
self.pending_requests = {}
self.request_lock = threading.Lock()
def deduplicated_request(self, request_key, request_func, *args, **kwargs):
"""Execute request with deduplication"""
with self.request_lock:
# Check if request is already pending
if request_key in self.pending_requests:
# Wait for existing request to complete
return self.pending_requests[request_key].result()
# Create new request
future = concurrent.futures.Future()
self.pending_requests[request_key] = future
try:
# Execute request
result = request_func(*args, **kwargs)
future.set_result(result)
return result
except Exception as e:
future.set_exception(e)
raise
finally:
# Clean up
with self.request_lock:
self.pending_requests.pop(request_key, None)
def generate_request_key(self, symbol, start_date, end_date, **kwargs):
"""Generate unique key for request deduplication"""
key_parts = [symbol, start_date, end_date]
key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
return "|".join(str(part) for part in key_parts)
# Usage example
deduplicator = RequestDeduplicator()
def optimized_get_stock_data(symbol, **kwargs):
"""Get stock data with deduplication"""
request_key = deduplicator.generate_request_key(symbol, **kwargs)
return deduplicator.deduplicated_request(
request_key,
qdb.get_stock_data,
symbol,
**kwargs
)
💾 Memory Management Optimization¶
Efficient Data Structures¶
Use memory-efficient data structures for large datasets:
import pandas as pd
import numpy as np
from typing import Dict, List
class MemoryOptimizedDataManager:
def __init__(self):
self.data_store = {}
self.memory_threshold_mb = 500 # 500MB threshold
def optimize_dataframe_memory(self, df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame memory usage"""
# Convert object columns to category where appropriate
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5: # Less than 50% unique values
df[col] = df[col].astype('category')
# Optimize numeric columns
for col in df.select_dtypes(include=['int64']).columns:
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0: # Unsigned integers
if col_max < 255:
df[col] = df[col].astype('uint8')
elif col_max < 65535:
df[col] = df[col].astype('uint16')
elif col_max < 4294967295:
df[col] = df[col].astype('uint32')
else: # Signed integers
if col_min > -128 and col_max < 127:
df[col] = df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype('int32')
# Optimize float columns
for col in df.select_dtypes(include=['float64']).columns:
df[col] = pd.to_numeric(df[col], downcast='float')
return df
def get_memory_usage_mb(self) -> float:
"""Get current memory usage in MB"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def manage_memory_usage(self, new_data: Dict[str, pd.DataFrame]):
"""Manage memory usage by cleaning old data if necessary"""
current_memory = self.get_memory_usage_mb()
if current_memory > self.memory_threshold_mb:
# Remove least recently used data
self.cleanup_old_data()
# Add new data with optimization
for symbol, df in new_data.items():
optimized_df = self.optimize_dataframe_memory(df)
self.data_store[symbol] = {
'data': optimized_df,
'last_accessed': datetime.now(),
'memory_mb': optimized_df.memory_usage(deep=True).sum() / 1024 / 1024
}
def cleanup_old_data(self):
"""Remove least recently used data to free memory"""
# Sort by last accessed time
sorted_items = sorted(
self.data_store.items(),
key=lambda x: x[1]['last_accessed']
)
# Remove oldest 25% of data
items_to_remove = len(sorted_items) // 4
for symbol, _ in sorted_items[:items_to_remove]:
del self.data_store[symbol]
print(f"Cleaned up {items_to_remove} datasets to free memory")
Streaming Data Processing¶
Process large datasets without loading everything into memory:
class StreamingDataProcessor:
def __init__(self, chunk_size=1000):
self.chunk_size = chunk_size
def process_large_dataset(self, symbols, start_date, end_date, processing_func):
"""Process large dataset in streaming fashion"""
results = []
# Process in chunks
for i in range(0, len(symbols), self.chunk_size):
chunk_symbols = symbols[i:i + self.chunk_size]
# Get data for chunk
chunk_data = qdb.get_multiple_stocks(
chunk_symbols,
start_date=start_date,
end_date=end_date
)
# Process chunk
chunk_results = []
for symbol, df in chunk_data.items():
if df is not None and not df.empty:
result = processing_func(symbol, df)
chunk_results.append(result)
results.extend(chunk_results)
# Optional: Clear chunk data to free memory
del chunk_data
print(f"Processed chunk {i//self.chunk_size + 1}/{(len(symbols)-1)//self.chunk_size + 1}")
return results
def calculate_rolling_statistics(self, symbol, df):
"""Example processing function - calculate rolling statistics"""
return {
'symbol': symbol,
'mean_return': df['close'].pct_change().mean(),
'volatility': df['close'].pct_change().std(),
'max_drawdown': self.calculate_max_drawdown(df['close']),
'sharpe_ratio': self.calculate_sharpe_ratio(df['close'])
}
def calculate_max_drawdown(self, prices):
"""Calculate maximum drawdown"""
peak = prices.expanding().max()
drawdown = (prices - peak) / peak
return drawdown.min()
def calculate_sharpe_ratio(self, prices, risk_free_rate=0.02):
"""Calculate Sharpe ratio"""
returns = prices.pct_change().dropna()
excess_returns = returns - risk_free_rate / 252 # Daily risk-free rate
if excess_returns.std() == 0:
return 0
return excess_returns.mean() / excess_returns.std() * np.sqrt(252)
# Usage example
processor = StreamingDataProcessor(chunk_size=500)
universe = qdb.get_stock_list()
statistics = processor.process_large_dataset(
symbols=universe,
start_date="20240101",
end_date="20241231",
processing_func=processor.calculate_rolling_statistics
)
print(f"Calculated statistics for {len(statistics)} stocks")
🚀 Production Deployment Best Practices¶
Configuration Management¶
Optimize QuantDB configuration for production environments:
# production_config.py
import os
from pathlib import Path
class ProductionConfig:
def __init__(self):
self.config = self.load_production_config()
def load_production_config(self):
"""Load optimized production configuration"""
return {
'cache': {
'directory': os.getenv('QDB_CACHE_DIR', '/data/quantdb_cache'),
'max_size_gb': int(os.getenv('QDB_CACHE_MAX_SIZE_GB', '50')),
'cleanup_threshold': float(os.getenv('QDB_CACHE_CLEANUP_THRESHOLD', '0.8')),
'ttl_historical_days': int(os.getenv('QDB_TTL_HISTORICAL_DAYS', '365')),
'ttl_current_day_minutes': int(os.getenv('QDB_TTL_CURRENT_DAY_MINUTES', '5')),
'ttl_realtime_seconds': int(os.getenv('QDB_TTL_REALTIME_SECONDS', '30'))
},
'performance': {
'max_concurrent_requests': int(os.getenv('QDB_MAX_CONCURRENT', '10')),
'request_timeout_seconds': int(os.getenv('QDB_REQUEST_TIMEOUT', '30')),
'retry_attempts': int(os.getenv('QDB_RETRY_ATTEMPTS', '3')),
'retry_delay_seconds': float(os.getenv('QDB_RETRY_DELAY', '1.0'))
},
'monitoring': {
'enable_metrics': os.getenv('QDB_ENABLE_METRICS', 'true').lower() == 'true',
'metrics_interval_seconds': int(os.getenv('QDB_METRICS_INTERVAL', '60')),
'log_level': os.getenv('QDB_LOG_LEVEL', 'INFO')
}
}
def apply_configuration(self):
"""Apply configuration to QuantDB"""
# Configure cache
qdb.configure_cache(
cache_dir=self.config['cache']['directory'],
max_size_gb=self.config['cache']['max_size_gb'],
cleanup_threshold=self.config['cache']['cleanup_threshold']
)
# Configure TTL settings
qdb.configure_ttl(
historical_days=self.config['cache']['ttl_historical_days'],
current_day_minutes=self.config['cache']['ttl_current_day_minutes'],
realtime_seconds=self.config['cache']['ttl_realtime_seconds']
)
# Configure performance settings
qdb.configure_performance(
max_concurrent=self.config['performance']['max_concurrent_requests'],
timeout=self.config['performance']['request_timeout_seconds'],
retry_attempts=self.config['performance']['retry_attempts'],
retry_delay=self.config['performance']['retry_delay_seconds']
)
# Configure monitoring
if self.config['monitoring']['enable_metrics']:
qdb.enable_metrics(
interval=self.config['monitoring']['metrics_interval_seconds']
)
qdb.set_log_level(self.config['monitoring']['log_level'])
# Apply production configuration
config = ProductionConfig()
config.apply_configuration()
Health Monitoring and Alerting¶
Implement comprehensive monitoring for production systems:
class QuantDBHealthMonitor:
def __init__(self):
self.health_checks = [
self.check_cache_health,
self.check_performance_metrics,
self.check_error_rates,
self.check_memory_usage
]
self.alert_thresholds = {
'cache_hit_rate_min': 0.8,
'avg_response_time_max_ms': 100,
'error_rate_max': 0.05,
'memory_usage_max_mb': 1000
}
def run_health_check(self):
"""Run comprehensive health check"""
health_status = {
'overall_status': 'healthy',
'checks': {},
'alerts': []
}
for check in self.health_checks:
try:
check_result = check()
health_status['checks'][check.__name__] = check_result
# Check for alerts
alerts = self.evaluate_alerts(check.__name__, check_result)
health_status['alerts'].extend(alerts)
except Exception as e:
health_status['checks'][check.__name__] = {
'status': 'error',
'error': str(e)
}
health_status['overall_status'] = 'unhealthy'
# Determine overall status
if health_status['alerts']:
health_status['overall_status'] = 'warning' if health_status['overall_status'] == 'healthy' else 'unhealthy'
return health_status
def check_cache_health(self):
"""Check cache performance and health"""
stats = qdb.cache_stats()
return {
'status': 'healthy',
'hit_rate': stats.get('hit_rate', 0),
'total_requests': stats.get('total_requests', 0),
'cache_size_mb': stats.get('cache_size_mb', 0),
'last_cleanup': stats.get('last_cleanup', 'never')
}
def check_performance_metrics(self):
"""Check performance metrics"""
stats = qdb.cache_stats()
return {
'status': 'healthy',
'avg_response_time_ms': stats.get('avg_response_time_ms', 0),
'cache_hit_response_time_ms': stats.get('cache_hit_response_time_ms', 0),
'cache_miss_response_time_ms': stats.get('cache_miss_response_time_ms', 0)
}
def check_error_rates(self):
"""Check error rates"""
stats = qdb.cache_stats()
total_requests = stats.get('total_requests', 0)
error_count = stats.get('error_count', 0)
error_rate = error_count / total_requests if total_requests > 0 else 0
return {
'status': 'healthy',
'error_rate': error_rate,
'error_count': error_count,
'total_requests': total_requests
}
def check_memory_usage(self):
"""Check memory usage"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
return {
'status': 'healthy',
'memory_usage_mb': memory_mb,
'memory_percent': process.memory_percent()
}
def evaluate_alerts(self, check_name, check_result):
"""Evaluate if alerts should be triggered"""
alerts = []
if check_name == 'check_cache_health':
if check_result['hit_rate'] < self.alert_thresholds['cache_hit_rate_min']:
alerts.append({
'severity': 'warning',
'message': f"Cache hit rate ({check_result['hit_rate']:.2%}) below threshold ({self.alert_thresholds['cache_hit_rate_min']:.2%})"
})
elif check_name == 'check_performance_metrics':
if check_result['avg_response_time_ms'] > self.alert_thresholds['avg_response_time_max_ms']:
alerts.append({
'severity': 'warning',
'message': f"Average response time ({check_result['avg_response_time_ms']:.1f}ms) above threshold ({self.alert_thresholds['avg_response_time_max_ms']}ms)"
})
elif check_name == 'check_error_rates':
if check_result['error_rate'] > self.alert_thresholds['error_rate_max']:
alerts.append({
'severity': 'critical',
'message': f"Error rate ({check_result['error_rate']:.2%}) above threshold ({self.alert_thresholds['error_rate_max']:.2%})"
})
elif check_name == 'check_memory_usage':
if check_result['memory_usage_mb'] > self.alert_thresholds['memory_usage_max_mb']:
alerts.append({
'severity': 'warning',
'message': f"Memory usage ({check_result['memory_usage_mb']:.1f}MB) above threshold ({self.alert_thresholds['memory_usage_max_mb']}MB)"
})
return alerts
# Usage example
monitor = QuantDBHealthMonitor()
health_status = monitor.run_health_check()
print(f"Overall Status: {health_status['overall_status']}")
if health_status['alerts']:
print("Alerts:")
for alert in health_status['alerts']:
print(f" {alert['severity'].upper()}: {alert['message']}")
💡 Performance Optimization Checklist¶
Development Phase¶
- [ ] Use batch processing for multiple symbols
- [ ] Implement request deduplication
- [ ] Optimize data structures for memory efficiency
- [ ] Configure appropriate cache TTL settings
- [ ] Implement predictive cache warming
Testing Phase¶
- [ ] Benchmark cache hit rates
- [ ] Test memory usage under load
- [ ] Validate error handling and retry logic
- [ ] Test concurrent request handling
- [ ] Measure end-to-end performance
Production Phase¶
- [ ] Configure production-optimized settings
- [ ] Implement health monitoring
- [ ] Set up alerting for key metrics
- [ ] Monitor cache efficiency
- [ ] Regular performance reviews
Maintenance Phase¶
- [ ] Regular cache cleanup
- [ ] Performance trend analysis
- [ ] Capacity planning
- [ ] Configuration tuning
- [ ] Update optimization strategies
🎯 Conclusion¶
Effective performance optimization requires a holistic approach combining smart caching strategies, efficient batch processing, memory management, and production best practices. By implementing these techniques, you can achieve:
- 90%+ cache hit rates with intelligent caching strategies
- 10x throughput improvements with optimized batch processing
- 50% memory reduction with efficient data structures
- 99.9% uptime with proper monitoring and alerting
Remember that optimization is an iterative process. Continuously monitor your system's performance and adjust strategies based on actual usage patterns and requirements.
Related Articles: - QuantDB Architecture Deep Dive - Performance Comparison Study - Migration Guide
Resources: - API Documentation - GitHub Repository - Community Support