Google Container Registry (GCR) Automation

Google Container Registry (GCR) Automation

Implement GCR scanning with Container Analysis API:

#!/usr/bin/env python3
# gcr-scanning-automation.py

from google.cloud import containeranalysis_v1
from google.cloud import artifactregistry_v1
import google.auth
from datetime import datetime, timedelta
import time

class GCRScanningAutomation:
    def __init__(self, project_id):
        self.project_id = project_id
        credentials, _ = google.auth.default()
        
        self.container_analysis = containeranalysis_v1.ContainerAnalysisClient()
        self.artifact_registry = artifactregistry_v1.ArtifactRegistryClient()
        
    def scan_image(self, image_url):
        """Trigger vulnerability scanning for an image"""
        # Parse image URL
        # Format: gcr.io/PROJECT_ID/IMAGE_NAME:TAG
        
        # Create vulnerability scan request
        discovery = containeranalysis_v1.DiscoveryOccurrence(
            analysis_status=containeranalysis_v1.DiscoveryOccurrence.AnalysisStatus.SCANNING
        )
        
        occurrence = containeranalysis_v1.Occurrence(
            resource_uri=image_url,
            discovery=discovery,
            kind=containeranalysis_v1.NoteKind.DISCOVERY
        )
        
        parent = f"projects/{self.project_id}"
        
        try:
            response = self.container_analysis.create_occurrence(
                parent=parent,
                occurrence=occurrence
            )
            return response.name
        except Exception as e:
            print(f"Error scanning image: {e}")
            return None
    
    def get_vulnerabilities(self, image_url):
        """Get vulnerability occurrences for an image"""
        filter_str = f'resourceUrl="{image_url}" AND kind="VULNERABILITY"'
        parent = f"projects/{self.project_id}"
        
        vulnerabilities = []
        
        for occurrence in self.container_analysis.list_occurrences(
            parent=parent,
            filter=filter_str
        ):
            vuln = occurrence.vulnerability
            vulnerabilities.append({
                'cve': vuln.cve,
                'severity': vuln.severity.name,
                'package': vuln.package_issue[0].affected_package if vuln.package_issue else 'Unknown',
                'fix_available': bool(vuln.package_issue[0].fixed_version) if vuln.package_issue else False,
                'fixed_version': vuln.package_issue[0].fixed_version.full_name if vuln.package_issue and vuln.package_issue[0].fixed_version else None
            })
        
        return vulnerabilities
    
    def create_vulnerability_report(self, repository_name):
        """Create vulnerability report for all images in repository"""
        report = {
            'repository': repository_name,
            'scan_date': datetime.now().isoformat(),
            'summary': {
                'total_images': 0,
                'vulnerable_images': 0,
                'critical': 0,
                'high': 0,
                'medium': 0,
                'low': 0
            },
            'images': []
        }
        
        # List all images in repository
        parent = f"projects/{self.project_id}/locations/us/repositories/{repository_name}"
        
        for package in self.artifact_registry.list_packages(parent=parent):
            report['summary']['total_images'] += 1
            
            # Get vulnerabilities for each image
            image_url = f"gcr.io/{self.project_id}/{package.name}"
            vulnerabilities = self.get_vulnerabilities(image_url)
            
            if vulnerabilities:
                report['summary']['vulnerable_images'] += 1
                
                # Count by severity
                severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
                for vuln in vulnerabilities:
                    severity = vuln['severity']
                    if severity in severity_counts:
                        severity_counts[severity] += 1
                        report['summary'][severity.lower()] += 1
                
                report['images'].append({
                    'image': package.name,
                    'vulnerability_count': len(vulnerabilities),
                    'severity_breakdown': severity_counts,
                    'top_vulnerabilities': vulnerabilities[:10]
                })
        
        return report

# Cloud Function for automated scanning
def scan_on_push(request):
    """Cloud Function triggered by Container Registry events"""
    
    # Parse Pub/Sub message
    import base64
    import json
    
    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data']).decode('utf-8')
    message = json.loads(payload)
    
    # Extract image details
    image_url = message['tag']
    project_id = message['project_id']
    
    scanner = GCRScanningAutomation(project_id)
    
    # Trigger scan
    scan_id = scanner.scan_image(image_url)
    
    # Wait for scan completion (with timeout)
    max_wait = 300  # 5 minutes
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        vulnerabilities = scanner.get_vulnerabilities(image_url)
        
        if vulnerabilities:
            # Check for critical vulnerabilities
            critical_vulns = [v for v in vulnerabilities if v['severity'] == 'CRITICAL']
            
            if critical_vulns:
                # Send alert
                send_alert(image_url, critical_vulns)
                
                # Apply image tag to mark as vulnerable
                apply_vulnerability_tag(image_url)
            
            break
        
        time.sleep(10)
    
    return {'status': 'completed', 'image': image_url}