Multi-Cloud Security Architecture
Multi-Cloud Security Architecture
Organizations increasingly adopt multi-cloud strategies to avoid vendor lock-in, leverage best-of-breed services, and meet regulatory requirements. However, managing security across multiple cloud providers multiplies complexity. Each provider offers different security features, APIs, and configuration options. Inconsistent security implementations across clouds create vulnerabilities and complicate incident response.
Unified security architecture across multiple clouds requires abstraction layers that standardize security controls while accommodating provider-specific features. Cloud Security Posture Management (CSPM) tools provide centralized visibility and policy enforcement across multiple cloud environments. These tools continuously monitor configurations, detect drift from security baselines, and automatically remediate common misconfigurations.
# Example: Multi-cloud storage security abstraction layer
from abc import ABC, abstractmethod
import boto3
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs
import hashlib
import json
from cryptography.fernet import Fernet
class CloudStorageSecurityAdapter(ABC):
"""Abstract base class for cloud storage security operations"""
@abstractmethod
def upload_encrypted(self, data, key_id, metadata):
pass
@abstractmethod
def download_decrypted(self, object_id, key_id):
pass
@abstractmethod
def set_access_policy(self, object_id, policy):
pass
@abstractmethod
def audit_access(self, object_id, time_range):
pass
@abstractmethod
def enable_versioning(self, bucket_name):
pass
class AWSSecureStorage(CloudStorageSecurityAdapter):
def __init__(self, config):
self.s3_client = boto3.client('s3',
region_name=config['region'],
aws_access_key_id=config['access_key'],
aws_secret_access_key=config['secret_key']
)
self.kms_client = boto3.client('kms', region_name=config['region'])
self.bucket_name = config['bucket_name']
def upload_encrypted(self, data, key_id, metadata):
"""Upload data with client-side encryption and S3 SSE-KMS"""
# Client-side encryption first
data_key = self.kms_client.generate_data_key(
KeyId=key_id,
KeySpec='AES_256'
)
# Encrypt data
cipher = Fernet(data_key['Plaintext'][:32])
encrypted_data = cipher.encrypt(data)
# Create object key with content hash for integrity
content_hash = hashlib.sha256(data).hexdigest()
object_key = f"{metadata['category']}/{content_hash}"
# Upload with server-side encryption
response = self.s3_client.put_object(
Bucket=self.bucket_name,
Key=object_key,
Body=encrypted_data,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=key_id,
Metadata={
**metadata,
'content_hash': content_hash,
'encryption_version': '1.0',
'client_encrypted': 'true'
},
StorageClass='INTELLIGENT_TIERING',
# Enable object lock for compliance
ObjectLockMode='COMPLIANCE',
ObjectLockRetainUntilDate=self._calculate_retention_date(metadata)
)
# Set bucket policy for access control
self._update_bucket_policy(object_key, metadata['data_classification'])
return {
'object_id': object_key,
'version_id': response.get('VersionId'),
'etag': response['ETag'],
'encryption_key_id': key_id
}
def _update_bucket_policy(self, object_key, classification):
"""Update bucket policy based on data classification"""
if classification == 'highly_sensitive':
policy = {
"Version": "2012-10-17",
"Statement": [{
"Sid": "DenyUnencryptedAccess",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": f"arn:aws:s3:::{self.bucket_name}/{object_key}",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
}
}]
}
self.s3_client.put_bucket_policy(
Bucket=self.bucket_name,
Policy=json.dumps(policy)
)
def enable_security_features(self):
"""Enable comprehensive S3 bucket security features"""
# Enable versioning
self.s3_client.put_bucket_versioning(
Bucket=self.bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# Enable default encryption
self.s3_client.put_bucket_encryption(
Bucket=self.bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': self.default_kms_key
},
'BucketKeyEnabled': True
}]
}
)
# Block public access
self.s3_client.put_public_access_block(
Bucket=self.bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
# Enable access logging
self.s3_client.put_bucket_logging(
Bucket=self.bucket_name,
BucketLoggingStatus={
'LoggingEnabled': {
'TargetBucket': f'{self.bucket_name}-logs',
'TargetPrefix': 'access-logs/'
}
}
)
# Enable CloudTrail for API logging
self._enable_cloudtrail_logging()
class AzureSecureStorage(CloudStorageSecurityAdapter):
def __init__(self, config):
self.blob_service = BlobServiceClient(
account_url=f"https://{config['account_name']}.blob.core.windows.net",
credential=config['credential']
)
self.container_name = config['container_name']
self.key_vault_url = config['key_vault_url']
def upload_encrypted(self, data, key_id, metadata):
"""Upload data with Azure Storage encryption"""
# Create blob client with customer-managed key
blob_name = self._generate_blob_name(data, metadata)
blob_client = self.blob_service.get_blob_client(
container=self.container_name,
blob=blob_name
)
# Set encryption scope for customer-managed keys
encryption_scope = f"encryption-scope-{metadata['data_classification']}"
# Upload with encryption and metadata
response = blob_client.upload_blob(
data=data,
overwrite=False,
encryption_scope=encryption_scope,
metadata={
**metadata,
'content_hash': hashlib.sha256(data).hexdigest(),
'key_vault_key': key_id
},
standard_blob_tier='Hot' if metadata.get('frequently_accessed') else 'Cool'
)
# Set immutability policy for compliance
if metadata.get('regulatory_hold'):
blob_client.set_immutability_policy(
immutability_policy=ImmutabilityPolicy(
expiry_time=datetime.utcnow() + timedelta(days=2555),
policy_mode="Locked"
)
)
return {
'blob_name': blob_name,
'etag': response['etag'],
'version_id': response.get('version_id')
}
def configure_advanced_threat_protection(self):
"""Enable Azure Defender and threat protection"""
# This would typically use Azure Management APIs
# Simplified example of security configurations
# Enable soft delete for blob recovery
self.blob_service.set_service_properties(
delete_retention_policy=DeleteRetentionPolicy(
enabled=True,
days=30
)
)
# Configure network restrictions
self._configure_network_rules()
# Enable diagnostic logging
self._enable_diagnostic_logging()
class MultiCloudSecurityOrchestrator:
"""Orchestrate security operations across multiple cloud providers"""
def __init__(self):
self.providers = {
'aws': AWSSecureStorage(self._load_aws_config()),
'azure': AzureSecureStorage(self._load_azure_config()),
'gcp': GCPSecureStorage(self._load_gcp_config())
}
self.policy_engine = CloudPolicyEngine()
def store_user_data(self, data, classification, compliance_requirements):
"""Store data with appropriate provider based on requirements"""
# Select provider based on compliance requirements
provider = self._select_provider(compliance_requirements)
# Apply consistent encryption key management
key_id = self._get_or_create_encryption_key(classification)
# Prepare metadata with standardized schema
metadata = {
'classification': classification,
'compliance': compliance_requirements,
'timestamp': datetime.utcnow().isoformat(),
'retention_period': self._calculate_retention(classification),
'access_tier': self._determine_access_tier(classification)
}
# Store with provider-specific implementation
result = self.providers[provider].upload_encrypted(data, key_id, metadata)
# Record in centralized inventory
self._update_data_inventory(result, provider, metadata)
# Apply cross-cloud policies
self.policy_engine.apply_policies(result['object_id'], classification)
return result
def _select_provider(self, requirements):
"""Select cloud provider based on compliance and performance needs"""
if 'data_residency' in requirements:
# Select based on geographic requirements
region = requirements['data_residency']
return self._get_provider_for_region(region)
if 'hipaa' in requirements:
# Prefer providers with specific compliance certifications
return 'aws' # Example: AWS has comprehensive HIPAA compliance
# Default to multi-cloud distribution for resilience
return self._get_least_utilized_provider()