Google Container Registry (GCR) Automation
Google Container Registry (GCR) Automation
Implement GCR scanning with Container Analysis API:
#!/usr/bin/env python3
# gcr-scanning-automation.py
from google.cloud import containeranalysis_v1
from google.cloud import artifactregistry_v1
import google.auth
from datetime import datetime, timedelta
import time
class GCRScanningAutomation:
def __init__(self, project_id):
self.project_id = project_id
credentials, _ = google.auth.default()
self.container_analysis = containeranalysis_v1.ContainerAnalysisClient()
self.artifact_registry = artifactregistry_v1.ArtifactRegistryClient()
def scan_image(self, image_url):
"""Trigger vulnerability scanning for an image"""
# Parse image URL
# Format: gcr.io/PROJECT_ID/IMAGE_NAME:TAG
# Create vulnerability scan request
discovery = containeranalysis_v1.DiscoveryOccurrence(
analysis_status=containeranalysis_v1.DiscoveryOccurrence.AnalysisStatus.SCANNING
)
occurrence = containeranalysis_v1.Occurrence(
resource_uri=image_url,
discovery=discovery,
kind=containeranalysis_v1.NoteKind.DISCOVERY
)
parent = f"projects/{self.project_id}"
try:
response = self.container_analysis.create_occurrence(
parent=parent,
occurrence=occurrence
)
return response.name
except Exception as e:
print(f"Error scanning image: {e}")
return None
def get_vulnerabilities(self, image_url):
"""Get vulnerability occurrences for an image"""
filter_str = f'resourceUrl="{image_url}" AND kind="VULNERABILITY"'
parent = f"projects/{self.project_id}"
vulnerabilities = []
for occurrence in self.container_analysis.list_occurrences(
parent=parent,
filter=filter_str
):
vuln = occurrence.vulnerability
vulnerabilities.append({
'cve': vuln.cve,
'severity': vuln.severity.name,
'package': vuln.package_issue[0].affected_package if vuln.package_issue else 'Unknown',
'fix_available': bool(vuln.package_issue[0].fixed_version) if vuln.package_issue else False,
'fixed_version': vuln.package_issue[0].fixed_version.full_name if vuln.package_issue and vuln.package_issue[0].fixed_version else None
})
return vulnerabilities
def create_vulnerability_report(self, repository_name):
"""Create vulnerability report for all images in repository"""
report = {
'repository': repository_name,
'scan_date': datetime.now().isoformat(),
'summary': {
'total_images': 0,
'vulnerable_images': 0,
'critical': 0,
'high': 0,
'medium': 0,
'low': 0
},
'images': []
}
# List all images in repository
parent = f"projects/{self.project_id}/locations/us/repositories/{repository_name}"
for package in self.artifact_registry.list_packages(parent=parent):
report['summary']['total_images'] += 1
# Get vulnerabilities for each image
image_url = f"gcr.io/{self.project_id}/{package.name}"
vulnerabilities = self.get_vulnerabilities(image_url)
if vulnerabilities:
report['summary']['vulnerable_images'] += 1
# Count by severity
severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
for vuln in vulnerabilities:
severity = vuln['severity']
if severity in severity_counts:
severity_counts[severity] += 1
report['summary'][severity.lower()] += 1
report['images'].append({
'image': package.name,
'vulnerability_count': len(vulnerabilities),
'severity_breakdown': severity_counts,
'top_vulnerabilities': vulnerabilities[:10]
})
return report
# Cloud Function for automated scanning
def scan_on_push(request):
"""Cloud Function triggered by Container Registry events"""
# Parse Pub/Sub message
import base64
import json
envelope = json.loads(request.data.decode('utf-8'))
payload = base64.b64decode(envelope['message']['data']).decode('utf-8')
message = json.loads(payload)
# Extract image details
image_url = message['tag']
project_id = message['project_id']
scanner = GCRScanningAutomation(project_id)
# Trigger scan
scan_id = scanner.scan_image(image_url)
# Wait for scan completion (with timeout)
max_wait = 300 # 5 minutes
start_time = time.time()
while time.time() - start_time < max_wait:
vulnerabilities = scanner.get_vulnerabilities(image_url)
if vulnerabilities:
# Check for critical vulnerabilities
critical_vulns = [v for v in vulnerabilities if v['severity'] == 'CRITICAL']
if critical_vulns:
# Send alert
send_alert(image_url, critical_vulns)
# Apply image tag to mark as vulnerable
apply_vulnerability_tag(image_url)
break
time.sleep(10)
return {'status': 'completed', 'image': image_url}