Implementing Harbor with Trivy Integration

Implementing Harbor with Trivy Integration

Harbor provides an enterprise-grade registry with built-in Trivy integration, offering seamless vulnerability scanning:

# Harbor installation with Trivy scanner
apiVersion: v1
kind: ConfigMap
metadata:
  name: harbor-scanner-trivy
  namespace: harbor
data:
  env: |
    SCANNER_LOG_LEVEL=info
    SCANNER_TRIVY_CACHE_DIR=/home/scanner/.cache/trivy
    SCANNER_TRIVY_REPORTS_DIR=/home/scanner/.cache/reports
    SCANNER_TRIVY_DEBUG_MODE=false
    SCANNER_TRIVY_VULN_TYPE=os,library
    SCANNER_TRIVY_SEVERITY=UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL
    SCANNER_TRIVY_IGNORE_UNFIXED=false
    SCANNER_TRIVY_SKIP_UPDATE=false
    SCANNER_TRIVY_GITHUB_TOKEN=${GITHUB_TOKEN}
    SCANNER_TRIVY_INSECURE=false
    SCANNER_STORE_REDIS_URL=redis://harbor-redis:6379
    SCANNER_JOB_QUEUE_REDIS_URL=redis://harbor-redis:6379

---
# Harbor project policy configuration
apiVersion: v1
kind: ConfigMap
metadata:
  name: harbor-policies
  namespace: harbor
data:
  vulnerability-policy.json: |
    {
      "version": "1.0",
      "policies": [
        {
          "name": "production-images",
          "description": "Security policy for production images",
          "project_selector": {
            "decoration": "matches",
            "pattern": "prod-*"
          },
          "rules": [
            {
              "action": "prevent",
              "severity": "critical",
              "cve_whitelist": []
            },
            {
              "action": "warn",
              "severity": "high",
              "count": 5
            }
          ],
          "scan_on_push": true,
          "prevent_vulnerable": true,
          "auto_scan": true
        }
      ]
    }

Configure Harbor scanning policies via API:

#!/usr/bin/env python3
# harbor-policy-automation.py

import requests
import json
from datetime import datetime, timedelta

class HarborAutomation:
    def __init__(self, harbor_url, username, password):
        self.harbor_url = harbor_url
        self.session = requests.Session()
        self.session.auth = (username, password)
        self.session.headers.update({'Content-Type': 'application/json'})
    
    def create_scanning_policy(self, project_name):
        """Create automated scanning policy for a project"""
        policy = {
            "metadata": {
                "auto_scan": True,
                "severity": "high",
                "reuse_sys_cve_allowlist": True,
                "retention_id": "1",
                "prevent_vul": True,
                "public": False
            }
        }
        
        response = self.session.put(
            f"{self.harbor_url}/api/v2.0/projects/{project_name}",
            json=policy
        )
        return response.status_code == 200
    
    def trigger_scan_all_images(self, project_name):
        """Trigger scanning for all images in a project"""
        # Get all repositories
        repos = self.session.get(
            f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories"
        ).json()
        
        scan_results = []
        for repo in repos:
            repo_name = repo['name']
            
            # Get all artifacts (tags)
            artifacts = self.session.get(
                f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts"
            ).json()
            
            for artifact in artifacts:
                # Trigger scan
                response = self.session.post(
                    f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts/{artifact['digest']}/scan"
                )
                
                scan_results.append({
                    'repository': repo_name,
                    'tag': artifact.get('tags', [{}])[0].get('name', 'untagged'),
                    'digest': artifact['digest'],
                    'scan_triggered': response.status_code == 202
                })
        
        return scan_results
    
    def get_vulnerability_summary(self, project_name):
        """Get vulnerability summary for all images"""
        repos = self.session.get(
            f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories"
        ).json()
        
        summary = {
            'critical': 0,
            'high': 0,
            'medium': 0,
            'low': 0,
            'vulnerable_images': []
        }
        
        for repo in repos:
            artifacts = self.session.get(
                f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo['name']}/artifacts",
                params={'with_scan_overview': True}
            ).json()
            
            for artifact in artifacts:
                scan_overview = artifact.get('scan_overview', {})
                if scan_overview:
                    report = scan_overview.get('application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0', {})
                    if report.get('severity') in ['Critical', 'High']:
                        summary['vulnerable_images'].append({
                            'repository': repo['name'],
                            'tag': artifact.get('tags', [{}])[0].get('name', 'untagged'),
                            'severity': report.get('severity'),
                            'vulnerability_count': report.get('total_count', 0)
                        })
                    
                    # Update counts
                    vuln_summary = report.get('summary', {})
                    summary['critical'] += vuln_summary.get('Critical', 0)
                    summary['high'] += vuln_summary.get('High', 0)
                    summary['medium'] += vuln_summary.get('Medium', 0)
                    summary['low'] += vuln_summary.get('Low', 0)
        
        return summary

# Automation script
if __name__ == "__main__":
    harbor = HarborAutomation(
        "https://harbor.company.com",
        "admin",
        "Harbor12345"
    )
    
    # Configure all projects
    projects = ["dev", "staging", "production"]
    for project in projects:
        print(f"Configuring project: {project}")
        harbor.create_scanning_policy(project)
        
        # Trigger initial scan
        results = harbor.trigger_scan_all_images(project)
        print(f"Triggered {len(results)} scans")
        
        # Get vulnerability summary
        summary = harbor.get_vulnerability_summary(project)
        print(f"Vulnerabilities - Critical: {summary['critical']}, High: {summary['high']}")
        
        # Alert on critical vulnerabilities
        if summary['critical'] > 0:
            print("ALERT: Critical vulnerabilities found!")
            for img in summary['vulnerable_images']:
                if img['severity'] == 'Critical':
                    print(f"  - {img['repository']}:{img['tag']}")