Centralized Key Repository

Centralized Key Repository

A centralized key repository provides single source of truth for all SSH keys in the organization. This repository must be secure, auditable, and integrated with identity management systems to maintain consistency across the infrastructure.

Implement a secure centralized key store:

#!/usr/bin/env python3
# ssh-key-repository.py
# Centralized SSH key repository with API

import os
import json
import hashlib
import sqlite3
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import flask
from flask import Flask, request, jsonify
import ldap3

class SSHKeyRepository:
    def __init__(self, db_path, encryption_key=None):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        
        # Initialize encryption
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            self.cipher = Fernet(Fernet.generate_key())
            
        self._init_database()
        
    def _init_database(self):
        """Initialize repository database"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS ssh_keys (
            key_id TEXT PRIMARY KEY,
            public_key TEXT NOT NULL,
            private_key_encrypted BLOB,
            fingerprint TEXT UNIQUE,
            owner_id TEXT NOT NULL,
            owner_email TEXT,
            created_at TIMESTAMP,
            expires_at TIMESTAMP,
            last_accessed TIMESTAMP,
            access_count INTEGER DEFAULT 0,
            key_type TEXT,
            key_purpose TEXT,
            metadata JSON,
            status TEXT DEFAULT 'active'
        )''')
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS key_permissions (
            permission_id INTEGER PRIMARY KEY AUTOINCREMENT,
            key_id TEXT,
            user_id TEXT,
            permission_type TEXT,
            granted_at TIMESTAMP,
            granted_by TEXT,
            expires_at TIMESTAMP,
            FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
        )''')
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS audit_log (
            log_id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TIMESTAMP,
            user_id TEXT,
            action TEXT,
            key_id TEXT,
            details JSON,
            ip_address TEXT,
            success BOOLEAN
        )''')
        
        self.conn.commit()
        
    def store_key(self, owner_id, public_key, private_key=None, metadata=None):
        """Store SSH key in repository"""
        # Generate key ID
        key_id = hashlib.sha256(f"{owner_id}{datetime.now()}".encode()).hexdigest()[:16]
        
        # Calculate fingerprint
        fingerprint = self._calculate_fingerprint(public_key)
        
        # Encrypt private key if provided
        private_key_encrypted = None
        if private_key:
            private_key_encrypted = self.cipher.encrypt(private_key.encode())
            
        # Default expiry (1 year)
        expires_at = datetime.now() + timedelta(days=365)
        
        cursor = self.conn.cursor()
        cursor.execute('''
        INSERT INTO ssh_keys (
            key_id, public_key, private_key_encrypted, fingerprint,
            owner_id, created_at, expires_at, key_type, metadata
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            key_id, public_key, private_key_encrypted, fingerprint,
            owner_id, datetime.now(), expires_at, 'ed25519',
            json.dumps(metadata or {})
        ))
        
        self.conn.commit()
        
        # Audit log
        self._audit_log(owner_id, 'store_key', key_id, {'fingerprint': fingerprint})
        
        return key_id
        
    def retrieve_key(self, key_id, user_id):
        """Retrieve public key from repository"""
        # Check permissions
        if not self._check_permission(key_id, user_id, 'read'):
            self._audit_log(user_id, 'retrieve_key_denied', key_id, {})
            return None
            
        cursor = self.conn.cursor()
        cursor.execute('''
        SELECT public_key, status FROM ssh_keys WHERE key_id = ?
        ''', (key_id,))
        
        result = cursor.fetchone()
        if result and result[1] == 'active':
            public_key, _ = result
            
            # Update access tracking
            cursor.execute('''
            UPDATE ssh_keys 
            SET last_accessed = ?, access_count = access_count + 1
            WHERE key_id = ?
            ''', (datetime.now(), key_id))
            self.conn.commit()
            
            self._audit_log(user_id, 'retrieve_key', key_id, {})
            return public_key
            
        return None
        
    def list_user_keys(self, user_id):
        """List all keys accessible by user"""
        cursor = self.conn.cursor()
        
        # Get owned keys
        cursor.execute('''
        SELECT key_id, fingerprint, created_at, expires_at, status, key_purpose
        FROM ssh_keys
        WHERE owner_id = ? AND status = 'active'
        ''', (user_id,))
        
        owned_keys = []
        for row in cursor.fetchall():
            owned_keys.append({
                'key_id': row[0],
                'fingerprint': row[1],
                'created_at': row[2],
                'expires_at': row[3],
                'status': row[4],
                'purpose': row[5],
                'ownership': 'owned'
            })
            
        # Get keys with permissions
        cursor.execute('''
        SELECT k.key_id, k.fingerprint, p.permission_type, p.expires_at
        FROM ssh_keys k
        JOIN key_permissions p ON k.key_id = p.key_id
        WHERE p.user_id = ? AND k.status = 'active'
        AND (p.expires_at IS NULL OR p.expires_at > ?)
        ''', (user_id, datetime.now()))
        
        permitted_keys = []
        for row in cursor.fetchall():
            permitted_keys.append({
                'key_id': row[0],
                'fingerprint': row[1],
                'permission': row[2],
                'permission_expires': row[3],
                'ownership': 'permitted'
            })
            
        return owned_keys + permitted_keys
        
    def grant_permission(self, key_id, user_id, permission_type, grantor_id, expires_in_days=None):
        """Grant permission to access key"""
        expires_at = None
        if expires_in_days:
            expires_at = datetime.now() + timedelta(days=expires_in_days)
            
        cursor = self.conn.cursor()
        cursor.execute('''
        INSERT INTO key_permissions (
            key_id, user_id, permission_type, granted_at, granted_by, expires_at
        ) VALUES (?, ?, ?, ?, ?, ?)
        ''', (key_id, user_id, permission_type, datetime.now(), grantor_id, expires_at))
        
        self.conn.commit()
        
        self._audit_log(grantor_id, 'grant_permission', key_id, {
            'user_id': user_id,
            'permission': permission_type
        })
        
    def _check_permission(self, key_id, user_id, required_permission):
        """Check if user has permission for key"""
        cursor = self.conn.cursor()
        
        # Check if owner
        cursor.execute('SELECT owner_id FROM ssh_keys WHERE key_id = ?', (key_id,))
        result = cursor.fetchone()
        if result and result[0] == user_id:
            return True
            
        # Check permissions
        cursor.execute('''
        SELECT permission_type FROM key_permissions
        WHERE key_id = ? AND user_id = ?
        AND (expires_at IS NULL OR expires_at > ?)
        ''', (key_id, user_id, datetime.now()))
        
        permissions = [row[0] for row in cursor.fetchall()]
        
        # Permission hierarchy
        if 'admin' in permissions:
            return True
        if required_permission == 'read' and any(p in ['read', 'write'] for p in permissions):
            return True
        if required_permission in permissions:
            return True
            
        return False
        
    def _audit_log(self, user_id, action, key_id, details):
        """Log audit event"""
        cursor = self.conn.cursor()
        cursor.execute('''
        INSERT INTO audit_log (timestamp, user_id, action, key_id, details, success)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (datetime.now(), user_id, action, key_id, json.dumps(details), True))
        self.conn.commit()

# REST API for key repository
app = Flask(__name__)
key_repo = SSHKeyRepository('/opt/ssh-keys/repository.db')

@app.route('/api/keys', methods=['POST'])
def create_key():
    """Create new SSH key"""
    data = request.json
    user_id = authenticate_user(request.headers.get('Authorization'))
    
    if not user_id:
        return jsonify({'error': 'Unauthorized'}), 401
        
    # Generate new key pair
    private_key = ed25519.Ed25519PrivateKey.generate()
    public_key = private_key.public_key()
    
    # Serialize keys
    private_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.OpenSSH,
        encryption_algorithm=serialization.NoEncryption()
    )
    
    public_ssh = public_key.public_bytes(
        encoding=serialization.Encoding.OpenSSH,
        format=serialization.PublicFormat.OpenSSH
    )
    
    # Store in repository
    key_id = key_repo.store_key(
        user_id,
        public_ssh.decode(),
        private_pem.decode(),
        metadata=data.get('metadata', {})
    )
    
    return jsonify({
        'key_id': key_id,
        'public_key': public_ssh.decode(),
        'fingerprint': key_repo._calculate_fingerprint(public_ssh.decode())
    })

@app.route('/api/keys/<key_id>', methods=['GET'])
def get_key(key_id):
    """Retrieve public key"""
    user_id = authenticate_user(request.headers.get('Authorization'))
    
    if not user_id:
        return jsonify({'error': 'Unauthorized'}), 401
        
    public_key = key_repo.retrieve_key(key_id, user_id)
    
    if public_key:
        return jsonify({
            'key_id': key_id,
            'public_key': public_key
        })
    else:
        return jsonify({'error': 'Key not found or access denied'}), 404