Runtime Threat Detection

Runtime Threat Detection

Runtime threat detection identifies malicious activities during container execution. Unlike static scanning that finds vulnerabilities, runtime detection catches actual exploitation attempts. Behavioral analysis establishes normal container behavior baselines then alerts on deviations. Rule-based detection identifies known attack patterns. Machine learning augments both approaches by identifying novel threats.

Behavioral baselines must accommodate legitimate container variability. Containers scale dynamically, changing network patterns. Deployments introduce new containers with different behaviors. Legitimate administrative actions might appear suspicious. Baseline learning periods must be sufficient to capture normal variations while not so long that they miss early attacks.

# Example: Container runtime anomaly detection system
import asyncio
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import defaultdict
import time
import json
from dataclasses import dataclass
from typing import Dict, List, Set, Optional
import aioredis

@dataclass
class ContainerBehavior:
    container_id: str
    syscalls: Dict[str, int]
    network_connections: Set[str]
    file_operations: Dict[str, int]
    process_spawns: List[str]
    cpu_usage: List[float]
    memory_usage: List[float]
    timestamp: float

class ContainerAnomalyDetector:
    def __init__(self, redis_url: str, sensitivity: float = 0.1):
        self.redis_url = redis_url
        self.redis = None
        self.sensitivity = sensitivity
        self.models = {}
        self.baselines = defaultdict(lambda: defaultdict(list))
        self.alert_threshold = 0.7
        
    async def initialize(self):
        """Initialize Redis connection and load models"""
        self.redis = await aioredis.create_redis_pool(self.redis_url)
        await self.load_models()
        
    async def process_event(self, event: Dict):
        """Process container runtime event"""
        container_id = event.get('container_id')
        event_type = event.get('type')
        
        if not container_id:
            return
            
        # Update behavioral profile
        await self.update_profile(container_id, event)
        
        # Check for anomalies
        anomaly_score = await self.detect_anomaly(container_id, event)
        
        if anomaly_score > self.alert_threshold:
            await self.handle_anomaly(container_id, event, anomaly_score)
    
    async def update_profile(self, container_id: str, event: Dict):
        """Update container behavioral profile"""
        profile_key = f"profile:{container_id}"
        
        # Get current profile
        profile_data = await self.redis.get(profile_key)
        if profile_data:
            profile = json.loads(profile_data)
        else:
            profile = {
                'syscalls': {},
                'network': [],
                'files': {},
                'processes': [],
                'start_time': time.time()
            }
        
        # Update based on event type
        event_type = event.get('type')
        
        if event_type == 'syscall':
            syscall = event.get('syscall')
            profile['syscalls'][syscall] = profile['syscalls'].get(syscall, 0) + 1
            
        elif event_type == 'network':
            connection = f"{event.get('dst_ip')}:{event.get('dst_port')}"
            if connection not in profile['network']:
                profile['network'].append(connection)
                
        elif event_type == 'file':
            operation = event.get('operation')
            filename = event.get('filename')
            key = f"{operation}:{filename}"
            profile['files'][key] = profile['files'].get(key, 0) + 1
            
        elif event_type == 'process':
            process = event.get('comm')
            if process not in profile['processes']:
                profile['processes'].append(process)
        
        # Store updated profile
        await self.redis.setex(
            profile_key,
            3600,  # 1 hour TTL
            json.dumps(profile)
        )
    
    async def detect_anomaly(self, container_id: str, event: Dict) -> float:
        """Detect anomalous behavior"""
        # Get container metadata
        metadata = await self.get_container_metadata(container_id)
        if not metadata:
            return 0.0
            
        # Select appropriate model based on container type
        model_key = self.get_model_key(metadata)
        model = self.models.get(model_key)
        
        if not model:
            # No model yet, collect baseline
            await self.collect_baseline(container_id, event, metadata)
            return 0.0
        
        # Extract features from event
        features = await self.extract_features(event, metadata)
        
        # Predict anomaly score
        anomaly_score = model.decision_function([features])[0]
        
        # Normalize score to 0-1 range
        normalized_score = 1 / (1 + np.exp(-anomaly_score))
        
        return normalized_score
    
    async def extract_features(self, event: Dict, metadata: Dict) -> List[float]:
        """Extract numerical features from event"""
        features = []
        
        # Time-based features
        hour = time.localtime().tm_hour
        day_of_week = time.localtime().tm_wday
        features.extend([hour, day_of_week])
        
        # Event type encoding
        event_types = ['syscall', 'network', 'file', 'process']
        event_type = event.get('type')
        type_encoding = [1 if t == event_type else 0 for t in event_types]
        features.extend(type_encoding)
        
        # Syscall features
        if event_type == 'syscall':
            syscall_id = self.syscall_to_id(event.get('syscall'))
            features.append(syscall_id)
        else:
            features.append(0)
        
        # Network features
        if event_type == 'network':
            port = event.get('dst_port', 0)
            is_external = 1 if self.is_external_ip(event.get('dst_ip')) else 0
            features.extend([port, is_external])
        else:
            features.extend([0, 0])
        
        # File operation features
        if event_type == 'file':
            operation_encoding = {
                'open': 1, 'read': 2, 'write': 3,
                'delete': 4, 'rename': 5, 'chmod': 6
            }
            op_code = operation_encoding.get(event.get('operation'), 0)
            is_sensitive = 1 if self.is_sensitive_file(event.get('filename')) else 0
            features.extend([op_code, is_sensitive])
        else:
            features.extend([0, 0])
        
        # Container resource usage (if available)
        resources = await self.get_container_resources(metadata['container_id'])
        features.extend([
            resources.get('cpu_percent', 0),
            resources.get('memory_percent', 0),
            resources.get('network_rx_bytes', 0) / 1e6,  # MB
            resources.get('network_tx_bytes', 0) / 1e6   # MB
        ])
        
        return features
    
    async def handle_anomaly(self, container_id: str, event: Dict, score: float):
        """Handle detected anomaly"""
        alert = {
            'timestamp': time.time(),
            'container_id': container_id,
            'event': event,
            'anomaly_score': score,
            'severity': self.calculate_severity(score),
            'description': self.describe_anomaly(event, score)
        }
        
        # Store alert
        await self.redis.lpush('security:alerts', json.dumps(alert))
        
        # Publish to alert channel
        await self.redis.publish('security:alerts:channel', json.dumps(alert))
        
        # Take automated action if configured
        if score > 0.9 and self.config.get('auto_response'):
            await self.automated_response(container_id, alert)
    
    async def automated_response(self, container_id: str, alert: Dict):
        """Automated response to high-severity anomalies"""
        severity = alert['severity']
        
        if severity == 'critical':
            # Kill container immediately
            await self.kill_container(container_id)
            await self.notify_security_team(alert, action='container_killed')
            
        elif severity == 'high':
            # Isolate container
            await self.isolate_container(container_id)
            await self.notify_security_team(alert, action='container_isolated')
            
        elif severity == 'medium':
            # Increase monitoring
            await self.enable_detailed_monitoring(container_id)
            await self.notify_security_team(alert, action='monitoring_increased')
    
    def calculate_severity(self, score: float) -> str:
        """Calculate alert severity based on anomaly score"""
        if score > 0.9:
            return 'critical'
        elif score > 0.8:
            return 'high'
        elif score > 0.7:
            return 'medium'
        else:
            return 'low'
    
    def describe_anomaly(self, event: Dict, score: float) -> str:
        """Generate human-readable anomaly description"""
        event_type = event.get('type')
        
        descriptions = {
            'syscall': f"Unusual system call '{event.get('syscall')}' detected",
            'network': f"Suspicious network connection to {event.get('dst_ip')}:{event.get('dst_port')}",
            'file': f"Abnormal file operation '{event.get('operation')}' on {event.get('filename')}",
            'process': f"Unexpected process execution '{event.get('comm')}'"
        }
        
        base_description = descriptions.get(event_type, "Unknown anomaly type")
        return f"{base_description} (confidence: {score:.2%})"
    
    async def train_models(self):
        """Train anomaly detection models from baseline data"""
        # Group containers by type
        container_groups = await self.group_containers_by_type()
        
        for group_key, container_ids in container_groups.items():
            # Collect training data
            training_data = []
            
            for container_id in container_ids:
                baseline = await self.get_baseline(container_id)
                if baseline:
                    features = await self.baseline_to_features(baseline)
                    training_data.extend(features)
            
            if len(training_data) < 100:  # Minimum samples for training
                continue
            
            # Train Isolation Forest model
            model = IsolationForest(
                contamination=self.sensitivity,
                random_state=42,
                n_estimators=100
            )
            
            model.fit(training_data)
            self.models[group_key] = model
            
            # Save model
            await self.save_model(group_key, model)