Encryption at Rest Implementation
Encryption at Rest Implementation
Encryption at rest protects data stored in databases, file systems, and backups from unauthorized access. Modern encryption standards use authenticated encryption modes that provide both confidentiality and integrity protection. Implementing encryption at rest requires careful key management, algorithm selection, and performance optimization to ensure security without significantly impacting application functionality.
# Python - Secure Data Storage and Encryption
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import json
import base64
from typing import Dict, Any, Optional, Tuple
import sqlite3
from contextlib import contextmanager
import hmac
import hashlib
class SecureDataStorage:
def __init__(self, master_key: bytes = None):
# Use environment variable or provided key
if master_key is None:
master_key = os.environ.get('MASTER_ENCRYPTION_KEY', '').encode()
if not master_key:
raise ValueError("Master encryption key not provided")
# Derive encryption and MAC keys from master key
self.encryption_key = self._derive_key(master_key, b'encryption', 32)
self.mac_key = self._derive_key(master_key, b'mac', 32)
# Initialize cipher
self.cipher = AESGCM(self.encryption_key)
def _derive_key(self, master_key: bytes, purpose: bytes, length: int) -> bytes:
"""Derive purpose-specific keys from master key"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=length,
salt=purpose,
iterations=100000,
backend=default_backend()
)
return kdf.derive(master_key)
def encrypt_data(self, data: Any, associated_data: bytes = None) -> Dict[str, str]:
"""Encrypt data with AEAD"""
# Serialize data
if isinstance(data, (dict, list)):
plaintext = json.dumps(data).encode()
elif isinstance(data, str):
plaintext = data.encode()
elif isinstance(data, bytes):
plaintext = data
else:
plaintext = str(data).encode()
# Generate nonce
nonce = os.urandom(12) # 96-bit nonce for AES-GCM
# Encrypt with optional associated data
ciphertext = self.cipher.encrypt(nonce, plaintext, associated_data)
# Create storage format
encrypted_data = {
'nonce': base64.b64encode(nonce).decode(),
'ciphertext': base64.b64encode(ciphertext).decode(),
'algorithm': 'AES-256-GCM',
'version': '1.0'
}
if associated_data:
encrypted_data['aad'] = base64.b64encode(associated_data).decode()
return encrypted_data
def decrypt_data(self, encrypted_data: Dict[str, str],
expected_aad: bytes = None) -> Any:
"""Decrypt AEAD encrypted data"""
# Validate format
required_fields = ['nonce', 'ciphertext', 'algorithm', 'version']
if not all(field in encrypted_data for field in required_fields):
raise ValueError("Invalid encrypted data format")
# Check algorithm
if encrypted_data['algorithm'] != 'AES-256-GCM':
raise ValueError(f"Unsupported algorithm: {encrypted_data['algorithm']}")
# Decode components
nonce = base64.b64decode(encrypted_data['nonce'])
ciphertext = base64.b64decode(encrypted_data['ciphertext'])
# Handle associated data
aad = None
if 'aad' in encrypted_data:
aad = base64.b64decode(encrypted_data['aad'])
if expected_aad and not hmac.compare_digest(aad, expected_aad):
raise ValueError("Associated data mismatch")
# Decrypt
try:
plaintext = self.cipher.decrypt(nonce, ciphertext, aad)
# Try to deserialize JSON
try:
return json.loads(plaintext.decode())
except (json.JSONDecodeError, UnicodeDecodeError):
# Return as string or bytes
try:
return plaintext.decode()
except UnicodeDecodeError:
return plaintext
except Exception as e:
raise ValueError(f"Decryption failed: {str(e)}")
def encrypt_field(self, value: Any, field_name: str) -> str:
"""Encrypt individual field with field-specific context"""
# Use field name as associated data for context binding
aad = f"field:{field_name}".encode()
encrypted = self.encrypt_data(value, aad)
# Return as single string for database storage
return base64.b64encode(
json.dumps(encrypted).encode()
).decode()
def decrypt_field(self, encrypted_value: str, field_name: str) -> Any:
"""Decrypt individual field"""
# Decode from storage format
encrypted_data = json.loads(
base64.b64decode(encrypted_value)
)
# Verify field context
expected_aad = f"field:{field_name}".encode()
return self.decrypt_data(encrypted_data, expected_aad)
def create_encrypted_search_index(self, value: str,
blind_index_key: bytes = None) -> str:
"""Create blind index for encrypted field searching"""
if blind_index_key is None:
blind_index_key = self._derive_key(self.mac_key, b'blind_index', 32)
# Normalize value for consistent indexing
normalized = value.lower().strip()
# Create keyed hash (blind index)
h = hmac.new(blind_index_key, normalized.encode(), hashlib.sha256)
# Return truncated hash as index
return base64.b64encode(h.digest()[:16]).decode()
class EncryptedDatabase:
"""Database wrapper with transparent encryption"""
def __init__(self, db_path: str, encryption_key: bytes):
self.db_path = db_path
self.storage = SecureDataStorage(encryption_key)
self.encrypted_fields = set()
@contextmanager
def get_connection(self):
"""Get database connection with encryption support"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def create_secure_table(self, table_name: str,
schema: Dict[str, str],
encrypted_fields: List[str]):
"""Create table with encryption metadata"""
with self.get_connection() as conn:
# Build CREATE TABLE statement
columns = []
for field, field_type in schema.items():
if field in encrypted_fields:
# Encrypted fields stored as TEXT
columns.append(f"{field} TEXT")
# Add blind index column
columns.append(f"{field}_blind_index TEXT")
else:
columns.append(f"{field} {field_type}")
create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
conn.execute(create_sql)
# Store encryption metadata
conn.execute("""
CREATE TABLE IF NOT EXISTS encryption_metadata (
table_name TEXT,
field_name TEXT,
encryption_version TEXT,
PRIMARY KEY (table_name, field_name)
)
""")
for field in encrypted_fields:
conn.execute(
"INSERT OR REPLACE INTO encryption_metadata VALUES (?, ?, ?)",
(table_name, field, '1.0')
)
conn.commit()
def insert_encrypted(self, table_name: str, data: Dict[str, Any]) -> int:
"""Insert data with automatic encryption"""
with self.get_connection() as conn:
# Get encrypted fields for this table
cursor = conn.execute(
"SELECT field_name FROM encryption_metadata WHERE table_name = ?",
(table_name,)
)
encrypted_fields = {row['field_name'] for row in cursor}
# Prepare data for insertion
processed_data = {}
for field, value in data.items():
if field in encrypted_fields and value is not None:
# Encrypt field
processed_data[field] = self.storage.encrypt_field(value, field)
# Create blind index
processed_data[f"{field}_blind_index"] = \
self.storage.create_encrypted_search_index(str(value))
else:
processed_data[field] = value
# Build INSERT statement
fields = list(processed_data.keys())
placeholders = ['?' for _ in fields]
insert_sql = f"""
INSERT INTO {table_name} ({', '.join(fields)})
VALUES ({', '.join(placeholders)})
"""
cursor = conn.execute(insert_sql, list(processed_data.values()))
conn.commit()
return cursor.lastrowid
def search_encrypted(self, table_name: str, field_name: str,
search_value: str) -> List[Dict[str, Any]]:
"""Search encrypted fields using blind index"""
with self.get_connection() as conn:
# Create blind index for search value
blind_index = self.storage.create_encrypted_search_index(search_value)
# Search using blind index
cursor = conn.execute(
f"SELECT * FROM {table_name} WHERE {field_name}_blind_index = ?",
(blind_index,)
)
# Decrypt results
results = []
for row in cursor:
decrypted_row = dict(row)
# Get encrypted fields
metadata_cursor = conn.execute(
"SELECT field_name FROM encryption_metadata WHERE table_name = ?",
(table_name,)
)
encrypted_fields = {row['field_name'] for row in metadata_cursor}
# Decrypt fields
for field in encrypted_fields:
if field in decrypted_row and decrypted_row[field]:
decrypted_row[field] = self.storage.decrypt_field(
decrypted_row[field], field
)
# Remove blind index from results
blind_index_field = f"{field}_blind_index"
if blind_index_field in decrypted_row:
del decrypted_row[blind_index_field]
results.append(decrypted_row)
return results
class SecureFileStorage:
"""Encrypted file storage with chunking for large files"""
def __init__(self, storage_path: str, encryption_key: bytes):
self.storage_path = storage_path
self.storage = SecureDataStorage(encryption_key)
self.chunk_size = 1024 * 1024 # 1MB chunks
# Create storage directory
os.makedirs(storage_path, exist_ok=True)
def store_file(self, file_path: str, file_data: bytes,
metadata: Dict[str, Any] = None) -> str:
"""Store encrypted file with metadata"""
# Generate file ID
file_id = base64.b64encode(os.urandom(16)).decode()
# Create file directory
file_dir = os.path.join(self.storage_path, file_id)
os.makedirs(file_dir, exist_ok=True)
# Encrypt and store file in chunks
chunks = []
for i in range(0, len(file_data), self.chunk_size):
chunk = file_data[i:i + self.chunk_size]
# Encrypt chunk with sequence number
encrypted_chunk = self.storage.encrypt_data(
chunk,
f"chunk:{file_id}:{i}".encode()
)
# Store chunk
chunk_path = os.path.join(file_dir, f"chunk_{i}.enc")
with open(chunk_path, 'w') as f:
json.dump(encrypted_chunk, f)
chunks.append({
'sequence': i,
'size': len(chunk),
'path': chunk_path
})
# Encrypt and store metadata
file_metadata = {
'original_path': file_path,
'size': len(file_data),
'chunks': chunks,
'created_at': datetime.now().isoformat(),
'custom_metadata': metadata or {}
}
encrypted_metadata = self.storage.encrypt_data(
file_metadata,
f"metadata:{file_id}".encode()
)
metadata_path = os.path.join(file_dir, 'metadata.enc')
with open(metadata_path, 'w') as f:
json.dump(encrypted_metadata, f)
return file_id
def retrieve_file(self, file_id: str) -> Tuple[bytes, Dict[str, Any]]:
"""Retrieve and decrypt file"""
file_dir = os.path.join(self.storage_path, file_id)
# Load and decrypt metadata
metadata_path = os.path.join(file_dir, 'metadata.enc')
with open(metadata_path, 'r') as f:
encrypted_metadata = json.load(f)
metadata = self.storage.decrypt_data(
encrypted_metadata,
f"metadata:{file_id}".encode()
)
# Reconstruct file from chunks
file_data = bytearray()
for chunk_info in sorted(metadata['chunks'], key=lambda x: x['sequence']):
with open(chunk_info['path'], 'r') as f:
encrypted_chunk = json.load(f)
chunk = self.storage.decrypt_data(
encrypted_chunk,
f"chunk:{file_id}:{chunk_info['sequence']}".encode()
)
file_data.extend(chunk)
return bytes(file_data), metadata
// JavaScript - Secure Data Storage and Encryption
const crypto = require('crypto');
const fs = require('fs').promises;
const path = require('path');
class SecureDataStorage {
constructor(masterKey) {
if (!masterKey) {
masterKey = process.env.MASTER_ENCRYPTION_KEY;
if (!masterKey) {
throw new Error('Master encryption key not provided');
}
}
// Derive purpose-specific keys
this.encryptionKey = this.deriveKey(masterKey, 'encryption', 32);
this.macKey = this.deriveKey(masterKey, 'mac', 32);
// Encryption configuration
this.algorithm = 'aes-256-gcm';
this.ivLength = 16;
this.tagLength = 16;
this.saltLength = 32;
}
deriveKey(masterKey, purpose, length) {
// Use PBKDF2 for key derivation
return crypto.pbkdf2Sync(
masterKey,
`${purpose}_salt`,
100000,
length,
'sha256'
);
}
encryptData(data, associatedData = null) {
// Serialize data if needed
let plaintext;
if (typeof data === 'object') {
plaintext = Buffer.from(JSON.stringify(data));
} else if (typeof data === 'string') {
plaintext = Buffer.from(data);
} else if (Buffer.isBuffer(data)) {
plaintext = data;
} else {
plaintext = Buffer.from(String(data));
}
// Generate IV
const iv = crypto.randomBytes(this.ivLength);
// Create cipher
const cipher = crypto.createCipheriv(this.algorithm, this.encryptionKey, iv);
// Set associated data if provided
if (associatedData) {
cipher.setAAD(Buffer.from(associatedData));
}
// Encrypt
const encrypted = Buffer.concat([
cipher.update(plaintext),
cipher.final()
]);
// Get auth tag
const authTag = cipher.getAuthTag();
// Create storage format
return {
iv: iv.toString('base64'),
authTag: authTag.toString('base64'),
ciphertext: encrypted.toString('base64'),
algorithm: this.algorithm,
version: '1.0',
aad: associatedData ? Buffer.from(associatedData).toString('base64') : undefined
};
}
decryptData(encryptedData, expectedAAD = null) {
// Validate format
const required = ['iv', 'authTag', 'ciphertext', 'algorithm', 'version'];
for (const field of required) {
if (!encryptedData[field]) {
throw new Error(`Missing required field: ${field}`);
}
}
// Check algorithm
if (encryptedData.algorithm !== this.algorithm) {
throw new Error(`Unsupported algorithm: ${encryptedData.algorithm}`);
}
// Decode components
const iv = Buffer.from(encryptedData.iv, 'base64');
const authTag = Buffer.from(encryptedData.authTag, 'base64');
const ciphertext = Buffer.from(encryptedData.ciphertext, 'base64');
// Create decipher
const decipher = crypto.createDecipheriv(this.algorithm, this.encryptionKey, iv);
decipher.setAuthTag(authTag);
// Handle associated data
if (encryptedData.aad) {
const aad = Buffer.from(encryptedData.aad, 'base64');
if (expectedAAD && !aad.equals(Buffer.from(expectedAAD))) {
throw new Error('Associated data mismatch');
}
decipher.setAAD(aad);
}
// Decrypt
try {
const decrypted = Buffer.concat([
decipher.update(ciphertext),
decipher.final()
]);
// Try to parse as JSON
try {
return JSON.parse(decrypted.toString());
} catch {
// Return as string or buffer
const str = decrypted.toString();
return str.length > 0 ? str : decrypted;
}
} catch (error) {
throw new Error(`Decryption failed: ${error.message}`);
}
}
encryptField(value, fieldName) {
// Use field name as AAD for context binding
const aad = `field:${fieldName}`;
const encrypted = this.encryptData(value, aad);
// Return as base64 string for storage
return Buffer.from(JSON.stringify(encrypted)).toString('base64');
}
decryptField(encryptedValue, fieldName) {
// Decode from storage format
const encryptedData = JSON.parse(
Buffer.from(encryptedValue, 'base64').toString()
);
// Verify field context
const expectedAAD = `field:${fieldName}`;
return this.decryptData(encryptedData, expectedAAD);
}
createBlindIndex(value, blindIndexKey = null) {
// Create searchable index for encrypted data
if (!blindIndexKey) {
blindIndexKey = this.deriveKey(this.macKey, 'blind_index', 32);
}
// Normalize value
const normalized = value.toLowerCase().trim();
// Create HMAC
const hmac = crypto.createHmac('sha256', blindIndexKey);
hmac.update(normalized);
// Return truncated hash
return hmac.digest().slice(0, 16).toString('base64');
}
}
class EncryptedDatabase {
constructor(db, encryptionKey) {
this.db = db; // Database connection (e.g., MongoDB, PostgreSQL)
this.storage = new SecureDataStorage(encryptionKey);
this.encryptedFields = new Map(); // table -> Set of field names
}
async defineEncryptedSchema(collection, fields) {
// Define which fields should be encrypted
this.encryptedFields.set(collection, new Set(fields));
// Store metadata in database
await this.db.collection('_encryption_metadata').updateOne(
{ collection },
{
$set: {
collection,
encryptedFields: fields,
version: '1.0',
updatedAt: new Date()
}
},
{ upsert: true }
);
}
async insertEncrypted(collection, document) {
const encryptedFields = this.encryptedFields.get(collection) || new Set();
const processedDoc = { ...document };
// Encrypt specified fields
for (const field of encryptedFields) {
if (processedDoc[field] !== undefined && processedDoc[field] !== null) {
// Encrypt field
processedDoc[field] = this.storage.encryptField(
processedDoc[field],
field
);
// Create blind index for searching
processedDoc[`${field}_blind`] = this.storage.createBlindIndex(
String(document[field])
);
}
}
// Insert document
const result = await this.db.collection(collection).insertOne(processedDoc);
return result.insertedId;
}
async findEncrypted(collection, query, options = {}) {
const encryptedFields = this.encryptedFields.get(collection) || new Set();
const processedQuery = { ...query };
// Convert encrypted field queries to blind index queries
for (const field of Object.keys(query)) {
if (encryptedFields.has(field)) {
const searchValue = query[field];
delete processedQuery[field];
processedQuery[`${field}_blind`] = this.storage.createBlindIndex(
String(searchValue)
);
}
}
// Execute query
const cursor = await this.db.collection(collection).find(processedQuery, options);
const results = await cursor.toArray();
// Decrypt results
return results.map(doc => this.decryptDocument(collection, doc));
}
decryptDocument(collection, document) {
const encryptedFields = this.encryptedFields.get(collection) || new Set();
const decryptedDoc = { ...document };
// Decrypt fields
for (const field of encryptedFields) {
if (decryptedDoc[field]) {
try {
decryptedDoc[field] = this.storage.decryptField(
decryptedDoc[field],
field
);
} catch (error) {
console.error(`Failed to decrypt field ${field}:`, error);
decryptedDoc[field] = null;
}
}
// Remove blind index from results
delete decryptedDoc[`${field}_blind`];
}
return decryptedDoc;
}
}
class SecureFileStorage {
constructor(storagePath, encryptionKey) {
this.storagePath = storagePath;
this.storage = new SecureDataStorage(encryptionKey);
this.chunkSize = 1024 * 1024; // 1MB chunks
}
async storeFile(filePath, fileData, metadata = {}) {
// Generate file ID
const fileId = crypto.randomBytes(16).toString('base64url');
const fileDir = path.join(this.storagePath, fileId);
// Create directory
await fs.mkdir(fileDir, { recursive: true });
// Process file in chunks
const chunks = [];
for (let i = 0; i < fileData.length; i += this.chunkSize) {
const chunk = fileData.slice(i, i + this.chunkSize);
// Encrypt chunk
const encryptedChunk = this.storage.encryptData(
chunk,
`chunk:${fileId}:${i}`
);
// Store chunk
const chunkPath = path.join(fileDir, `chunk_${i}.enc`);
await fs.writeFile(
chunkPath,
JSON.stringify(encryptedChunk),
'utf8'
);
chunks.push({
sequence: i,
size: chunk.length,
path: chunkPath
});
}
// Store encrypted metadata
const fileMetadata = {
originalPath: filePath,
size: fileData.length,
chunks,
createdAt: new Date().toISOString(),
customMetadata: metadata
};
const encryptedMetadata = this.storage.encryptData(
fileMetadata,
`metadata:${fileId}`
);
await fs.writeFile(
path.join(fileDir, 'metadata.enc'),
JSON.stringify(encryptedMetadata),
'utf8'
);
return fileId;
}
async retrieveFile(fileId) {
const fileDir = path.join(this.storagePath, fileId);
// Load metadata
const encryptedMetadata = JSON.parse(
await fs.readFile(path.join(fileDir, 'metadata.enc'), 'utf8')
);
const metadata = this.storage.decryptData(
encryptedMetadata,
`metadata:${fileId}`
);
// Reconstruct file
const chunks = [];
for (const chunkInfo of metadata.chunks.sort((a, b) => a.sequence - b.sequence)) {
const encryptedChunk = JSON.parse(
await fs.readFile(chunkInfo.path, 'utf8')
);
const chunk = this.storage.decryptData(
encryptedChunk,
`chunk:${fileId}:${chunkInfo.sequence}`
);
chunks.push(Buffer.from(chunk));
}
return {
data: Buffer.concat(chunks),
metadata
};
}
async secureDelete(fileId) {
const fileDir = path.join(this.storagePath, fileId);
// Overwrite files before deletion
const files = await fs.readdir(fileDir);
for (const file of files) {
const filePath = path.join(fileDir, file);
const stats = await fs.stat(filePath);
// Overwrite with random data
const randomData = crypto.randomBytes(stats.size);
await fs.writeFile(filePath, randomData);
// Delete file
await fs.unlink(filePath);
}
// Remove directory
await fs.rmdir(fileDir);
}
}
Secure data storage and encryption are fundamental to protecting sensitive information in modern applications. By implementing proper encryption at rest, secure key management, and encrypted search capabilities, developers can ensure data confidentiality while maintaining application functionality. These patterns provide a foundation for building compliant, secure data storage solutions that protect against unauthorized access and data breaches.## Secure API Development
APIs form the backbone of modern web applications, enabling communication between services, mobile apps, and third-party integrations. However, APIs also present unique security challenges, as they expose application functionality to the outside world. This chapter explores comprehensive strategies for building secure APIs in Python and JavaScript, covering authentication, authorization, rate limiting, input validation, and protection against common API vulnerabilities.