Vulnerability Scanning Integration
Vulnerability Scanning Integration
Registry-integrated vulnerability scanning provides the last line of defense before image deployment. Scanning at push time prevents vulnerable images from entering the registry. Continuous scanning identifies newly discovered vulnerabilities in stored images. Integration with admission controllers prevents deployment of vulnerable images. However, scanning adds latency and computational overhead requiring careful implementation.
Scanner selection impacts both security effectiveness and operational efficiency. Native registry scanners like Harbor's Trivy integration provide seamless workflows. External scanners offer advanced features but require API integration. Multiple scanners can provide defense in depth through different vulnerability databases. Organizations should evaluate scanners based on accuracy, performance, and integration capabilities.
# Example: Registry webhook for vulnerability scanning
import json
import hashlib
import hmac
from flask import Flask, request, jsonify
import requests
from datetime import datetime
app = Flask(__name__)
class RegistrySecurityWebhook:
def __init__(self, config):
self.config = config
self.scanner_api = config['scanner_api_endpoint']
self.webhook_secret = config['webhook_secret'].encode()
self.severity_threshold = config['severity_threshold']
def verify_webhook_signature(self, payload, signature):
"""Verify webhook authenticity"""
expected_signature = hmac.new(
self.webhook_secret,
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
@app.route('/webhook/registry', methods=['POST'])
def handle_registry_event(self):
# Verify webhook signature
signature = request.headers.get('X-Registry-Signature')
if not self.verify_webhook_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 403
event = request.json
if event['type'] == 'PUSH_ARTIFACT':
return self.handle_push_event(event)
elif event['type'] == 'SCANNING_COMPLETED':
return self.handle_scan_complete(event)
return jsonify({'status': 'ignored'}), 200
def handle_push_event(self, event):
"""Trigger scanning for pushed images"""
artifact = event['data']['artifact']
repository = event['data']['repository']
# Initiate vulnerability scan
scan_request = {
'registry': repository['registry'],
'repository': repository['name'],
'tag': artifact['tag'],
'digest': artifact['digest'],
'scan_type': 'full',
'requested_by': event['operator'],
'requested_at': datetime.utcnow().isoformat()
}
# Call scanner API
response = requests.post(
f"{self.scanner_api}/scan",
json=scan_request,
headers={'Authorization': f"Bearer {self.config['scanner_token']}"}
)
if response.status_code == 202:
return jsonify({
'status': 'scan_initiated',
'scan_id': response.json()['scan_id']
}), 202
else:
return jsonify({
'error': 'Failed to initiate scan',
'details': response.text
}), 500
def handle_scan_complete(self, event):
"""Process scan results and enforce policies"""
scan_result = event['data']['scan_result']
# Check vulnerability severity
critical_vulns = scan_result.get('critical', 0)
high_vulns = scan_result.get('high', 0)
if critical_vulns > 0 or high_vulns > self.severity_threshold['high']:
# Mark image as non-deployable
self.quarantine_image(
scan_result['repository'],
scan_result['tag'],
scan_result['vulnerabilities']
)
# Notify security team
self.notify_security_team(scan_result)
return jsonify({
'status': 'quarantined',
'reason': 'Exceeded vulnerability threshold',
'critical': critical_vulns,
'high': high_vulns
}), 200
# Add deployment approval
self.approve_for_deployment(
scan_result['repository'],
scan_result['tag']
)
return jsonify({'status': 'approved'}), 200
def quarantine_image(self, repository, tag, vulnerabilities):
"""Prevent image deployment"""
# Add security labels
labels = {
'security.scan.status': 'failed',
'security.scan.date': datetime.utcnow().isoformat(),
'security.vulnerabilities.critical': str(vulnerabilities.get('critical', 0)),
'security.vulnerabilities.high': str(vulnerabilities.get('high', 0)),
'deployment.allowed': 'false'
}
# Update image metadata
self.update_image_labels(repository, tag, labels)
# Configure admission controller rules
self.update_admission_policy(repository, tag, 'deny')
Scan result management requires balancing security with operational needs. Blocking all vulnerable images may prevent critical deployments. Risk-based policies can allow deployment with compensating controls. Temporary exemptions enable emergency deployments with time-limited approvals. Clear communication of scan results helps developers understand and remediate issues.