Encryption at Rest Implementation

Encryption at Rest Implementation

Encryption at rest protects data stored in databases, file systems, and backups from unauthorized access. Modern encryption standards use authenticated encryption modes that provide both confidentiality and integrity protection. Implementing encryption at rest requires careful key management, algorithm selection, and performance optimization to ensure security without significantly impacting application functionality.

# Python - Secure Data Storage and Encryption
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import json
import base64
from typing import Dict, Any, Optional, Tuple
import sqlite3
from contextlib import contextmanager
import hmac
import hashlib

class SecureDataStorage:
    def __init__(self, master_key: bytes = None):
        # Use environment variable or provided key
        if master_key is None:
            master_key = os.environ.get('MASTER_ENCRYPTION_KEY', '').encode()
            if not master_key:
                raise ValueError("Master encryption key not provided")
        
        # Derive encryption and MAC keys from master key
        self.encryption_key = self._derive_key(master_key, b'encryption', 32)
        self.mac_key = self._derive_key(master_key, b'mac', 32)
        
        # Initialize cipher
        self.cipher = AESGCM(self.encryption_key)
        
    def _derive_key(self, master_key: bytes, purpose: bytes, length: int) -> bytes:
        """Derive purpose-specific keys from master key"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=length,
            salt=purpose,
            iterations=100000,
            backend=default_backend()
        )
        return kdf.derive(master_key)
    
    def encrypt_data(self, data: Any, associated_data: bytes = None) -> Dict[str, str]:
        """Encrypt data with AEAD"""
        # Serialize data
        if isinstance(data, (dict, list)):
            plaintext = json.dumps(data).encode()
        elif isinstance(data, str):
            plaintext = data.encode()
        elif isinstance(data, bytes):
            plaintext = data
        else:
            plaintext = str(data).encode()
        
        # Generate nonce
        nonce = os.urandom(12)  # 96-bit nonce for AES-GCM
        
        # Encrypt with optional associated data
        ciphertext = self.cipher.encrypt(nonce, plaintext, associated_data)
        
        # Create storage format
        encrypted_data = {
            'nonce': base64.b64encode(nonce).decode(),
            'ciphertext': base64.b64encode(ciphertext).decode(),
            'algorithm': 'AES-256-GCM',
            'version': '1.0'
        }
        
        if associated_data:
            encrypted_data['aad'] = base64.b64encode(associated_data).decode()
        
        return encrypted_data
    
    def decrypt_data(self, encrypted_data: Dict[str, str], 
                     expected_aad: bytes = None) -> Any:
        """Decrypt AEAD encrypted data"""
        # Validate format
        required_fields = ['nonce', 'ciphertext', 'algorithm', 'version']
        if not all(field in encrypted_data for field in required_fields):
            raise ValueError("Invalid encrypted data format")
        
        # Check algorithm
        if encrypted_data['algorithm'] != 'AES-256-GCM':
            raise ValueError(f"Unsupported algorithm: {encrypted_data['algorithm']}")
        
        # Decode components
        nonce = base64.b64decode(encrypted_data['nonce'])
        ciphertext = base64.b64decode(encrypted_data['ciphertext'])
        
        # Handle associated data
        aad = None
        if 'aad' in encrypted_data:
            aad = base64.b64decode(encrypted_data['aad'])
            if expected_aad and not hmac.compare_digest(aad, expected_aad):
                raise ValueError("Associated data mismatch")
        
        # Decrypt
        try:
            plaintext = self.cipher.decrypt(nonce, ciphertext, aad)
            
            # Try to deserialize JSON
            try:
                return json.loads(plaintext.decode())
            except (json.JSONDecodeError, UnicodeDecodeError):
                # Return as string or bytes
                try:
                    return plaintext.decode()
                except UnicodeDecodeError:
                    return plaintext
                    
        except Exception as e:
            raise ValueError(f"Decryption failed: {str(e)}")
    
    def encrypt_field(self, value: Any, field_name: str) -> str:
        """Encrypt individual field with field-specific context"""
        # Use field name as associated data for context binding
        aad = f"field:{field_name}".encode()
        encrypted = self.encrypt_data(value, aad)
        
        # Return as single string for database storage
        return base64.b64encode(
            json.dumps(encrypted).encode()
        ).decode()
    
    def decrypt_field(self, encrypted_value: str, field_name: str) -> Any:
        """Decrypt individual field"""
        # Decode from storage format
        encrypted_data = json.loads(
            base64.b64decode(encrypted_value)
        )
        
        # Verify field context
        expected_aad = f"field:{field_name}".encode()
        return self.decrypt_data(encrypted_data, expected_aad)
    
    def create_encrypted_search_index(self, value: str, 
                                     blind_index_key: bytes = None) -> str:
        """Create blind index for encrypted field searching"""
        if blind_index_key is None:
            blind_index_key = self._derive_key(self.mac_key, b'blind_index', 32)
        
        # Normalize value for consistent indexing
        normalized = value.lower().strip()
        
        # Create keyed hash (blind index)
        h = hmac.new(blind_index_key, normalized.encode(), hashlib.sha256)
        
        # Return truncated hash as index
        return base64.b64encode(h.digest()[:16]).decode()

class EncryptedDatabase:
    """Database wrapper with transparent encryption"""
    
    def __init__(self, db_path: str, encryption_key: bytes):
        self.db_path = db_path
        self.storage = SecureDataStorage(encryption_key)
        self.encrypted_fields = set()
        
    @contextmanager
    def get_connection(self):
        """Get database connection with encryption support"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def create_secure_table(self, table_name: str, 
                           schema: Dict[str, str],
                           encrypted_fields: List[str]):
        """Create table with encryption metadata"""
        with self.get_connection() as conn:
            # Build CREATE TABLE statement
            columns = []
            for field, field_type in schema.items():
                if field in encrypted_fields:
                    # Encrypted fields stored as TEXT
                    columns.append(f"{field} TEXT")
                    # Add blind index column
                    columns.append(f"{field}_blind_index TEXT")
                else:
                    columns.append(f"{field} {field_type}")
            
            create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
            conn.execute(create_sql)
            
            # Store encryption metadata
            conn.execute("""
                CREATE TABLE IF NOT EXISTS encryption_metadata (
                    table_name TEXT,
                    field_name TEXT,
                    encryption_version TEXT,
                    PRIMARY KEY (table_name, field_name)
                )
            """)
            
            for field in encrypted_fields:
                conn.execute(
                    "INSERT OR REPLACE INTO encryption_metadata VALUES (?, ?, ?)",
                    (table_name, field, '1.0')
                )
            
            conn.commit()
    
    def insert_encrypted(self, table_name: str, data: Dict[str, Any]) -> int:
        """Insert data with automatic encryption"""
        with self.get_connection() as conn:
            # Get encrypted fields for this table
            cursor = conn.execute(
                "SELECT field_name FROM encryption_metadata WHERE table_name = ?",
                (table_name,)
            )
            encrypted_fields = {row['field_name'] for row in cursor}
            
            # Prepare data for insertion
            processed_data = {}
            for field, value in data.items():
                if field in encrypted_fields and value is not None:
                    # Encrypt field
                    processed_data[field] = self.storage.encrypt_field(value, field)
                    # Create blind index
                    processed_data[f"{field}_blind_index"] = \
                        self.storage.create_encrypted_search_index(str(value))
                else:
                    processed_data[field] = value
            
            # Build INSERT statement
            fields = list(processed_data.keys())
            placeholders = ['?' for _ in fields]
            insert_sql = f"""
                INSERT INTO {table_name} ({', '.join(fields)})
                VALUES ({', '.join(placeholders)})
            """
            
            cursor = conn.execute(insert_sql, list(processed_data.values()))
            conn.commit()
            
            return cursor.lastrowid
    
    def search_encrypted(self, table_name: str, field_name: str, 
                        search_value: str) -> List[Dict[str, Any]]:
        """Search encrypted fields using blind index"""
        with self.get_connection() as conn:
            # Create blind index for search value
            blind_index = self.storage.create_encrypted_search_index(search_value)
            
            # Search using blind index
            cursor = conn.execute(
                f"SELECT * FROM {table_name} WHERE {field_name}_blind_index = ?",
                (blind_index,)
            )
            
            # Decrypt results
            results = []
            for row in cursor:
                decrypted_row = dict(row)
                
                # Get encrypted fields
                metadata_cursor = conn.execute(
                    "SELECT field_name FROM encryption_metadata WHERE table_name = ?",
                    (table_name,)
                )
                encrypted_fields = {row['field_name'] for row in metadata_cursor}
                
                # Decrypt fields
                for field in encrypted_fields:
                    if field in decrypted_row and decrypted_row[field]:
                        decrypted_row[field] = self.storage.decrypt_field(
                            decrypted_row[field], field
                        )
                    # Remove blind index from results
                    blind_index_field = f"{field}_blind_index"
                    if blind_index_field in decrypted_row:
                        del decrypted_row[blind_index_field]
                
                results.append(decrypted_row)
            
            return results

class SecureFileStorage:
    """Encrypted file storage with chunking for large files"""
    
    def __init__(self, storage_path: str, encryption_key: bytes):
        self.storage_path = storage_path
        self.storage = SecureDataStorage(encryption_key)
        self.chunk_size = 1024 * 1024  # 1MB chunks
        
        # Create storage directory
        os.makedirs(storage_path, exist_ok=True)
    
    def store_file(self, file_path: str, file_data: bytes, 
                   metadata: Dict[str, Any] = None) -> str:
        """Store encrypted file with metadata"""
        # Generate file ID
        file_id = base64.b64encode(os.urandom(16)).decode()
        
        # Create file directory
        file_dir = os.path.join(self.storage_path, file_id)
        os.makedirs(file_dir, exist_ok=True)
        
        # Encrypt and store file in chunks
        chunks = []
        for i in range(0, len(file_data), self.chunk_size):
            chunk = file_data[i:i + self.chunk_size]
            
            # Encrypt chunk with sequence number
            encrypted_chunk = self.storage.encrypt_data(
                chunk,
                f"chunk:{file_id}:{i}".encode()
            )
            
            # Store chunk
            chunk_path = os.path.join(file_dir, f"chunk_{i}.enc")
            with open(chunk_path, 'w') as f:
                json.dump(encrypted_chunk, f)
            
            chunks.append({
                'sequence': i,
                'size': len(chunk),
                'path': chunk_path
            })
        
        # Encrypt and store metadata
        file_metadata = {
            'original_path': file_path,
            'size': len(file_data),
            'chunks': chunks,
            'created_at': datetime.now().isoformat(),
            'custom_metadata': metadata or {}
        }
        
        encrypted_metadata = self.storage.encrypt_data(
            file_metadata,
            f"metadata:{file_id}".encode()
        )
        
        metadata_path = os.path.join(file_dir, 'metadata.enc')
        with open(metadata_path, 'w') as f:
            json.dump(encrypted_metadata, f)
        
        return file_id
    
    def retrieve_file(self, file_id: str) -> Tuple[bytes, Dict[str, Any]]:
        """Retrieve and decrypt file"""
        file_dir = os.path.join(self.storage_path, file_id)
        
        # Load and decrypt metadata
        metadata_path = os.path.join(file_dir, 'metadata.enc')
        with open(metadata_path, 'r') as f:
            encrypted_metadata = json.load(f)
        
        metadata = self.storage.decrypt_data(
            encrypted_metadata,
            f"metadata:{file_id}".encode()
        )
        
        # Reconstruct file from chunks
        file_data = bytearray()
        for chunk_info in sorted(metadata['chunks'], key=lambda x: x['sequence']):
            with open(chunk_info['path'], 'r') as f:
                encrypted_chunk = json.load(f)
            
            chunk = self.storage.decrypt_data(
                encrypted_chunk,
                f"chunk:{file_id}:{chunk_info['sequence']}".encode()
            )
            
            file_data.extend(chunk)
        
        return bytes(file_data), metadata
// JavaScript - Secure Data Storage and Encryption
const crypto = require('crypto');
const fs = require('fs').promises;
const path = require('path');

class SecureDataStorage {
    constructor(masterKey) {
        if (!masterKey) {
            masterKey = process.env.MASTER_ENCRYPTION_KEY;
            if (!masterKey) {
                throw new Error('Master encryption key not provided');
            }
        }
        
        // Derive purpose-specific keys
        this.encryptionKey = this.deriveKey(masterKey, 'encryption', 32);
        this.macKey = this.deriveKey(masterKey, 'mac', 32);
        
        // Encryption configuration
        this.algorithm = 'aes-256-gcm';
        this.ivLength = 16;
        this.tagLength = 16;
        this.saltLength = 32;
    }
    
    deriveKey(masterKey, purpose, length) {
        // Use PBKDF2 for key derivation
        return crypto.pbkdf2Sync(
            masterKey,
            `${purpose}_salt`,
            100000,
            length,
            'sha256'
        );
    }
    
    encryptData(data, associatedData = null) {
        // Serialize data if needed
        let plaintext;
        if (typeof data === 'object') {
            plaintext = Buffer.from(JSON.stringify(data));
        } else if (typeof data === 'string') {
            plaintext = Buffer.from(data);
        } else if (Buffer.isBuffer(data)) {
            plaintext = data;
        } else {
            plaintext = Buffer.from(String(data));
        }
        
        // Generate IV
        const iv = crypto.randomBytes(this.ivLength);
        
        // Create cipher
        const cipher = crypto.createCipheriv(this.algorithm, this.encryptionKey, iv);
        
        // Set associated data if provided
        if (associatedData) {
            cipher.setAAD(Buffer.from(associatedData));
        }
        
        // Encrypt
        const encrypted = Buffer.concat([
            cipher.update(plaintext),
            cipher.final()
        ]);
        
        // Get auth tag
        const authTag = cipher.getAuthTag();
        
        // Create storage format
        return {
            iv: iv.toString('base64'),
            authTag: authTag.toString('base64'),
            ciphertext: encrypted.toString('base64'),
            algorithm: this.algorithm,
            version: '1.0',
            aad: associatedData ? Buffer.from(associatedData).toString('base64') : undefined
        };
    }
    
    decryptData(encryptedData, expectedAAD = null) {
        // Validate format
        const required = ['iv', 'authTag', 'ciphertext', 'algorithm', 'version'];
        for (const field of required) {
            if (!encryptedData[field]) {
                throw new Error(`Missing required field: ${field}`);
            }
        }
        
        // Check algorithm
        if (encryptedData.algorithm !== this.algorithm) {
            throw new Error(`Unsupported algorithm: ${encryptedData.algorithm}`);
        }
        
        // Decode components
        const iv = Buffer.from(encryptedData.iv, 'base64');
        const authTag = Buffer.from(encryptedData.authTag, 'base64');
        const ciphertext = Buffer.from(encryptedData.ciphertext, 'base64');
        
        // Create decipher
        const decipher = crypto.createDecipheriv(this.algorithm, this.encryptionKey, iv);
        decipher.setAuthTag(authTag);
        
        // Handle associated data
        if (encryptedData.aad) {
            const aad = Buffer.from(encryptedData.aad, 'base64');
            if (expectedAAD && !aad.equals(Buffer.from(expectedAAD))) {
                throw new Error('Associated data mismatch');
            }
            decipher.setAAD(aad);
        }
        
        // Decrypt
        try {
            const decrypted = Buffer.concat([
                decipher.update(ciphertext),
                decipher.final()
            ]);
            
            // Try to parse as JSON
            try {
                return JSON.parse(decrypted.toString());
            } catch {
                // Return as string or buffer
                const str = decrypted.toString();
                return str.length > 0 ? str : decrypted;
            }
        } catch (error) {
            throw new Error(`Decryption failed: ${error.message}`);
        }
    }
    
    encryptField(value, fieldName) {
        // Use field name as AAD for context binding
        const aad = `field:${fieldName}`;
        const encrypted = this.encryptData(value, aad);
        
        // Return as base64 string for storage
        return Buffer.from(JSON.stringify(encrypted)).toString('base64');
    }
    
    decryptField(encryptedValue, fieldName) {
        // Decode from storage format
        const encryptedData = JSON.parse(
            Buffer.from(encryptedValue, 'base64').toString()
        );
        
        // Verify field context
        const expectedAAD = `field:${fieldName}`;
        return this.decryptData(encryptedData, expectedAAD);
    }
    
    createBlindIndex(value, blindIndexKey = null) {
        // Create searchable index for encrypted data
        if (!blindIndexKey) {
            blindIndexKey = this.deriveKey(this.macKey, 'blind_index', 32);
        }
        
        // Normalize value
        const normalized = value.toLowerCase().trim();
        
        // Create HMAC
        const hmac = crypto.createHmac('sha256', blindIndexKey);
        hmac.update(normalized);
        
        // Return truncated hash
        return hmac.digest().slice(0, 16).toString('base64');
    }
}

class EncryptedDatabase {
    constructor(db, encryptionKey) {
        this.db = db; // Database connection (e.g., MongoDB, PostgreSQL)
        this.storage = new SecureDataStorage(encryptionKey);
        this.encryptedFields = new Map(); // table -> Set of field names
    }
    
    async defineEncryptedSchema(collection, fields) {
        // Define which fields should be encrypted
        this.encryptedFields.set(collection, new Set(fields));
        
        // Store metadata in database
        await this.db.collection('_encryption_metadata').updateOne(
            { collection },
            {
                $set: {
                    collection,
                    encryptedFields: fields,
                    version: '1.0',
                    updatedAt: new Date()
                }
            },
            { upsert: true }
        );
    }
    
    async insertEncrypted(collection, document) {
        const encryptedFields = this.encryptedFields.get(collection) || new Set();
        const processedDoc = { ...document };
        
        // Encrypt specified fields
        for (const field of encryptedFields) {
            if (processedDoc[field] !== undefined && processedDoc[field] !== null) {
                // Encrypt field
                processedDoc[field] = this.storage.encryptField(
                    processedDoc[field],
                    field
                );
                
                // Create blind index for searching
                processedDoc[`${field}_blind`] = this.storage.createBlindIndex(
                    String(document[field])
                );
            }
        }
        
        // Insert document
        const result = await this.db.collection(collection).insertOne(processedDoc);
        return result.insertedId;
    }
    
    async findEncrypted(collection, query, options = {}) {
        const encryptedFields = this.encryptedFields.get(collection) || new Set();
        const processedQuery = { ...query };
        
        // Convert encrypted field queries to blind index queries
        for (const field of Object.keys(query)) {
            if (encryptedFields.has(field)) {
                const searchValue = query[field];
                delete processedQuery[field];
                processedQuery[`${field}_blind`] = this.storage.createBlindIndex(
                    String(searchValue)
                );
            }
        }
        
        // Execute query
        const cursor = await this.db.collection(collection).find(processedQuery, options);
        const results = await cursor.toArray();
        
        // Decrypt results
        return results.map(doc => this.decryptDocument(collection, doc));
    }
    
    decryptDocument(collection, document) {
        const encryptedFields = this.encryptedFields.get(collection) || new Set();
        const decryptedDoc = { ...document };
        
        // Decrypt fields
        for (const field of encryptedFields) {
            if (decryptedDoc[field]) {
                try {
                    decryptedDoc[field] = this.storage.decryptField(
                        decryptedDoc[field],
                        field
                    );
                } catch (error) {
                    console.error(`Failed to decrypt field ${field}:`, error);
                    decryptedDoc[field] = null;
                }
            }
            
            // Remove blind index from results
            delete decryptedDoc[`${field}_blind`];
        }
        
        return decryptedDoc;
    }
}

class SecureFileStorage {
    constructor(storagePath, encryptionKey) {
        this.storagePath = storagePath;
        this.storage = new SecureDataStorage(encryptionKey);
        this.chunkSize = 1024 * 1024; // 1MB chunks
    }
    
    async storeFile(filePath, fileData, metadata = {}) {
        // Generate file ID
        const fileId = crypto.randomBytes(16).toString('base64url');
        const fileDir = path.join(this.storagePath, fileId);
        
        // Create directory
        await fs.mkdir(fileDir, { recursive: true });
        
        // Process file in chunks
        const chunks = [];
        for (let i = 0; i < fileData.length; i += this.chunkSize) {
            const chunk = fileData.slice(i, i + this.chunkSize);
            
            // Encrypt chunk
            const encryptedChunk = this.storage.encryptData(
                chunk,
                `chunk:${fileId}:${i}`
            );
            
            // Store chunk
            const chunkPath = path.join(fileDir, `chunk_${i}.enc`);
            await fs.writeFile(
                chunkPath,
                JSON.stringify(encryptedChunk),
                'utf8'
            );
            
            chunks.push({
                sequence: i,
                size: chunk.length,
                path: chunkPath
            });
        }
        
        // Store encrypted metadata
        const fileMetadata = {
            originalPath: filePath,
            size: fileData.length,
            chunks,
            createdAt: new Date().toISOString(),
            customMetadata: metadata
        };
        
        const encryptedMetadata = this.storage.encryptData(
            fileMetadata,
            `metadata:${fileId}`
        );
        
        await fs.writeFile(
            path.join(fileDir, 'metadata.enc'),
            JSON.stringify(encryptedMetadata),
            'utf8'
        );
        
        return fileId;
    }
    
    async retrieveFile(fileId) {
        const fileDir = path.join(this.storagePath, fileId);
        
        // Load metadata
        const encryptedMetadata = JSON.parse(
            await fs.readFile(path.join(fileDir, 'metadata.enc'), 'utf8')
        );
        
        const metadata = this.storage.decryptData(
            encryptedMetadata,
            `metadata:${fileId}`
        );
        
        // Reconstruct file
        const chunks = [];
        for (const chunkInfo of metadata.chunks.sort((a, b) => a.sequence - b.sequence)) {
            const encryptedChunk = JSON.parse(
                await fs.readFile(chunkInfo.path, 'utf8')
            );
            
            const chunk = this.storage.decryptData(
                encryptedChunk,
                `chunk:${fileId}:${chunkInfo.sequence}`
            );
            
            chunks.push(Buffer.from(chunk));
        }
        
        return {
            data: Buffer.concat(chunks),
            metadata
        };
    }
    
    async secureDelete(fileId) {
        const fileDir = path.join(this.storagePath, fileId);
        
        // Overwrite files before deletion
        const files = await fs.readdir(fileDir);
        for (const file of files) {
            const filePath = path.join(fileDir, file);
            const stats = await fs.stat(filePath);
            
            // Overwrite with random data
            const randomData = crypto.randomBytes(stats.size);
            await fs.writeFile(filePath, randomData);
            
            // Delete file
            await fs.unlink(filePath);
        }
        
        // Remove directory
        await fs.rmdir(fileDir);
    }
}

Secure data storage and encryption are fundamental to protecting sensitive information in modern applications. By implementing proper encryption at rest, secure key management, and encrypted search capabilities, developers can ensure data confidentiality while maintaining application functionality. These patterns provide a foundation for building compliant, secure data storage solutions that protect against unauthorized access and data breaches.## Secure API Development

APIs form the backbone of modern web applications, enabling communication between services, mobile apps, and third-party integrations. However, APIs also present unique security challenges, as they expose application functionality to the outside world. This chapter explores comprehensive strategies for building secure APIs in Python and JavaScript, covering authentication, authorization, rate limiting, input validation, and protection against common API vulnerabilities.