Implementing Vault-Based Secrets Management

Implementing Vault-Based Secrets Management

HashiCorp Vault has emerged as a leading solution for enterprise secrets management. Vault provides centralized secret storage, dynamic secret generation, and comprehensive audit logging. Its plugin architecture supports various authentication methods and secret engines, making it adaptable to different organizational needs.

#!/usr/bin/env python3
# vault_integration.py - CI/CD Vault integration

import os
import sys
import json
import logging
from typing import Dict, Any, Optional
import hvac
from functools import wraps
import time

class VaultSecretsManager:
    def __init__(self, vault_addr: str, namespace: Optional[str] = None):
        self.vault_addr = vault_addr
        self.namespace = namespace
        self.client = None
        self.logger = logging.getLogger(__name__)
        
    def authenticate_ci(self, role_id: str, secret_id: str) -> bool:
        """Authenticate using AppRole for CI/CD systems"""
        try:
            self.client = hvac.Client(
                url=self.vault_addr,
                namespace=self.namespace
            )
            
            # AppRole authentication
            response = self.client.auth.approle.login(
                role_id=role_id,
                secret_id=secret_id
            )
            
            self.client.token = response['auth']['client_token']
            self.lease_id = response['auth']['lease_id']
            self.token_expires_at = time.time() + response['auth']['lease_duration']
            
            self.logger.info("Successfully authenticated with Vault")
            return True
            
        except Exception as e:
            self.logger.error(f"Vault authentication failed: {e}")
            return False
    
    def get_static_secret(self, path: str) -> Dict[str, Any]:
        """Retrieve static secret from KV v2 secret engine"""
        try:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point='secret'
            )
            return response['data']['data']
        except Exception as e:
            self.logger.error(f"Failed to retrieve secret from {path}: {e}")
            raise
    
    def get_dynamic_credentials(self, backend: str, role: str) -> Dict[str, Any]:
        """Generate dynamic credentials"""
        try:
            if backend == 'database':
                response = self.client.read(f'database/creds/{role}')
            elif backend == 'aws':
                response = self.client.read(f'aws/creds/{role}')
            elif backend == 'pki':
                response = self.client.write(
                    f'pki/issue/{role}',
                    common_name=f"{role}.example.com",
                    ttl="24h"
                )
            else:
                raise ValueError(f"Unsupported backend: {backend}")
            
            return {
                'credentials': response['data'],
                'lease_id': response['lease_id'],
                'lease_duration': response['lease_duration']
            }
            
        except Exception as e:
            self.logger.error(f"Failed to generate dynamic credentials: {e}")
            raise
    
    def rotate_secret(self, path: str, new_value: Dict[str, Any]) -> bool:
        """Rotate a static secret"""
        try:
            # Create new version
            self.client.secrets.kv.v2.create_or_update_secret(
                path=path,
                secret=new_value,
                mount_point='secret'
            )
            
            # Keep last 5 versions for rollback capability
            metadata = self.client.secrets.kv.v2.read_secret_metadata(
                path=path,
                mount_point='secret'
            )
            
            versions = metadata['data']['versions']
            if len(versions) > 5:
                # Delete old versions
                old_versions = sorted(versions.keys())[:-5]
                self.client.secrets.kv.v2.delete_secret_versions(
                    path=path,
                    versions=old_versions,
                    mount_point='secret'
                )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to rotate secret: {e}")
            return False
    
    def setup_aws_dynamic_secrets(self, role_name: str, policy_arns: list) -> bool:
        """Configure AWS dynamic secret generation"""
        try:
            # Configure AWS secret engine
            self.client.sys.enable_secrets_engine(
                backend_type='aws',
                path='aws',
                config={
                    'default_lease_ttl': '1h',
                    'max_lease_ttl': '24h'
                }
            )
            
            # Configure root credentials
            self.client.write(
                'aws/config/root',
                access_key=os.environ['AWS_ACCESS_KEY_ID'],
                secret_key=os.environ['AWS_SECRET_ACCESS_KEY'],
                region='us-east-1'
            )
            
            # Create role
            self.client.write(
                f'aws/roles/{role_name}',
                credential_type='iam_user',
                policy_arns=','.join(policy_arns),
                default_ttl='1h',
                max_ttl='24h'
            )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to setup AWS dynamic secrets: {e}")
            return False

# CI/CD pipeline integration
class PipelineSecretsInjector:
    def __init__(self, vault_manager: VaultSecretsManager):
        self.vault = vault_manager
        self.secrets_cache = {}
        
    def inject_secrets(self, stage: str, required_secrets: list) -> Dict[str, str]:
        """Inject secrets for pipeline stage"""
        injected_secrets = {}
        
        for secret_config in required_secrets:
            secret_type = secret_config['type']
            secret_name = secret_config['name']
            
            try:
                if secret_type == 'static':
                    secret_data = self.vault.get_static_secret(secret_config['path'])
                    
                    # Handle different secret formats
                    if secret_config.get('key'):
                        value = secret_data[secret_config['key']]
                    else:
                        value = json.dumps(secret_data)
                    
                    injected_secrets[secret_name] = value
                    
                elif secret_type == 'dynamic':
                    creds = self.vault.get_dynamic_credentials(
                        backend=secret_config['backend'],
                        role=secret_config['role']
                    )
                    
                    # Map credentials to environment variables
                    if secret_config['backend'] == 'database':
                        injected_secrets[f"{secret_name}_USERNAME"] = creds['credentials']['username']
                        injected_secrets[f"{secret_name}_PASSWORD"] = creds['credentials']['password']
                    elif secret_config['backend'] == 'aws':
                        injected_secrets[f"{secret_name}_ACCESS_KEY_ID"] = creds['credentials']['access_key']
                        injected_secrets[f"{secret_name}_SECRET_ACCESS_KEY"] = creds['credentials']['secret_key']
                        injected_secrets[f"{secret_name}_SESSION_TOKEN"] = creds['credentials']['security_token']
                        
                    # Store lease for cleanup
                    self.secrets_cache[secret_name] = {
                        'lease_id': creds['lease_id'],
                        'type': 'dynamic'
                    }
                    
            except Exception as e:
                self.logger.error(f"Failed to inject secret {secret_name}: {e}")
                raise
        
        return injected_secrets
    
    def cleanup_secrets(self):
        """Revoke dynamic secrets after use"""
        for secret_name, secret_info in self.secrets_cache.items():
            if secret_info['type'] == 'dynamic':
                try:
                    self.vault.client.sys.revoke_lease(
                        lease_id=secret_info['lease_id']
                    )
                except Exception as e:
                    self.logger.error(f"Failed to revoke lease for {secret_name}: {e}")

# Jenkins pipeline script
def generate_jenkins_pipeline():
    return '''
pipeline {
    agent any
    
    environment {
        VAULT_ADDR = credentials('vault-address')
        VAULT_NAMESPACE = 'cicd'
    }
    
    stages {
        stage('Setup') {
            steps {
                script {
                    // Authenticate with Vault
                    withCredentials([
                        string(credentialsId: 'vault-role-id', variable: 'ROLE_ID'),
                        string(credentialsId: 'vault-secret-id', variable: 'SECRET_ID')
                    ]) {
                        sh '''
                            python3 vault_integration.py auth \
                                --role-id $ROLE_ID \
                                --secret-id $SECRET_ID
                        '''
                    }
                }
            }
        }
        
        stage('Build') {
            steps {
                script {
                    // Inject build secrets
                    def buildSecrets = sh(
                        script: 'python3 vault_integration.py inject --stage build',
                        returnStdout: true
                    ).trim()
                    
                    def secrets = readJSON text: buildSecrets
                    
                    withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
                        sh '''
                            # Use injected secrets
                            docker build \
                                --build-arg NPM_TOKEN=$NPM_TOKEN \
                                --build-arg GITHUB_TOKEN=$GITHUB_TOKEN \
                                -t myapp:$BUILD_NUMBER .
                        '''
                    }
                }
            }
        }
        
        stage('Test') {
            steps {
                script {
                    // Get dynamic database credentials for testing
                    def testSecrets = sh(
                        script: 'python3 vault_integration.py inject --stage test --dynamic',
                        returnStdout: true
                    ).trim()
                    
                    def secrets = readJSON text: testSecrets
                    
                    withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
                        sh '''
                            # Run tests with dynamic credentials
                            export DATABASE_URL="postgresql://$DB_USERNAME:$DB_PASSWORD@testdb:5432/test"
                            npm test
                        '''
                    }
                }
            }
            post {
                always {
                    // Revoke dynamic credentials
                    sh 'python3 vault_integration.py cleanup --stage test'
                }
            }
        }
        
        stage('Deploy') {
            when {
                branch 'main'
            }
            steps {
                script {
                    // Get cloud provider credentials
                    def deploySecrets = sh(
                        script: 'python3 vault_integration.py inject --stage deploy --dynamic',
                        returnStdout: true
                    ).trim()
                    
                    def secrets = readJSON text: deploySecrets
                    
                    withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
                        sh '''
                            # Deploy with temporary AWS credentials
                            aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
                            aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY
                            aws configure set aws_session_token $AWS_SESSION_TOKEN
                            
                            terraform apply -auto-approve
                        '''
                    }
                }
            }
        }
    }
    
    post {
        always {
            // Cleanup all dynamic secrets
            sh 'python3 vault_integration.py cleanup --all'
            
            // Revoke Vault token
            sh 'python3 vault_integration.py logout'
        }
    }
}
'''