Python for Cross-Platform Security Automation
Python for Cross-Platform Security Automation
Python provides excellent cross-platform capabilities for security automation, with extensive libraries supporting both Windows and Linux operations. Its readable syntax and powerful features make it ideal for complex security orchestration tasks. Understanding Python security libraries and best practices enables development of sophisticated automation solutions.
Implement unified security monitoring across platforms:
#!/usr/bin/env python3
"""
Cross-platform Security Monitoring and Response System
"""
import os
import sys
import json
import logging
import asyncio
import platform
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import subprocess
import psutil
import requests
from cryptography.fernet import Fernet
class SecurityMonitor:
"""Unified security monitoring for Windows and Linux"""
def __init__(self, config_file: str = "security_config.json"):
self.platform = platform.system().lower()
self.config = self._load_config(config_file)
self.logger = self._setup_logging()
self.alerts = []
self.encryption_key = self._get_encryption_key()
def _load_config(self, config_file: str) -> Dict:
"""Load configuration from encrypted file"""
with open(config_file, 'r') as f:
return json.load(f)
def _setup_logging(self) -> logging.Logger:
"""Configure logging with rotation"""
logger = logging.getLogger('SecurityMonitor')
logger.setLevel(logging.INFO)
# File handler with rotation
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
'security_monitor.log',
maxBytes=10485760, # 10MB
backupCount=5
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logger.addHandler(file_handler)
return logger
def _get_encryption_key(self) -> bytes:
"""Get or generate encryption key"""
key_file = '.security_key'
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
os.chmod(key_file, 0o600)
return key
async def monitor_system_integrity(self) -> Dict[str, Any]:
"""Monitor system file integrity"""
integrity_issues = []
if self.platform == 'linux':
# Check critical system files
critical_files = [
'/etc/passwd', '/etc/shadow', '/etc/sudoers',
'/etc/ssh/sshd_config', '/etc/pam.d/common-auth'
]
for file_path in critical_files:
if os.path.exists(file_path):
# Calculate checksum
import hashlib
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
hasher.update(f.read())
current_hash = hasher.hexdigest()
# Compare with baseline
baseline_file = f"{file_path}.baseline"
if os.path.exists(baseline_file):
with open(baseline_file, 'r') as f:
baseline_hash = f.read().strip()
if current_hash != baseline_hash:
integrity_issues.append({
'file': file_path,
'status': 'modified',
'current_hash': current_hash,
'baseline_hash': baseline_hash
})
else:
# Create baseline
with open(baseline_file, 'w') as f:
f.write(current_hash)
os.chmod(baseline_file, 0o600)
elif self.platform == 'windows':
# Windows integrity monitoring
import winreg
# Check critical registry keys
critical_keys = [
(winreg.HKEY_LOCAL_MACHINE, r'SOFTWARE\Microsoft\Windows\CurrentVersion\Run'),
(winreg.HKEY_LOCAL_MACHINE, r'SYSTEM\CurrentControlSet\Services'),
]
for hive, subkey in critical_keys:
try:
with winreg.OpenKey(hive, subkey) as key:
# Enumerate values
i = 0
while True:
try:
name, value, type_ = winreg.EnumValue(key, i)
# Check against whitelist
if name not in self.config.get('registry_whitelist', []):
integrity_issues.append({
'type': 'registry',
'key': f"{hive}\\{subkey}",
'value': name,
'data': str(value)
})
i += 1
except WindowsError:
break
except Exception as e:
self.logger.error(f"Registry check failed: {e}")
return {
'timestamp': datetime.now().isoformat(),
'issues': integrity_issues
}
async def detect_anomalies(self) -> List[Dict[str, Any]]:
"""Detect security anomalies using behavioral analysis"""
anomalies = []
# Process analysis
for proc in psutil.process_iter(['pid', 'name', 'username', 'cmdline']):
try:
proc_info = proc.info
# Detect suspicious process names
suspicious_names = ['nc', 'ncat', 'powercat', 'mimikatz']
if any(susp in proc_info['name'].lower() for susp in suspicious_names):
anomalies.append({
'type': 'suspicious_process',
'process': proc_info['name'],
'pid': proc_info['pid'],
'user': proc_info['username'],
'cmdline': ' '.join(proc_info['cmdline'] or [])
})
# Detect hidden processes (Linux)
if self.platform == 'linux':
proc_path = f"/proc/{proc_info['pid']}"
if os.path.exists(proc_path):
# Check for process hiding techniques
stat_file = f"{proc_path}/stat"
if os.path.exists(stat_file):
with open(stat_file, 'r') as f:
stat_content = f.read()
if '(hidden)' in stat_content:
anomalies.append({
'type': 'hidden_process',
'pid': proc_info['pid'],
'details': 'Process attempting to hide'
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Network anomaly detection
connections = psutil.net_connections(kind='inet')
for conn in connections:
# Detect unusual ports
if conn.status == 'ESTABLISHED':
unusual_ports = [4444, 5555, 6666, 7777, 8888, 9999]
if conn.raddr and conn.raddr.port in unusual_ports:
anomalies.append({
'type': 'unusual_connection',
'local': f"{conn.laddr.ip}:{conn.laddr.port}",
'remote': f"{conn.raddr.ip}:{conn.raddr.port}",
'pid': conn.pid,
'status': conn.status
})
return anomalies
async def automated_response(self, threat: Dict[str, Any]) -> bool:
"""Automated threat response"""
self.logger.warning(f"Responding to threat: {threat}")
response_taken = False
if threat['type'] == 'suspicious_process':
# Kill suspicious process
try:
if self.platform == 'linux':
subprocess.run(['kill', '-9', str(threat['pid'])], check=True)
elif self.platform == 'windows':
subprocess.run(['taskkill', '/F', '/PID', str(threat['pid'])], check=True)
self.logger.info(f"Terminated suspicious process: {threat['process']} (PID: {threat['pid']})")
response_taken = True
except Exception as e:
self.logger.error(f"Failed to terminate process: {e}")
elif threat['type'] == 'unusual_connection':
# Block IP using firewall
remote_ip = threat['remote'].split(':')[0]
try:
if self.platform == 'linux':
subprocess.run(['iptables', '-A', 'INPUT', '-s', remote_ip, '-j', 'DROP'], check=True)
elif self.platform == 'windows':
subprocess.run([
'netsh', 'advfirewall', 'firewall', 'add', 'rule',
f'name="Block {remote_ip}"', 'dir=in', 'action=block',
f'remoteip={remote_ip}'
], check=True)
self.logger.info(f"Blocked IP address: {remote_ip}")
response_taken = True
except Exception as e:
self.logger.error(f"Failed to block IP: {e}")
# Send alert
self._send_alert(threat)
return response_taken
def _send_alert(self, threat: Dict[str, Any]):
"""Send security alert"""
alert = {
'timestamp': datetime.now().isoformat(),
'hostname': platform.node(),
'threat': threat,
'platform': self.platform
}
# Multiple alert channels
# Email alert
if 'email' in self.config.get('alert_channels', []):
# Implementation depends on email configuration
pass
# Webhook alert
if 'webhook' in self.config.get('alert_channels', []):
webhook_url = self.config.get('webhook_url')
if webhook_url:
try:
requests.post(webhook_url, json=alert, timeout=5)
except Exception as e:
self.logger.error(f"Webhook alert failed: {e}")
# Log alert
self.alerts.append(alert)
self.logger.warning(f"Security Alert: {json.dumps(alert, indent=2)}")
async def run_continuous_monitoring(self):
"""Main monitoring loop"""
self.logger.info(f"Starting security monitoring on {self.platform}")
while True:
try:
# Run monitoring tasks
integrity_check = await self.monitor_system_integrity()
anomalies = await self.detect_anomalies()
# Process findings
if integrity_check['issues']:
for issue in integrity_check['issues']:
await self.automated_response({'type': 'integrity_violation', **issue})
for anomaly in anomalies:
await self.automated_response(anomaly)
# Sleep before next iteration
await asyncio.sleep(self.config.get('scan_interval', 60))
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
break
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
await asyncio.sleep(10) # Brief pause before retry
# Main execution
if __name__ == "__main__":
monitor = SecurityMonitor()
asyncio.run(monitor.run_continuous_monitoring())