Technical Requirements for GDPR Compliance

Technical Requirements for GDPR Compliance

GDPR mandates specific technical measures that directly impact data storage design. Privacy by Design requires considering privacy implications from the earliest architectural decisions. Data minimization means storing only necessary data for specified purposes. Purpose limitation prevents using data beyond its original collection purpose. These principles fundamentally challenge traditional "collect everything" approaches to data storage.

The right to erasure (Right to be Forgotten) requires systems capable of completely removing individual's data upon request. This requirement extends beyond simple database deletions to encompass backups, logs, derived data, and analytical systems. Implementing true erasure while maintaining system integrity and regulatory requirements for data retention creates complex technical challenges.

# Example: GDPR-compliant data storage system with erasure capabilities
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from enum import Enum

class LawfulBasis(Enum):
    CONSENT = "consent"
    CONTRACT = "contract"
    LEGAL_OBLIGATION = "legal_obligation"
    VITAL_INTERESTS = "vital_interests"
    PUBLIC_TASK = "public_task"
    LEGITIMATE_INTERESTS = "legitimate_interests"

class DataCategory(Enum):
    PERSONAL = "personal"
    SENSITIVE = "sensitive"
    CHILDREN = "children"
    CRIMINAL = "criminal"

@dataclass
class DataRecord:
    record_id: str
    data_subject_id: str
    category: DataCategory
    purpose: str
    lawful_basis: LawfulBasis
    collected_at: datetime
    retention_until: datetime
    data: Dict
    derived_from: Optional[List[str]] = None

class GDPRCompliantStorage:
    def __init__(self, config):
        self.config = config
        self.storage_backend = self._initialize_storage()
        self.audit_logger = AuditLogger()
        self.consent_manager = ConsentManager()
        self.retention_scheduler = RetentionScheduler()
        
    async def store_personal_data(
        self,
        data_subject_id: str,
        data: Dict,
        category: DataCategory,
        purpose: str,
        lawful_basis: LawfulBasis,
        retention_period_days: Optional[int] = None
    ) -> str:
        """Store personal data with GDPR compliance checks"""
        
        # Verify lawful basis
        if lawful_basis == LawfulBasis.CONSENT:
            consent = await self.consent_manager.verify_consent(
                data_subject_id, purpose
            )
            if not consent or not consent.is_valid:
                raise ValueError("Valid consent required for data storage")
        
        # Data minimization check
        minimized_data = self._minimize_data(data, purpose)
        
        # Determine retention period
        if retention_period_days is None:
            retention_period_days = self._get_default_retention(purpose, category)
        
        retention_until = datetime.utcnow() + timedelta(days=retention_period_days)
        
        # Create record with metadata
        record = DataRecord(
            record_id=self._generate_record_id(),
            data_subject_id=data_subject_id,
            category=category,
            purpose=purpose,
            lawful_basis=lawful_basis,
            collected_at=datetime.utcnow(),
            retention_until=retention_until,
            data=minimized_data
        )
        
        # Encrypt sensitive data
        if category in [DataCategory.SENSITIVE, DataCategory.CHILDREN]:
            record.data = await self._encrypt_sensitive_data(record.data)
        
        # Store with transaction
        async with self.storage_backend.transaction() as txn:
            # Store main record
            await txn.store(f"records:{record.record_id}", record)
            
            # Update indices for data subject access
            await txn.add_to_set(
                f"subject_records:{data_subject_id}",
                record.record_id
            )
            
            # Index by purpose for auditing
            await txn.add_to_set(
                f"purpose_records:{purpose}",
                record.record_id
            )
            
            # Schedule retention
            await self.retention_scheduler.schedule_deletion(
                record.record_id, retention_until
            )
            
            # Audit log
            await self.audit_logger.log_data_collection(
                record, self._get_request_context()
            )
            
            await txn.commit()
        
        return record.record_id
    
    async def handle_erasure_request(
        self, 
        data_subject_id: str, 
        verification_token: str
    ) -> Dict:
        """Implement Right to be Forgotten"""
        
        # Verify request authenticity
        if not await self._verify_erasure_request(data_subject_id, verification_token):
            raise ValueError("Invalid erasure request")
        
        erasure_report = {
            "data_subject_id": data_subject_id,
            "requested_at": datetime.utcnow().isoformat(),
            "records_erased": [],
            "records_retained": [],
            "derived_data_erased": []
        }
        
        # Get all records for data subject
        record_ids = await self.storage_backend.get_set(
            f"subject_records:{data_subject_id}"
        )
        
        for record_id in record_ids:
            record = await self.storage_backend.get(f"records:{record_id}")
            
            # Check if erasure is permitted
            if self._can_erase_record(record):
                # Erase record and derived data
                await self._cascade_erasure(record, erasure_report)
                erasure_report["records_erased"].append({
                    "record_id": record_id,
                    "category": record.category.value,
                    "purpose": record.purpose
                })
            else:
                # Document why record cannot be erased
                reason = self._get_retention_reason(record)
                erasure_report["records_retained"].append({
                    "record_id": record_id,
                    "category": record.category.value,
                    "purpose": record.purpose,
                    "retention_reason": reason,
                    "retention_until": record.retention_until.isoformat()
                })
        
        # Erase from backups (schedule async job)
        await self._schedule_backup_erasure(data_subject_id, erasure_report)
        
        # Notify data subject
        await self._send_erasure_confirmation(data_subject_id, erasure_report)
        
        # Audit log
        await self.audit_logger.log_erasure(erasure_report)
        
        return erasure_report
    
    async def _cascade_erasure(self, record: DataRecord, report: Dict):
        """Erase record and all derived data"""
        
        # Erase main record
        await self.storage_backend.delete(f"records:{record.record_id}")
        
        # Remove from indices
        await self.storage_backend.remove_from_set(
            f"subject_records:{record.data_subject_id}",
            record.record_id
        )
        await self.storage_backend.remove_from_set(
            f"purpose_records:{record.purpose}",
            record.record_id
        )
        
        # Find and erase derived data
        derived_records = await self._find_derived_records(record.record_id)
        for derived_id in derived_records:
            derived_record = await self.storage_backend.get(f"records:{derived_id}")
            if derived_record:
                await self._cascade_erasure(derived_record, report)
                report["derived_data_erased"].append(derived_id)
        
        # Erase from analytics systems
        await self._erase_from_analytics(record)
        
        # Erase from search indices
        await self._erase_from_search(record)
    
    def _can_erase_record(self, record: DataRecord) -> bool:
        """Determine if record can be legally erased"""
        
        # Cannot erase if required for legal obligations
        if record.lawful_basis == LawfulBasis.LEGAL_OBLIGATION:
            return False
        
        # Cannot erase if within mandatory retention period
        if hasattr(self.config, 'mandatory_retention'):
            mandatory_period = self.config.mandatory_retention.get(
                record.purpose, {}
            ).get(record.category.value)
            
            if mandatory_period:
                mandatory_until = record.collected_at + timedelta(
                    days=mandatory_period
                )
                if datetime.utcnow() < mandatory_until:
                    return False
        
        # Check for ongoing legal proceedings
        if self._has_legal_hold(record):
            return False
        
        return True
    
    async def handle_access_request(
        self,
        data_subject_id: str,
        verification_token: str
    ) -> Dict:
        """Implement Right of Access"""
        
        # Verify request
        if not await self._verify_access_request(data_subject_id, verification_token):
            raise ValueError("Invalid access request")
        
        # Gather all data
        access_report = {
            "data_subject_id": data_subject_id,
            "generated_at": datetime.utcnow().isoformat(),
            "personal_data": [],
            "processing_purposes": set(),
            "data_categories": set(),
            "recipients": [],
            "retention_periods": {},
            "data_sources": set()
        }
        
        # Get all records
        record_ids = await self.storage_backend.get_set(
            f"subject_records:{data_subject_id}"
        )
        
        for record_id in record_ids:
            record = await self.storage_backend.get(f"records:{record_id}")
            
            # Decrypt if necessary
            decrypted_data = record.data
            if record.category in [DataCategory.SENSITIVE, DataCategory.CHILDREN]:
                decrypted_data = await self._decrypt_sensitive_data(record.data)
            
            access_report["personal_data"].append({
                "category": record.category.value,
                "purpose": record.purpose,
                "collected_at": record.collected_at.isoformat(),
                "lawful_basis": record.lawful_basis.value,
                "data": decrypted_data
            })
            
            access_report["processing_purposes"].add(record.purpose)
            access_report["data_categories"].add(record.category.value)
            access_report["retention_periods"][record.purpose] = (
                record.retention_until.isoformat()
            )
        
        # Add processing information
        access_report["recipients"] = await self._get_data_recipients(data_subject_id)
        access_report["data_sources"] = list(access_report["data_sources"])
        access_report["processing_purposes"] = list(access_report["processing_purposes"])
        access_report["data_categories"] = list(access_report["data_categories"])
        
        # Generate machine-readable format
        formatted_report = self._format_access_report(access_report)
        
        # Audit log
        await self.audit_logger.log_access_request(data_subject_id)
        
        return formatted_report
    
    async def handle_portability_request(
        self,
        data_subject_id: str,
        verification_token: str,
        format: str = "json"
    ) -> bytes:
        """Implement Right to Data Portability"""
        
        # Get access report first
        access_report = await self.handle_access_request(
            data_subject_id, verification_token
        )
        
        # Filter for portable data (provided by data subject or observed)
        portable_data = self._filter_portable_data(access_report)
        
        # Format according to request
        if format == "json":
            return json.dumps(portable_data, indent=2).encode('utf-8')
        elif format == "csv":
            return self._convert_to_csv(portable_data)
        elif format == "xml":
            return self._convert_to_xml(portable_data)
        else:
            raise ValueError(f"Unsupported format: {format}")
    
    def _minimize_data(self, data: Dict, purpose: str) -> Dict:
        """Implement data minimization principle"""
        
        # Define minimum required fields per purpose
        required_fields = {
            "authentication": ["email", "password_hash"],
            "shipping": ["name", "address", "phone"],
            "marketing": ["email", "preferences"],
            "analytics": ["session_id", "timestamp", "action"]
        }
        
        fields = required_fields.get(purpose, [])
        minimized = {k: v for k, v in data.items() if k in fields}
        
        return minimized
    
    async def update_consent(
        self,
        data_subject_id: str,
        purpose: str,
        granted: bool
    ):
        """Handle consent updates and propagate to stored data"""
        
        if not granted:
            # Withdrawn consent - check data deletion requirements
            records = await self._get_records_by_purpose(data_subject_id, purpose)
            
            for record in records:
                if record.lawful_basis == LawfulBasis.CONSENT:
                    # Must delete data based on consent withdrawal
                    await self._cascade_erasure(record, {})