Secure Password Storage Architecture

Secure Password Storage Architecture

Database design significantly impacts password security. Storing authentication data requires careful consideration of access patterns, encryption requirements, and breach mitigation. The principle of least privilege should guide all architectural decisions, limiting access to password hashes to only essential systems and personnel.

Separating authentication data from general application data provides crucial isolation. Dedicated authentication databases or schemas with restricted access prevent SQL injection in the main application from exposing password hashes. This separation also enables specialized security measures like database-level encryption, enhanced audit logging, and stricter access controls.

import os
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import psycopg2
from psycopg2.extras import RealDictCursor

class SecurePasswordStorage:
    """Secure password storage with encryption at rest"""
    
    def __init__(self, db_config, encryption_key=None):
        self.db_config = db_config
        
        # Initialize encryption for sensitive data
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            # Derive key from environment variable
            master_key = os.environ.get('MASTER_KEY', '').encode()
            kdf = PBKDF2(
                algorithm=hashes.SHA256(),
                length=32,
                salt=b'app-specific-salt',
                iterations=100000,
            )
            key = base64.urlsafe_b64encode(kdf.derive(master_key))
            self.cipher = Fernet(key)
    
    def create_secure_schema(self):
        """Create secure schema for authentication data"""
        
        with psycopg2.connect(**self.db_config) as conn:
            with conn.cursor() as cur:
                # Create separate schema for auth data
                cur.execute("""
                    CREATE SCHEMA IF NOT EXISTS auth_secure;
                    
                    -- Restrict access to auth schema
                    REVOKE ALL ON SCHEMA auth_secure FROM PUBLIC;
                    GRANT USAGE ON SCHEMA auth_secure TO auth_service;
                    
                    -- Main authentication table
                    CREATE TABLE IF NOT EXISTS auth_secure.user_credentials (
                        user_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        username VARCHAR(255) UNIQUE NOT NULL,
                        password_hash TEXT NOT NULL,
                        hash_algorithm VARCHAR(50) NOT NULL,
                        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
                        updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
                        last_login TIMESTAMP WITH TIME ZONE,
                        failed_attempts INTEGER DEFAULT 0,
                        locked_until TIMESTAMP WITH TIME ZONE,
                        requires_reset BOOLEAN DEFAULT FALSE
                    );
                    
                    -- Password history for preventing reuse
                    CREATE TABLE IF NOT EXISTS auth_secure.password_history (
                        history_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        user_id UUID REFERENCES auth_secure.user_credentials(user_id),
                        password_hash_encrypted TEXT NOT NULL,
                        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
                    );
                    
                    -- Audit log for security events
                    CREATE TABLE IF NOT EXISTS auth_secure.auth_audit_log (
                        log_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        user_id UUID,
                        event_type VARCHAR(50) NOT NULL,
                        event_data JSONB,
                        ip_address INET,
                        user_agent TEXT,
                        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
                    );
                    
                    -- Indexes for performance
                    CREATE INDEX idx_auth_username ON auth_secure.user_credentials(username);
                    CREATE INDEX idx_auth_audit_user ON auth_secure.auth_audit_log(user_id);
                    CREATE INDEX idx_auth_audit_created ON auth_secure.auth_audit_log(created_at);
                    
                    -- Row-level security
                    ALTER TABLE auth_secure.user_credentials ENABLE ROW LEVEL SECURITY;
                    ALTER TABLE auth_secure.password_history ENABLE ROW LEVEL SECURITY;
                    ALTER TABLE auth_secure.auth_audit_log ENABLE ROW LEVEL SECURITY;
                """)
    
    def store_password_hash(self, username, password_hash, algorithm='argon2id'):
        """Store password hash with audit logging"""
        
        with psycopg2.connect(**self.db_config) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                # Store new hash
                cur.execute("""
                    INSERT INTO auth_secure.user_credentials 
                    (username, password_hash, hash_algorithm)
                    VALUES (%s, %s, %s)
                    RETURNING user_id
                """, (username, password_hash, algorithm))
                
                user_id = cur.fetchone()['user_id']
                
                # Store encrypted copy in history
                encrypted_hash = self.cipher.encrypt(password_hash.encode())
                cur.execute("""
                    INSERT INTO auth_secure.password_history
                    (user_id, password_hash_encrypted)
                    VALUES (%s, %s)
                """, (user_id, encrypted_hash.decode()))
                
                # Audit log
                self._log_event(cur, user_id, 'password_set', {
                    'algorithm': algorithm
                })
                
                return user_id
    
    def _log_event(self, cursor, user_id, event_type, event_data, 
                   ip_address=None, user_agent=None):
        """Log security event"""
        
        cursor.execute("""
            INSERT INTO auth_secure.auth_audit_log
            (user_id, event_type, event_data, ip_address, user_agent)
            VALUES (%s, %s, %s, %s, %s)
        """, (user_id, event_type, json.dumps(event_data), ip_address, user_agent))