Implementing Admission Control for Security Scanning
Implementing Admission Control for Security Scanning
Deploy admission controllers to enforce security scanning before pods run:
# Trivy-Operator installation for continuous scanning
apiVersion: v1
kind: Namespace
metadata:
name: trivy-system
---
# Install Trivy-Operator using Helm
# helm install trivy-operator aqua/trivy-operator \
# --namespace trivy-system \
# --create-namespace \
# --set="trivy.ignoreUnfixed=true" \
# --set="operator.scanJobTimeout=5m"
# ValidatingWebhookConfiguration for image scanning
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: image-scanner-webhook
webhooks:
- name: scan.images.security
clientConfig:
service:
name: image-scanner
namespace: security-system
path: "/validate"
caBundle: LS0tLS1CRUdJTi...
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["apps", "batch"]
apiVersions: ["v1"]
resources: ["deployments", "statefulsets", "daemonsets", "jobs"]
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
failurePolicy: Fail
namespaceSelector:
matchLabels:
security-scanning: enabled
Implement a custom admission webhook for policy enforcement:
# admission-webhook.py
from flask import Flask, request, jsonify
import base64
import json
import subprocess
app = Flask(__name__)
@app.route('/validate', methods=['POST'])
def validate():
admission_review = request.get_json()
# Extract pod spec from admission review
pod_spec = admission_review['request']['object']['spec']
response = {
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"response": {
"uid": admission_review['request']['uid'],
"allowed": True,
"warnings": []
}
}
# Scan each container image
for container in pod_spec.get('template', {}).get('spec', {}).get('containers', []):
image = container['image']
# Run Trivy scan
result = subprocess.run(
['trivy', 'image', '--format', 'json', '--severity', 'CRITICAL,HIGH', image],
capture_output=True,
text=True
)
if result.returncode != 0:
scan_data = json.loads(result.stdout)
vuln_count = sum(len(r.get('Vulnerabilities', [])) for r in scan_data.get('Results', []))
if vuln_count > 0:
response['response']['allowed'] = False
response['response']['status'] = {
'message': f'Image {image} has {vuln_count} HIGH/CRITICAL vulnerabilities'
}
break
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=443, ssl_context='adhoc')