Runtime Security Scanning

Runtime Security Scanning

Runtime scanning detects vulnerabilities in running containers, complementing build-time scanning. Runtime scanners can identify vulnerabilities introduced through dynamic downloads, configuration changes, or runtime package installations. They also verify that deployed containers match scanned images, detecting unauthorized modifications.

Runtime scanning faces unique challenges including performance impact and container ephemerality. Scanning must not significantly impact application performance. Short-lived containers may terminate before scanning completes. Agent-based and agentless approaches each have trade-offs in coverage and resource usage. Organizations must choose approaches matching their operational requirements.

# Example: Runtime vulnerability scanner integration
import asyncio
import json
import time
from datetime import datetime, timedelta
from kubernetes import client, config
from kubernetes.client.rest import ApiException

class RuntimeSecurityScanner:
    def __init__(self, scanner_config):
        self.config = scanner_config
        self.k8s_client = self._init_k8s_client()
        self.scan_cache = {}
        self.vulnerability_db = VulnerabilityDatabase(scanner_config['db_path'])
        
    def _init_k8s_client(self):
        """Initialize Kubernetes client"""
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        return {
            'core': client.CoreV1Api(),
            'apps': client.AppsV1Api(),
            'batch': client.BatchV1Api()
        }
    
    async def scan_running_containers(self, namespace='default'):
        """Scan all running containers in namespace"""
        pods = self.k8s_client['core'].list_namespaced_pod(namespace)
        scan_results = []
        
        for pod in pods.items:
            if pod.status.phase != 'Running':
                continue
                
            for container in pod.spec.containers:
                # Check if recently scanned
                cache_key = f"{container.image}:{pod.metadata.uid}"
                if self._is_recently_scanned(cache_key):
                    scan_results.append(self.scan_cache[cache_key])
                    continue
                
                # Perform runtime scan
                result = await self._scan_container(
                    pod.metadata.name,
                    pod.metadata.namespace,
                    container.name,
                    container.image
                )
                
                # Cache results
                self.scan_cache[cache_key] = result
                scan_results.append(result)
                
                # Check for critical vulnerabilities
                if self._has_critical_vulnerabilities(result):
                    await self._handle_critical_vulnerability(pod, container, result)
        
        return scan_results
    
    async def _scan_container(self, pod_name, namespace, container_name, image):
        """Perform runtime vulnerability scan"""
        scan_job = {
            'apiVersion': 'batch/v1',
            'kind': 'Job',
            'metadata': {
                'name': f'security-scan-{pod_name}-{int(time.time())}',
                'namespace': 'security-scanning',
                'labels': {
                    'app': 'runtime-scanner',
                    'target-pod': pod_name,
                    'target-namespace': namespace
                }
            },
            'spec': {
                'ttlSecondsAfterFinished': 300,
                'template': {
                    'metadata': {
                        'labels': {
                            'app': 'runtime-scanner'
                        }
                    },
                    'spec': {
                        'restartPolicy': 'Never',
                        'serviceAccountName': 'scanner-sa',
                        'containers': [{
                            'name': 'scanner',
                            'image': 'aquasec/trivy:latest',
                            'command': [
                                'trivy',
                                'image',
                                '--format', 'json',
                                '--output', '/tmp/scan-result.json',
                                image
                            ],
                            'volumeMounts': [{
                                'name': 'docker-sock',
                                'mountPath': '/var/run/docker.sock'
                            }, {
                                'name': 'scan-results',
                                'mountPath': '/tmp'
                            }],
                            'resources': {
                                'limits': {
                                    'memory': '512Mi',
                                    'cpu': '500m'
                                },
                                'requests': {
                                    'memory': '256Mi',
                                    'cpu': '200m'
                                }
                            }
                        }],
                        'volumes': [{
                            'name': 'docker-sock',
                            'hostPath': {
                                'path': '/var/run/docker.sock',
                                'type': 'Socket'
                            }
                        }, {
                            'name': 'scan-results',
                            'emptyDir': {}
                        }]
                    }
                }
            }
        }
        
        # Create and wait for scan job
        try:
            job = self.k8s_client['batch'].create_namespaced_job(
                namespace='security-scanning',
                body=scan_job
            )
            
            # Wait for completion
            result = await self._wait_for_job_completion(
                job.metadata.name,
                'security-scanning'
            )
            
            # Parse and enrich results
            scan_data = self._parse_scan_results(result)
            scan_data['runtime_context'] = {
                'pod': pod_name,
                'namespace': namespace,
                'container': container_name,
                'scan_time': datetime.utcnow().isoformat(),
                'node': self._get_pod_node(pod_name, namespace)
            }
            
            return scan_data
            
        except ApiException as e:
            print(f"Error creating scan job: {e}")
            return None
    
    async def continuous_runtime_monitoring(self):
        """Continuous monitoring loop"""
        while True:
            try:
                # Scan all namespaces
                namespaces = self.k8s_client['core'].list_namespace()
                
                for ns in namespaces.items:
                    if ns.metadata.name in self.config['excluded_namespaces']:
                        continue
                    
                    results = await self.scan_running_containers(
                        ns.metadata.name
                    )
                    
                    # Process results
                    await self._process_scan_results(results)
                
                # Wait before next scan cycle
                await asyncio.sleep(self.config['scan_interval_seconds'])
                
            except Exception as e:
                print(f"Error in monitoring loop: {e}")
                await asyncio.sleep(60)
    
    async def _handle_critical_vulnerability(self, pod, container, scan_result):
        """Handle critical vulnerabilities based on policy"""
        policy = self.config['critical_vulnerability_policy']
        
        if policy == 'alert':
            await self._send_security_alert(pod, container, scan_result)
            
        elif policy == 'isolate':
            # Add network policy to isolate pod
            await self._isolate_pod(pod)
            await self._send_security_alert(pod, container, scan_result)
            
        elif policy == 'terminate':
            # Delete pod to force redeployment
            await self._terminate_pod(pod)
            await self._send_security_alert(pod, container, scan_result)
            
        # Log security event
        self._log_security_event({
            'event_type': 'critical_vulnerability_detected',
            'pod': pod.metadata.name,
            'namespace': pod.metadata.namespace,
            'container': container.name,
            'image': container.image,
            'vulnerabilities': scan_result['critical_vulnerabilities'],
            'action_taken': policy,
            'timestamp': datetime.utcnow().isoformat()
        })
    
    def _is_recently_scanned(self, cache_key):
        """Check if container was recently scanned"""
        if cache_key not in self.scan_cache:
            return False
        
        scan_time = self.scan_cache[cache_key].get('runtime_context', {}).get('scan_time')
        if not scan_time:
            return False
        
        scan_datetime = datetime.fromisoformat(scan_time)
        age = datetime.utcnow() - scan_datetime
        
        return age < timedelta(hours=self.config['cache_duration_hours'])