Implementing Principle of Least Privilege
Implementing Principle of Least Privilege
The principle of least privilege requires granting users and applications only the minimum permissions necessary for their functions. Database systems typically default to overly permissive configurations where users can create objects, view system catalogs, or access unrelated schemas. Proper privilege management requires understanding the granular permission models each database system provides and systematically restricting access.
Role-based access control (RBAC) provides a manageable approach to database permissions. Instead of granting permissions directly to users, create roles representing different access levels and assign users to appropriate roles. This abstraction simplifies permission management and audit while ensuring consistent access controls. Application-specific roles might include read-only reporting, transaction processing, and administrative maintenance, each with precisely defined permissions.
# Example: Implementing database access control in application layer
import psycopg2
from psycopg2 import sql
from contextlib import contextmanager
from enum import Enum
import logging
class DatabaseRole(Enum):
READ_ONLY = "app_reader"
READ_WRITE = "app_writer"
ADMIN = "app_admin"
REPORTING = "app_reporter"
class SecureDatabaseAccess:
def __init__(self, config):
self.config = config
self.connection_pools = {}
self._initialize_pools()
def _initialize_pools(self):
"""Create separate connection pools for each role"""
for role in DatabaseRole:
self.connection_pools[role] = self._create_pool(role)
def _create_pool(self, role: DatabaseRole):
"""Create connection pool with role-specific credentials"""
return psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config[f'{role.value}_user'],
password=self.config[f'{role.value}_password'],
sslmode='require',
sslcert=self.config['ssl_cert'],
sslkey=self.config['ssl_key'],
sslrootcert=self.config['ssl_ca'],
connect_timeout=10,
options=f'-c statement_timeout=30000 -c lock_timeout=10000'
)
@contextmanager
def get_connection(self, role: DatabaseRole):
"""Get connection from appropriate pool based on required permissions"""
pool = self.connection_pools[role]
conn = pool.getconn()
try:
# Set session parameters for additional security
with conn.cursor() as cursor:
cursor.execute("SET row_security = on")
cursor.execute("SET search_path = %s", (f'{role.value}_schema',))
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logging.error(f"Database error with role {role.value}: {e}")
raise
finally:
pool.putconn(conn)
def execute_read_query(self, query, params=None):
"""Execute read-only query with appropriate role"""
with self.get_connection(DatabaseRole.READ_ONLY) as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
return cursor.fetchall()
def execute_write_operation(self, operation_func):
"""Execute write operation with transaction management"""
with self.get_connection(DatabaseRole.READ_WRITE) as conn:
with conn.cursor() as cursor:
result = operation_func(cursor)
# Audit log write operations
self._audit_log_operation(cursor, operation_func.__name__)
return result
def _audit_log_operation(self, cursor, operation_name):
"""Log data modifications for audit trail"""
cursor.execute("""
INSERT INTO audit_log (operation, user_name, timestamp, session_id)
VALUES (%s, current_user, current_timestamp, pg_backend_pid())
""", (operation_name,))
# Database role creation script
def create_database_roles():
role_definitions = """
-- Revoke default permissions
REVOKE ALL ON SCHEMA public FROM PUBLIC;
REVOKE ALL ON DATABASE userdata FROM PUBLIC;
-- Create application roles
CREATE ROLE app_reader WITH LOGIN PASSWORD 'reader_password' CONNECTION LIMIT 20;
CREATE ROLE app_writer WITH LOGIN PASSWORD 'writer_password' CONNECTION LIMIT 10;
CREATE ROLE app_reporter WITH LOGIN PASSWORD 'reporter_password' CONNECTION LIMIT 5;
CREATE ROLE app_admin WITH LOGIN PASSWORD 'admin_password' CONNECTION LIMIT 2;
-- Create schemas for role isolation
CREATE SCHEMA app_reader_schema AUTHORIZATION app_reader;
CREATE SCHEMA app_writer_schema AUTHORIZATION app_writer;
CREATE SCHEMA app_reporter_schema AUTHORIZATION app_reporter;
-- Grant specific permissions
-- Reader: SELECT only on specific tables
GRANT USAGE ON SCHEMA app_data TO app_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA app_data TO app_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA app_data GRANT SELECT ON TABLES TO app_reader;
-- Writer: INSERT, UPDATE, DELETE on transaction tables only
GRANT USAGE ON SCHEMA app_data TO app_writer;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_data.users TO app_writer;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_data.transactions TO app_writer;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA app_data TO app_writer;
-- Reporter: SELECT with specific row-level security
GRANT USAGE ON SCHEMA app_data TO app_reporter;
GRANT SELECT ON app_data.aggregated_metrics TO app_reporter;
-- Enable row-level security
ALTER TABLE app_data.users ENABLE ROW LEVEL SECURITY;
ALTER TABLE app_data.transactions ENABLE ROW LEVEL SECURITY;
-- Create row-level security policies
CREATE POLICY user_isolation ON app_data.users
FOR ALL TO app_writer
USING (tenant_id = current_setting('app.current_tenant')::int);
CREATE POLICY transaction_isolation ON app_data.transactions
FOR ALL TO app_writer
USING (tenant_id = current_setting('app.current_tenant')::int);
"""
return role_definitions