Implementing Encryption for Backups
Implementing Encryption for Backups
Encryption must be mandatory for all backups, regardless of storage location or media type. However, backup encryption introduces complexity beyond standard data-at-rest encryption. Long-term key management becomes critical when backups might need restoration years later. Key rotation strategies must balance security improvements with the ability to decrypt historical backups.
# Example: Comprehensive backup encryption system
import os
import hashlib
import json
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
import boto3
from azure.keyvault.secrets import SecretClient
from google.cloud import secretmanager
class SecureBackupEncryption:
def __init__(self, config):
self.config = config
self.key_manager = MultiCloudKeyManager(config)
self.audit_logger = BackupAuditLogger()
def create_backup_encryption_key(self, backup_id, classification='standard'):
"""Generate hierarchical encryption keys for backup"""
# Master key from key management service
master_key = self.key_manager.get_or_create_master_key(
f"backup-master-{classification}"
)
# Derive backup-specific key
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=backup_id.encode(),
iterations=100000,
backend=default_backend()
)
backup_key = kdf.derive(master_key)
# Create key metadata
key_metadata = {
'key_id': self.generate_key_id(),
'backup_id': backup_id,
'created_at': datetime.utcnow().isoformat(),
'algorithm': 'AES-256-GCM',
'master_key_version': self.key_manager.get_current_version(),
'classification': classification,
'rotation_schedule': self.determine_rotation_schedule(classification)
}
# Store key metadata (not the key itself)
self.store_key_metadata(key_metadata)
return backup_key, key_metadata
def encrypt_backup_stream(self, input_stream, backup_id, classification='standard'):
"""Encrypt backup data stream with chunking for large files"""
backup_key, key_metadata = self.create_backup_encryption_key(
backup_id, classification
)
# Initialize encryption
aesgcm = AESGCM(backup_key)
chunk_size = 64 * 1024 * 1024 # 64MB chunks
encrypted_chunks = []
chunk_metadata = []
chunk_index = 0
while True:
chunk = input_stream.read(chunk_size)
if not chunk:
break
# Generate nonce for this chunk
nonce = os.urandom(12)
# Add chunk authentication data
aad = json.dumps({
'backup_id': backup_id,
'chunk_index': chunk_index,
'timestamp': datetime.utcnow().isoformat()
}).encode()
# Encrypt chunk
encrypted_chunk = aesgcm.encrypt(nonce, chunk, aad)
# Calculate chunk hash for integrity
chunk_hash = hashlib.sha256(encrypted_chunk).hexdigest()
encrypted_chunks.append(encrypted_chunk)
chunk_metadata.append({
'index': chunk_index,
'nonce': nonce.hex(),
'size': len(chunk),
'encrypted_size': len(encrypted_chunk),
'hash': chunk_hash,
'aad': aad.decode()
})
chunk_index += 1
# Create backup manifest
manifest = {
'backup_id': backup_id,
'key_metadata': key_metadata,
'chunks': chunk_metadata,
'total_chunks': len(chunk_metadata),
'encryption_completed': datetime.utcnow().isoformat(),
'integrity_hash': self.calculate_overall_hash(encrypted_chunks)
}
return encrypted_chunks, manifest
def implement_key_escrow(self, key_metadata):
"""Implement secure key escrow for emergency recovery"""
# Split key using Shamir's Secret Sharing
from secretsharing import SecretSharer
# Require 3 of 5 shares for reconstruction
shares = SecretSharer.split_secret(
key_metadata['key_id'], 3, 5
)
# Distribute shares to different systems/people
distribution = {
'ceo_safe': shares[0],
'cto_safe': shares[1],
'security_officer': shares[2],
'offsite_vault': shares[3],
'lawyer_escrow': shares[4]
}
# Store distribution records
for location, share in distribution.items():
self.store_key_share(
location,
share,
key_metadata['backup_id']
)
# Audit log
self.audit_logger.log_key_escrow(
key_metadata['backup_id'],
list(distribution.keys())
)
return distribution
class MultiCloudKeyManager:
"""Manage encryption keys across multiple cloud providers"""
def __init__(self, config):
self.providers = {
'aws': self._init_aws_kms(config.get('aws')),
'azure': self._init_azure_keyvault(config.get('azure')),
'gcp': self._init_gcp_kms(config.get('gcp'))
}
self.primary_provider = config.get('primary_provider', 'aws')
def _init_aws_kms(self, config):
if not config:
return None
return boto3.client(
'kms',
region_name=config['region'],
aws_access_key_id=config['access_key'],
aws_secret_access_key=config['secret_key']
)
def _init_azure_keyvault(self, config):
if not config:
return None
from azure.identity import ClientSecretCredential
credential = ClientSecretCredential(
tenant_id=config['tenant_id'],
client_id=config['client_id'],
client_secret=config['client_secret']
)
return SecretClient(
vault_url=config['vault_url'],
credential=credential
)
def get_or_create_master_key(self, key_alias):
"""Get or create master key with multi-cloud redundancy"""
primary_key = self._get_primary_key(key_alias)
# Replicate to secondary providers for redundancy
for provider_name, provider in self.providers.items():
if provider_name != self.primary_provider and provider:
self._replicate_key(primary_key, key_alias, provider_name)
return primary_key
def _get_primary_key(self, key_alias):
provider = self.providers[self.primary_provider]
if self.primary_provider == 'aws':
try:
# Generate data key
response = provider.generate_data_key(
KeyId=f'alias/{key_alias}',
KeySpec='AES_256'
)
return response['Plaintext']
except provider.exceptions.NotFoundException:
# Create new key
response = provider.create_key(
Description=f'Backup encryption key: {key_alias}',
KeyUsage='ENCRYPT_DECRYPT',
Origin='AWS_KMS'
)
# Create alias
provider.create_alias(
AliasName=f'alias/{key_alias}',
TargetKeyId=response['KeyMetadata']['KeyId']
)
# Generate data key
response = provider.generate_data_key(
KeyId=response['KeyMetadata']['KeyId'],
KeySpec='AES_256'
)
return response['Plaintext']