Python for Cross-Platform Security Automation

Python for Cross-Platform Security Automation

Python provides excellent cross-platform capabilities for security automation, with extensive libraries supporting both Windows and Linux operations. Its readable syntax and powerful features make it ideal for complex security orchestration tasks. Understanding Python security libraries and best practices enables development of sophisticated automation solutions.

Implement unified security monitoring across platforms:

#!/usr/bin/env python3
"""
Cross-platform Security Monitoring and Response System
"""

import os
import sys
import json
import logging
import asyncio
import platform
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import subprocess
import psutil
import requests
from cryptography.fernet import Fernet

class SecurityMonitor:
    """Unified security monitoring for Windows and Linux"""
    
    def __init__(self, config_file: str = "security_config.json"):
        self.platform = platform.system().lower()
        self.config = self._load_config(config_file)
        self.logger = self._setup_logging()
        self.alerts = []
        self.encryption_key = self._get_encryption_key()
        
    def _load_config(self, config_file: str) -> Dict:
        """Load configuration from encrypted file"""
        with open(config_file, 'r') as f:
            return json.load(f)
    
    def _setup_logging(self) -> logging.Logger:
        """Configure logging with rotation"""
        logger = logging.getLogger('SecurityMonitor')
        logger.setLevel(logging.INFO)
        
        # File handler with rotation
        from logging.handlers import RotatingFileHandler
        file_handler = RotatingFileHandler(
            'security_monitor.log',
            maxBytes=10485760,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        )
        logger.addHandler(file_handler)
        
        return logger
    
    def _get_encryption_key(self) -> bytes:
        """Get or generate encryption key"""
        key_file = '.security_key'
        if os.path.exists(key_file):
            with open(key_file, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(key_file, 'wb') as f:
                f.write(key)
            os.chmod(key_file, 0o600)
            return key
    
    async def monitor_system_integrity(self) -> Dict[str, Any]:
        """Monitor system file integrity"""
        integrity_issues = []
        
        if self.platform == 'linux':
            # Check critical system files
            critical_files = [
                '/etc/passwd', '/etc/shadow', '/etc/sudoers',
                '/etc/ssh/sshd_config', '/etc/pam.d/common-auth'
            ]
            
            for file_path in critical_files:
                if os.path.exists(file_path):
                    # Calculate checksum
                    import hashlib
                    hasher = hashlib.sha256()
                    with open(file_path, 'rb') as f:
                        hasher.update(f.read())
                    current_hash = hasher.hexdigest()
                    
                    # Compare with baseline
                    baseline_file = f"{file_path}.baseline"
                    if os.path.exists(baseline_file):
                        with open(baseline_file, 'r') as f:
                            baseline_hash = f.read().strip()
                        
                        if current_hash != baseline_hash:
                            integrity_issues.append({
                                'file': file_path,
                                'status': 'modified',
                                'current_hash': current_hash,
                                'baseline_hash': baseline_hash
                            })
                    else:
                        # Create baseline
                        with open(baseline_file, 'w') as f:
                            f.write(current_hash)
                        os.chmod(baseline_file, 0o600)
        
        elif self.platform == 'windows':
            # Windows integrity monitoring
            import winreg
            
            # Check critical registry keys
            critical_keys = [
                (winreg.HKEY_LOCAL_MACHINE, r'SOFTWARE\Microsoft\Windows\CurrentVersion\Run'),
                (winreg.HKEY_LOCAL_MACHINE, r'SYSTEM\CurrentControlSet\Services'),
            ]
            
            for hive, subkey in critical_keys:
                try:
                    with winreg.OpenKey(hive, subkey) as key:
                        # Enumerate values
                        i = 0
                        while True:
                            try:
                                name, value, type_ = winreg.EnumValue(key, i)
                                # Check against whitelist
                                if name not in self.config.get('registry_whitelist', []):
                                    integrity_issues.append({
                                        'type': 'registry',
                                        'key': f"{hive}\\{subkey}",
                                        'value': name,
                                        'data': str(value)
                                    })
                                i += 1
                            except WindowsError:
                                break
                except Exception as e:
                    self.logger.error(f"Registry check failed: {e}")
        
        return {
            'timestamp': datetime.now().isoformat(),
            'issues': integrity_issues
        }
    
    async def detect_anomalies(self) -> List[Dict[str, Any]]:
        """Detect security anomalies using behavioral analysis"""
        anomalies = []
        
        # Process analysis
        for proc in psutil.process_iter(['pid', 'name', 'username', 'cmdline']):
            try:
                proc_info = proc.info
                
                # Detect suspicious process names
                suspicious_names = ['nc', 'ncat', 'powercat', 'mimikatz']
                if any(susp in proc_info['name'].lower() for susp in suspicious_names):
                    anomalies.append({
                        'type': 'suspicious_process',
                        'process': proc_info['name'],
                        'pid': proc_info['pid'],
                        'user': proc_info['username'],
                        'cmdline': ' '.join(proc_info['cmdline'] or [])
                    })
                
                # Detect hidden processes (Linux)
                if self.platform == 'linux':
                    proc_path = f"/proc/{proc_info['pid']}"
                    if os.path.exists(proc_path):
                        # Check for process hiding techniques
                        stat_file = f"{proc_path}/stat"
                        if os.path.exists(stat_file):
                            with open(stat_file, 'r') as f:
                                stat_content = f.read()
                            if '(hidden)' in stat_content:
                                anomalies.append({
                                    'type': 'hidden_process',
                                    'pid': proc_info['pid'],
                                    'details': 'Process attempting to hide'
                                })
                
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        # Network anomaly detection
        connections = psutil.net_connections(kind='inet')
        for conn in connections:
            # Detect unusual ports
            if conn.status == 'ESTABLISHED':
                unusual_ports = [4444, 5555, 6666, 7777, 8888, 9999]
                if conn.raddr and conn.raddr.port in unusual_ports:
                    anomalies.append({
                        'type': 'unusual_connection',
                        'local': f"{conn.laddr.ip}:{conn.laddr.port}",
                        'remote': f"{conn.raddr.ip}:{conn.raddr.port}",
                        'pid': conn.pid,
                        'status': conn.status
                    })
        
        return anomalies
    
    async def automated_response(self, threat: Dict[str, Any]) -> bool:
        """Automated threat response"""
        self.logger.warning(f"Responding to threat: {threat}")
        
        response_taken = False
        
        if threat['type'] == 'suspicious_process':
            # Kill suspicious process
            try:
                if self.platform == 'linux':
                    subprocess.run(['kill', '-9', str(threat['pid'])], check=True)
                elif self.platform == 'windows':
                    subprocess.run(['taskkill', '/F', '/PID', str(threat['pid'])], check=True)
                
                self.logger.info(f"Terminated suspicious process: {threat['process']} (PID: {threat['pid']})")
                response_taken = True
            except Exception as e:
                self.logger.error(f"Failed to terminate process: {e}")
        
        elif threat['type'] == 'unusual_connection':
            # Block IP using firewall
            remote_ip = threat['remote'].split(':')[0]
            
            try:
                if self.platform == 'linux':
                    subprocess.run(['iptables', '-A', 'INPUT', '-s', remote_ip, '-j', 'DROP'], check=True)
                elif self.platform == 'windows':
                    subprocess.run([
                        'netsh', 'advfirewall', 'firewall', 'add', 'rule',
                        f'name="Block {remote_ip}"', 'dir=in', 'action=block',
                        f'remoteip={remote_ip}'
                    ], check=True)
                
                self.logger.info(f"Blocked IP address: {remote_ip}")
                response_taken = True
            except Exception as e:
                self.logger.error(f"Failed to block IP: {e}")
        
        # Send alert
        self._send_alert(threat)
        
        return response_taken
    
    def _send_alert(self, threat: Dict[str, Any]):
        """Send security alert"""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'hostname': platform.node(),
            'threat': threat,
            'platform': self.platform
        }
        
        # Multiple alert channels
        # Email alert
        if 'email' in self.config.get('alert_channels', []):
            # Implementation depends on email configuration
            pass
        
        # Webhook alert
        if 'webhook' in self.config.get('alert_channels', []):
            webhook_url = self.config.get('webhook_url')
            if webhook_url:
                try:
                    requests.post(webhook_url, json=alert, timeout=5)
                except Exception as e:
                    self.logger.error(f"Webhook alert failed: {e}")
        
        # Log alert
        self.alerts.append(alert)
        self.logger.warning(f"Security Alert: {json.dumps(alert, indent=2)}")
    
    async def run_continuous_monitoring(self):
        """Main monitoring loop"""
        self.logger.info(f"Starting security monitoring on {self.platform}")
        
        while True:
            try:
                # Run monitoring tasks
                integrity_check = await self.monitor_system_integrity()
                anomalies = await self.detect_anomalies()
                
                # Process findings
                if integrity_check['issues']:
                    for issue in integrity_check['issues']:
                        await self.automated_response({'type': 'integrity_violation', **issue})
                
                for anomaly in anomalies:
                    await self.automated_response(anomaly)
                
                # Sleep before next iteration
                await asyncio.sleep(self.config.get('scan_interval', 60))
                
            except KeyboardInterrupt:
                self.logger.info("Monitoring stopped by user")
                break
            except Exception as e:
                self.logger.error(f"Monitoring error: {e}")
                await asyncio.sleep(10)  # Brief pause before retry

# Main execution
if __name__ == "__main__":
    monitor = SecurityMonitor()
    asyncio.run(monitor.run_continuous_monitoring())