AWS ECR Scanning Automation
AWS ECR Scanning Automation
Implement comprehensive ECR scanning with AWS native tools and enhanced reporting:
#!/usr/bin/env python3
# ecr-scanning-automation.py
import boto3
import json
from datetime import datetime, timedelta
import time
class ECRScanningAutomation:
def __init__(self, region='us-east-1'):
self.ecr = boto3.client('ecr', region_name=region)
self.sns = boto3.client('sns', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def enable_scan_on_push(self, repository_name):
"""Enable scan on push for a repository"""
try:
response = self.ecr.put_image_scanning_configuration(
repositoryName=repository_name,
imageScanningConfiguration={
'scanOnPush': True
}
)
return response['ResponseMetadata']['HTTPStatusCode'] == 200
except Exception as e:
print(f"Error enabling scan on push: {e}")
return False
def scan_all_images(self, repository_name):
"""Scan all images in a repository"""
scan_results = []
# List all images
paginator = self.ecr.get_paginator('list_images')
for page in paginator.paginate(repositoryName=repository_name):
for image in page['imageIds']:
# Start scan
try:
self.ecr.start_image_scan(
repositoryName=repository_name,
imageId=image
)
scan_results.append({
'repository': repository_name,
'imageDigest': image.get('imageDigest'),
'imageTag': image.get('imageTag', 'untagged'),
'scanStarted': True
})
except self.ecr.exceptions.ImageScanningDisabledException:
print(f"Scanning disabled for {repository_name}")
except Exception as e:
print(f"Error scanning image: {e}")
# Rate limiting
time.sleep(0.5)
return scan_results
def get_scan_findings(self, repository_name, image_digest):
"""Get scan findings for an image"""
try:
response = self.ecr.describe_image_scan_findings(
repositoryName=repository_name,
imageId={'imageDigest': image_digest}
)
findings = response['imageScanFindings']
# Categorize by severity
severity_counts = {
'CRITICAL': 0,
'HIGH': 0,
'MEDIUM': 0,
'LOW': 0,
'INFORMATIONAL': 0
}
for finding in findings.get('findings', []):
severity = finding['severity']
severity_counts[severity] = severity_counts.get(severity, 0) + 1
return {
'repository': repository_name,
'imageDigest': image_digest,
'findingSeverityCounts': findings.get('findingSeverityCounts', {}),
'findings': findings.get('findings', []),
'severityCounts': severity_counts,
'scanCompletedAt': findings.get('imageScanCompletedAt')
}
except self.ecr.exceptions.ScanNotFoundException:
return None
except Exception as e:
print(f"Error getting scan findings: {e}")
return None
def create_vulnerability_report(self, repository_name):
"""Create comprehensive vulnerability report"""
report = {
'reportDate': datetime.now().isoformat(),
'repository': repository_name,
'summary': {
'totalImages': 0,
'scannedImages': 0,
'vulnerableImages': 0,
'criticalFindings': 0,
'highFindings': 0
},
'vulnerableImages': []
}
# Get all images
paginator = self.ecr.get_paginator('list_images')
for page in paginator.paginate(repositoryName=repository_name):
for image in page['imageIds']:
report['summary']['totalImages'] += 1
# Get scan findings
findings = self.get_scan_findings(
repository_name,
image.get('imageDigest')
)
if findings:
report['summary']['scannedImages'] += 1
critical = findings['severityCounts'].get('CRITICAL', 0)
high = findings['severityCounts'].get('HIGH', 0)
if critical > 0 or high > 0:
report['summary']['vulnerableImages'] += 1
report['summary']['criticalFindings'] += critical
report['summary']['highFindings'] += high
report['vulnerableImages'].append({
'imageTag': image.get('imageTag', 'untagged'),
'imageDigest': image.get('imageDigest'),
'criticalCount': critical,
'highCount': high,
'findings': findings['findings'][:5] # Top 5 findings
})
return report
def publish_metrics(self, repository_name, report):
"""Publish metrics to CloudWatch"""
namespace = 'ContainerSecurity'
metrics = [
{
'MetricName': 'VulnerableImages',
'Value': report['summary']['vulnerableImages'],
'Unit': 'Count',
'Dimensions': [
{'Name': 'Repository', 'Value': repository_name}
]
},
{
'MetricName': 'CriticalFindings',
'Value': report['summary']['criticalFindings'],
'Unit': 'Count',
'Dimensions': [
{'Name': 'Repository', 'Value': repository_name}
]
},
{
'MetricName': 'HighFindings',
'Value': report['summary']['highFindings'],
'Unit': 'Count',
'Dimensions': [
{'Name': 'Repository', 'Value': repository_name}
]
}
]
self.cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=metrics
)
def setup_automated_scanning(self):
"""Setup EventBridge rule for automated scanning"""
events = boto3.client('events')
# Create rule for new image pushes
rule_name = 'ecr-image-push-scan'
events.put_rule(
Name=rule_name,
EventPattern=json.dumps({
"source": ["aws.ecr"],
"detail-type": ["ECR Image Action"],
"detail": {
"action-type": ["PUSH"],
"result": ["SUCCESS"]
}
}),
State='ENABLED',
Description='Trigger additional scanning on ECR image push'
)
# Add Lambda target (assumes Lambda function exists)
events.put_targets(
Rule=rule_name,
Targets=[{
'Id': '1',
'Arn': 'arn:aws:lambda:us-east-1:123456789012:function:ecr-scan-handler',
'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeRole'
}]
)
# Lambda handler for automated scanning
def lambda_handler(event, context):
"""Lambda function to handle ECR push events"""
# Extract repository and image details
detail = event['detail']
repository_name = detail['repository-name']
image_digest = detail['image-digest']
scanner = ECRScanningAutomation()
# Wait for scan to complete
max_attempts = 30
for i in range(max_attempts):
findings = scanner.get_scan_findings(repository_name, image_digest)
if findings:
# Check severity
critical = findings['severityCounts'].get('CRITICAL', 0)
high = findings['severityCounts'].get('HIGH', 0)
if critical > 0:
# Send alert
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
Subject=f'Critical vulnerabilities in {repository_name}',
Message=json.dumps({
'repository': repository_name,
'imageDigest': image_digest,
'criticalCount': critical,
'highCount': high,
'findings': findings['findings'][:5]
}, indent=2)
)
# Tag image as vulnerable
scanner.ecr.put_image_tag_mutability(
repositoryName=repository_name,
imageTagMutability='IMMUTABLE'
)
break
time.sleep(10)
return {
'statusCode': 200,
'body': json.dumps({
'repository': repository_name,
'imageDigest': image_digest,
'scanned': findings is not None
})
}