AWS ECR Scanning Automation

AWS ECR Scanning Automation

Implement comprehensive ECR scanning with AWS native tools and enhanced reporting:

#!/usr/bin/env python3
# ecr-scanning-automation.py

import boto3
import json
from datetime import datetime, timedelta
import time

class ECRScanningAutomation:
    def __init__(self, region='us-east-1'):
        self.ecr = boto3.client('ecr', region_name=region)
        self.sns = boto3.client('sns', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        
    def enable_scan_on_push(self, repository_name):
        """Enable scan on push for a repository"""
        try:
            response = self.ecr.put_image_scanning_configuration(
                repositoryName=repository_name,
                imageScanningConfiguration={
                    'scanOnPush': True
                }
            )
            return response['ResponseMetadata']['HTTPStatusCode'] == 200
        except Exception as e:
            print(f"Error enabling scan on push: {e}")
            return False
    
    def scan_all_images(self, repository_name):
        """Scan all images in a repository"""
        scan_results = []
        
        # List all images
        paginator = self.ecr.get_paginator('list_images')
        
        for page in paginator.paginate(repositoryName=repository_name):
            for image in page['imageIds']:
                # Start scan
                try:
                    self.ecr.start_image_scan(
                        repositoryName=repository_name,
                        imageId=image
                    )
                    
                    scan_results.append({
                        'repository': repository_name,
                        'imageDigest': image.get('imageDigest'),
                        'imageTag': image.get('imageTag', 'untagged'),
                        'scanStarted': True
                    })
                except self.ecr.exceptions.ImageScanningDisabledException:
                    print(f"Scanning disabled for {repository_name}")
                except Exception as e:
                    print(f"Error scanning image: {e}")
                
                # Rate limiting
                time.sleep(0.5)
        
        return scan_results
    
    def get_scan_findings(self, repository_name, image_digest):
        """Get scan findings for an image"""
        try:
            response = self.ecr.describe_image_scan_findings(
                repositoryName=repository_name,
                imageId={'imageDigest': image_digest}
            )
            
            findings = response['imageScanFindings']
            
            # Categorize by severity
            severity_counts = {
                'CRITICAL': 0,
                'HIGH': 0,
                'MEDIUM': 0,
                'LOW': 0,
                'INFORMATIONAL': 0
            }
            
            for finding in findings.get('findings', []):
                severity = finding['severity']
                severity_counts[severity] = severity_counts.get(severity, 0) + 1
            
            return {
                'repository': repository_name,
                'imageDigest': image_digest,
                'findingSeverityCounts': findings.get('findingSeverityCounts', {}),
                'findings': findings.get('findings', []),
                'severityCounts': severity_counts,
                'scanCompletedAt': findings.get('imageScanCompletedAt')
            }
            
        except self.ecr.exceptions.ScanNotFoundException:
            return None
        except Exception as e:
            print(f"Error getting scan findings: {e}")
            return None
    
    def create_vulnerability_report(self, repository_name):
        """Create comprehensive vulnerability report"""
        report = {
            'reportDate': datetime.now().isoformat(),
            'repository': repository_name,
            'summary': {
                'totalImages': 0,
                'scannedImages': 0,
                'vulnerableImages': 0,
                'criticalFindings': 0,
                'highFindings': 0
            },
            'vulnerableImages': []
        }
        
        # Get all images
        paginator = self.ecr.get_paginator('list_images')
        
        for page in paginator.paginate(repositoryName=repository_name):
            for image in page['imageIds']:
                report['summary']['totalImages'] += 1
                
                # Get scan findings
                findings = self.get_scan_findings(
                    repository_name,
                    image.get('imageDigest')
                )
                
                if findings:
                    report['summary']['scannedImages'] += 1
                    
                    critical = findings['severityCounts'].get('CRITICAL', 0)
                    high = findings['severityCounts'].get('HIGH', 0)
                    
                    if critical > 0 or high > 0:
                        report['summary']['vulnerableImages'] += 1
                        report['summary']['criticalFindings'] += critical
                        report['summary']['highFindings'] += high
                        
                        report['vulnerableImages'].append({
                            'imageTag': image.get('imageTag', 'untagged'),
                            'imageDigest': image.get('imageDigest'),
                            'criticalCount': critical,
                            'highCount': high,
                            'findings': findings['findings'][:5]  # Top 5 findings
                        })
        
        return report
    
    def publish_metrics(self, repository_name, report):
        """Publish metrics to CloudWatch"""
        namespace = 'ContainerSecurity'
        
        metrics = [
            {
                'MetricName': 'VulnerableImages',
                'Value': report['summary']['vulnerableImages'],
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Repository', 'Value': repository_name}
                ]
            },
            {
                'MetricName': 'CriticalFindings',
                'Value': report['summary']['criticalFindings'],
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Repository', 'Value': repository_name}
                ]
            },
            {
                'MetricName': 'HighFindings',
                'Value': report['summary']['highFindings'],
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Repository', 'Value': repository_name}
                ]
            }
        ]
        
        self.cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=metrics
        )
    
    def setup_automated_scanning(self):
        """Setup EventBridge rule for automated scanning"""
        events = boto3.client('events')
        
        # Create rule for new image pushes
        rule_name = 'ecr-image-push-scan'
        
        events.put_rule(
            Name=rule_name,
            EventPattern=json.dumps({
                "source": ["aws.ecr"],
                "detail-type": ["ECR Image Action"],
                "detail": {
                    "action-type": ["PUSH"],
                    "result": ["SUCCESS"]
                }
            }),
            State='ENABLED',
            Description='Trigger additional scanning on ECR image push'
        )
        
        # Add Lambda target (assumes Lambda function exists)
        events.put_targets(
            Rule=rule_name,
            Targets=[{
                'Id': '1',
                'Arn': 'arn:aws:lambda:us-east-1:123456789012:function:ecr-scan-handler',
                'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeRole'
            }]
        )

# Lambda handler for automated scanning
def lambda_handler(event, context):
    """Lambda function to handle ECR push events"""
    
    # Extract repository and image details
    detail = event['detail']
    repository_name = detail['repository-name']
    image_digest = detail['image-digest']
    
    scanner = ECRScanningAutomation()
    
    # Wait for scan to complete
    max_attempts = 30
    for i in range(max_attempts):
        findings = scanner.get_scan_findings(repository_name, image_digest)
        
        if findings:
            # Check severity
            critical = findings['severityCounts'].get('CRITICAL', 0)
            high = findings['severityCounts'].get('HIGH', 0)
            
            if critical > 0:
                # Send alert
                sns = boto3.client('sns')
                sns.publish(
                    TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
                    Subject=f'Critical vulnerabilities in {repository_name}',
                    Message=json.dumps({
                        'repository': repository_name,
                        'imageDigest': image_digest,
                        'criticalCount': critical,
                        'highCount': high,
                        'findings': findings['findings'][:5]
                    }, indent=2)
                )
                
                # Tag image as vulnerable
                scanner.ecr.put_image_tag_mutability(
                    repositoryName=repository_name,
                    imageTagMutability='IMMUTABLE'
                )
                
            break
        
        time.sleep(10)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'repository': repository_name,
            'imageDigest': image_digest,
            'scanned': findings is not None
        })
    }