Implementing Vault-Based Secrets Management
Implementing Vault-Based Secrets Management
HashiCorp Vault has emerged as a leading solution for enterprise secrets management. Vault provides centralized secret storage, dynamic secret generation, and comprehensive audit logging. Its plugin architecture supports various authentication methods and secret engines, making it adaptable to different organizational needs.
#!/usr/bin/env python3
# vault_integration.py - CI/CD Vault integration
import os
import sys
import json
import logging
from typing import Dict, Any, Optional
import hvac
from functools import wraps
import time
class VaultSecretsManager:
def __init__(self, vault_addr: str, namespace: Optional[str] = None):
self.vault_addr = vault_addr
self.namespace = namespace
self.client = None
self.logger = logging.getLogger(__name__)
def authenticate_ci(self, role_id: str, secret_id: str) -> bool:
"""Authenticate using AppRole for CI/CD systems"""
try:
self.client = hvac.Client(
url=self.vault_addr,
namespace=self.namespace
)
# AppRole authentication
response = self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id
)
self.client.token = response['auth']['client_token']
self.lease_id = response['auth']['lease_id']
self.token_expires_at = time.time() + response['auth']['lease_duration']
self.logger.info("Successfully authenticated with Vault")
return True
except Exception as e:
self.logger.error(f"Vault authentication failed: {e}")
return False
def get_static_secret(self, path: str) -> Dict[str, Any]:
"""Retrieve static secret from KV v2 secret engine"""
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point='secret'
)
return response['data']['data']
except Exception as e:
self.logger.error(f"Failed to retrieve secret from {path}: {e}")
raise
def get_dynamic_credentials(self, backend: str, role: str) -> Dict[str, Any]:
"""Generate dynamic credentials"""
try:
if backend == 'database':
response = self.client.read(f'database/creds/{role}')
elif backend == 'aws':
response = self.client.read(f'aws/creds/{role}')
elif backend == 'pki':
response = self.client.write(
f'pki/issue/{role}',
common_name=f"{role}.example.com",
ttl="24h"
)
else:
raise ValueError(f"Unsupported backend: {backend}")
return {
'credentials': response['data'],
'lease_id': response['lease_id'],
'lease_duration': response['lease_duration']
}
except Exception as e:
self.logger.error(f"Failed to generate dynamic credentials: {e}")
raise
def rotate_secret(self, path: str, new_value: Dict[str, Any]) -> bool:
"""Rotate a static secret"""
try:
# Create new version
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=new_value,
mount_point='secret'
)
# Keep last 5 versions for rollback capability
metadata = self.client.secrets.kv.v2.read_secret_metadata(
path=path,
mount_point='secret'
)
versions = metadata['data']['versions']
if len(versions) > 5:
# Delete old versions
old_versions = sorted(versions.keys())[:-5]
self.client.secrets.kv.v2.delete_secret_versions(
path=path,
versions=old_versions,
mount_point='secret'
)
return True
except Exception as e:
self.logger.error(f"Failed to rotate secret: {e}")
return False
def setup_aws_dynamic_secrets(self, role_name: str, policy_arns: list) -> bool:
"""Configure AWS dynamic secret generation"""
try:
# Configure AWS secret engine
self.client.sys.enable_secrets_engine(
backend_type='aws',
path='aws',
config={
'default_lease_ttl': '1h',
'max_lease_ttl': '24h'
}
)
# Configure root credentials
self.client.write(
'aws/config/root',
access_key=os.environ['AWS_ACCESS_KEY_ID'],
secret_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region='us-east-1'
)
# Create role
self.client.write(
f'aws/roles/{role_name}',
credential_type='iam_user',
policy_arns=','.join(policy_arns),
default_ttl='1h',
max_ttl='24h'
)
return True
except Exception as e:
self.logger.error(f"Failed to setup AWS dynamic secrets: {e}")
return False
# CI/CD pipeline integration
class PipelineSecretsInjector:
def __init__(self, vault_manager: VaultSecretsManager):
self.vault = vault_manager
self.secrets_cache = {}
def inject_secrets(self, stage: str, required_secrets: list) -> Dict[str, str]:
"""Inject secrets for pipeline stage"""
injected_secrets = {}
for secret_config in required_secrets:
secret_type = secret_config['type']
secret_name = secret_config['name']
try:
if secret_type == 'static':
secret_data = self.vault.get_static_secret(secret_config['path'])
# Handle different secret formats
if secret_config.get('key'):
value = secret_data[secret_config['key']]
else:
value = json.dumps(secret_data)
injected_secrets[secret_name] = value
elif secret_type == 'dynamic':
creds = self.vault.get_dynamic_credentials(
backend=secret_config['backend'],
role=secret_config['role']
)
# Map credentials to environment variables
if secret_config['backend'] == 'database':
injected_secrets[f"{secret_name}_USERNAME"] = creds['credentials']['username']
injected_secrets[f"{secret_name}_PASSWORD"] = creds['credentials']['password']
elif secret_config['backend'] == 'aws':
injected_secrets[f"{secret_name}_ACCESS_KEY_ID"] = creds['credentials']['access_key']
injected_secrets[f"{secret_name}_SECRET_ACCESS_KEY"] = creds['credentials']['secret_key']
injected_secrets[f"{secret_name}_SESSION_TOKEN"] = creds['credentials']['security_token']
# Store lease for cleanup
self.secrets_cache[secret_name] = {
'lease_id': creds['lease_id'],
'type': 'dynamic'
}
except Exception as e:
self.logger.error(f"Failed to inject secret {secret_name}: {e}")
raise
return injected_secrets
def cleanup_secrets(self):
"""Revoke dynamic secrets after use"""
for secret_name, secret_info in self.secrets_cache.items():
if secret_info['type'] == 'dynamic':
try:
self.vault.client.sys.revoke_lease(
lease_id=secret_info['lease_id']
)
except Exception as e:
self.logger.error(f"Failed to revoke lease for {secret_name}: {e}")
# Jenkins pipeline script
def generate_jenkins_pipeline():
return '''
pipeline {
agent any
environment {
VAULT_ADDR = credentials('vault-address')
VAULT_NAMESPACE = 'cicd'
}
stages {
stage('Setup') {
steps {
script {
// Authenticate with Vault
withCredentials([
string(credentialsId: 'vault-role-id', variable: 'ROLE_ID'),
string(credentialsId: 'vault-secret-id', variable: 'SECRET_ID')
]) {
sh '''
python3 vault_integration.py auth \
--role-id $ROLE_ID \
--secret-id $SECRET_ID
'''
}
}
}
}
stage('Build') {
steps {
script {
// Inject build secrets
def buildSecrets = sh(
script: 'python3 vault_integration.py inject --stage build',
returnStdout: true
).trim()
def secrets = readJSON text: buildSecrets
withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
sh '''
# Use injected secrets
docker build \
--build-arg NPM_TOKEN=$NPM_TOKEN \
--build-arg GITHUB_TOKEN=$GITHUB_TOKEN \
-t myapp:$BUILD_NUMBER .
'''
}
}
}
}
stage('Test') {
steps {
script {
// Get dynamic database credentials for testing
def testSecrets = sh(
script: 'python3 vault_integration.py inject --stage test --dynamic',
returnStdout: true
).trim()
def secrets = readJSON text: testSecrets
withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
sh '''
# Run tests with dynamic credentials
export DATABASE_URL="postgresql://$DB_USERNAME:$DB_PASSWORD@testdb:5432/test"
npm test
'''
}
}
}
post {
always {
// Revoke dynamic credentials
sh 'python3 vault_integration.py cleanup --stage test'
}
}
}
stage('Deploy') {
when {
branch 'main'
}
steps {
script {
// Get cloud provider credentials
def deploySecrets = sh(
script: 'python3 vault_integration.py inject --stage deploy --dynamic',
returnStdout: true
).trim()
def secrets = readJSON text: deploySecrets
withEnv(secrets.collect { k, v -> "${k}=${v}" }) {
sh '''
# Deploy with temporary AWS credentials
aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY
aws configure set aws_session_token $AWS_SESSION_TOKEN
terraform apply -auto-approve
'''
}
}
}
}
}
post {
always {
// Cleanup all dynamic secrets
sh 'python3 vault_integration.py cleanup --all'
// Revoke Vault token
sh 'python3 vault_integration.py logout'
}
}
}
'''