Runtime Threat Detection
Runtime Threat Detection
Runtime threat detection identifies malicious activities during container execution. Unlike static scanning that finds vulnerabilities, runtime detection catches actual exploitation attempts. Behavioral analysis establishes normal container behavior baselines then alerts on deviations. Rule-based detection identifies known attack patterns. Machine learning augments both approaches by identifying novel threats.
Behavioral baselines must accommodate legitimate container variability. Containers scale dynamically, changing network patterns. Deployments introduce new containers with different behaviors. Legitimate administrative actions might appear suspicious. Baseline learning periods must be sufficient to capture normal variations while not so long that they miss early attacks.
# Example: Container runtime anomaly detection system
import asyncio
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import defaultdict
import time
import json
from dataclasses import dataclass
from typing import Dict, List, Set, Optional
import aioredis
@dataclass
class ContainerBehavior:
container_id: str
syscalls: Dict[str, int]
network_connections: Set[str]
file_operations: Dict[str, int]
process_spawns: List[str]
cpu_usage: List[float]
memory_usage: List[float]
timestamp: float
class ContainerAnomalyDetector:
def __init__(self, redis_url: str, sensitivity: float = 0.1):
self.redis_url = redis_url
self.redis = None
self.sensitivity = sensitivity
self.models = {}
self.baselines = defaultdict(lambda: defaultdict(list))
self.alert_threshold = 0.7
async def initialize(self):
"""Initialize Redis connection and load models"""
self.redis = await aioredis.create_redis_pool(self.redis_url)
await self.load_models()
async def process_event(self, event: Dict):
"""Process container runtime event"""
container_id = event.get('container_id')
event_type = event.get('type')
if not container_id:
return
# Update behavioral profile
await self.update_profile(container_id, event)
# Check for anomalies
anomaly_score = await self.detect_anomaly(container_id, event)
if anomaly_score > self.alert_threshold:
await self.handle_anomaly(container_id, event, anomaly_score)
async def update_profile(self, container_id: str, event: Dict):
"""Update container behavioral profile"""
profile_key = f"profile:{container_id}"
# Get current profile
profile_data = await self.redis.get(profile_key)
if profile_data:
profile = json.loads(profile_data)
else:
profile = {
'syscalls': {},
'network': [],
'files': {},
'processes': [],
'start_time': time.time()
}
# Update based on event type
event_type = event.get('type')
if event_type == 'syscall':
syscall = event.get('syscall')
profile['syscalls'][syscall] = profile['syscalls'].get(syscall, 0) + 1
elif event_type == 'network':
connection = f"{event.get('dst_ip')}:{event.get('dst_port')}"
if connection not in profile['network']:
profile['network'].append(connection)
elif event_type == 'file':
operation = event.get('operation')
filename = event.get('filename')
key = f"{operation}:{filename}"
profile['files'][key] = profile['files'].get(key, 0) + 1
elif event_type == 'process':
process = event.get('comm')
if process not in profile['processes']:
profile['processes'].append(process)
# Store updated profile
await self.redis.setex(
profile_key,
3600, # 1 hour TTL
json.dumps(profile)
)
async def detect_anomaly(self, container_id: str, event: Dict) -> float:
"""Detect anomalous behavior"""
# Get container metadata
metadata = await self.get_container_metadata(container_id)
if not metadata:
return 0.0
# Select appropriate model based on container type
model_key = self.get_model_key(metadata)
model = self.models.get(model_key)
if not model:
# No model yet, collect baseline
await self.collect_baseline(container_id, event, metadata)
return 0.0
# Extract features from event
features = await self.extract_features(event, metadata)
# Predict anomaly score
anomaly_score = model.decision_function([features])[0]
# Normalize score to 0-1 range
normalized_score = 1 / (1 + np.exp(-anomaly_score))
return normalized_score
async def extract_features(self, event: Dict, metadata: Dict) -> List[float]:
"""Extract numerical features from event"""
features = []
# Time-based features
hour = time.localtime().tm_hour
day_of_week = time.localtime().tm_wday
features.extend([hour, day_of_week])
# Event type encoding
event_types = ['syscall', 'network', 'file', 'process']
event_type = event.get('type')
type_encoding = [1 if t == event_type else 0 for t in event_types]
features.extend(type_encoding)
# Syscall features
if event_type == 'syscall':
syscall_id = self.syscall_to_id(event.get('syscall'))
features.append(syscall_id)
else:
features.append(0)
# Network features
if event_type == 'network':
port = event.get('dst_port', 0)
is_external = 1 if self.is_external_ip(event.get('dst_ip')) else 0
features.extend([port, is_external])
else:
features.extend([0, 0])
# File operation features
if event_type == 'file':
operation_encoding = {
'open': 1, 'read': 2, 'write': 3,
'delete': 4, 'rename': 5, 'chmod': 6
}
op_code = operation_encoding.get(event.get('operation'), 0)
is_sensitive = 1 if self.is_sensitive_file(event.get('filename')) else 0
features.extend([op_code, is_sensitive])
else:
features.extend([0, 0])
# Container resource usage (if available)
resources = await self.get_container_resources(metadata['container_id'])
features.extend([
resources.get('cpu_percent', 0),
resources.get('memory_percent', 0),
resources.get('network_rx_bytes', 0) / 1e6, # MB
resources.get('network_tx_bytes', 0) / 1e6 # MB
])
return features
async def handle_anomaly(self, container_id: str, event: Dict, score: float):
"""Handle detected anomaly"""
alert = {
'timestamp': time.time(),
'container_id': container_id,
'event': event,
'anomaly_score': score,
'severity': self.calculate_severity(score),
'description': self.describe_anomaly(event, score)
}
# Store alert
await self.redis.lpush('security:alerts', json.dumps(alert))
# Publish to alert channel
await self.redis.publish('security:alerts:channel', json.dumps(alert))
# Take automated action if configured
if score > 0.9 and self.config.get('auto_response'):
await self.automated_response(container_id, alert)
async def automated_response(self, container_id: str, alert: Dict):
"""Automated response to high-severity anomalies"""
severity = alert['severity']
if severity == 'critical':
# Kill container immediately
await self.kill_container(container_id)
await self.notify_security_team(alert, action='container_killed')
elif severity == 'high':
# Isolate container
await self.isolate_container(container_id)
await self.notify_security_team(alert, action='container_isolated')
elif severity == 'medium':
# Increase monitoring
await self.enable_detailed_monitoring(container_id)
await self.notify_security_team(alert, action='monitoring_increased')
def calculate_severity(self, score: float) -> str:
"""Calculate alert severity based on anomaly score"""
if score > 0.9:
return 'critical'
elif score > 0.8:
return 'high'
elif score > 0.7:
return 'medium'
else:
return 'low'
def describe_anomaly(self, event: Dict, score: float) -> str:
"""Generate human-readable anomaly description"""
event_type = event.get('type')
descriptions = {
'syscall': f"Unusual system call '{event.get('syscall')}' detected",
'network': f"Suspicious network connection to {event.get('dst_ip')}:{event.get('dst_port')}",
'file': f"Abnormal file operation '{event.get('operation')}' on {event.get('filename')}",
'process': f"Unexpected process execution '{event.get('comm')}'"
}
base_description = descriptions.get(event_type, "Unknown anomaly type")
return f"{base_description} (confidence: {score:.2%})"
async def train_models(self):
"""Train anomaly detection models from baseline data"""
# Group containers by type
container_groups = await self.group_containers_by_type()
for group_key, container_ids in container_groups.items():
# Collect training data
training_data = []
for container_id in container_ids:
baseline = await self.get_baseline(container_id)
if baseline:
features = await self.baseline_to_features(baseline)
training_data.extend(features)
if len(training_data) < 100: # Minimum samples for training
continue
# Train Isolation Forest model
model = IsolationForest(
contamination=self.sensitivity,
random_state=42,
n_estimators=100
)
model.fit(training_data)
self.models[group_key] = model
# Save model
await self.save_model(group_key, model)