π Advanced Usage & Patterns¶
π― What You'll Learn¶
| **π₯ Performance** | **π§ Customization** | **ποΈ Architecture** | **π‘οΈ Production** | |-------------------|---------------------|---------------------|------------------| | Async processing | Custom components | Advanced patterns | Error handling | | Batch operations | Plugin development | Scaling strategies | Monitoring | | Memory optimization | Extension points | Load balancing | Security | | Caching strategies | Custom metrics | Distributed evaluation | Deployment |
β‘ High-Performance Async Operations¶
π Concurrent Model Evaluation¶
#### **Basic Async Pattern** #### **Advanced Async Configuration**
import asyncio
from llm_evaluation_framework.engines.async_inference_engine import AsyncInferenceEngine
from llm_evaluation_framework import ModelRegistry, TestDatasetGenerator
async def high_performance_evaluation():
"""
Execute multiple model evaluations concurrently for maximum throughput
"""
# Initialize components
registry = ModelRegistry()
generator = TestDatasetGenerator()
async_engine = AsyncInferenceEngine(registry)
# Register multiple models
models = {
"gpt-3.5-turbo": {"provider": "openai", "capabilities": ["reasoning"]},
"claude-3-sonnet": {"provider": "anthropic", "capabilities": ["creativity"]},
"gpt-4": {"provider": "openai", "capabilities": ["coding"]}
}
for model_name, config in models.items():
registry.register_model(model_name, config)
# Generate test cases
test_cases = generator.generate_test_cases(
use_case={"domain": "general", "required_capabilities": ["reasoning"]},
count=100
)
# Execute concurrent evaluations
tasks = [
async_engine.evaluate_model_async(
model_name=model,
test_cases=test_cases,
max_concurrent=10, # Concurrent requests per model
timeout=30.0
)
for model in models.keys()
]
# Wait for all evaluations to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
return {model: result for model, result in zip(models.keys(), results)}
# Execute the evaluation
if __name__ == "__main__":
results = asyncio.run(high_performance_evaluation())
for model, result in results.items():
if isinstance(result, Exception):
print(f"β {model}: {result}")
else:
accuracy = result['aggregate_metrics']['accuracy']
cost = result['aggregate_metrics']['total_cost']
print(f"β
{model}: {accuracy:.1%} accuracy, ${cost:.4f} cost")
from llm_evaluation_framework.engines.async_inference_engine import AsyncInferenceEngine
from llm_evaluation_framework.utils.rate_limiter import RateLimiter
from llm_evaluation_framework.utils.retry_handler import RetryHandler
class ProductionAsyncEngine:
def __init__(self, registry, config=None):
self.registry = registry
self.config = config or self._default_config()
# Configure rate limiting
self.rate_limiter = RateLimiter(
requests_per_minute=self.config['rate_limit'],
burst_size=self.config['burst_size']
)
# Configure retry handling
self.retry_handler = RetryHandler(
max_retries=self.config['max_retries'],
backoff_factor=self.config['backoff_factor'],
retry_on_errors=['timeout', 'rate_limit', 'server_error']
)
self.engine = AsyncInferenceEngine(
registry=registry,
rate_limiter=self.rate_limiter,
retry_handler=self.retry_handler
)
def _default_config(self):
return {
'rate_limit': 60, # requests per minute
'burst_size': 10, # burst capacity
'max_retries': 3,
'backoff_factor': 2.0,
'timeout': 30.0,
'max_concurrent': 20
}
async def evaluate_with_monitoring(self, model_name, test_cases):
"""
Execute evaluation with comprehensive monitoring and error handling
"""
start_time = time.time()
try:
# Execute evaluation with monitoring
result = await self.engine.evaluate_model_async(
model_name=model_name,
test_cases=test_cases,
max_concurrent=self.config['max_concurrent'],
timeout=self.config['timeout'],
progress_callback=self._progress_callback,
error_callback=self._error_callback
)
# Add performance metrics
result['performance_metrics'] = {
'total_time': time.time() - start_time,
'throughput': len(test_cases) / (time.time() - start_time),
'error_rate': result.get('error_count', 0) / len(test_cases)
}
return result
except Exception as e:
logger.error(f"Evaluation failed for {model_name}: {e}")
return {'error': str(e), 'model': model_name}
async def _progress_callback(self, completed, total, current_test):
"""Progress monitoring callback"""
progress = (completed / total) * 100
print(f"Progress: {progress:.1f}% ({completed}/{total}) - Current: {current_test['id']}")
async def _error_callback(self, error, test_case):
"""Error handling callback"""
logger.warning(f"Test {test_case['id']} failed: {error}")
π§ Custom Component Development¶
ποΈ Custom Scoring Strategies¶
#### **Domain-Specific Scorer** #### **Multi-Metric Custom Scorer**
from llm_evaluation_framework.evaluation.scoring_strategies import ScoringStrategy
from typing import List, Dict, Any
import re
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class MedicalAccuracyScorer(ScoringStrategy):
"""
Specialized scorer for medical domain evaluations
Considers medical terminology, accuracy, and safety
"""
def __init__(self, medical_terms_file: str = None):
self.medical_terms = self._load_medical_terms(medical_terms_file)
self.vectorizer = TfidfVectorizer(
ngram_range=(1, 3),
max_features=10000,
stop_words='english'
)
def calculate_score(self, predictions: List[str], references: List[str]) -> float:
"""
Calculate medical domain-specific accuracy score
"""
scores = []
for pred, ref in zip(predictions, references):
# Individual scoring components
terminology_score = self._medical_terminology_score(pred, ref)
semantic_score = self._semantic_similarity_score(pred, ref)
safety_score = self._safety_compliance_score(pred)
factual_score = self._factual_accuracy_score(pred, ref)
# Weighted combination
final_score = (
0.3 * terminology_score +
0.25 * semantic_score +
0.25 * safety_score +
0.2 * factual_score
)
scores.append(final_score)
return sum(scores) / len(scores)
def _medical_terminology_score(self, prediction: str, reference: str) -> float:
"""Score based on correct medical terminology usage"""
pred_terms = self._extract_medical_terms(prediction)
ref_terms = self._extract_medical_terms(reference)
if not ref_terms:
return 1.0
# Calculate precision and recall for medical terms
correct_terms = pred_terms.intersection(ref_terms)
precision = len(correct_terms) / len(pred_terms) if pred_terms else 0
recall = len(correct_terms) / len(ref_terms)
# F1 score
if precision + recall == 0:
return 0.0
return 2 * (precision * recall) / (precision + recall)
def _semantic_similarity_score(self, prediction: str, reference: str) -> float:
"""Calculate semantic similarity using TF-IDF vectors"""
try:
corpus = [prediction, reference]
tfidf_matrix = self.vectorizer.fit_transform(corpus)
similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
return similarity
except:
return 0.0
def _safety_compliance_score(self, prediction: str) -> float:
"""Check for medical safety compliance"""
# Check for dangerous advice patterns
danger_patterns = [
r'ignore.{0,10}doctor',
r'stop.{0,10}medication',
r'self.{0,10}diagnose',
r'definitely.{0,10}cancer'
]
safety_violations = sum(
1 for pattern in danger_patterns
if re.search(pattern, prediction.lower())
)
# Return inverted score (fewer violations = higher score)
return max(0, 1.0 - (safety_violations * 0.5))
def _factual_accuracy_score(self, prediction: str, reference: str) -> float:
"""Score factual accuracy using key medical facts"""
# Extract medical facts (symptoms, treatments, conditions)
pred_facts = self._extract_medical_facts(prediction)
ref_facts = self._extract_medical_facts(reference)
if not ref_facts:
return 1.0
# Calculate fact overlap
correct_facts = pred_facts.intersection(ref_facts)
return len(correct_facts) / len(ref_facts)
# Usage example
medical_scorer = MedicalAccuracyScorer('medical_terms.txt')
scoring_context = ScoringContext(medical_scorer)
score = scoring_context.evaluate(predictions, references)
class ComprehensiveScorer(ScoringStrategy):
"""
Advanced scorer combining multiple evaluation dimensions
"""
def __init__(self, weights: Dict[str, float] = None):
self.weights = weights or {
'accuracy': 0.3,
'fluency': 0.2,
'relevance': 0.2,
'creativity': 0.15,
'safety': 0.15
}
# Initialize sub-scorers
self.accuracy_scorer = AccuracyScoringStrategy()
self.fluency_scorer = FluencyScorer()
self.relevance_scorer = RelevanceScorer()
self.creativity_scorer = CreativityScorer()
self.safety_scorer = SafetyScorer()
def calculate_score(self, predictions: List[str], references: List[str]) -> Dict[str, Any]:
"""
Return comprehensive scoring breakdown
"""
# Calculate individual scores
scores = {
'accuracy': self.accuracy_scorer.calculate_score(predictions, references),
'fluency': self.fluency_scorer.calculate_score(predictions),
'relevance': self.relevance_scorer.calculate_score(predictions, references),
'creativity': self.creativity_scorer.calculate_score(predictions),
'safety': self.safety_scorer.calculate_score(predictions)
}
# Calculate weighted overall score
overall_score = sum(
scores[metric] * self.weights[metric]
for metric in scores
)
return {
'overall_score': overall_score,
'component_scores': scores,
'weights_used': self.weights
}
ποΈ Custom Persistence Backends¶
#### **Redis Backend Implementation** #### **Cloud Storage Backend (AWS S3)**
import redis
import json
from typing import Any, Dict, List
from llm_evaluation_framework.persistence.base_store import BaseStore
class RedisStore(BaseStore):
"""
High-performance Redis backend for distributed evaluation results
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.client = redis.Redis(
host=config.get('host', 'localhost'),
port=config.get('port', 6379),
db=config.get('db', 0),
password=config.get('password'),
decode_responses=True
)
self.key_prefix = config.get('key_prefix', 'llm_eval:')
self.ttl = config.get('ttl', 86400) # 24 hours default
def save(self, key: str, data: Dict[str, Any]) -> bool:
"""Save evaluation results to Redis with TTL"""
try:
full_key = f"{self.key_prefix}{key}"
serialized_data = json.dumps(data, default=str)
# Save with TTL
result = self.client.setex(full_key, self.ttl, serialized_data)
# Add to index for querying
self._update_index(key, data)
return result
except Exception as e:
logger.error(f"Redis save failed: {e}")
return False
def load(self, key: str) -> Dict[str, Any]:
"""Load evaluation results from Redis"""
try:
full_key = f"{self.key_prefix}{key}"
data = self.client.get(full_key)
if data is None:
raise KeyError(f"Key not found: {key}")
return json.loads(data)
except Exception as e:
logger.error(f"Redis load failed: {e}")
raise
def query(self, filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Query results with filters using Redis indexes"""
try:
# Build query from filters
query_keys = self._build_query_keys(filters)
results = []
for key in query_keys:
try:
data = self.load(key.split(':')[-1]) # Remove prefix
if self._matches_filters(data, filters):
results.append(data)
except KeyError:
continue
return results
except Exception as e:
logger.error(f"Redis query failed: {e}")
return []
def _update_index(self, key: str, data: Dict[str, Any]):
"""Update indexes for efficient querying"""
# Model name index
if 'model_name' in data:
model_key = f"{self.key_prefix}index:model:{data['model_name']}"
self.client.sadd(model_key, key)
self.client.expire(model_key, self.ttl)
# Timestamp index
if 'timestamp' in data:
timestamp = data['timestamp'][:10] # Date only
date_key = f"{self.key_prefix}index:date:{timestamp}"
self.client.sadd(date_key, key)
self.client.expire(date_key, self.ttl)
# Accuracy range index
if 'aggregate_metrics' in data and 'accuracy' in data['aggregate_metrics']:
accuracy = data['aggregate_metrics']['accuracy']
accuracy_range = f"{int(accuracy * 10)}" # 0-9 range
acc_key = f"{self.key_prefix}index:accuracy:{accuracy_range}"
self.client.sadd(acc_key, key)
self.client.expire(acc_key, self.ttl)
# Configuration and usage
redis_config = {
'host': 'localhost',
'port': 6379,
'db': 0,
'key_prefix': 'llm_eval:',
'ttl': 86400
}
redis_store = RedisStore(redis_config)
persistence_manager = PersistenceManager({'redis': redis_store})
import boto3
import json
from botocore.exceptions import ClientError
from llm_evaluation_framework.persistence.base_store import BaseStore
class S3Store(BaseStore):
"""
AWS S3 backend for scalable evaluation result storage
"""
def __init__(self, config: Dict[str, Any]):
self.bucket_name = config['bucket_name']
self.key_prefix = config.get('key_prefix', 'llm-evaluations/')
self.s3_client = boto3.client(
's3',
aws_access_key_id=config.get('access_key_id'),
aws_secret_access_key=config.get('secret_access_key'),
region_name=config.get('region', 'us-east-1')
)
def save(self, key: str, data: Dict[str, Any]) -> bool:
"""Save to S3 with metadata indexing"""
try:
full_key = f"{self.key_prefix}{key}.json"
# Add metadata for indexing
metadata = self._extract_metadata(data)
# Upload to S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=full_key,
Body=json.dumps(data, default=str),
ContentType='application/json',
Metadata=metadata
)
return True
except ClientError as e:
logger.error(f"S3 save failed: {e}")
return False
def load(self, key: str) -> Dict[str, Any]:
"""Load from S3"""
try:
full_key = f"{self.key_prefix}{key}.json"
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=full_key
)
return json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
raise KeyError(f"Key not found: {key}")
raise
def _extract_metadata(self, data: Dict[str, Any]) -> Dict[str, str]:
"""Extract metadata for S3 object tagging"""
metadata = {}
if 'model_name' in data:
metadata['model-name'] = data['model_name']
if 'aggregate_metrics' in data:
metrics = data['aggregate_metrics']
if 'accuracy' in metrics:
metadata['accuracy'] = str(round(metrics['accuracy'], 3))
if 'total_cost' in metrics:
metadata['total-cost'] = str(round(metrics['total_cost'], 4))
return metadata
ποΈ Advanced Architecture Patterns¶
π Pipeline Pattern Implementation¶
from abc import ABC, abstractmethod
from typing import Any, List, Dict
import logging
class PipelineStage(ABC):
"""Base class for pipeline stages"""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"pipeline.{name}")
@abstractmethod
async def process(self, data: Any) -> Any:
"""Process data through this stage"""
pass
async def __call__(self, data: Any) -> Any:
"""Execute the stage with logging"""
self.logger.info(f"Starting stage: {self.name}")
try:
result = await self.process(data)
self.logger.info(f"Completed stage: {self.name}")
return result
except Exception as e:
self.logger.error(f"Stage {self.name} failed: {e}")
raise
class EvaluationPipeline:
"""
Advanced evaluation pipeline with configurable stages
"""
def __init__(self, stages: List[PipelineStage]):
self.stages = stages
self.logger = logging.getLogger("evaluation_pipeline")
async def execute(self, initial_data: Any) -> Any:
"""Execute the complete pipeline"""
data = initial_data
for i, stage in enumerate(self.stages):
self.logger.info(f"Executing stage {i+1}/{len(self.stages)}: {stage.name}")
data = await stage(data)
return data
# Example pipeline stages
class DataPreprocessingStage(PipelineStage):
async def process(self, data: Dict) -> Dict:
"""Preprocess and validate input data"""
# Data cleaning and validation
processed_data = {
'test_cases': self._clean_test_cases(data['test_cases']),
'model_config': self._validate_model_config(data['model_config']),
'evaluation_config': data.get('evaluation_config', {})
}
return processed_data
class ModelExecutionStage(PipelineStage):
def __init__(self, engine):
super().__init__("model_execution")
self.engine = engine
async def process(self, data: Dict) -> Dict:
"""Execute model inference"""
results = await self.engine.evaluate_model_async(
model_name=data['model_config']['name'],
test_cases=data['test_cases']
)
data['raw_results'] = results
return data
class ScoringStage(PipelineStage):
def __init__(self, scoring_strategies: List):
super().__init__("scoring")
self.scoring_strategies = scoring_strategies
async def process(self, data: Dict) -> Dict:
"""Apply multiple scoring strategies"""
scores = {}
for strategy in self.scoring_strategies:
strategy_name = strategy.__class__.__name__
scores[strategy_name] = strategy.calculate_score(
data['raw_results']['predictions'],
data['raw_results']['references']
)
data['scores'] = scores
return data
# Pipeline usage
pipeline = EvaluationPipeline([
DataPreprocessingStage("preprocessing"),
ModelExecutionStage(async_engine),
ScoringStage([AccuracyScorer(), F1Scorer(), CustomScorer()]),
ResultsAggregationStage("aggregation"),
PersistenceStage("persistence")
])
results = await pipeline.execute(initial_evaluation_data)
π‘οΈ Production-Ready Error Handling¶
π§ Comprehensive Error Management¶
import asyncio
import logging
from typing import Optional, Callable, Any
from functools import wraps
from enum import Enum
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ProductionErrorHandler:
"""
Enterprise-grade error handling with recovery strategies
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger("error_handler")
self.error_counts = {}
self.circuit_breakers = {}
def handle_with_retry(
self,
max_retries: int = 3,
backoff_factor: float = 2.0,
retry_on: List[Exception] = None
):
"""Decorator for automatic retry with exponential backoff"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
retry_on_exceptions = retry_on or [Exception]
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
self._log_final_failure(func.__name__, e, attempt + 1)
raise
if not any(isinstance(e, exc_type) for exc_type in retry_on_exceptions):
raise
wait_time = backoff_factor ** attempt
self.logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {e}. "
f"Retrying in {wait_time}s"
)
await asyncio.sleep(wait_time)
return wrapper
return decorator
def circuit_breaker(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60
):
"""Circuit breaker pattern for failing services"""
def decorator(func):
breaker_key = func.__name__
@wraps(func)
async def wrapper(*args, **kwargs):
breaker = self.circuit_breakers.get(breaker_key, {
'failures': 0,
'last_failure': 0,
'state': 'closed' # closed, open, half-open
})
current_time = time.time()
# Check circuit breaker state
if breaker['state'] == 'open':
if current_time - breaker['last_failure'] > recovery_timeout:
breaker['state'] = 'half-open'
self.logger.info(f"Circuit breaker {breaker_key} entering half-open state")
else:
raise CircuitBreakerOpenError(f"Circuit breaker {breaker_key} is open")
try:
result = await func(*args, **kwargs)
# Reset on success
if breaker['state'] == 'half-open':
breaker['state'] = 'closed'
breaker['failures'] = 0
self.logger.info(f"Circuit breaker {breaker_key} closed")
self.circuit_breakers[breaker_key] = breaker
return result
except Exception as e:
breaker['failures'] += 1
breaker['last_failure'] = current_time
if breaker['failures'] >= failure_threshold:
breaker['state'] = 'open'
self.logger.error(f"Circuit breaker {breaker_key} opened after {breaker['failures']} failures")
self.circuit_breakers[breaker_key] = breaker
raise
return wrapper
return decorator
# Usage examples
error_handler = ProductionErrorHandler(config)
@error_handler.handle_with_retry(max_retries=3, backoff_factor=2.0)
@error_handler.circuit_breaker(failure_threshold=5, recovery_timeout=60)
async def reliable_model_call(model_name: str, prompt: str):
"""
Model call with comprehensive error handling
"""
# Implementation with automatic retry and circuit breaker
return await model_api_call(model_name, prompt)
π Advanced Monitoring & Analytics¶
π Performance Monitoring¶
import time
import psutil
import asyncio
from contextlib import asynccontextmanager
from typing import Dict, Any
class PerformanceMonitor:
"""
Comprehensive performance monitoring for evaluation workflows
"""
def __init__(self):
self.metrics = {}
self.active_operations = {}
@asynccontextmanager
async def monitor_operation(self, operation_name: str, metadata: Dict = None):
"""Context manager for monitoring operation performance"""
start_time = time.time()
start_memory = psutil.virtual_memory().used
start_cpu = psutil.cpu_percent()
operation_id = f"{operation_name}_{int(start_time)}"
self.active_operations[operation_id] = {
'name': operation_name,
'start_time': start_time,
'metadata': metadata or {}
}
try:
yield operation_id
finally:
end_time = time.time()
end_memory = psutil.virtual_memory().used
end_cpu = psutil.cpu_percent()
metrics = {
'operation': operation_name,
'duration': end_time - start_time,
'memory_delta': end_memory - start_memory,
'cpu_usage': (start_cpu + end_cpu) / 2,
'start_time': start_time,
'end_time': end_time,
'metadata': metadata or {}
}
self._record_metrics(operation_id, metrics)
del self.active_operations[operation_id]
def _record_metrics(self, operation_id: str, metrics: Dict[str, Any]):
"""Record performance metrics"""
if metrics['operation'] not in self.metrics:
self.metrics[metrics['operation']] = []
self.metrics[metrics['operation']].append(metrics)
# Log performance alerts
self._check_performance_alerts(metrics)
def _check_performance_alerts(self, metrics: Dict[str, Any]):
"""Check for performance issues and alert"""
# Memory usage alert
if metrics['memory_delta'] > 100 * 1024 * 1024: # 100MB
logger.warning(
f"High memory usage in {metrics['operation']}: "
f"{metrics['memory_delta'] / 1024 / 1024:.1f}MB"
)
# Duration alert
if metrics['duration'] > 30: # 30 seconds
logger.warning(
f"Slow operation {metrics['operation']}: "
f"{metrics['duration']:.1f}s"
)
def get_performance_report(self, operation_name: str = None) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
if operation_name:
data = self.metrics.get(operation_name, [])
else:
data = []
for op_metrics in self.metrics.values():
data.extend(op_metrics)
if not data:
return {"error": "No metrics available"}
# Calculate statistics
durations = [m['duration'] for m in data]
memory_deltas = [m['memory_delta'] for m in data]
return {
'total_operations': len(data),
'avg_duration': sum(durations) / len(durations),
'max_duration': max(durations),
'min_duration': min(durations),
'avg_memory_delta': sum(memory_deltas) / len(memory_deltas),
'max_memory_delta': max(memory_deltas),
'operations_by_type': {
op: len(metrics) for op, metrics in self.metrics.items()
}
}
# Usage in evaluation workflow
monitor = PerformanceMonitor()
async def monitored_evaluation():
async with monitor.monitor_operation("full_evaluation", {"model": "gpt-3.5-turbo"}):
async with monitor.monitor_operation("data_generation"):
test_cases = generator.generate_test_cases(use_case, count=100)
async with monitor.monitor_operation("model_inference"):
results = await engine.evaluate_model_async("gpt-3.5-turbo", test_cases)
async with monitor.monitor_operation("scoring"):
scores = scoring_context.evaluate(results['predictions'], results['references'])
# Generate performance report
report = monitor.get_performance_report()
print(f"Evaluation completed in {report['avg_duration']:.2f}s")
π Deployment & Scaling Strategies¶
π³ Docker Containerization¶
# Production Dockerfile
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Create app directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash llmeval
RUN chown -R llmeval:llmeval /app
USER llmeval
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "from llm_evaluation_framework import ModelRegistry; registry = ModelRegistry(); print('OK')"
# Default command
CMD ["python", "-m", "llm_evaluation_framework.cli"]
# Docker Compose for distributed evaluation
version: '3.8'
services:
llm-evaluator:
build: .
environment:
- REDIS_HOST=redis
- DATABASE_URL=postgresql://user:pass@postgres:5432/llmeval
depends_on:
- redis
- postgres
volumes:
- ./config:/app/config
- ./results:/app/results
deploy:
replicas: 3
resources:
limits:
memory: 2G
reservations:
memory: 1G
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=llmeval
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
redis_data:
postgres_data:
βΈοΈ Kubernetes Deployment¶
# Kubernetes deployment configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-evaluation-framework
labels:
app: llm-evaluator
spec:
replicas: 5
selector:
matchLabels:
app: llm-evaluator
template:
metadata:
labels:
app: llm-evaluator
spec:
containers:
- name: llm-evaluator
image: llm-evaluation-framework:latest
ports:
- containerPort: 8000
env:
- name: REDIS_HOST
value: "redis-service"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: llm-evaluator-service
spec:
selector:
app: llm-evaluator
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-evaluator-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-evaluation-framework
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
## π Master Advanced Patterns **You've learned the most powerful techniques for production LLM evaluation!** **Ready to implement these patterns?** [](developer-guide.md) [](developer-guide.md#custom-components) [](logging-and-error-handling.md) --- *Advanced patterns for enterprise-scale LLM evaluation systems! π*