Implementing Admission Control for Security Scanning

Implementing Admission Control for Security Scanning

Deploy admission controllers to enforce security scanning before pods run:

# Trivy-Operator installation for continuous scanning
apiVersion: v1
kind: Namespace
metadata:
  name: trivy-system
---
# Install Trivy-Operator using Helm
# helm install trivy-operator aqua/trivy-operator \
#   --namespace trivy-system \
#   --create-namespace \
#   --set="trivy.ignoreUnfixed=true" \
#   --set="operator.scanJobTimeout=5m"

# ValidatingWebhookConfiguration for image scanning
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: image-scanner-webhook
webhooks:
- name: scan.images.security
  clientConfig:
    service:
      name: image-scanner
      namespace: security-system
      path: "/validate"
    caBundle: LS0tLS1CRUdJTi...
  rules:
  - operations: ["CREATE", "UPDATE"]
    apiGroups: ["apps", "batch"]
    apiVersions: ["v1"]
    resources: ["deployments", "statefulsets", "daemonsets", "jobs"]
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  failurePolicy: Fail
  namespaceSelector:
    matchLabels:
      security-scanning: enabled

Implement a custom admission webhook for policy enforcement:

# admission-webhook.py
from flask import Flask, request, jsonify
import base64
import json
import subprocess

app = Flask(__name__)

@app.route('/validate', methods=['POST'])
def validate():
    admission_review = request.get_json()
    
    # Extract pod spec from admission review
    pod_spec = admission_review['request']['object']['spec']
    
    response = {
        "apiVersion": "admission.k8s.io/v1",
        "kind": "AdmissionReview",
        "response": {
            "uid": admission_review['request']['uid'],
            "allowed": True,
            "warnings": []
        }
    }
    
    # Scan each container image
    for container in pod_spec.get('template', {}).get('spec', {}).get('containers', []):
        image = container['image']
        
        # Run Trivy scan
        result = subprocess.run(
            ['trivy', 'image', '--format', 'json', '--severity', 'CRITICAL,HIGH', image],
            capture_output=True,
            text=True
        )
        
        if result.returncode != 0:
            scan_data = json.loads(result.stdout)
            vuln_count = sum(len(r.get('Vulnerabilities', [])) for r in scan_data.get('Results', []))
            
            if vuln_count > 0:
                response['response']['allowed'] = False
                response['response']['status'] = {
                    'message': f'Image {image} has {vuln_count} HIGH/CRITICAL vulnerabilities'
                }
                break
    
    return jsonify(response)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=443, ssl_context='adhoc')