Implementing Harbor with Trivy Integration
Implementing Harbor with Trivy Integration
Harbor provides an enterprise-grade registry with built-in Trivy integration, offering seamless vulnerability scanning:
# Harbor installation with Trivy scanner
apiVersion: v1
kind: ConfigMap
metadata:
name: harbor-scanner-trivy
namespace: harbor
data:
env: |
SCANNER_LOG_LEVEL=info
SCANNER_TRIVY_CACHE_DIR=/home/scanner/.cache/trivy
SCANNER_TRIVY_REPORTS_DIR=/home/scanner/.cache/reports
SCANNER_TRIVY_DEBUG_MODE=false
SCANNER_TRIVY_VULN_TYPE=os,library
SCANNER_TRIVY_SEVERITY=UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL
SCANNER_TRIVY_IGNORE_UNFIXED=false
SCANNER_TRIVY_SKIP_UPDATE=false
SCANNER_TRIVY_GITHUB_TOKEN=${GITHUB_TOKEN}
SCANNER_TRIVY_INSECURE=false
SCANNER_STORE_REDIS_URL=redis://harbor-redis:6379
SCANNER_JOB_QUEUE_REDIS_URL=redis://harbor-redis:6379
---
# Harbor project policy configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: harbor-policies
namespace: harbor
data:
vulnerability-policy.json: |
{
"version": "1.0",
"policies": [
{
"name": "production-images",
"description": "Security policy for production images",
"project_selector": {
"decoration": "matches",
"pattern": "prod-*"
},
"rules": [
{
"action": "prevent",
"severity": "critical",
"cve_whitelist": []
},
{
"action": "warn",
"severity": "high",
"count": 5
}
],
"scan_on_push": true,
"prevent_vulnerable": true,
"auto_scan": true
}
]
}
Configure Harbor scanning policies via API:
#!/usr/bin/env python3
# harbor-policy-automation.py
import requests
import json
from datetime import datetime, timedelta
class HarborAutomation:
def __init__(self, harbor_url, username, password):
self.harbor_url = harbor_url
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({'Content-Type': 'application/json'})
def create_scanning_policy(self, project_name):
"""Create automated scanning policy for a project"""
policy = {
"metadata": {
"auto_scan": True,
"severity": "high",
"reuse_sys_cve_allowlist": True,
"retention_id": "1",
"prevent_vul": True,
"public": False
}
}
response = self.session.put(
f"{self.harbor_url}/api/v2.0/projects/{project_name}",
json=policy
)
return response.status_code == 200
def trigger_scan_all_images(self, project_name):
"""Trigger scanning for all images in a project"""
# Get all repositories
repos = self.session.get(
f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories"
).json()
scan_results = []
for repo in repos:
repo_name = repo['name']
# Get all artifacts (tags)
artifacts = self.session.get(
f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts"
).json()
for artifact in artifacts:
# Trigger scan
response = self.session.post(
f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts/{artifact['digest']}/scan"
)
scan_results.append({
'repository': repo_name,
'tag': artifact.get('tags', [{}])[0].get('name', 'untagged'),
'digest': artifact['digest'],
'scan_triggered': response.status_code == 202
})
return scan_results
def get_vulnerability_summary(self, project_name):
"""Get vulnerability summary for all images"""
repos = self.session.get(
f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories"
).json()
summary = {
'critical': 0,
'high': 0,
'medium': 0,
'low': 0,
'vulnerable_images': []
}
for repo in repos:
artifacts = self.session.get(
f"{self.harbor_url}/api/v2.0/projects/{project_name}/repositories/{repo['name']}/artifacts",
params={'with_scan_overview': True}
).json()
for artifact in artifacts:
scan_overview = artifact.get('scan_overview', {})
if scan_overview:
report = scan_overview.get('application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0', {})
if report.get('severity') in ['Critical', 'High']:
summary['vulnerable_images'].append({
'repository': repo['name'],
'tag': artifact.get('tags', [{}])[0].get('name', 'untagged'),
'severity': report.get('severity'),
'vulnerability_count': report.get('total_count', 0)
})
# Update counts
vuln_summary = report.get('summary', {})
summary['critical'] += vuln_summary.get('Critical', 0)
summary['high'] += vuln_summary.get('High', 0)
summary['medium'] += vuln_summary.get('Medium', 0)
summary['low'] += vuln_summary.get('Low', 0)
return summary
# Automation script
if __name__ == "__main__":
harbor = HarborAutomation(
"https://harbor.company.com",
"admin",
"Harbor12345"
)
# Configure all projects
projects = ["dev", "staging", "production"]
for project in projects:
print(f"Configuring project: {project}")
harbor.create_scanning_policy(project)
# Trigger initial scan
results = harbor.trigger_scan_all_images(project)
print(f"Triggered {len(results)} scans")
# Get vulnerability summary
summary = harbor.get_vulnerability_summary(project)
print(f"Vulnerabilities - Critical: {summary['critical']}, High: {summary['high']}")
# Alert on critical vulnerabilities
if summary['critical'] > 0:
print("ALERT: Critical vulnerabilities found!")
for img in summary['vulnerable_images']:
if img['severity'] == 'Critical':
print(f" - {img['repository']}:{img['tag']}")