Technical Requirements for GDPR Compliance
Technical Requirements for GDPR Compliance
GDPR mandates specific technical measures that directly impact data storage design. Privacy by Design requires considering privacy implications from the earliest architectural decisions. Data minimization means storing only necessary data for specified purposes. Purpose limitation prevents using data beyond its original collection purpose. These principles fundamentally challenge traditional "collect everything" approaches to data storage.
The right to erasure (Right to be Forgotten) requires systems capable of completely removing individual's data upon request. This requirement extends beyond simple database deletions to encompass backups, logs, derived data, and analytical systems. Implementing true erasure while maintaining system integrity and regulatory requirements for data retention creates complex technical challenges.
# Example: GDPR-compliant data storage system with erasure capabilities
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from enum import Enum
class LawfulBasis(Enum):
CONSENT = "consent"
CONTRACT = "contract"
LEGAL_OBLIGATION = "legal_obligation"
VITAL_INTERESTS = "vital_interests"
PUBLIC_TASK = "public_task"
LEGITIMATE_INTERESTS = "legitimate_interests"
class DataCategory(Enum):
PERSONAL = "personal"
SENSITIVE = "sensitive"
CHILDREN = "children"
CRIMINAL = "criminal"
@dataclass
class DataRecord:
record_id: str
data_subject_id: str
category: DataCategory
purpose: str
lawful_basis: LawfulBasis
collected_at: datetime
retention_until: datetime
data: Dict
derived_from: Optional[List[str]] = None
class GDPRCompliantStorage:
def __init__(self, config):
self.config = config
self.storage_backend = self._initialize_storage()
self.audit_logger = AuditLogger()
self.consent_manager = ConsentManager()
self.retention_scheduler = RetentionScheduler()
async def store_personal_data(
self,
data_subject_id: str,
data: Dict,
category: DataCategory,
purpose: str,
lawful_basis: LawfulBasis,
retention_period_days: Optional[int] = None
) -> str:
"""Store personal data with GDPR compliance checks"""
# Verify lawful basis
if lawful_basis == LawfulBasis.CONSENT:
consent = await self.consent_manager.verify_consent(
data_subject_id, purpose
)
if not consent or not consent.is_valid:
raise ValueError("Valid consent required for data storage")
# Data minimization check
minimized_data = self._minimize_data(data, purpose)
# Determine retention period
if retention_period_days is None:
retention_period_days = self._get_default_retention(purpose, category)
retention_until = datetime.utcnow() + timedelta(days=retention_period_days)
# Create record with metadata
record = DataRecord(
record_id=self._generate_record_id(),
data_subject_id=data_subject_id,
category=category,
purpose=purpose,
lawful_basis=lawful_basis,
collected_at=datetime.utcnow(),
retention_until=retention_until,
data=minimized_data
)
# Encrypt sensitive data
if category in [DataCategory.SENSITIVE, DataCategory.CHILDREN]:
record.data = await self._encrypt_sensitive_data(record.data)
# Store with transaction
async with self.storage_backend.transaction() as txn:
# Store main record
await txn.store(f"records:{record.record_id}", record)
# Update indices for data subject access
await txn.add_to_set(
f"subject_records:{data_subject_id}",
record.record_id
)
# Index by purpose for auditing
await txn.add_to_set(
f"purpose_records:{purpose}",
record.record_id
)
# Schedule retention
await self.retention_scheduler.schedule_deletion(
record.record_id, retention_until
)
# Audit log
await self.audit_logger.log_data_collection(
record, self._get_request_context()
)
await txn.commit()
return record.record_id
async def handle_erasure_request(
self,
data_subject_id: str,
verification_token: str
) -> Dict:
"""Implement Right to be Forgotten"""
# Verify request authenticity
if not await self._verify_erasure_request(data_subject_id, verification_token):
raise ValueError("Invalid erasure request")
erasure_report = {
"data_subject_id": data_subject_id,
"requested_at": datetime.utcnow().isoformat(),
"records_erased": [],
"records_retained": [],
"derived_data_erased": []
}
# Get all records for data subject
record_ids = await self.storage_backend.get_set(
f"subject_records:{data_subject_id}"
)
for record_id in record_ids:
record = await self.storage_backend.get(f"records:{record_id}")
# Check if erasure is permitted
if self._can_erase_record(record):
# Erase record and derived data
await self._cascade_erasure(record, erasure_report)
erasure_report["records_erased"].append({
"record_id": record_id,
"category": record.category.value,
"purpose": record.purpose
})
else:
# Document why record cannot be erased
reason = self._get_retention_reason(record)
erasure_report["records_retained"].append({
"record_id": record_id,
"category": record.category.value,
"purpose": record.purpose,
"retention_reason": reason,
"retention_until": record.retention_until.isoformat()
})
# Erase from backups (schedule async job)
await self._schedule_backup_erasure(data_subject_id, erasure_report)
# Notify data subject
await self._send_erasure_confirmation(data_subject_id, erasure_report)
# Audit log
await self.audit_logger.log_erasure(erasure_report)
return erasure_report
async def _cascade_erasure(self, record: DataRecord, report: Dict):
"""Erase record and all derived data"""
# Erase main record
await self.storage_backend.delete(f"records:{record.record_id}")
# Remove from indices
await self.storage_backend.remove_from_set(
f"subject_records:{record.data_subject_id}",
record.record_id
)
await self.storage_backend.remove_from_set(
f"purpose_records:{record.purpose}",
record.record_id
)
# Find and erase derived data
derived_records = await self._find_derived_records(record.record_id)
for derived_id in derived_records:
derived_record = await self.storage_backend.get(f"records:{derived_id}")
if derived_record:
await self._cascade_erasure(derived_record, report)
report["derived_data_erased"].append(derived_id)
# Erase from analytics systems
await self._erase_from_analytics(record)
# Erase from search indices
await self._erase_from_search(record)
def _can_erase_record(self, record: DataRecord) -> bool:
"""Determine if record can be legally erased"""
# Cannot erase if required for legal obligations
if record.lawful_basis == LawfulBasis.LEGAL_OBLIGATION:
return False
# Cannot erase if within mandatory retention period
if hasattr(self.config, 'mandatory_retention'):
mandatory_period = self.config.mandatory_retention.get(
record.purpose, {}
).get(record.category.value)
if mandatory_period:
mandatory_until = record.collected_at + timedelta(
days=mandatory_period
)
if datetime.utcnow() < mandatory_until:
return False
# Check for ongoing legal proceedings
if self._has_legal_hold(record):
return False
return True
async def handle_access_request(
self,
data_subject_id: str,
verification_token: str
) -> Dict:
"""Implement Right of Access"""
# Verify request
if not await self._verify_access_request(data_subject_id, verification_token):
raise ValueError("Invalid access request")
# Gather all data
access_report = {
"data_subject_id": data_subject_id,
"generated_at": datetime.utcnow().isoformat(),
"personal_data": [],
"processing_purposes": set(),
"data_categories": set(),
"recipients": [],
"retention_periods": {},
"data_sources": set()
}
# Get all records
record_ids = await self.storage_backend.get_set(
f"subject_records:{data_subject_id}"
)
for record_id in record_ids:
record = await self.storage_backend.get(f"records:{record_id}")
# Decrypt if necessary
decrypted_data = record.data
if record.category in [DataCategory.SENSITIVE, DataCategory.CHILDREN]:
decrypted_data = await self._decrypt_sensitive_data(record.data)
access_report["personal_data"].append({
"category": record.category.value,
"purpose": record.purpose,
"collected_at": record.collected_at.isoformat(),
"lawful_basis": record.lawful_basis.value,
"data": decrypted_data
})
access_report["processing_purposes"].add(record.purpose)
access_report["data_categories"].add(record.category.value)
access_report["retention_periods"][record.purpose] = (
record.retention_until.isoformat()
)
# Add processing information
access_report["recipients"] = await self._get_data_recipients(data_subject_id)
access_report["data_sources"] = list(access_report["data_sources"])
access_report["processing_purposes"] = list(access_report["processing_purposes"])
access_report["data_categories"] = list(access_report["data_categories"])
# Generate machine-readable format
formatted_report = self._format_access_report(access_report)
# Audit log
await self.audit_logger.log_access_request(data_subject_id)
return formatted_report
async def handle_portability_request(
self,
data_subject_id: str,
verification_token: str,
format: str = "json"
) -> bytes:
"""Implement Right to Data Portability"""
# Get access report first
access_report = await self.handle_access_request(
data_subject_id, verification_token
)
# Filter for portable data (provided by data subject or observed)
portable_data = self._filter_portable_data(access_report)
# Format according to request
if format == "json":
return json.dumps(portable_data, indent=2).encode('utf-8')
elif format == "csv":
return self._convert_to_csv(portable_data)
elif format == "xml":
return self._convert_to_xml(portable_data)
else:
raise ValueError(f"Unsupported format: {format}")
def _minimize_data(self, data: Dict, purpose: str) -> Dict:
"""Implement data minimization principle"""
# Define minimum required fields per purpose
required_fields = {
"authentication": ["email", "password_hash"],
"shipping": ["name", "address", "phone"],
"marketing": ["email", "preferences"],
"analytics": ["session_id", "timestamp", "action"]
}
fields = required_fields.get(purpose, [])
minimized = {k: v for k, v in data.items() if k in fields}
return minimized
async def update_consent(
self,
data_subject_id: str,
purpose: str,
granted: bool
):
"""Handle consent updates and propagate to stored data"""
if not granted:
# Withdrawn consent - check data deletion requirements
records = await self._get_records_by_purpose(data_subject_id, purpose)
for record in records:
if record.lawful_basis == LawfulBasis.CONSENT:
# Must delete data based on consent withdrawal
await self._cascade_erasure(record, {})