Monitoring Update Status and Compliance

Monitoring Update Status and Compliance

Create monitoring scripts for update compliance:

#!/usr/bin/env python3
# /usr/local/bin/update-monitor.py

import subprocess
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime, timedelta
import os

class UpdateMonitor:
    def __init__(self):
        self.config = {
            'admin_email': '[email protected]',
            'smtp_server': 'localhost',
            'max_days_without_update': 30,
            'critical_packages': ['apache2', 'nginx', 'openssl', 'openssh-server']
        }
        
    def check_pending_updates(self):
        """Check for pending security updates"""
        try:
            # For Ubuntu/Debian
            result = subprocess.run(
                ['apt-get', '-s', 'upgrade'],
                capture_output=True,
                text=True
            )
            
            pending_updates = []
            for line in result.stdout.split('\n'):
                if line.startswith('Inst'):
                    package = line.split()[1]
                    if any(crit in package for crit in self.config['critical_packages']):
                        pending_updates.append(package)
            
            return pending_updates
            
        except Exception as e:
            print(f"Error checking updates: {e}")
            return []
    
    def check_update_history(self):
        """Check when system was last updated"""
        try:
            # Check apt history
            history_file = '/var/log/apt/history.log'
            if os.path.exists(history_file):
                with open(history_file, 'r') as f:
                    content = f.read()
                    
                # Find last update date
                last_update = None
                for line in content.split('\n'):
                    if line.startswith('Start-Date:'):
                        date_str = line.split(':', 1)[1].strip()
                        last_update = datetime.strptime(
                            date_str.split()[0], 
                            '%Y-%m-%d'
                        )
                
                if last_update:
                    days_since_update = (datetime.now() - last_update).days
                    return days_since_update
                    
        except Exception as e:
            print(f"Error checking update history: {e}")
            
        return None
    
    def check_service_versions(self):
        """Check versions of critical services"""
        versions = {}
        
        # Check Apache version
        try:
            result = subprocess.run(
                ['apache2', '-v'],
                capture_output=True,
                text=True
            )
            versions['apache2'] = result.stdout.split('\n')[0]
        except:
            pass
            
        # Check Nginx version
        try:
            result = subprocess.run(
                ['nginx', '-v'],
                capture_output=True,
                text=True,
                stderr=subprocess.STDOUT
            )
            versions['nginx'] = result.stdout.strip()
        except:
            pass
            
        # Check OpenSSL version
        try:
            result = subprocess.run(
                ['openssl', 'version'],
                capture_output=True,
                text=True
            )
            versions['openssl'] = result.stdout.strip()
        except:
            pass
            
        return versions
    
    def generate_report(self):
        """Generate update compliance report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'hostname': subprocess.gethostname(),
            'pending_updates': self.check_pending_updates(),
            'days_since_update': self.check_update_history(),
            'service_versions': self.check_service_versions()
        }
        
        # Check compliance
        report['compliance'] = True
        report['issues'] = []
        
        if report['pending_updates']:
            report['compliance'] = False
            report['issues'].append(f"Critical updates pending: {', '.join(report['pending_updates'])}")
            
        if report['days_since_update'] and report['days_since_update'] > self.config['max_days_without_update']:
            report['compliance'] = False
            report['issues'].append(f"No updates applied in {report['days_since_update']} days")
            
        return report
    
    def send_alert(self, report):
        """Send email alert for non-compliance"""
        if not report['compliance']:
            subject = f"Security Update Alert: {report['hostname']}"
            body = f"""
Security Update Compliance Report
Generated: {report['timestamp']}
Server: {report['hostname']}

COMPLIANCE STATUS: FAILED

Issues:
{chr(10).join('- ' + issue for issue in report['issues'])}

Pending Critical Updates:
{chr(10).join('- ' + pkg for pkg in report['pending_updates'])}

Days Since Last Update: {report['days_since_update']}

Service Versions:
{chr(10).join(f'- {svc}: {ver}' for svc, ver in report['service_versions'].items())}

Please address these issues immediately.
            """
            
            msg = MIMEText(body)
            msg['Subject'] = subject
            msg['From'] = f"update-monitor@{report['hostname']}"
            msg['To'] = self.config['admin_email']
            
            try:
                smtp = smtplib.SMTP(self.config['smtp_server'])
                smtp.send_message(msg)
                smtp.quit()
            except Exception as e:
                print(f"Failed to send email: {e}")

if __name__ == '__main__':
    monitor = UpdateMonitor()
    report = monitor.generate_report()
    
    # Save report
    with open('/var/log/update-compliance.json', 'w') as f:
        json.dump(report, f, indent=2)
    
    # Send alert if needed
    if not report['compliance']:
        monitor.send_alert(report)
    
    # Print summary
    print(f"Update Compliance: {'PASSED' if report['compliance'] else 'FAILED'}")
    if report['issues']:
        print("Issues found:")
        for issue in report['issues']:
            print(f"  - {issue}")