Immutable Backups and Ransomware Protection

Immutable Backups and Ransomware Protection

Immutable backups provide critical protection against ransomware attacks that attempt to encrypt or delete backup data. Write-Once-Read-Many (WORM) storage, whether hardware-based or software-enforced, prevents modification of completed backups. Cloud providers offer object lock features that prevent deletion for specified retention periods, even by administrators.

Air-gapped backups add another protection layer by physically or logically isolating backup copies from network access. While true air gaps require manual media handling, logical air gaps using network isolation and time-based access windows provide similar protection with better operational efficiency. These isolated backups serve as last-resort recovery options when other systems are compromised.

# Example: Implementing immutable backup system
import hashlib
import json
from datetime import datetime, timedelta
from enum import Enum

class ImmutabilityLevel(Enum):
    COMPLIANCE = "compliance"  # Cannot be overridden
    GOVERNANCE = "governance"  # Can be overridden with special permissions
    LEGAL_HOLD = "legal_hold"  # Indefinite retention

class ImmutableBackupManager:
    def __init__(self, storage_backend, config):
        self.storage = storage_backend
        self.config = config
        self.audit_logger = AuditLogger()
        
    async def create_immutable_backup(self, backup_data, metadata):
        """Create immutable backup with comprehensive protection"""
        
        backup_id = self.generate_backup_id()
        
        # Calculate integrity hashes
        data_hash = hashlib.sha256(backup_data).hexdigest()
        
        # Determine retention period based on data classification
        retention_days = self.calculate_retention_period(metadata)
        retention_until = datetime.utcnow() + timedelta(days=retention_days)
        
        # Create backup package
        backup_package = {
            'id': backup_id,
            'created_at': datetime.utcnow().isoformat(),
            'data_hash': data_hash,
            'metadata': metadata,
            'retention': {
                'until': retention_until.isoformat(),
                'level': self.determine_immutability_level(metadata),
                'legal_hold': False
            },
            'integrity_proof': self.generate_integrity_proof(backup_data, metadata)
        }
        
        # Store with immutability settings
        if self.storage.type == 'aws_s3':
            await self.store_s3_immutable(backup_data, backup_package)
        elif self.storage.type == 'azure_blob':
            await self.store_azure_immutable(backup_data, backup_package)
        else:
            await self.store_generic_immutable(backup_data, backup_package)
        
        # Create tamper-evident log
        await self.create_tamper_evident_log(backup_package)
        
        # Set up integrity monitoring
        await self.schedule_integrity_checks(backup_id)
        
        return backup_id
    
    async def store_s3_immutable(self, data, package):
        """Store in S3 with object lock"""
        import boto3
        
        s3 = self.storage.client
        
        # Upload with object lock
        response = s3.put_object(
            Bucket=self.config['bucket'],
            Key=f"immutable-backups/{package['id']}",
            Body=data,
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=self.config['kms_key'],
            ObjectLockMode=package['retention']['level'].value.upper(),
            ObjectLockRetainUntilDate=package['retention']['until'],
            Metadata={
                'backup-id': package['id'],
                'data-hash': package['data_hash'],
                'created-at': package['created_at']
            }
        )
        
        # Store metadata separately with versioning
        s3.put_object(
            Bucket=self.config['bucket'],
            Key=f"immutable-metadata/{package['id']}.json",
            Body=json.dumps(package),
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=self.config['kms_key']
        )
        
        # Enable MFA delete for additional protection
        if package['retention']['level'] == ImmutabilityLevel.COMPLIANCE:
            s3.put_object_retention(
                Bucket=self.config['bucket'],
                Key=f"immutable-backups/{package['id']}",
                Retention={
                    'Mode': 'COMPLIANCE',
                    'RetainUntilDate': package['retention']['until']
                },
                VersionId=response['VersionId'],
                BypassGovernanceRetention=False
            )
    
    def generate_integrity_proof(self, data, metadata):
        """Generate cryptographic proof of backup integrity"""
        
        # Create Merkle tree for large backups
        from merkletools import MerkleTools
        
        mt = MerkleTools(hash_type='sha256')
        
        # Split data into chunks for tree
        chunk_size = 1024 * 1024  # 1MB chunks
        chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
        
        # Add chunks to tree
        for chunk in chunks:
            mt.add_leaf(hashlib.sha256(chunk).hexdigest())
        
        mt.make_tree()
        
        # Generate proof package
        proof = {
            'merkle_root': mt.get_merkle_root(),
            'chunk_count': len(chunks),
            'metadata_hash': hashlib.sha256(
                json.dumps(metadata, sort_keys=True).encode()
            ).hexdigest(),
            'timestamp': datetime.utcnow().isoformat(),
            'algorithm': 'sha256-merkle'
        }
        
        # Sign proof for non-repudiation
        proof['signature'] = self.sign_proof(proof)
        
        return proof
    
    async def create_tamper_evident_log(self, backup_package):
        """Create append-only log for backup operations"""
        
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'operation': 'backup_created',
            'backup_id': backup_package['id'],
            'data_hash': backup_package['data_hash'],
            'operator': self.get_current_operator(),
            'integrity_proof': backup_package['integrity_proof']
        }
        
        # Chain to previous log entry
        previous_hash = await self.get_latest_log_hash()
        log_entry['previous_hash'] = previous_hash
        log_entry['entry_hash'] = hashlib.sha256(
            json.dumps(log_entry, sort_keys=True).encode()
        ).hexdigest()
        
        # Store in append-only log
        await self.storage.append_to_log('backup-integrity-log', log_entry)
        
        # Replicate to multiple locations
        await self.replicate_log_entry(log_entry)
    
    async def verify_backup_integrity(self, backup_id):
        """Comprehensive integrity verification"""
        
        verification_result = {
            'backup_id': backup_id,
            'verified_at': datetime.utcnow().isoformat(),
            'checks': {}
        }
        
        try:
            # Retrieve backup and metadata
            backup_data = await self.storage.get(f"immutable-backups/{backup_id}")
            metadata = await self.storage.get(f"immutable-metadata/{backup_id}.json")
            
            # Verify data hash
            calculated_hash = hashlib.sha256(backup_data).hexdigest()
            verification_result['checks']['data_hash'] = {
                'expected': metadata['data_hash'],
                'calculated': calculated_hash,
                'valid': calculated_hash == metadata['data_hash']
            }
            
            # Verify Merkle proof
            proof_valid = self.verify_merkle_proof(
                backup_data,
                metadata['integrity_proof']
            )
            verification_result['checks']['merkle_proof'] = {
                'valid': proof_valid
            }
            
            # Verify immutability status
            immutability_status = await self.verify_immutability_status(backup_id)
            verification_result['checks']['immutability'] = immutability_status
            
            # Verify log chain integrity
            log_integrity = await self.verify_log_chain(backup_id)
            verification_result['checks']['log_integrity'] = log_integrity
            
            # Overall result
            verification_result['valid'] = all(
                check.get('valid', False) 
                for check in verification_result['checks'].values()
            )
            
        except Exception as e:
            verification_result['valid'] = False
            verification_result['error'] = str(e)
        
        # Log verification result
        await self.audit_logger.log_verification(verification_result)
        
        return verification_result
    
    async def initiate_air_gap_backup(self, backup_id):
        """Create air-gapped copy for ransomware protection"""
        
        # Verify backup integrity first
        verification = await self.verify_backup_integrity(backup_id)
        if not verification['valid']:
            raise ValueError(f"Cannot air-gap invalid backup: {backup_id}")
        
        # Create air gap job
        job = {
            'job_id': self.generate_job_id(),
            'backup_id': backup_id,
            'created_at': datetime.utcnow().isoformat(),
            'status': 'pending',
            'window': self.get_next_airgap_window()
        }
        
        # Queue for air gap transfer during next window
        await self.queue_airgap_job(job)
        
        # Notification
        await self.notify_airgap_operator(job)
        
        return job