Multi-Cloud Security Architecture

Multi-Cloud Security Architecture

Organizations increasingly adopt multi-cloud strategies to avoid vendor lock-in, leverage best-of-breed services, and meet regulatory requirements. However, managing security across multiple cloud providers multiplies complexity. Each provider offers different security features, APIs, and configuration options. Inconsistent security implementations across clouds create vulnerabilities and complicate incident response.

Unified security architecture across multiple clouds requires abstraction layers that standardize security controls while accommodating provider-specific features. Cloud Security Posture Management (CSPM) tools provide centralized visibility and policy enforcement across multiple cloud environments. These tools continuously monitor configurations, detect drift from security baselines, and automatically remediate common misconfigurations.

# Example: Multi-cloud storage security abstraction layer
from abc import ABC, abstractmethod
import boto3
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs
import hashlib
import json
from cryptography.fernet import Fernet

class CloudStorageSecurityAdapter(ABC):
    """Abstract base class for cloud storage security operations"""
    
    @abstractmethod
    def upload_encrypted(self, data, key_id, metadata):
        pass
    
    @abstractmethod
    def download_decrypted(self, object_id, key_id):
        pass
    
    @abstractmethod
    def set_access_policy(self, object_id, policy):
        pass
    
    @abstractmethod
    def audit_access(self, object_id, time_range):
        pass
    
    @abstractmethod
    def enable_versioning(self, bucket_name):
        pass

class AWSSecureStorage(CloudStorageSecurityAdapter):
    def __init__(self, config):
        self.s3_client = boto3.client('s3', 
            region_name=config['region'],
            aws_access_key_id=config['access_key'],
            aws_secret_access_key=config['secret_key']
        )
        self.kms_client = boto3.client('kms', region_name=config['region'])
        self.bucket_name = config['bucket_name']
        
    def upload_encrypted(self, data, key_id, metadata):
        """Upload data with client-side encryption and S3 SSE-KMS"""
        # Client-side encryption first
        data_key = self.kms_client.generate_data_key(
            KeyId=key_id,
            KeySpec='AES_256'
        )
        
        # Encrypt data
        cipher = Fernet(data_key['Plaintext'][:32])
        encrypted_data = cipher.encrypt(data)
        
        # Create object key with content hash for integrity
        content_hash = hashlib.sha256(data).hexdigest()
        object_key = f"{metadata['category']}/{content_hash}"
        
        # Upload with server-side encryption
        response = self.s3_client.put_object(
            Bucket=self.bucket_name,
            Key=object_key,
            Body=encrypted_data,
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=key_id,
            Metadata={
                **metadata,
                'content_hash': content_hash,
                'encryption_version': '1.0',
                'client_encrypted': 'true'
            },
            StorageClass='INTELLIGENT_TIERING',
            # Enable object lock for compliance
            ObjectLockMode='COMPLIANCE',
            ObjectLockRetainUntilDate=self._calculate_retention_date(metadata)
        )
        
        # Set bucket policy for access control
        self._update_bucket_policy(object_key, metadata['data_classification'])
        
        return {
            'object_id': object_key,
            'version_id': response.get('VersionId'),
            'etag': response['ETag'],
            'encryption_key_id': key_id
        }
    
    def _update_bucket_policy(self, object_key, classification):
        """Update bucket policy based on data classification"""
        if classification == 'highly_sensitive':
            policy = {
                "Version": "2012-10-17",
                "Statement": [{
                    "Sid": "DenyUnencryptedAccess",
                    "Effect": "Deny",
                    "Principal": "*",
                    "Action": "s3:GetObject",
                    "Resource": f"arn:aws:s3:::{self.bucket_name}/{object_key}",
                    "Condition": {
                        "StringNotEquals": {
                            "s3:x-amz-server-side-encryption": "aws:kms"
                        }
                    }
                }]
            }
            
            self.s3_client.put_bucket_policy(
                Bucket=self.bucket_name,
                Policy=json.dumps(policy)
            )
    
    def enable_security_features(self):
        """Enable comprehensive S3 bucket security features"""
        # Enable versioning
        self.s3_client.put_bucket_versioning(
            Bucket=self.bucket_name,
            VersioningConfiguration={'Status': 'Enabled'}
        )
        
        # Enable default encryption
        self.s3_client.put_bucket_encryption(
            Bucket=self.bucket_name,
            ServerSideEncryptionConfiguration={
                'Rules': [{
                    'ApplyServerSideEncryptionByDefault': {
                        'SSEAlgorithm': 'aws:kms',
                        'KMSMasterKeyID': self.default_kms_key
                    },
                    'BucketKeyEnabled': True
                }]
            }
        )
        
        # Block public access
        self.s3_client.put_public_access_block(
            Bucket=self.bucket_name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
            }
        )
        
        # Enable access logging
        self.s3_client.put_bucket_logging(
            Bucket=self.bucket_name,
            BucketLoggingStatus={
                'LoggingEnabled': {
                    'TargetBucket': f'{self.bucket_name}-logs',
                    'TargetPrefix': 'access-logs/'
                }
            }
        )
        
        # Enable CloudTrail for API logging
        self._enable_cloudtrail_logging()

class AzureSecureStorage(CloudStorageSecurityAdapter):
    def __init__(self, config):
        self.blob_service = BlobServiceClient(
            account_url=f"https://{config['account_name']}.blob.core.windows.net",
            credential=config['credential']
        )
        self.container_name = config['container_name']
        self.key_vault_url = config['key_vault_url']
        
    def upload_encrypted(self, data, key_id, metadata):
        """Upload data with Azure Storage encryption"""
        # Create blob client with customer-managed key
        blob_name = self._generate_blob_name(data, metadata)
        blob_client = self.blob_service.get_blob_client(
            container=self.container_name,
            blob=blob_name
        )
        
        # Set encryption scope for customer-managed keys
        encryption_scope = f"encryption-scope-{metadata['data_classification']}"
        
        # Upload with encryption and metadata
        response = blob_client.upload_blob(
            data=data,
            overwrite=False,
            encryption_scope=encryption_scope,
            metadata={
                **metadata,
                'content_hash': hashlib.sha256(data).hexdigest(),
                'key_vault_key': key_id
            },
            standard_blob_tier='Hot' if metadata.get('frequently_accessed') else 'Cool'
        )
        
        # Set immutability policy for compliance
        if metadata.get('regulatory_hold'):
            blob_client.set_immutability_policy(
                immutability_policy=ImmutabilityPolicy(
                    expiry_time=datetime.utcnow() + timedelta(days=2555),
                    policy_mode="Locked"
                )
            )
        
        return {
            'blob_name': blob_name,
            'etag': response['etag'],
            'version_id': response.get('version_id')
        }
    
    def configure_advanced_threat_protection(self):
        """Enable Azure Defender and threat protection"""
        # This would typically use Azure Management APIs
        # Simplified example of security configurations
        
        # Enable soft delete for blob recovery
        self.blob_service.set_service_properties(
            delete_retention_policy=DeleteRetentionPolicy(
                enabled=True,
                days=30
            )
        )
        
        # Configure network restrictions
        self._configure_network_rules()
        
        # Enable diagnostic logging
        self._enable_diagnostic_logging()

class MultiCloudSecurityOrchestrator:
    """Orchestrate security operations across multiple cloud providers"""
    
    def __init__(self):
        self.providers = {
            'aws': AWSSecureStorage(self._load_aws_config()),
            'azure': AzureSecureStorage(self._load_azure_config()),
            'gcp': GCPSecureStorage(self._load_gcp_config())
        }
        self.policy_engine = CloudPolicyEngine()
        
    def store_user_data(self, data, classification, compliance_requirements):
        """Store data with appropriate provider based on requirements"""
        # Select provider based on compliance requirements
        provider = self._select_provider(compliance_requirements)
        
        # Apply consistent encryption key management
        key_id = self._get_or_create_encryption_key(classification)
        
        # Prepare metadata with standardized schema
        metadata = {
            'classification': classification,
            'compliance': compliance_requirements,
            'timestamp': datetime.utcnow().isoformat(),
            'retention_period': self._calculate_retention(classification),
            'access_tier': self._determine_access_tier(classification)
        }
        
        # Store with provider-specific implementation
        result = self.providers[provider].upload_encrypted(data, key_id, metadata)
        
        # Record in centralized inventory
        self._update_data_inventory(result, provider, metadata)
        
        # Apply cross-cloud policies
        self.policy_engine.apply_policies(result['object_id'], classification)
        
        return result
    
    def _select_provider(self, requirements):
        """Select cloud provider based on compliance and performance needs"""
        if 'data_residency' in requirements:
            # Select based on geographic requirements
            region = requirements['data_residency']
            return self._get_provider_for_region(region)
        
        if 'hipaa' in requirements:
            # Prefer providers with specific compliance certifications
            return 'aws'  # Example: AWS has comprehensive HIPAA compliance
        
        # Default to multi-cloud distribution for resilience
        return self._get_least_utilized_provider()