Implementing Secure Bastion Configuration

Implementing Secure Bastion Configuration

Configuring a bastion host requires careful attention to security hardening beyond standard SSH configurations. The bastion host must be more secure than any system it protects, as compromise would grant attackers access to the entire internal network.

Create comprehensive bastion configuration:

# /etc/ssh/sshd_config.bastion
# Hardened SSH configuration for bastion hosts

# Network and Protocol
Port 22
AddressFamily inet
Protocol 2
ListenAddress 203.0.113.10  # External interface only

# Host Keys
HostKey /etc/ssh/ssh_host_ed25519_key
HostKey /etc/ssh/ssh_host_rsa_key

# Ciphers and Algorithms (Ultra-secure)
Ciphers [email protected],[email protected]
MACs [email protected],[email protected]
KexAlgorithms curve25519-sha256,[email protected]

# Authentication
PermitRootLogin no
PubkeyAuthentication yes
PasswordAuthentication no
ChallengeResponseAuthentication no
AuthenticationMethods publickey,keyboard-interactive
UsePAM yes

# Access Control
AllowGroups bastion-users
MaxAuthTries 2
MaxSessions 10
MaxStartups 10:30:60

# Security Features
StrictModes yes
IgnoreRhosts yes
HostbasedAuthentication no
PermitUserEnvironment no
PermitTunnel no

# Forwarding Controls
AllowAgentForwarding yes  # Required for ProxyJump
AllowTcpForwarding yes    # Required for tunneling
X11Forwarding no
GatewayPorts no

# Session Controls
ClientAliveInterval 300
ClientAliveCountMax 0
LoginGraceTime 30
TCPKeepAlive yes

# Logging
SyslogFacility AUTH
LogLevel VERBOSE

# Banner
Banner /etc/ssh/bastion_banner.txt

# Force command for audit logging
ForceCommand /usr/local/bin/bastion-shell

# SFTP
Subsystem sftp /usr/lib/openssh/sftp-server -f AUTHPRIV -l INFO

# Match block for internal forwarding only
Match Group internal-access
    PermitOpen 10.0.0.0/8:22
    ForceCommand echo 'Internal access only'
    
# Match block for administrators
Match Group bastion-admins
    PermitOpen any
    AllowTcpForwarding yes

Implement bastion shell wrapper for enhanced security:

#!/usr/bin/env python3
# /usr/local/bin/bastion-shell
# Secure shell wrapper for bastion hosts

import os
import sys
import subprocess
import logging
import json
import time
import pwd
from datetime import datetime

class BastionShell:
    def __init__(self):
        self.user = pwd.getpwuid(os.getuid()).pw_name
        self.session_id = f"{self.user}-{int(time.time())}"
        self.client_ip = os.environ.get('SSH_CLIENT', '').split()[0]
        self.original_command = os.environ.get('SSH_ORIGINAL_COMMAND', '')
        
        # Setup logging
        self.setup_logging()
        
        # Setup session recording
        self.session_dir = f"/var/log/bastion-sessions/{self.session_id}"
        os.makedirs(self.session_dir, exist_ok=True)
        
    def setup_logging(self):
        """Configure audit logging"""
        logging.basicConfig(
            filename='/var/log/bastion-audit.log',
            level=logging.INFO,
            format='%(asctime)s - %(message)s'
        )
        
    def log_session_start(self):
        """Log session initiation"""
        session_info = {
            'event': 'session_start',
            'session_id': self.session_id,
            'user': self.user,
            'client_ip': self.client_ip,
            'timestamp': datetime.now().isoformat(),
            'command': self.original_command or 'interactive'
        }
        logging.info(json.dumps(session_info))
        
    def log_session_end(self, exit_code):
        """Log session termination"""
        session_info = {
            'event': 'session_end',
            'session_id': self.session_id,
            'user': self.user,
            'exit_code': exit_code,
            'timestamp': datetime.now().isoformat()
        }
        logging.info(json.dumps(session_info))
        
    def validate_command(self, command):
        """Validate commands against security policy"""
        # Forbidden commands
        forbidden_patterns = [
            'rm -rf /',
            'dd if=/dev/zero',
            'mkfs',
            ':(){ :|:& };:',  # Fork bomb
            'nc -l',  # Netcat listener
        ]
        
        for pattern in forbidden_patterns:
            if pattern in command:
                logging.warning(f"Blocked command: {command} by {self.user}")
                return False
                
        return True
        
    def get_allowed_targets(self):
        """Get list of allowed target hosts for user"""
        # In production, this would query LDAP/database
        allowed_hosts = {
            'developers': ['dev-*.internal', '10.0.1.*'],
            'admins': ['*.internal'],
            'contractors': ['contractor-workspace.internal']
        }
        
        # Get user's groups
        groups = [g.gr_name for g in os.getgroups()]
        
        targets = []
        for group in groups:
            if group in allowed_hosts:
                targets.extend(allowed_hosts[group])
                
        return targets
        
    def validate_ssh_target(self, command):
        """Validate SSH ProxyJump targets"""
        if command.startswith('ssh '):
            # Extract target host
            parts = command.split()
            if len(parts) >= 2:
                target = parts[1].split('@')[-1]
                
                allowed = self.get_allowed_targets()
                
                # Check if target matches allowed patterns
                for pattern in allowed:
                    if self.match_pattern(target, pattern):
                        return True
                        
                logging.warning(f"Unauthorized SSH target: {target} by {self.user}")
                return False
                
        return True
        
    def match_pattern(self, target, pattern):
        """Match target against pattern with wildcards"""
        import fnmatch
        return fnmatch.fnmatch(target, pattern)
        
    def execute_command(self):
        """Execute user command with recording"""
        if self.original_command:
            # Non-interactive command
            if not self.validate_command(self.original_command):
                print("Command not allowed by security policy")
                return 1
                
            if not self.validate_ssh_target(self.original_command):
                print("Access to target host denied")
                return 1
                
            # Execute with recording
            proc = subprocess.Popen(
                self.original_command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            
            stdout, stderr = proc.communicate()
            
            # Log command output
            with open(f"{self.session_dir}/command_output.txt", 'wb') as f:
                f.write(stdout)
                f.write(stderr)
                
            return proc.returncode
            
        else:
            # Interactive session with recording
            recording_file = f"{self.session_dir}/session.rec"
            timing_file = f"{self.session_dir}/session.time"
            
            # Use script for session recording
            cmd = [
                '/usr/bin/script',
                '-f',
                '-q',
                '-t', timing_file,
                recording_file,
                '-c', os.environ.get('SHELL', '/bin/bash')
            ]
            
            return subprocess.call(cmd)
            
    def run(self):
        """Main execution"""
        self.log_session_start()
        
        try:
            exit_code = self.execute_command()
        except Exception as e:
            logging.error(f"Session error: {str(e)}")
            exit_code = 1
            
        self.log_session_end(exit_code)
        return exit_code

if __name__ == '__main__':
    shell = BastionShell()
    sys.exit(shell.run())