Immutable Backups and Ransomware Protection
Immutable Backups and Ransomware Protection
Immutable backups provide critical protection against ransomware attacks that attempt to encrypt or delete backup data. Write-Once-Read-Many (WORM) storage, whether hardware-based or software-enforced, prevents modification of completed backups. Cloud providers offer object lock features that prevent deletion for specified retention periods, even by administrators.
Air-gapped backups add another protection layer by physically or logically isolating backup copies from network access. While true air gaps require manual media handling, logical air gaps using network isolation and time-based access windows provide similar protection with better operational efficiency. These isolated backups serve as last-resort recovery options when other systems are compromised.
# Example: Implementing immutable backup system
import hashlib
import json
from datetime import datetime, timedelta
from enum import Enum
class ImmutabilityLevel(Enum):
COMPLIANCE = "compliance" # Cannot be overridden
GOVERNANCE = "governance" # Can be overridden with special permissions
LEGAL_HOLD = "legal_hold" # Indefinite retention
class ImmutableBackupManager:
def __init__(self, storage_backend, config):
self.storage = storage_backend
self.config = config
self.audit_logger = AuditLogger()
async def create_immutable_backup(self, backup_data, metadata):
"""Create immutable backup with comprehensive protection"""
backup_id = self.generate_backup_id()
# Calculate integrity hashes
data_hash = hashlib.sha256(backup_data).hexdigest()
# Determine retention period based on data classification
retention_days = self.calculate_retention_period(metadata)
retention_until = datetime.utcnow() + timedelta(days=retention_days)
# Create backup package
backup_package = {
'id': backup_id,
'created_at': datetime.utcnow().isoformat(),
'data_hash': data_hash,
'metadata': metadata,
'retention': {
'until': retention_until.isoformat(),
'level': self.determine_immutability_level(metadata),
'legal_hold': False
},
'integrity_proof': self.generate_integrity_proof(backup_data, metadata)
}
# Store with immutability settings
if self.storage.type == 'aws_s3':
await self.store_s3_immutable(backup_data, backup_package)
elif self.storage.type == 'azure_blob':
await self.store_azure_immutable(backup_data, backup_package)
else:
await self.store_generic_immutable(backup_data, backup_package)
# Create tamper-evident log
await self.create_tamper_evident_log(backup_package)
# Set up integrity monitoring
await self.schedule_integrity_checks(backup_id)
return backup_id
async def store_s3_immutable(self, data, package):
"""Store in S3 with object lock"""
import boto3
s3 = self.storage.client
# Upload with object lock
response = s3.put_object(
Bucket=self.config['bucket'],
Key=f"immutable-backups/{package['id']}",
Body=data,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=self.config['kms_key'],
ObjectLockMode=package['retention']['level'].value.upper(),
ObjectLockRetainUntilDate=package['retention']['until'],
Metadata={
'backup-id': package['id'],
'data-hash': package['data_hash'],
'created-at': package['created_at']
}
)
# Store metadata separately with versioning
s3.put_object(
Bucket=self.config['bucket'],
Key=f"immutable-metadata/{package['id']}.json",
Body=json.dumps(package),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=self.config['kms_key']
)
# Enable MFA delete for additional protection
if package['retention']['level'] == ImmutabilityLevel.COMPLIANCE:
s3.put_object_retention(
Bucket=self.config['bucket'],
Key=f"immutable-backups/{package['id']}",
Retention={
'Mode': 'COMPLIANCE',
'RetainUntilDate': package['retention']['until']
},
VersionId=response['VersionId'],
BypassGovernanceRetention=False
)
def generate_integrity_proof(self, data, metadata):
"""Generate cryptographic proof of backup integrity"""
# Create Merkle tree for large backups
from merkletools import MerkleTools
mt = MerkleTools(hash_type='sha256')
# Split data into chunks for tree
chunk_size = 1024 * 1024 # 1MB chunks
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Add chunks to tree
for chunk in chunks:
mt.add_leaf(hashlib.sha256(chunk).hexdigest())
mt.make_tree()
# Generate proof package
proof = {
'merkle_root': mt.get_merkle_root(),
'chunk_count': len(chunks),
'metadata_hash': hashlib.sha256(
json.dumps(metadata, sort_keys=True).encode()
).hexdigest(),
'timestamp': datetime.utcnow().isoformat(),
'algorithm': 'sha256-merkle'
}
# Sign proof for non-repudiation
proof['signature'] = self.sign_proof(proof)
return proof
async def create_tamper_evident_log(self, backup_package):
"""Create append-only log for backup operations"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'operation': 'backup_created',
'backup_id': backup_package['id'],
'data_hash': backup_package['data_hash'],
'operator': self.get_current_operator(),
'integrity_proof': backup_package['integrity_proof']
}
# Chain to previous log entry
previous_hash = await self.get_latest_log_hash()
log_entry['previous_hash'] = previous_hash
log_entry['entry_hash'] = hashlib.sha256(
json.dumps(log_entry, sort_keys=True).encode()
).hexdigest()
# Store in append-only log
await self.storage.append_to_log('backup-integrity-log', log_entry)
# Replicate to multiple locations
await self.replicate_log_entry(log_entry)
async def verify_backup_integrity(self, backup_id):
"""Comprehensive integrity verification"""
verification_result = {
'backup_id': backup_id,
'verified_at': datetime.utcnow().isoformat(),
'checks': {}
}
try:
# Retrieve backup and metadata
backup_data = await self.storage.get(f"immutable-backups/{backup_id}")
metadata = await self.storage.get(f"immutable-metadata/{backup_id}.json")
# Verify data hash
calculated_hash = hashlib.sha256(backup_data).hexdigest()
verification_result['checks']['data_hash'] = {
'expected': metadata['data_hash'],
'calculated': calculated_hash,
'valid': calculated_hash == metadata['data_hash']
}
# Verify Merkle proof
proof_valid = self.verify_merkle_proof(
backup_data,
metadata['integrity_proof']
)
verification_result['checks']['merkle_proof'] = {
'valid': proof_valid
}
# Verify immutability status
immutability_status = await self.verify_immutability_status(backup_id)
verification_result['checks']['immutability'] = immutability_status
# Verify log chain integrity
log_integrity = await self.verify_log_chain(backup_id)
verification_result['checks']['log_integrity'] = log_integrity
# Overall result
verification_result['valid'] = all(
check.get('valid', False)
for check in verification_result['checks'].values()
)
except Exception as e:
verification_result['valid'] = False
verification_result['error'] = str(e)
# Log verification result
await self.audit_logger.log_verification(verification_result)
return verification_result
async def initiate_air_gap_backup(self, backup_id):
"""Create air-gapped copy for ransomware protection"""
# Verify backup integrity first
verification = await self.verify_backup_integrity(backup_id)
if not verification['valid']:
raise ValueError(f"Cannot air-gap invalid backup: {backup_id}")
# Create air gap job
job = {
'job_id': self.generate_job_id(),
'backup_id': backup_id,
'created_at': datetime.utcnow().isoformat(),
'status': 'pending',
'window': self.get_next_airgap_window()
}
# Queue for air gap transfer during next window
await self.queue_airgap_job(job)
# Notification
await self.notify_airgap_operator(job)
return job