Implementing Privacy by Design in APIs
Implementing Privacy by Design in APIs
Privacy by Design principles fundamentally shape how compliant APIs handle personal data. Proactive prevention of privacy invasions requires APIs to implement strong authentication, encryption, and access controls by default. Privacy must be the default setting, with users explicitly opting into data sharing rather than opting out. This approach influences API design from authentication flows to data retention policies.
# Python implementation of Privacy by Design principles
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
import hashlib
import json
from cryptography.fernet import Fernet
from dataclasses import dataclass, field
@dataclass
class PrivacyPolicy:
purpose: str
legal_basis: str
retention_period: timedelta
third_party_sharing: bool = False
requires_explicit_consent: bool = True
class PrivacyCompliantAPI:
def __init__(self, encryption_key: bytes):
self.cipher_suite = Fernet(encryption_key)
self.policies = {}
self.consent_store = {}
self.data_minimization_rules = {}
def register_data_policy(self, data_type: str, policy: PrivacyPolicy):
"""Register privacy policy for specific data types"""
self.policies[data_type] = policy
def process_api_request(self, request_data: Dict, user_id: str) -> Dict:
"""Process API request with privacy controls"""
# Check purpose limitation
purpose = request_data.get('purpose')
if not self._validate_purpose(purpose):
raise PrivacyError("Invalid purpose for data processing")
# Check consent
required_data_types = self._identify_data_types(request_data)
if not self._check_consent(user_id, required_data_types, purpose):
return {
'error': 'consent_required',
'required_consents': self._get_required_consents(required_data_types),
'consent_url': f'/api/privacy/consent/{user_id}'
}
# Apply data minimization
minimized_data = self._minimize_data(request_data, purpose)
# Process with audit trail
result = self._process_with_audit(minimized_data, user_id, purpose)
# Apply retention policies
self._schedule_data_deletion(result['data_id'], purpose)
return result
def handle_data_request(self, user_id: str, request_type: str) -> Dict:
"""Handle GDPR data subject requests"""
if request_type == 'access':
return self._handle_access_request(user_id)
elif request_type == 'portability':
return self._handle_portability_request(user_id)
elif request_type == 'erasure':
return self._handle_erasure_request(user_id)
elif request_type == 'rectification':
return self._handle_rectification_request(user_id)
else:
raise ValueError(f"Unknown request type: {request_type}")
def _handle_access_request(self, user_id: str) -> Dict:
"""GDPR Article 15 - Right of access"""
user_data = {}
# Collect all data about the user
for data_source in self.data_sources:
data = data_source.get_user_data(user_id)
if data:
# Decrypt sensitive data
decrypted_data = self._decrypt_user_data(data)
user_data[data_source.name] = decrypted_data
# Include processing information
processing_info = {
'purposes': self._get_processing_purposes(user_id),
'categories': self._get_data_categories(user_id),
'recipients': self._get_data_recipients(user_id),
'retention_periods': self._get_retention_periods(user_id),
'rights': [
'access', 'rectification', 'erasure', 'restriction',
'portability', 'object', 'withdraw_consent'
]
}
return {
'user_data': user_data,
'processing_info': processing_info,
'generated_at': datetime.utcnow().isoformat()
}
def _handle_portability_request(self, user_id: str) -> Dict:
"""GDPR Article 20 - Right to data portability"""
portable_data = {}
# Only include data provided by the user or generated through their use
for data_type, data in self._get_portable_data(user_id).items():
portable_data[data_type] = {
'data': data,
'format': 'json',
'schema_version': '1.0'
}
# Create machine-readable format
export_package = {
'export_version': '2.0',
'export_date': datetime.utcnow().isoformat(),
'user_id': self._pseudonymize_id(user_id),
'data': portable_data,
'metadata': {
'encoding': 'utf-8',
'compression': 'none',
'checksum': self._calculate_checksum(portable_data)
}
}
return export_package
def _minimize_data(self, data: Dict, purpose: str) -> Dict:
"""Apply data minimization principle"""
minimization_rules = self.data_minimization_rules.get(purpose, {})
minimized = {}
for field, value in data.items():
if field in minimization_rules['required_fields']:
minimized[field] = value
elif field in minimization_rules.get('optional_fields', []):
# Include only if explicitly requested
if data.get('_include_optional') and field in data['_include_optional']:
minimized[field] = value
return minimized
def implement_consent_management(self):
"""Implement granular consent management"""
@dataclass
class ConsentRecord:
user_id: str
data_type: str
purpose: str
granted: bool
timestamp: datetime
expires_at: Optional[datetime] = None
withdrawal_timestamp: Optional[datetime] = None
version: str = "1.0"
class ConsentManager:
def grant_consent(self, user_id: str, consent_request: Dict) -> str:
# Validate consent request
if not self._validate_consent_request(consent_request):
raise ValueError("Invalid consent request")
consent_id = self._generate_consent_id()
# Store consent with cryptographic proof
consent_record = ConsentRecord(
user_id=user_id,
data_type=consent_request['data_type'],
purpose=consent_request['purpose'],
granted=True,
timestamp=datetime.utcnow(),
expires_at=self._calculate_expiry(consent_request)
)
# Create tamper-proof record
consent_hash = self._hash_consent_record(consent_record)
self.consent_store[consent_id] = {
'record': consent_record,
'hash': consent_hash,
'signature': self._sign_consent(consent_hash)
}
return consent_id
def withdraw_consent(self, user_id: str, consent_id: str) -> bool:
consent_data = self.consent_store.get(consent_id)
if not consent_data or consent_data['record'].user_id != user_id:
return False
# Mark as withdrawn, don't delete (audit trail)
consent_data['record'].withdrawal_timestamp = datetime.utcnow()
consent_data['record'].granted = False
# Trigger data deletion workflows
self._trigger_consent_withdrawal_actions(consent_data['record'])
return True
# Middleware for privacy compliance
class PrivacyComplianceMiddleware:
def __init__(self, privacy_api: PrivacyCompliantAPI):
self.privacy_api = privacy_api
async def __call__(self, request, call_next):
# Check if request involves personal data
if self._involves_personal_data(request):
# Verify lawful basis
lawful_basis = self._determine_lawful_basis(request)
if not lawful_basis:
return JSONResponse(
status_code=403,
content={'error': 'No lawful basis for processing'}
)
# Log for accountability
await self._log_processing_activity(request, lawful_basis)
# Add privacy headers
response = await call_next(request)
response.headers['X-Privacy-Policy'] = '/api/privacy/policy'
response.headers['X-Data-Controller'] = 'Example Corp'
return response