Implementing Encryption for Backups

Implementing Encryption for Backups

Encryption must be mandatory for all backups, regardless of storage location or media type. However, backup encryption introduces complexity beyond standard data-at-rest encryption. Long-term key management becomes critical when backups might need restoration years later. Key rotation strategies must balance security improvements with the ability to decrypt historical backups.

# Example: Comprehensive backup encryption system
import os
import hashlib
import json
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
import boto3
from azure.keyvault.secrets import SecretClient
from google.cloud import secretmanager

class SecureBackupEncryption:
    def __init__(self, config):
        self.config = config
        self.key_manager = MultiCloudKeyManager(config)
        self.audit_logger = BackupAuditLogger()
        
    def create_backup_encryption_key(self, backup_id, classification='standard'):
        """Generate hierarchical encryption keys for backup"""
        # Master key from key management service
        master_key = self.key_manager.get_or_create_master_key(
            f"backup-master-{classification}"
        )
        
        # Derive backup-specific key
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=backup_id.encode(),
            iterations=100000,
            backend=default_backend()
        )
        backup_key = kdf.derive(master_key)
        
        # Create key metadata
        key_metadata = {
            'key_id': self.generate_key_id(),
            'backup_id': backup_id,
            'created_at': datetime.utcnow().isoformat(),
            'algorithm': 'AES-256-GCM',
            'master_key_version': self.key_manager.get_current_version(),
            'classification': classification,
            'rotation_schedule': self.determine_rotation_schedule(classification)
        }
        
        # Store key metadata (not the key itself)
        self.store_key_metadata(key_metadata)
        
        return backup_key, key_metadata
    
    def encrypt_backup_stream(self, input_stream, backup_id, classification='standard'):
        """Encrypt backup data stream with chunking for large files"""
        backup_key, key_metadata = self.create_backup_encryption_key(
            backup_id, classification
        )
        
        # Initialize encryption
        aesgcm = AESGCM(backup_key)
        chunk_size = 64 * 1024 * 1024  # 64MB chunks
        
        encrypted_chunks = []
        chunk_metadata = []
        
        chunk_index = 0
        while True:
            chunk = input_stream.read(chunk_size)
            if not chunk:
                break
            
            # Generate nonce for this chunk
            nonce = os.urandom(12)
            
            # Add chunk authentication data
            aad = json.dumps({
                'backup_id': backup_id,
                'chunk_index': chunk_index,
                'timestamp': datetime.utcnow().isoformat()
            }).encode()
            
            # Encrypt chunk
            encrypted_chunk = aesgcm.encrypt(nonce, chunk, aad)
            
            # Calculate chunk hash for integrity
            chunk_hash = hashlib.sha256(encrypted_chunk).hexdigest()
            
            encrypted_chunks.append(encrypted_chunk)
            chunk_metadata.append({
                'index': chunk_index,
                'nonce': nonce.hex(),
                'size': len(chunk),
                'encrypted_size': len(encrypted_chunk),
                'hash': chunk_hash,
                'aad': aad.decode()
            })
            
            chunk_index += 1
        
        # Create backup manifest
        manifest = {
            'backup_id': backup_id,
            'key_metadata': key_metadata,
            'chunks': chunk_metadata,
            'total_chunks': len(chunk_metadata),
            'encryption_completed': datetime.utcnow().isoformat(),
            'integrity_hash': self.calculate_overall_hash(encrypted_chunks)
        }
        
        return encrypted_chunks, manifest
    
    def implement_key_escrow(self, key_metadata):
        """Implement secure key escrow for emergency recovery"""
        # Split key using Shamir's Secret Sharing
        from secretsharing import SecretSharer
        
        # Require 3 of 5 shares for reconstruction
        shares = SecretSharer.split_secret(
            key_metadata['key_id'], 3, 5
        )
        
        # Distribute shares to different systems/people
        distribution = {
            'ceo_safe': shares[0],
            'cto_safe': shares[1],
            'security_officer': shares[2],
            'offsite_vault': shares[3],
            'lawyer_escrow': shares[4]
        }
        
        # Store distribution records
        for location, share in distribution.items():
            self.store_key_share(
                location,
                share,
                key_metadata['backup_id']
            )
        
        # Audit log
        self.audit_logger.log_key_escrow(
            key_metadata['backup_id'],
            list(distribution.keys())
        )
        
        return distribution

class MultiCloudKeyManager:
    """Manage encryption keys across multiple cloud providers"""
    
    def __init__(self, config):
        self.providers = {
            'aws': self._init_aws_kms(config.get('aws')),
            'azure': self._init_azure_keyvault(config.get('azure')),
            'gcp': self._init_gcp_kms(config.get('gcp'))
        }
        self.primary_provider = config.get('primary_provider', 'aws')
        
    def _init_aws_kms(self, config):
        if not config:
            return None
        
        return boto3.client(
            'kms',
            region_name=config['region'],
            aws_access_key_id=config['access_key'],
            aws_secret_access_key=config['secret_key']
        )
    
    def _init_azure_keyvault(self, config):
        if not config:
            return None
        
        from azure.identity import ClientSecretCredential
        
        credential = ClientSecretCredential(
            tenant_id=config['tenant_id'],
            client_id=config['client_id'],
            client_secret=config['client_secret']
        )
        
        return SecretClient(
            vault_url=config['vault_url'],
            credential=credential
        )
    
    def get_or_create_master_key(self, key_alias):
        """Get or create master key with multi-cloud redundancy"""
        primary_key = self._get_primary_key(key_alias)
        
        # Replicate to secondary providers for redundancy
        for provider_name, provider in self.providers.items():
            if provider_name != self.primary_provider and provider:
                self._replicate_key(primary_key, key_alias, provider_name)
        
        return primary_key
    
    def _get_primary_key(self, key_alias):
        provider = self.providers[self.primary_provider]
        
        if self.primary_provider == 'aws':
            try:
                # Generate data key
                response = provider.generate_data_key(
                    KeyId=f'alias/{key_alias}',
                    KeySpec='AES_256'
                )
                return response['Plaintext']
            except provider.exceptions.NotFoundException:
                # Create new key
                response = provider.create_key(
                    Description=f'Backup encryption key: {key_alias}',
                    KeyUsage='ENCRYPT_DECRYPT',
                    Origin='AWS_KMS'
                )
                
                # Create alias
                provider.create_alias(
                    AliasName=f'alias/{key_alias}',
                    TargetKeyId=response['KeyMetadata']['KeyId']
                )
                
                # Generate data key
                response = provider.generate_data_key(
                    KeyId=response['KeyMetadata']['KeyId'],
                    KeySpec='AES_256'
                )
                return response['Plaintext']