Managing User Access Through Bastion
Managing User Access Through Bastion
Effective user access management on bastion hosts requires granular controls that map users to specific internal resources they're authorized to access. This involves integrating with identity management systems and implementing dynamic access policies.
Implement comprehensive access management:
#!/usr/bin/env python3
# bastion-access-manager.py
# Dynamic access control for bastion hosts
import ldap3
import mysql.connector
import json
import logging
from datetime import datetime, timedelta
import redis
import hashlib
class BastionAccessManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.ldap_conn = self._init_ldap()
self.db_conn = self._init_database()
self.redis_conn = redis.Redis(host='localhost', port=6379, db=0)
def _init_ldap(self):
"""Initialize LDAP connection"""
server = ldap3.Server(
self.config['ldap']['server'],
use_ssl=True
)
conn = ldap3.Connection(
server,
user=self.config['ldap']['bind_dn'],
password=self.config['ldap']['bind_password'],
auto_bind=True
)
return conn
def _init_database(self):
"""Initialize database connection"""
return mysql.connector.connect(
host=self.config['db']['host'],
user=self.config['db']['user'],
password=self.config['db']['password'],
database=self.config['db']['database']
)
def authenticate_user(self, username, public_key_fingerprint):
"""Authenticate user and verify authorization"""
# Check if user exists in LDAP
self.ldap_conn.search(
search_base=self.config['ldap']['user_base'],
search_filter=f'(uid={username})',
attributes=['uid', 'memberOf', 'sshPublicKey', 'accountStatus']
)
if not self.ldap_conn.entries:
return False, "User not found"
user_entry = self.ldap_conn.entries[0]
# Check account status
if user_entry.accountStatus != 'active':
return False, "Account disabled"
# Verify SSH key
authorized_keys = user_entry.sshPublicKey
if not any(public_key_fingerprint in key for key in authorized_keys):
return False, "SSH key not authorized"
# Get user groups
groups = [dn.split(',')[0].split('=')[1] for dn in user_entry.memberOf]
# Check if user has bastion access
if not any(g in self.config['allowed_groups'] for g in groups):
return False, "No bastion access permission"
# Create session
session_token = self.create_session(username, groups)
return True, session_token
def create_session(self, username, groups):
"""Create authenticated session"""
session_id = hashlib.sha256(
f"{username}{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
session_data = {
'username': username,
'groups': groups,
'login_time': datetime.now().isoformat(),
'source_ip': self.get_client_ip(),
'allowed_targets': self.get_allowed_targets(username, groups)
}
# Store in Redis with expiration
self.redis_conn.setex(
f"bastion_session:{session_id}",
timedelta(hours=8),
json.dumps(session_data)
)
# Log session creation
self.log_access_event('session_created', username, session_data)
return session_id
def get_allowed_targets(self, username, groups):
"""Get list of allowed target hosts for user"""
cursor = self.db_conn.cursor()
# Get group-based permissions
group_targets = []
for group in groups:
cursor.execute("""
SELECT target_pattern, access_level
FROM group_permissions
WHERE group_name = %s AND active = 1
""", (group,))
group_targets.extend(cursor.fetchall())
# Get user-specific permissions
cursor.execute("""
SELECT target_pattern, access_level, expiry_date
FROM user_permissions
WHERE username = %s AND active = 1
AND (expiry_date IS NULL OR expiry_date > NOW())
""", (username,))
user_targets = cursor.fetchall()
# Combine and deduplicate
all_targets = {}
for target, access_level, *expiry in group_targets + user_targets:
if target not in all_targets or access_level == 'admin':
all_targets[target] = {
'pattern': target,
'access_level': access_level,
'expiry': expiry[0] if expiry else None
}
return list(all_targets.values())
def validate_target_access(self, session_id, target_host):
"""Validate if session can access target host"""
# Get session data
session_data = self.redis_conn.get(f"bastion_session:{session_id}")
if not session_data:
return False, "Invalid or expired session"
session = json.loads(session_data)
# Check against allowed targets
for allowed in session['allowed_targets']:
if self.match_host_pattern(target_host, allowed['pattern']):
# Check if temporary access has expired
if allowed.get('expiry'):
expiry = datetime.fromisoformat(allowed['expiry'])
if datetime.now() > expiry:
continue
# Log successful validation
self.log_access_event(
'target_access_granted',
session['username'],
{'target': target_host, 'pattern': allowed['pattern']}
)
return True, allowed['access_level']
# Log denied access
self.log_access_event(
'target_access_denied',
session['username'],
{'target': target_host}
)
return False, "Access to target denied"
def request_temporary_access(self, username, target_pattern, reason, duration_hours):
"""Request temporary access to additional targets"""
request_id = hashlib.sha256(
f"{username}{target_pattern}{datetime.now()}".encode()
).hexdigest()[:8]
cursor = self.db_conn.cursor()
cursor.execute("""
INSERT INTO access_requests
(request_id, username, target_pattern, reason, duration_hours,
requested_at, status)
VALUES (%s, %s, %s, %s, %s, NOW(), 'pending')
""", (request_id, username, target_pattern, reason, duration_hours))
self.db_conn.commit()
# Notify approvers
self.notify_approvers(request_id, username, target_pattern, reason)
return request_id
def approve_access_request(self, request_id, approver):
"""Approve temporary access request"""
cursor = self.db_conn.cursor()
# Get request details
cursor.execute("""
SELECT username, target_pattern, duration_hours
FROM access_requests
WHERE request_id = %s AND status = 'pending'
""", (request_id,))
result = cursor.fetchone()
if not result:
return False, "Request not found or already processed"
username, target_pattern, duration_hours = result
# Create temporary permission
cursor.execute("""
INSERT INTO user_permissions
(username, target_pattern, access_level, expiry_date,
granted_by, active)
VALUES (%s, %s, 'temporary',
DATE_ADD(NOW(), INTERVAL %s HOUR), %s, 1)
""", (username, target_pattern, duration_hours, approver))
# Update request status
cursor.execute("""
UPDATE access_requests
SET status = 'approved', approved_by = %s, approved_at = NOW()
WHERE request_id = %s
""", (approver, request_id))
self.db_conn.commit()
# Clear user's permission cache
self.clear_user_cache(username)
return True, "Access approved"
def log_access_event(self, event_type, username, details):
"""Log access events for audit"""
cursor = self.db_conn.cursor()
cursor.execute("""
INSERT INTO access_audit
(timestamp, event_type, username, details, source_ip)
VALUES (NOW(), %s, %s, %s, %s)
""", (event_type, username, json.dumps(details), self.get_client_ip()))
self.db_conn.commit()
# Also log to syslog
logging.info(f"BASTION_ACCESS: {event_type} user={username} details={details}")
# CLI tool for access management
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Bastion Access Manager')
parser.add_argument('action', choices=['validate', 'request', 'approve'])
parser.add_argument('--session', help='Session ID')
parser.add_argument('--target', help='Target host')
parser.add_argument('--user', help='Username')
parser.add_argument('--pattern', help='Target pattern for access request')
parser.add_argument('--reason', help='Reason for access request')
parser.add_argument('--duration', type=int, help='Duration in hours')
parser.add_argument('--request-id', help='Request ID to approve')
args = parser.parse_args()
manager = BastionAccessManager('/etc/bastion/config.json')
if args.action == 'validate':
valid, msg = manager.validate_target_access(args.session, args.target)
print(f"Access {'granted' if valid else 'denied'}: {msg}")
elif args.action == 'request':
request_id = manager.request_temporary_access(
args.user, args.pattern, args.reason, args.duration
)
print(f"Access request created: {request_id}")
elif args.action == 'approve':
success, msg = manager.approve_access_request(args.request_id, args.user)
print(f"{'Success' if success else 'Failed'}: {msg}")