Runtime Security Scanning
Runtime Security Scanning
Runtime scanning detects vulnerabilities in running containers, complementing build-time scanning. Runtime scanners can identify vulnerabilities introduced through dynamic downloads, configuration changes, or runtime package installations. They also verify that deployed containers match scanned images, detecting unauthorized modifications.
Runtime scanning faces unique challenges including performance impact and container ephemerality. Scanning must not significantly impact application performance. Short-lived containers may terminate before scanning completes. Agent-based and agentless approaches each have trade-offs in coverage and resource usage. Organizations must choose approaches matching their operational requirements.
# Example: Runtime vulnerability scanner integration
import asyncio
import json
import time
from datetime import datetime, timedelta
from kubernetes import client, config
from kubernetes.client.rest import ApiException
class RuntimeSecurityScanner:
def __init__(self, scanner_config):
self.config = scanner_config
self.k8s_client = self._init_k8s_client()
self.scan_cache = {}
self.vulnerability_db = VulnerabilityDatabase(scanner_config['db_path'])
def _init_k8s_client(self):
"""Initialize Kubernetes client"""
try:
config.load_incluster_config()
except:
config.load_kube_config()
return {
'core': client.CoreV1Api(),
'apps': client.AppsV1Api(),
'batch': client.BatchV1Api()
}
async def scan_running_containers(self, namespace='default'):
"""Scan all running containers in namespace"""
pods = self.k8s_client['core'].list_namespaced_pod(namespace)
scan_results = []
for pod in pods.items:
if pod.status.phase != 'Running':
continue
for container in pod.spec.containers:
# Check if recently scanned
cache_key = f"{container.image}:{pod.metadata.uid}"
if self._is_recently_scanned(cache_key):
scan_results.append(self.scan_cache[cache_key])
continue
# Perform runtime scan
result = await self._scan_container(
pod.metadata.name,
pod.metadata.namespace,
container.name,
container.image
)
# Cache results
self.scan_cache[cache_key] = result
scan_results.append(result)
# Check for critical vulnerabilities
if self._has_critical_vulnerabilities(result):
await self._handle_critical_vulnerability(pod, container, result)
return scan_results
async def _scan_container(self, pod_name, namespace, container_name, image):
"""Perform runtime vulnerability scan"""
scan_job = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'name': f'security-scan-{pod_name}-{int(time.time())}',
'namespace': 'security-scanning',
'labels': {
'app': 'runtime-scanner',
'target-pod': pod_name,
'target-namespace': namespace
}
},
'spec': {
'ttlSecondsAfterFinished': 300,
'template': {
'metadata': {
'labels': {
'app': 'runtime-scanner'
}
},
'spec': {
'restartPolicy': 'Never',
'serviceAccountName': 'scanner-sa',
'containers': [{
'name': 'scanner',
'image': 'aquasec/trivy:latest',
'command': [
'trivy',
'image',
'--format', 'json',
'--output', '/tmp/scan-result.json',
image
],
'volumeMounts': [{
'name': 'docker-sock',
'mountPath': '/var/run/docker.sock'
}, {
'name': 'scan-results',
'mountPath': '/tmp'
}],
'resources': {
'limits': {
'memory': '512Mi',
'cpu': '500m'
},
'requests': {
'memory': '256Mi',
'cpu': '200m'
}
}
}],
'volumes': [{
'name': 'docker-sock',
'hostPath': {
'path': '/var/run/docker.sock',
'type': 'Socket'
}
}, {
'name': 'scan-results',
'emptyDir': {}
}]
}
}
}
}
# Create and wait for scan job
try:
job = self.k8s_client['batch'].create_namespaced_job(
namespace='security-scanning',
body=scan_job
)
# Wait for completion
result = await self._wait_for_job_completion(
job.metadata.name,
'security-scanning'
)
# Parse and enrich results
scan_data = self._parse_scan_results(result)
scan_data['runtime_context'] = {
'pod': pod_name,
'namespace': namespace,
'container': container_name,
'scan_time': datetime.utcnow().isoformat(),
'node': self._get_pod_node(pod_name, namespace)
}
return scan_data
except ApiException as e:
print(f"Error creating scan job: {e}")
return None
async def continuous_runtime_monitoring(self):
"""Continuous monitoring loop"""
while True:
try:
# Scan all namespaces
namespaces = self.k8s_client['core'].list_namespace()
for ns in namespaces.items:
if ns.metadata.name in self.config['excluded_namespaces']:
continue
results = await self.scan_running_containers(
ns.metadata.name
)
# Process results
await self._process_scan_results(results)
# Wait before next scan cycle
await asyncio.sleep(self.config['scan_interval_seconds'])
except Exception as e:
print(f"Error in monitoring loop: {e}")
await asyncio.sleep(60)
async def _handle_critical_vulnerability(self, pod, container, scan_result):
"""Handle critical vulnerabilities based on policy"""
policy = self.config['critical_vulnerability_policy']
if policy == 'alert':
await self._send_security_alert(pod, container, scan_result)
elif policy == 'isolate':
# Add network policy to isolate pod
await self._isolate_pod(pod)
await self._send_security_alert(pod, container, scan_result)
elif policy == 'terminate':
# Delete pod to force redeployment
await self._terminate_pod(pod)
await self._send_security_alert(pod, container, scan_result)
# Log security event
self._log_security_event({
'event_type': 'critical_vulnerability_detected',
'pod': pod.metadata.name,
'namespace': pod.metadata.namespace,
'container': container.name,
'image': container.image,
'vulnerabilities': scan_result['critical_vulnerabilities'],
'action_taken': policy,
'timestamp': datetime.utcnow().isoformat()
})
def _is_recently_scanned(self, cache_key):
"""Check if container was recently scanned"""
if cache_key not in self.scan_cache:
return False
scan_time = self.scan_cache[cache_key].get('runtime_context', {}).get('scan_time')
if not scan_time:
return False
scan_datetime = datetime.fromisoformat(scan_time)
age = datetime.utcnow() - scan_datetime
return age < timedelta(hours=self.config['cache_duration_hours'])