Implementing Principle of Least Privilege

Implementing Principle of Least Privilege

The principle of least privilege requires granting users and applications only the minimum permissions necessary for their functions. Database systems typically default to overly permissive configurations where users can create objects, view system catalogs, or access unrelated schemas. Proper privilege management requires understanding the granular permission models each database system provides and systematically restricting access.

Role-based access control (RBAC) provides a manageable approach to database permissions. Instead of granting permissions directly to users, create roles representing different access levels and assign users to appropriate roles. This abstraction simplifies permission management and audit while ensuring consistent access controls. Application-specific roles might include read-only reporting, transaction processing, and administrative maintenance, each with precisely defined permissions.

# Example: Implementing database access control in application layer
import psycopg2
from psycopg2 import sql
from contextlib import contextmanager
from enum import Enum
import logging

class DatabaseRole(Enum):
    READ_ONLY = "app_reader"
    READ_WRITE = "app_writer"
    ADMIN = "app_admin"
    REPORTING = "app_reporter"

class SecureDatabaseAccess:
    def __init__(self, config):
        self.config = config
        self.connection_pools = {}
        self._initialize_pools()
    
    def _initialize_pools(self):
        """Create separate connection pools for each role"""
        for role in DatabaseRole:
            self.connection_pools[role] = self._create_pool(role)
    
    def _create_pool(self, role: DatabaseRole):
        """Create connection pool with role-specific credentials"""
        return psycopg2.pool.ThreadedConnectionPool(
            minconn=1,
            maxconn=10,
            host=self.config['host'],
            port=self.config['port'],
            database=self.config['database'],
            user=self.config[f'{role.value}_user'],
            password=self.config[f'{role.value}_password'],
            sslmode='require',
            sslcert=self.config['ssl_cert'],
            sslkey=self.config['ssl_key'],
            sslrootcert=self.config['ssl_ca'],
            connect_timeout=10,
            options=f'-c statement_timeout=30000 -c lock_timeout=10000'
        )
    
    @contextmanager
    def get_connection(self, role: DatabaseRole):
        """Get connection from appropriate pool based on required permissions"""
        pool = self.connection_pools[role]
        conn = pool.getconn()
        
        try:
            # Set session parameters for additional security
            with conn.cursor() as cursor:
                cursor.execute("SET row_security = on")
                cursor.execute("SET search_path = %s", (f'{role.value}_schema',))
            
            yield conn
            conn.commit()
            
        except Exception as e:
            conn.rollback()
            logging.error(f"Database error with role {role.value}: {e}")
            raise
            
        finally:
            pool.putconn(conn)
    
    def execute_read_query(self, query, params=None):
        """Execute read-only query with appropriate role"""
        with self.get_connection(DatabaseRole.READ_ONLY) as conn:
            with conn.cursor() as cursor:
                cursor.execute(query, params)
                return cursor.fetchall()
    
    def execute_write_operation(self, operation_func):
        """Execute write operation with transaction management"""
        with self.get_connection(DatabaseRole.READ_WRITE) as conn:
            with conn.cursor() as cursor:
                result = operation_func(cursor)
                # Audit log write operations
                self._audit_log_operation(cursor, operation_func.__name__)
                return result
    
    def _audit_log_operation(self, cursor, operation_name):
        """Log data modifications for audit trail"""
        cursor.execute("""
            INSERT INTO audit_log (operation, user_name, timestamp, session_id)
            VALUES (%s, current_user, current_timestamp, pg_backend_pid())
        """, (operation_name,))

# Database role creation script
def create_database_roles():
    role_definitions = """
    -- Revoke default permissions
    REVOKE ALL ON SCHEMA public FROM PUBLIC;
    REVOKE ALL ON DATABASE userdata FROM PUBLIC;
    
    -- Create application roles
    CREATE ROLE app_reader WITH LOGIN PASSWORD 'reader_password' CONNECTION LIMIT 20;
    CREATE ROLE app_writer WITH LOGIN PASSWORD 'writer_password' CONNECTION LIMIT 10;
    CREATE ROLE app_reporter WITH LOGIN PASSWORD 'reporter_password' CONNECTION LIMIT 5;
    CREATE ROLE app_admin WITH LOGIN PASSWORD 'admin_password' CONNECTION LIMIT 2;
    
    -- Create schemas for role isolation
    CREATE SCHEMA app_reader_schema AUTHORIZATION app_reader;
    CREATE SCHEMA app_writer_schema AUTHORIZATION app_writer;
    CREATE SCHEMA app_reporter_schema AUTHORIZATION app_reporter;
    
    -- Grant specific permissions
    -- Reader: SELECT only on specific tables
    GRANT USAGE ON SCHEMA app_data TO app_reader;
    GRANT SELECT ON ALL TABLES IN SCHEMA app_data TO app_reader;
    ALTER DEFAULT PRIVILEGES IN SCHEMA app_data GRANT SELECT ON TABLES TO app_reader;
    
    -- Writer: INSERT, UPDATE, DELETE on transaction tables only
    GRANT USAGE ON SCHEMA app_data TO app_writer;
    GRANT SELECT, INSERT, UPDATE, DELETE ON app_data.users TO app_writer;
    GRANT SELECT, INSERT, UPDATE, DELETE ON app_data.transactions TO app_writer;
    GRANT USAGE ON ALL SEQUENCES IN SCHEMA app_data TO app_writer;
    
    -- Reporter: SELECT with specific row-level security
    GRANT USAGE ON SCHEMA app_data TO app_reporter;
    GRANT SELECT ON app_data.aggregated_metrics TO app_reporter;
    
    -- Enable row-level security
    ALTER TABLE app_data.users ENABLE ROW LEVEL SECURITY;
    ALTER TABLE app_data.transactions ENABLE ROW LEVEL SECURITY;
    
    -- Create row-level security policies
    CREATE POLICY user_isolation ON app_data.users
        FOR ALL TO app_writer
        USING (tenant_id = current_setting('app.current_tenant')::int);
    
    CREATE POLICY transaction_isolation ON app_data.transactions
        FOR ALL TO app_writer
        USING (tenant_id = current_setting('app.current_tenant')::int);
    """
    
    return role_definitions