Skip to content

πŸš€ Advanced Usage & Patterns

![Advanced Usage](https://img.shields.io/badge/Advanced%20Usage-Power%20User%20Guide-8b5cf6?style=for-the-badge&logo=rocket&logoColor=white) **Master advanced patterns, optimization techniques, and enterprise-grade implementations** *Unlock the full potential of the LLM Evaluation Framework*

🎯 What You'll Learn

| **πŸ”₯ Performance** | **πŸ”§ Customization** | **πŸ—οΈ Architecture** | **πŸ›‘οΈ Production** | |-------------------|---------------------|---------------------|------------------| | Async processing | Custom components | Advanced patterns | Error handling | | Batch operations | Plugin development | Scaling strategies | Monitoring | | Memory optimization | Extension points | Load balancing | Security | | Caching strategies | Custom metrics | Distributed evaluation | Deployment |

⚑ High-Performance Async Operations

πŸš€ Concurrent Model Evaluation

#### **Basic Async Pattern**
import asyncio
from llm_evaluation_framework.engines.async_inference_engine import AsyncInferenceEngine
from llm_evaluation_framework import ModelRegistry, TestDatasetGenerator

async def high_performance_evaluation():
    """
    Execute multiple model evaluations concurrently for maximum throughput
    """
    # Initialize components
    registry = ModelRegistry()
    generator = TestDatasetGenerator()
    async_engine = AsyncInferenceEngine(registry)

    # Register multiple models
    models = {
        "gpt-3.5-turbo": {"provider": "openai", "capabilities": ["reasoning"]},
        "claude-3-sonnet": {"provider": "anthropic", "capabilities": ["creativity"]},
        "gpt-4": {"provider": "openai", "capabilities": ["coding"]}
    }

    for model_name, config in models.items():
        registry.register_model(model_name, config)

    # Generate test cases
    test_cases = generator.generate_test_cases(
        use_case={"domain": "general", "required_capabilities": ["reasoning"]},
        count=100
    )

    # Execute concurrent evaluations
    tasks = [
        async_engine.evaluate_model_async(
            model_name=model,
            test_cases=test_cases,
            max_concurrent=10,  # Concurrent requests per model
            timeout=30.0
        )
        for model in models.keys()
    ]

    # Wait for all evaluations to complete
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return {model: result for model, result in zip(models.keys(), results)}

# Execute the evaluation
if __name__ == "__main__":
    results = asyncio.run(high_performance_evaluation())

    for model, result in results.items():
        if isinstance(result, Exception):
            print(f"❌ {model}: {result}")
        else:
            accuracy = result['aggregate_metrics']['accuracy']
            cost = result['aggregate_metrics']['total_cost']
            print(f"βœ… {model}: {accuracy:.1%} accuracy, ${cost:.4f} cost")
#### **Advanced Async Configuration**
from llm_evaluation_framework.engines.async_inference_engine import AsyncInferenceEngine
from llm_evaluation_framework.utils.rate_limiter import RateLimiter
from llm_evaluation_framework.utils.retry_handler import RetryHandler

class ProductionAsyncEngine:
    def __init__(self, registry, config=None):
        self.registry = registry
        self.config = config or self._default_config()

        # Configure rate limiting
        self.rate_limiter = RateLimiter(
            requests_per_minute=self.config['rate_limit'],
            burst_size=self.config['burst_size']
        )

        # Configure retry handling
        self.retry_handler = RetryHandler(
            max_retries=self.config['max_retries'],
            backoff_factor=self.config['backoff_factor'],
            retry_on_errors=['timeout', 'rate_limit', 'server_error']
        )

        self.engine = AsyncInferenceEngine(
            registry=registry,
            rate_limiter=self.rate_limiter,
            retry_handler=self.retry_handler
        )

    def _default_config(self):
        return {
            'rate_limit': 60,  # requests per minute
            'burst_size': 10,  # burst capacity
            'max_retries': 3,
            'backoff_factor': 2.0,
            'timeout': 30.0,
            'max_concurrent': 20
        }

    async def evaluate_with_monitoring(self, model_name, test_cases):
        """
        Execute evaluation with comprehensive monitoring and error handling
        """
        start_time = time.time()

        try:
            # Execute evaluation with monitoring
            result = await self.engine.evaluate_model_async(
                model_name=model_name,
                test_cases=test_cases,
                max_concurrent=self.config['max_concurrent'],
                timeout=self.config['timeout'],
                progress_callback=self._progress_callback,
                error_callback=self._error_callback
            )

            # Add performance metrics
            result['performance_metrics'] = {
                'total_time': time.time() - start_time,
                'throughput': len(test_cases) / (time.time() - start_time),
                'error_rate': result.get('error_count', 0) / len(test_cases)
            }

            return result

        except Exception as e:
            logger.error(f"Evaluation failed for {model_name}: {e}")
            return {'error': str(e), 'model': model_name}

    async def _progress_callback(self, completed, total, current_test):
        """Progress monitoring callback"""
        progress = (completed / total) * 100
        print(f"Progress: {progress:.1f}% ({completed}/{total}) - Current: {current_test['id']}")

    async def _error_callback(self, error, test_case):
        """Error handling callback"""
        logger.warning(f"Test {test_case['id']} failed: {error}")

πŸ”§ Custom Component Development

πŸ—οΈ Custom Scoring Strategies

#### **Domain-Specific Scorer**
from llm_evaluation_framework.evaluation.scoring_strategies import ScoringStrategy
from typing import List, Dict, Any
import re
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

class MedicalAccuracyScorer(ScoringStrategy):
    """
    Specialized scorer for medical domain evaluations
    Considers medical terminology, accuracy, and safety
    """

    def __init__(self, medical_terms_file: str = None):
        self.medical_terms = self._load_medical_terms(medical_terms_file)
        self.vectorizer = TfidfVectorizer(
            ngram_range=(1, 3),
            max_features=10000,
            stop_words='english'
        )

    def calculate_score(self, predictions: List[str], references: List[str]) -> float:
        """
        Calculate medical domain-specific accuracy score
        """
        scores = []

        for pred, ref in zip(predictions, references):
            # Individual scoring components
            terminology_score = self._medical_terminology_score(pred, ref)
            semantic_score = self._semantic_similarity_score(pred, ref)
            safety_score = self._safety_compliance_score(pred)
            factual_score = self._factual_accuracy_score(pred, ref)

            # Weighted combination
            final_score = (
                0.3 * terminology_score +
                0.25 * semantic_score +
                0.25 * safety_score +
                0.2 * factual_score
            )

            scores.append(final_score)

        return sum(scores) / len(scores)

    def _medical_terminology_score(self, prediction: str, reference: str) -> float:
        """Score based on correct medical terminology usage"""
        pred_terms = self._extract_medical_terms(prediction)
        ref_terms = self._extract_medical_terms(reference)

        if not ref_terms:
            return 1.0

        # Calculate precision and recall for medical terms
        correct_terms = pred_terms.intersection(ref_terms)
        precision = len(correct_terms) / len(pred_terms) if pred_terms else 0
        recall = len(correct_terms) / len(ref_terms)

        # F1 score
        if precision + recall == 0:
            return 0.0
        return 2 * (precision * recall) / (precision + recall)

    def _semantic_similarity_score(self, prediction: str, reference: str) -> float:
        """Calculate semantic similarity using TF-IDF vectors"""
        try:
            corpus = [prediction, reference]
            tfidf_matrix = self.vectorizer.fit_transform(corpus)
            similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
            return similarity
        except:
            return 0.0

    def _safety_compliance_score(self, prediction: str) -> float:
        """Check for medical safety compliance"""
        # Check for dangerous advice patterns
        danger_patterns = [
            r'ignore.{0,10}doctor',
            r'stop.{0,10}medication',
            r'self.{0,10}diagnose',
            r'definitely.{0,10}cancer'
        ]

        safety_violations = sum(
            1 for pattern in danger_patterns
            if re.search(pattern, prediction.lower())
        )

        # Return inverted score (fewer violations = higher score)
        return max(0, 1.0 - (safety_violations * 0.5))

    def _factual_accuracy_score(self, prediction: str, reference: str) -> float:
        """Score factual accuracy using key medical facts"""
        # Extract medical facts (symptoms, treatments, conditions)
        pred_facts = self._extract_medical_facts(prediction)
        ref_facts = self._extract_medical_facts(reference)

        if not ref_facts:
            return 1.0

        # Calculate fact overlap
        correct_facts = pred_facts.intersection(ref_facts)
        return len(correct_facts) / len(ref_facts)

# Usage example
medical_scorer = MedicalAccuracyScorer('medical_terms.txt')
scoring_context = ScoringContext(medical_scorer)
score = scoring_context.evaluate(predictions, references)
#### **Multi-Metric Custom Scorer**
class ComprehensiveScorer(ScoringStrategy):
    """
    Advanced scorer combining multiple evaluation dimensions
    """

    def __init__(self, weights: Dict[str, float] = None):
        self.weights = weights or {
            'accuracy': 0.3,
            'fluency': 0.2,
            'relevance': 0.2,
            'creativity': 0.15,
            'safety': 0.15
        }

        # Initialize sub-scorers
        self.accuracy_scorer = AccuracyScoringStrategy()
        self.fluency_scorer = FluencyScorer()
        self.relevance_scorer = RelevanceScorer()
        self.creativity_scorer = CreativityScorer()
        self.safety_scorer = SafetyScorer()

    def calculate_score(self, predictions: List[str], references: List[str]) -> Dict[str, Any]:
        """
        Return comprehensive scoring breakdown
        """
        # Calculate individual scores
        scores = {
            'accuracy': self.accuracy_scorer.calculate_score(predictions, references),
            'fluency': self.fluency_scorer.calculate_score(predictions),
            'relevance': self.relevance_scorer.calculate_score(predictions, references),
            'creativity': self.creativity_scorer.calculate_score(predictions),
            'safety': self.safety_scorer.calculate_score(predictions)
        }

        # Calculate weighted overall score
        overall_score = sum(
            scores[metric] * self.weights[metric]
            for metric in scores
        )

        return {
            'overall_score': overall_score,
            'component_scores': scores,
            'weights_used': self.weights
        }

πŸ—„οΈ Custom Persistence Backends

#### **Redis Backend Implementation**
import redis
import json
from typing import Any, Dict, List
from llm_evaluation_framework.persistence.base_store import BaseStore

class RedisStore(BaseStore):
    """
    High-performance Redis backend for distributed evaluation results
    """

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.client = redis.Redis(
            host=config.get('host', 'localhost'),
            port=config.get('port', 6379),
            db=config.get('db', 0),
            password=config.get('password'),
            decode_responses=True
        )
        self.key_prefix = config.get('key_prefix', 'llm_eval:')
        self.ttl = config.get('ttl', 86400)  # 24 hours default

    def save(self, key: str, data: Dict[str, Any]) -> bool:
        """Save evaluation results to Redis with TTL"""
        try:
            full_key = f"{self.key_prefix}{key}"
            serialized_data = json.dumps(data, default=str)

            # Save with TTL
            result = self.client.setex(full_key, self.ttl, serialized_data)

            # Add to index for querying
            self._update_index(key, data)

            return result
        except Exception as e:
            logger.error(f"Redis save failed: {e}")
            return False

    def load(self, key: str) -> Dict[str, Any]:
        """Load evaluation results from Redis"""
        try:
            full_key = f"{self.key_prefix}{key}"
            data = self.client.get(full_key)

            if data is None:
                raise KeyError(f"Key not found: {key}")

            return json.loads(data)
        except Exception as e:
            logger.error(f"Redis load failed: {e}")
            raise

    def query(self, filters: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Query results with filters using Redis indexes"""
        try:
            # Build query from filters
            query_keys = self._build_query_keys(filters)

            results = []
            for key in query_keys:
                try:
                    data = self.load(key.split(':')[-1])  # Remove prefix
                    if self._matches_filters(data, filters):
                        results.append(data)
                except KeyError:
                    continue

            return results
        except Exception as e:
            logger.error(f"Redis query failed: {e}")
            return []

    def _update_index(self, key: str, data: Dict[str, Any]):
        """Update indexes for efficient querying"""
        # Model name index
        if 'model_name' in data:
            model_key = f"{self.key_prefix}index:model:{data['model_name']}"
            self.client.sadd(model_key, key)
            self.client.expire(model_key, self.ttl)

        # Timestamp index
        if 'timestamp' in data:
            timestamp = data['timestamp'][:10]  # Date only
            date_key = f"{self.key_prefix}index:date:{timestamp}"
            self.client.sadd(date_key, key)
            self.client.expire(date_key, self.ttl)

        # Accuracy range index
        if 'aggregate_metrics' in data and 'accuracy' in data['aggregate_metrics']:
            accuracy = data['aggregate_metrics']['accuracy']
            accuracy_range = f"{int(accuracy * 10)}"  # 0-9 range
            acc_key = f"{self.key_prefix}index:accuracy:{accuracy_range}"
            self.client.sadd(acc_key, key)
            self.client.expire(acc_key, self.ttl)

# Configuration and usage
redis_config = {
    'host': 'localhost',
    'port': 6379,
    'db': 0,
    'key_prefix': 'llm_eval:',
    'ttl': 86400
}

redis_store = RedisStore(redis_config)
persistence_manager = PersistenceManager({'redis': redis_store})
#### **Cloud Storage Backend (AWS S3)**
import boto3
import json
from botocore.exceptions import ClientError
from llm_evaluation_framework.persistence.base_store import BaseStore

class S3Store(BaseStore):
    """
    AWS S3 backend for scalable evaluation result storage
    """

    def __init__(self, config: Dict[str, Any]):
        self.bucket_name = config['bucket_name']
        self.key_prefix = config.get('key_prefix', 'llm-evaluations/')

        self.s3_client = boto3.client(
            's3',
            aws_access_key_id=config.get('access_key_id'),
            aws_secret_access_key=config.get('secret_access_key'),
            region_name=config.get('region', 'us-east-1')
        )

    def save(self, key: str, data: Dict[str, Any]) -> bool:
        """Save to S3 with metadata indexing"""
        try:
            full_key = f"{self.key_prefix}{key}.json"

            # Add metadata for indexing
            metadata = self._extract_metadata(data)

            # Upload to S3
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=full_key,
                Body=json.dumps(data, default=str),
                ContentType='application/json',
                Metadata=metadata
            )

            return True
        except ClientError as e:
            logger.error(f"S3 save failed: {e}")
            return False

    def load(self, key: str) -> Dict[str, Any]:
        """Load from S3"""
        try:
            full_key = f"{self.key_prefix}{key}.json"

            response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=full_key
            )

            return json.loads(response['Body'].read())
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                raise KeyError(f"Key not found: {key}")
            raise

    def _extract_metadata(self, data: Dict[str, Any]) -> Dict[str, str]:
        """Extract metadata for S3 object tagging"""
        metadata = {}

        if 'model_name' in data:
            metadata['model-name'] = data['model_name']

        if 'aggregate_metrics' in data:
            metrics = data['aggregate_metrics']
            if 'accuracy' in metrics:
                metadata['accuracy'] = str(round(metrics['accuracy'], 3))
            if 'total_cost' in metrics:
                metadata['total-cost'] = str(round(metrics['total_cost'], 4))

        return metadata

πŸ—οΈ Advanced Architecture Patterns

πŸ”„ Pipeline Pattern Implementation

from abc import ABC, abstractmethod
from typing import Any, List, Dict
import logging

class PipelineStage(ABC):
    """Base class for pipeline stages"""

    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(f"pipeline.{name}")

    @abstractmethod
    async def process(self, data: Any) -> Any:
        """Process data through this stage"""
        pass

    async def __call__(self, data: Any) -> Any:
        """Execute the stage with logging"""
        self.logger.info(f"Starting stage: {self.name}")
        try:
            result = await self.process(data)
            self.logger.info(f"Completed stage: {self.name}")
            return result
        except Exception as e:
            self.logger.error(f"Stage {self.name} failed: {e}")
            raise

class EvaluationPipeline:
    """
    Advanced evaluation pipeline with configurable stages
    """

    def __init__(self, stages: List[PipelineStage]):
        self.stages = stages
        self.logger = logging.getLogger("evaluation_pipeline")

    async def execute(self, initial_data: Any) -> Any:
        """Execute the complete pipeline"""
        data = initial_data

        for i, stage in enumerate(self.stages):
            self.logger.info(f"Executing stage {i+1}/{len(self.stages)}: {stage.name}")
            data = await stage(data)

        return data

# Example pipeline stages
class DataPreprocessingStage(PipelineStage):
    async def process(self, data: Dict) -> Dict:
        """Preprocess and validate input data"""
        # Data cleaning and validation
        processed_data = {
            'test_cases': self._clean_test_cases(data['test_cases']),
            'model_config': self._validate_model_config(data['model_config']),
            'evaluation_config': data.get('evaluation_config', {})
        }
        return processed_data

class ModelExecutionStage(PipelineStage):
    def __init__(self, engine):
        super().__init__("model_execution")
        self.engine = engine

    async def process(self, data: Dict) -> Dict:
        """Execute model inference"""
        results = await self.engine.evaluate_model_async(
            model_name=data['model_config']['name'],
            test_cases=data['test_cases']
        )
        data['raw_results'] = results
        return data

class ScoringStage(PipelineStage):
    def __init__(self, scoring_strategies: List):
        super().__init__("scoring")
        self.scoring_strategies = scoring_strategies

    async def process(self, data: Dict) -> Dict:
        """Apply multiple scoring strategies"""
        scores = {}
        for strategy in self.scoring_strategies:
            strategy_name = strategy.__class__.__name__
            scores[strategy_name] = strategy.calculate_score(
                data['raw_results']['predictions'],
                data['raw_results']['references']
            )

        data['scores'] = scores
        return data

# Pipeline usage
pipeline = EvaluationPipeline([
    DataPreprocessingStage("preprocessing"),
    ModelExecutionStage(async_engine),
    ScoringStage([AccuracyScorer(), F1Scorer(), CustomScorer()]),
    ResultsAggregationStage("aggregation"),
    PersistenceStage("persistence")
])

results = await pipeline.execute(initial_evaluation_data)

πŸ›‘οΈ Production-Ready Error Handling

πŸ”§ Comprehensive Error Management

import asyncio
import logging
from typing import Optional, Callable, Any
from functools import wraps
from enum import Enum

class ErrorSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class ProductionErrorHandler:
    """
    Enterprise-grade error handling with recovery strategies
    """

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger("error_handler")
        self.error_counts = {}
        self.circuit_breakers = {}

    def handle_with_retry(
        self, 
        max_retries: int = 3, 
        backoff_factor: float = 2.0,
        retry_on: List[Exception] = None
    ):
        """Decorator for automatic retry with exponential backoff"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                retry_on_exceptions = retry_on or [Exception]

                for attempt in range(max_retries + 1):
                    try:
                        return await func(*args, **kwargs)
                    except Exception as e:
                        if attempt == max_retries:
                            self._log_final_failure(func.__name__, e, attempt + 1)
                            raise

                        if not any(isinstance(e, exc_type) for exc_type in retry_on_exceptions):
                            raise

                        wait_time = backoff_factor ** attempt
                        self.logger.warning(
                            f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {e}. "
                            f"Retrying in {wait_time}s"
                        )
                        await asyncio.sleep(wait_time)

            return wrapper
        return decorator

    def circuit_breaker(
        self, 
        failure_threshold: int = 5, 
        recovery_timeout: int = 60
    ):
        """Circuit breaker pattern for failing services"""
        def decorator(func):
            breaker_key = func.__name__

            @wraps(func)
            async def wrapper(*args, **kwargs):
                breaker = self.circuit_breakers.get(breaker_key, {
                    'failures': 0,
                    'last_failure': 0,
                    'state': 'closed'  # closed, open, half-open
                })

                current_time = time.time()

                # Check circuit breaker state
                if breaker['state'] == 'open':
                    if current_time - breaker['last_failure'] > recovery_timeout:
                        breaker['state'] = 'half-open'
                        self.logger.info(f"Circuit breaker {breaker_key} entering half-open state")
                    else:
                        raise CircuitBreakerOpenError(f"Circuit breaker {breaker_key} is open")

                try:
                    result = await func(*args, **kwargs)

                    # Reset on success
                    if breaker['state'] == 'half-open':
                        breaker['state'] = 'closed'
                        breaker['failures'] = 0
                        self.logger.info(f"Circuit breaker {breaker_key} closed")

                    self.circuit_breakers[breaker_key] = breaker
                    return result

                except Exception as e:
                    breaker['failures'] += 1
                    breaker['last_failure'] = current_time

                    if breaker['failures'] >= failure_threshold:
                        breaker['state'] = 'open'
                        self.logger.error(f"Circuit breaker {breaker_key} opened after {breaker['failures']} failures")

                    self.circuit_breakers[breaker_key] = breaker
                    raise

            return wrapper
        return decorator

# Usage examples
error_handler = ProductionErrorHandler(config)

@error_handler.handle_with_retry(max_retries=3, backoff_factor=2.0)
@error_handler.circuit_breaker(failure_threshold=5, recovery_timeout=60)
async def reliable_model_call(model_name: str, prompt: str):
    """
    Model call with comprehensive error handling
    """
    # Implementation with automatic retry and circuit breaker
    return await model_api_call(model_name, prompt)

πŸ“Š Advanced Monitoring & Analytics

πŸ“ˆ Performance Monitoring

import time
import psutil
import asyncio
from contextlib import asynccontextmanager
from typing import Dict, Any

class PerformanceMonitor:
    """
    Comprehensive performance monitoring for evaluation workflows
    """

    def __init__(self):
        self.metrics = {}
        self.active_operations = {}

    @asynccontextmanager
    async def monitor_operation(self, operation_name: str, metadata: Dict = None):
        """Context manager for monitoring operation performance"""
        start_time = time.time()
        start_memory = psutil.virtual_memory().used
        start_cpu = psutil.cpu_percent()

        operation_id = f"{operation_name}_{int(start_time)}"

        self.active_operations[operation_id] = {
            'name': operation_name,
            'start_time': start_time,
            'metadata': metadata or {}
        }

        try:
            yield operation_id
        finally:
            end_time = time.time()
            end_memory = psutil.virtual_memory().used
            end_cpu = psutil.cpu_percent()

            metrics = {
                'operation': operation_name,
                'duration': end_time - start_time,
                'memory_delta': end_memory - start_memory,
                'cpu_usage': (start_cpu + end_cpu) / 2,
                'start_time': start_time,
                'end_time': end_time,
                'metadata': metadata or {}
            }

            self._record_metrics(operation_id, metrics)
            del self.active_operations[operation_id]

    def _record_metrics(self, operation_id: str, metrics: Dict[str, Any]):
        """Record performance metrics"""
        if metrics['operation'] not in self.metrics:
            self.metrics[metrics['operation']] = []

        self.metrics[metrics['operation']].append(metrics)

        # Log performance alerts
        self._check_performance_alerts(metrics)

    def _check_performance_alerts(self, metrics: Dict[str, Any]):
        """Check for performance issues and alert"""
        # Memory usage alert
        if metrics['memory_delta'] > 100 * 1024 * 1024:  # 100MB
            logger.warning(
                f"High memory usage in {metrics['operation']}: "
                f"{metrics['memory_delta'] / 1024 / 1024:.1f}MB"
            )

        # Duration alert
        if metrics['duration'] > 30:  # 30 seconds
            logger.warning(
                f"Slow operation {metrics['operation']}: "
                f"{metrics['duration']:.1f}s"
            )

    def get_performance_report(self, operation_name: str = None) -> Dict[str, Any]:
        """Generate comprehensive performance report"""
        if operation_name:
            data = self.metrics.get(operation_name, [])
        else:
            data = []
            for op_metrics in self.metrics.values():
                data.extend(op_metrics)

        if not data:
            return {"error": "No metrics available"}

        # Calculate statistics
        durations = [m['duration'] for m in data]
        memory_deltas = [m['memory_delta'] for m in data]

        return {
            'total_operations': len(data),
            'avg_duration': sum(durations) / len(durations),
            'max_duration': max(durations),
            'min_duration': min(durations),
            'avg_memory_delta': sum(memory_deltas) / len(memory_deltas),
            'max_memory_delta': max(memory_deltas),
            'operations_by_type': {
                op: len(metrics) for op, metrics in self.metrics.items()
            }
        }

# Usage in evaluation workflow
monitor = PerformanceMonitor()

async def monitored_evaluation():
    async with monitor.monitor_operation("full_evaluation", {"model": "gpt-3.5-turbo"}):

        async with monitor.monitor_operation("data_generation"):
            test_cases = generator.generate_test_cases(use_case, count=100)

        async with monitor.monitor_operation("model_inference"):
            results = await engine.evaluate_model_async("gpt-3.5-turbo", test_cases)

        async with monitor.monitor_operation("scoring"):
            scores = scoring_context.evaluate(results['predictions'], results['references'])

    # Generate performance report
    report = monitor.get_performance_report()
    print(f"Evaluation completed in {report['avg_duration']:.2f}s")

πŸš€ Deployment & Scaling Strategies

🐳 Docker Containerization

# Production Dockerfile
FROM python:3.11-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Create app directory
WORKDIR /app

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash llmeval
RUN chown -R llmeval:llmeval /app
USER llmeval

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD python -c "from llm_evaluation_framework import ModelRegistry; registry = ModelRegistry(); print('OK')"

# Default command
CMD ["python", "-m", "llm_evaluation_framework.cli"]
# Docker Compose for distributed evaluation
version: '3.8'

services:
  llm-evaluator:
    build: .
    environment:
      - REDIS_HOST=redis
      - DATABASE_URL=postgresql://user:pass@postgres:5432/llmeval
    depends_on:
      - redis
      - postgres
    volumes:
      - ./config:/app/config
      - ./results:/app/results
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 2G
        reservations:
          memory: 1G

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=llmeval
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  redis_data:
  postgres_data:

☸️ Kubernetes Deployment

# Kubernetes deployment configuration
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-evaluation-framework
  labels:
    app: llm-evaluator
spec:
  replicas: 5
  selector:
    matchLabels:
      app: llm-evaluator
  template:
    metadata:
      labels:
        app: llm-evaluator
    spec:
      containers:
      - name: llm-evaluator
        image: llm-evaluation-framework:latest
        ports:
        - containerPort: 8000
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: url
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: llm-evaluator-service
spec:
  selector:
    app: llm-evaluator
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-evaluator-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-evaluation-framework
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

## πŸŽ“ Master Advanced Patterns **You've learned the most powerful techniques for production LLM evaluation!** **Ready to implement these patterns?** [![πŸš€ Production Guide](https://img.shields.io/badge/πŸš€_Production_Guide-Deploy%20Now-6366f1?style=for-the-badge)](developer-guide.md) [![πŸ”§ Custom Components](https://img.shields.io/badge/πŸ”§_Custom_Components-Build%20Extensions-22c55e?style=for-the-badge)](developer-guide.md#custom-components) [![πŸ“Š Monitoring Setup](https://img.shields.io/badge/πŸ“Š_Monitoring_Setup-Track%20Performance-f59e0b?style=for-the-badge)](logging-and-error-handling.md) --- *Advanced patterns for enterprise-scale LLM evaluation systems! πŸš€*