Centralized Key Repository
Centralized Key Repository
A centralized key repository provides single source of truth for all SSH keys in the organization. This repository must be secure, auditable, and integrated with identity management systems to maintain consistency across the infrastructure.
Implement a secure centralized key store:
#!/usr/bin/env python3
# ssh-key-repository.py
# Centralized SSH key repository with API
import os
import json
import hashlib
import sqlite3
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import flask
from flask import Flask, request, jsonify
import ldap3
class SSHKeyRepository:
def __init__(self, db_path, encryption_key=None):
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
# Initialize encryption
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
self.cipher = Fernet(Fernet.generate_key())
self._init_database()
def _init_database(self):
"""Initialize repository database"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS ssh_keys (
key_id TEXT PRIMARY KEY,
public_key TEXT NOT NULL,
private_key_encrypted BLOB,
fingerprint TEXT UNIQUE,
owner_id TEXT NOT NULL,
owner_email TEXT,
created_at TIMESTAMP,
expires_at TIMESTAMP,
last_accessed TIMESTAMP,
access_count INTEGER DEFAULT 0,
key_type TEXT,
key_purpose TEXT,
metadata JSON,
status TEXT DEFAULT 'active'
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS key_permissions (
permission_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT,
user_id TEXT,
permission_type TEXT,
granted_at TIMESTAMP,
granted_by TEXT,
expires_at TIMESTAMP,
FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
user_id TEXT,
action TEXT,
key_id TEXT,
details JSON,
ip_address TEXT,
success BOOLEAN
)''')
self.conn.commit()
def store_key(self, owner_id, public_key, private_key=None, metadata=None):
"""Store SSH key in repository"""
# Generate key ID
key_id = hashlib.sha256(f"{owner_id}{datetime.now()}".encode()).hexdigest()[:16]
# Calculate fingerprint
fingerprint = self._calculate_fingerprint(public_key)
# Encrypt private key if provided
private_key_encrypted = None
if private_key:
private_key_encrypted = self.cipher.encrypt(private_key.encode())
# Default expiry (1 year)
expires_at = datetime.now() + timedelta(days=365)
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO ssh_keys (
key_id, public_key, private_key_encrypted, fingerprint,
owner_id, created_at, expires_at, key_type, metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
key_id, public_key, private_key_encrypted, fingerprint,
owner_id, datetime.now(), expires_at, 'ed25519',
json.dumps(metadata or {})
))
self.conn.commit()
# Audit log
self._audit_log(owner_id, 'store_key', key_id, {'fingerprint': fingerprint})
return key_id
def retrieve_key(self, key_id, user_id):
"""Retrieve public key from repository"""
# Check permissions
if not self._check_permission(key_id, user_id, 'read'):
self._audit_log(user_id, 'retrieve_key_denied', key_id, {})
return None
cursor = self.conn.cursor()
cursor.execute('''
SELECT public_key, status FROM ssh_keys WHERE key_id = ?
''', (key_id,))
result = cursor.fetchone()
if result and result[1] == 'active':
public_key, _ = result
# Update access tracking
cursor.execute('''
UPDATE ssh_keys
SET last_accessed = ?, access_count = access_count + 1
WHERE key_id = ?
''', (datetime.now(), key_id))
self.conn.commit()
self._audit_log(user_id, 'retrieve_key', key_id, {})
return public_key
return None
def list_user_keys(self, user_id):
"""List all keys accessible by user"""
cursor = self.conn.cursor()
# Get owned keys
cursor.execute('''
SELECT key_id, fingerprint, created_at, expires_at, status, key_purpose
FROM ssh_keys
WHERE owner_id = ? AND status = 'active'
''', (user_id,))
owned_keys = []
for row in cursor.fetchall():
owned_keys.append({
'key_id': row[0],
'fingerprint': row[1],
'created_at': row[2],
'expires_at': row[3],
'status': row[4],
'purpose': row[5],
'ownership': 'owned'
})
# Get keys with permissions
cursor.execute('''
SELECT k.key_id, k.fingerprint, p.permission_type, p.expires_at
FROM ssh_keys k
JOIN key_permissions p ON k.key_id = p.key_id
WHERE p.user_id = ? AND k.status = 'active'
AND (p.expires_at IS NULL OR p.expires_at > ?)
''', (user_id, datetime.now()))
permitted_keys = []
for row in cursor.fetchall():
permitted_keys.append({
'key_id': row[0],
'fingerprint': row[1],
'permission': row[2],
'permission_expires': row[3],
'ownership': 'permitted'
})
return owned_keys + permitted_keys
def grant_permission(self, key_id, user_id, permission_type, grantor_id, expires_in_days=None):
"""Grant permission to access key"""
expires_at = None
if expires_in_days:
expires_at = datetime.now() + timedelta(days=expires_in_days)
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO key_permissions (
key_id, user_id, permission_type, granted_at, granted_by, expires_at
) VALUES (?, ?, ?, ?, ?, ?)
''', (key_id, user_id, permission_type, datetime.now(), grantor_id, expires_at))
self.conn.commit()
self._audit_log(grantor_id, 'grant_permission', key_id, {
'user_id': user_id,
'permission': permission_type
})
def _check_permission(self, key_id, user_id, required_permission):
"""Check if user has permission for key"""
cursor = self.conn.cursor()
# Check if owner
cursor.execute('SELECT owner_id FROM ssh_keys WHERE key_id = ?', (key_id,))
result = cursor.fetchone()
if result and result[0] == user_id:
return True
# Check permissions
cursor.execute('''
SELECT permission_type FROM key_permissions
WHERE key_id = ? AND user_id = ?
AND (expires_at IS NULL OR expires_at > ?)
''', (key_id, user_id, datetime.now()))
permissions = [row[0] for row in cursor.fetchall()]
# Permission hierarchy
if 'admin' in permissions:
return True
if required_permission == 'read' and any(p in ['read', 'write'] for p in permissions):
return True
if required_permission in permissions:
return True
return False
def _audit_log(self, user_id, action, key_id, details):
"""Log audit event"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO audit_log (timestamp, user_id, action, key_id, details, success)
VALUES (?, ?, ?, ?, ?, ?)
''', (datetime.now(), user_id, action, key_id, json.dumps(details), True))
self.conn.commit()
# REST API for key repository
app = Flask(__name__)
key_repo = SSHKeyRepository('/opt/ssh-keys/repository.db')
@app.route('/api/keys', methods=['POST'])
def create_key():
"""Create new SSH key"""
data = request.json
user_id = authenticate_user(request.headers.get('Authorization'))
if not user_id:
return jsonify({'error': 'Unauthorized'}), 401
# Generate new key pair
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Serialize keys
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.OpenSSH,
encryption_algorithm=serialization.NoEncryption()
)
public_ssh = public_key.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
# Store in repository
key_id = key_repo.store_key(
user_id,
public_ssh.decode(),
private_pem.decode(),
metadata=data.get('metadata', {})
)
return jsonify({
'key_id': key_id,
'public_key': public_ssh.decode(),
'fingerprint': key_repo._calculate_fingerprint(public_ssh.decode())
})
@app.route('/api/keys/<key_id>', methods=['GET'])
def get_key(key_id):
"""Retrieve public key"""
user_id = authenticate_user(request.headers.get('Authorization'))
if not user_id:
return jsonify({'error': 'Unauthorized'}), 401
public_key = key_repo.retrieve_key(key_id, user_id)
if public_key:
return jsonify({
'key_id': key_id,
'public_key': public_key
})
else:
return jsonify({'error': 'Key not found or access denied'}), 404