Managing User Access Through Bastion

Managing User Access Through Bastion

Effective user access management on bastion hosts requires granular controls that map users to specific internal resources they're authorized to access. This involves integrating with identity management systems and implementing dynamic access policies.

Implement comprehensive access management:

#!/usr/bin/env python3
# bastion-access-manager.py
# Dynamic access control for bastion hosts

import ldap3
import mysql.connector
import json
import logging
from datetime import datetime, timedelta
import redis
import hashlib

class BastionAccessManager:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
            
        self.ldap_conn = self._init_ldap()
        self.db_conn = self._init_database()
        self.redis_conn = redis.Redis(host='localhost', port=6379, db=0)
        
    def _init_ldap(self):
        """Initialize LDAP connection"""
        server = ldap3.Server(
            self.config['ldap']['server'],
            use_ssl=True
        )
        
        conn = ldap3.Connection(
            server,
            user=self.config['ldap']['bind_dn'],
            password=self.config['ldap']['bind_password'],
            auto_bind=True
        )
        
        return conn
        
    def _init_database(self):
        """Initialize database connection"""
        return mysql.connector.connect(
            host=self.config['db']['host'],
            user=self.config['db']['user'],
            password=self.config['db']['password'],
            database=self.config['db']['database']
        )
        
    def authenticate_user(self, username, public_key_fingerprint):
        """Authenticate user and verify authorization"""
        # Check if user exists in LDAP
        self.ldap_conn.search(
            search_base=self.config['ldap']['user_base'],
            search_filter=f'(uid={username})',
            attributes=['uid', 'memberOf', 'sshPublicKey', 'accountStatus']
        )
        
        if not self.ldap_conn.entries:
            return False, "User not found"
            
        user_entry = self.ldap_conn.entries[0]
        
        # Check account status
        if user_entry.accountStatus != 'active':
            return False, "Account disabled"
            
        # Verify SSH key
        authorized_keys = user_entry.sshPublicKey
        if not any(public_key_fingerprint in key for key in authorized_keys):
            return False, "SSH key not authorized"
            
        # Get user groups
        groups = [dn.split(',')[0].split('=')[1] for dn in user_entry.memberOf]
        
        # Check if user has bastion access
        if not any(g in self.config['allowed_groups'] for g in groups):
            return False, "No bastion access permission"
            
        # Create session
        session_token = self.create_session(username, groups)
        
        return True, session_token
        
    def create_session(self, username, groups):
        """Create authenticated session"""
        session_id = hashlib.sha256(
            f"{username}{datetime.now().isoformat()}".encode()
        ).hexdigest()[:16]
        
        session_data = {
            'username': username,
            'groups': groups,
            'login_time': datetime.now().isoformat(),
            'source_ip': self.get_client_ip(),
            'allowed_targets': self.get_allowed_targets(username, groups)
        }
        
        # Store in Redis with expiration
        self.redis_conn.setex(
            f"bastion_session:{session_id}",
            timedelta(hours=8),
            json.dumps(session_data)
        )
        
        # Log session creation
        self.log_access_event('session_created', username, session_data)
        
        return session_id
        
    def get_allowed_targets(self, username, groups):
        """Get list of allowed target hosts for user"""
        cursor = self.db_conn.cursor()
        
        # Get group-based permissions
        group_targets = []
        for group in groups:
            cursor.execute("""
                SELECT target_pattern, access_level
                FROM group_permissions
                WHERE group_name = %s AND active = 1
            """, (group,))
            
            group_targets.extend(cursor.fetchall())
            
        # Get user-specific permissions
        cursor.execute("""
            SELECT target_pattern, access_level, expiry_date
            FROM user_permissions
            WHERE username = %s AND active = 1
            AND (expiry_date IS NULL OR expiry_date > NOW())
        """, (username,))
        
        user_targets = cursor.fetchall()
        
        # Combine and deduplicate
        all_targets = {}
        for target, access_level, *expiry in group_targets + user_targets:
            if target not in all_targets or access_level == 'admin':
                all_targets[target] = {
                    'pattern': target,
                    'access_level': access_level,
                    'expiry': expiry[0] if expiry else None
                }
                
        return list(all_targets.values())
        
    def validate_target_access(self, session_id, target_host):
        """Validate if session can access target host"""
        # Get session data
        session_data = self.redis_conn.get(f"bastion_session:{session_id}")
        
        if not session_data:
            return False, "Invalid or expired session"
            
        session = json.loads(session_data)
        
        # Check against allowed targets
        for allowed in session['allowed_targets']:
            if self.match_host_pattern(target_host, allowed['pattern']):
                # Check if temporary access has expired
                if allowed.get('expiry'):
                    expiry = datetime.fromisoformat(allowed['expiry'])
                    if datetime.now() > expiry:
                        continue
                        
                # Log successful validation
                self.log_access_event(
                    'target_access_granted',
                    session['username'],
                    {'target': target_host, 'pattern': allowed['pattern']}
                )
                
                return True, allowed['access_level']
                
        # Log denied access
        self.log_access_event(
            'target_access_denied',
            session['username'],
            {'target': target_host}
        )
        
        return False, "Access to target denied"
        
    def request_temporary_access(self, username, target_pattern, reason, duration_hours):
        """Request temporary access to additional targets"""
        request_id = hashlib.sha256(
            f"{username}{target_pattern}{datetime.now()}".encode()
        ).hexdigest()[:8]
        
        cursor = self.db_conn.cursor()
        cursor.execute("""
            INSERT INTO access_requests 
            (request_id, username, target_pattern, reason, duration_hours, 
             requested_at, status)
            VALUES (%s, %s, %s, %s, %s, NOW(), 'pending')
        """, (request_id, username, target_pattern, reason, duration_hours))
        
        self.db_conn.commit()
        
        # Notify approvers
        self.notify_approvers(request_id, username, target_pattern, reason)
        
        return request_id
        
    def approve_access_request(self, request_id, approver):
        """Approve temporary access request"""
        cursor = self.db_conn.cursor()
        
        # Get request details
        cursor.execute("""
            SELECT username, target_pattern, duration_hours
            FROM access_requests
            WHERE request_id = %s AND status = 'pending'
        """, (request_id,))
        
        result = cursor.fetchone()
        if not result:
            return False, "Request not found or already processed"
            
        username, target_pattern, duration_hours = result
        
        # Create temporary permission
        cursor.execute("""
            INSERT INTO user_permissions
            (username, target_pattern, access_level, expiry_date, 
             granted_by, active)
            VALUES (%s, %s, 'temporary', 
                   DATE_ADD(NOW(), INTERVAL %s HOUR), %s, 1)
        """, (username, target_pattern, duration_hours, approver))
        
        # Update request status
        cursor.execute("""
            UPDATE access_requests
            SET status = 'approved', approved_by = %s, approved_at = NOW()
            WHERE request_id = %s
        """, (approver, request_id))
        
        self.db_conn.commit()
        
        # Clear user's permission cache
        self.clear_user_cache(username)
        
        return True, "Access approved"
        
    def log_access_event(self, event_type, username, details):
        """Log access events for audit"""
        cursor = self.db_conn.cursor()
        
        cursor.execute("""
            INSERT INTO access_audit
            (timestamp, event_type, username, details, source_ip)
            VALUES (NOW(), %s, %s, %s, %s)
        """, (event_type, username, json.dumps(details), self.get_client_ip()))
        
        self.db_conn.commit()
        
        # Also log to syslog
        logging.info(f"BASTION_ACCESS: {event_type} user={username} details={details}")

# CLI tool for access management
if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Bastion Access Manager')
    parser.add_argument('action', choices=['validate', 'request', 'approve'])
    parser.add_argument('--session', help='Session ID')
    parser.add_argument('--target', help='Target host')
    parser.add_argument('--user', help='Username')
    parser.add_argument('--pattern', help='Target pattern for access request')
    parser.add_argument('--reason', help='Reason for access request')
    parser.add_argument('--duration', type=int, help='Duration in hours')
    parser.add_argument('--request-id', help='Request ID to approve')
    
    args = parser.parse_args()
    
    manager = BastionAccessManager('/etc/bastion/config.json')
    
    if args.action == 'validate':
        valid, msg = manager.validate_target_access(args.session, args.target)
        print(f"Access {'granted' if valid else 'denied'}: {msg}")
        
    elif args.action == 'request':
        request_id = manager.request_temporary_access(
            args.user, args.pattern, args.reason, args.duration
        )
        print(f"Access request created: {request_id}")
        
    elif args.action == 'approve':
        success, msg = manager.approve_access_request(args.request_id, args.user)
        print(f"{'Success' if success else 'Failed'}: {msg}")