Vulnerability Scanning Integration

Vulnerability Scanning Integration

Registry-integrated vulnerability scanning provides the last line of defense before image deployment. Scanning at push time prevents vulnerable images from entering the registry. Continuous scanning identifies newly discovered vulnerabilities in stored images. Integration with admission controllers prevents deployment of vulnerable images. However, scanning adds latency and computational overhead requiring careful implementation.

Scanner selection impacts both security effectiveness and operational efficiency. Native registry scanners like Harbor's Trivy integration provide seamless workflows. External scanners offer advanced features but require API integration. Multiple scanners can provide defense in depth through different vulnerability databases. Organizations should evaluate scanners based on accuracy, performance, and integration capabilities.

# Example: Registry webhook for vulnerability scanning
import json
import hashlib
import hmac
from flask import Flask, request, jsonify
import requests
from datetime import datetime

app = Flask(__name__)

class RegistrySecurityWebhook:
    def __init__(self, config):
        self.config = config
        self.scanner_api = config['scanner_api_endpoint']
        self.webhook_secret = config['webhook_secret'].encode()
        self.severity_threshold = config['severity_threshold']
        
    def verify_webhook_signature(self, payload, signature):
        """Verify webhook authenticity"""
        expected_signature = hmac.new(
            self.webhook_secret,
            payload,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(signature, expected_signature)
    
    @app.route('/webhook/registry', methods=['POST'])
    def handle_registry_event(self):
        # Verify webhook signature
        signature = request.headers.get('X-Registry-Signature')
        if not self.verify_webhook_signature(request.data, signature):
            return jsonify({'error': 'Invalid signature'}), 403
        
        event = request.json
        
        if event['type'] == 'PUSH_ARTIFACT':
            return self.handle_push_event(event)
        elif event['type'] == 'SCANNING_COMPLETED':
            return self.handle_scan_complete(event)
        
        return jsonify({'status': 'ignored'}), 200
    
    def handle_push_event(self, event):
        """Trigger scanning for pushed images"""
        artifact = event['data']['artifact']
        repository = event['data']['repository']
        
        # Initiate vulnerability scan
        scan_request = {
            'registry': repository['registry'],
            'repository': repository['name'],
            'tag': artifact['tag'],
            'digest': artifact['digest'],
            'scan_type': 'full',
            'requested_by': event['operator'],
            'requested_at': datetime.utcnow().isoformat()
        }
        
        # Call scanner API
        response = requests.post(
            f"{self.scanner_api}/scan",
            json=scan_request,
            headers={'Authorization': f"Bearer {self.config['scanner_token']}"}
        )
        
        if response.status_code == 202:
            return jsonify({
                'status': 'scan_initiated',
                'scan_id': response.json()['scan_id']
            }), 202
        else:
            return jsonify({
                'error': 'Failed to initiate scan',
                'details': response.text
            }), 500
    
    def handle_scan_complete(self, event):
        """Process scan results and enforce policies"""
        scan_result = event['data']['scan_result']
        
        # Check vulnerability severity
        critical_vulns = scan_result.get('critical', 0)
        high_vulns = scan_result.get('high', 0)
        
        if critical_vulns > 0 or high_vulns > self.severity_threshold['high']:
            # Mark image as non-deployable
            self.quarantine_image(
                scan_result['repository'],
                scan_result['tag'],
                scan_result['vulnerabilities']
            )
            
            # Notify security team
            self.notify_security_team(scan_result)
            
            return jsonify({
                'status': 'quarantined',
                'reason': 'Exceeded vulnerability threshold',
                'critical': critical_vulns,
                'high': high_vulns
            }), 200
        
        # Add deployment approval
        self.approve_for_deployment(
            scan_result['repository'],
            scan_result['tag']
        )
        
        return jsonify({'status': 'approved'}), 200
    
    def quarantine_image(self, repository, tag, vulnerabilities):
        """Prevent image deployment"""
        # Add security labels
        labels = {
            'security.scan.status': 'failed',
            'security.scan.date': datetime.utcnow().isoformat(),
            'security.vulnerabilities.critical': str(vulnerabilities.get('critical', 0)),
            'security.vulnerabilities.high': str(vulnerabilities.get('high', 0)),
            'deployment.allowed': 'false'
        }
        
        # Update image metadata
        self.update_image_labels(repository, tag, labels)
        
        # Configure admission controller rules
        self.update_admission_policy(repository, tag, 'deny')

Scan result management requires balancing security with operational needs. Blocking all vulnerable images may prevent critical deployments. Risk-based policies can allow deployment with compensating controls. Temporary exemptions enable emergency deployments with time-limited approvals. Clear communication of scan results helps developers understand and remediate issues.