Secure Password Storage Architecture
Secure Password Storage Architecture
Database design significantly impacts password security. Storing authentication data requires careful consideration of access patterns, encryption requirements, and breach mitigation. The principle of least privilege should guide all architectural decisions, limiting access to password hashes to only essential systems and personnel.
Separating authentication data from general application data provides crucial isolation. Dedicated authentication databases or schemas with restricted access prevent SQL injection in the main application from exposing password hashes. This separation also enables specialized security measures like database-level encryption, enhanced audit logging, and stricter access controls.
import os
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import psycopg2
from psycopg2.extras import RealDictCursor
class SecurePasswordStorage:
"""Secure password storage with encryption at rest"""
def __init__(self, db_config, encryption_key=None):
self.db_config = db_config
# Initialize encryption for sensitive data
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
# Derive key from environment variable
master_key = os.environ.get('MASTER_KEY', '').encode()
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=b'app-specific-salt',
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(master_key))
self.cipher = Fernet(key)
def create_secure_schema(self):
"""Create secure schema for authentication data"""
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
# Create separate schema for auth data
cur.execute("""
CREATE SCHEMA IF NOT EXISTS auth_secure;
-- Restrict access to auth schema
REVOKE ALL ON SCHEMA auth_secure FROM PUBLIC;
GRANT USAGE ON SCHEMA auth_secure TO auth_service;
-- Main authentication table
CREATE TABLE IF NOT EXISTS auth_secure.user_credentials (
user_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(255) UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
hash_algorithm VARCHAR(50) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_login TIMESTAMP WITH TIME ZONE,
failed_attempts INTEGER DEFAULT 0,
locked_until TIMESTAMP WITH TIME ZONE,
requires_reset BOOLEAN DEFAULT FALSE
);
-- Password history for preventing reuse
CREATE TABLE IF NOT EXISTS auth_secure.password_history (
history_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES auth_secure.user_credentials(user_id),
password_hash_encrypted TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Audit log for security events
CREATE TABLE IF NOT EXISTS auth_secure.auth_audit_log (
log_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID,
event_type VARCHAR(50) NOT NULL,
event_data JSONB,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_auth_username ON auth_secure.user_credentials(username);
CREATE INDEX idx_auth_audit_user ON auth_secure.auth_audit_log(user_id);
CREATE INDEX idx_auth_audit_created ON auth_secure.auth_audit_log(created_at);
-- Row-level security
ALTER TABLE auth_secure.user_credentials ENABLE ROW LEVEL SECURITY;
ALTER TABLE auth_secure.password_history ENABLE ROW LEVEL SECURITY;
ALTER TABLE auth_secure.auth_audit_log ENABLE ROW LEVEL SECURITY;
""")
def store_password_hash(self, username, password_hash, algorithm='argon2id'):
"""Store password hash with audit logging"""
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Store new hash
cur.execute("""
INSERT INTO auth_secure.user_credentials
(username, password_hash, hash_algorithm)
VALUES (%s, %s, %s)
RETURNING user_id
""", (username, password_hash, algorithm))
user_id = cur.fetchone()['user_id']
# Store encrypted copy in history
encrypted_hash = self.cipher.encrypt(password_hash.encode())
cur.execute("""
INSERT INTO auth_secure.password_history
(user_id, password_hash_encrypted)
VALUES (%s, %s)
""", (user_id, encrypted_hash.decode()))
# Audit log
self._log_event(cur, user_id, 'password_set', {
'algorithm': algorithm
})
return user_id
def _log_event(self, cursor, user_id, event_type, event_data,
ip_address=None, user_agent=None):
"""Log security event"""
cursor.execute("""
INSERT INTO auth_secure.auth_audit_log
(user_id, event_type, event_data, ip_address, user_agent)
VALUES (%s, %s, %s, %s, %s)
""", (user_id, event_type, json.dumps(event_data), ip_address, user_agent))