Implementing Attribute-Based Access Control
Implementing Attribute-Based Access Control
Attribute-Based Access Control (ABAC) provides more flexible permission models than traditional RBAC. ABAC makes authorization decisions based on attributes of users, resources, and environmental context. This flexibility proves particularly valuable for IaC scenarios with complex permission requirements.
Tags and metadata enable ABAC implementations for IaC resources. Resources tagged with specific project identifiers can be managed only by team members associated with those projects. Environmental tags restrict modifications based on deployment stages. Compliance tags might require additional approvals for resources handling sensitive data.
# ABAC policy engine for IaC
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class Action(Enum):
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
READ = "read"
@dataclass
class Principal:
user_id: str
groups: List[str]
attributes: Dict[str, Any]
@dataclass
class Resource:
type: str
id: str
tags: Dict[str, str]
attributes: Dict[str, Any]
@dataclass
class Context:
timestamp: str
source_ip: str
mfa_authenticated: bool
environment: str
class ABACPolicyEngine:
def __init__(self, policies_path: str):
self.policies = self._load_policies(policies_path)
def evaluate(self, principal: Principal, action: Action,
resource: Resource, context: Context) -> bool:
"""Evaluate ABAC policy for IaC action."""
for policy in self.policies:
if self._matches_policy(policy, principal, action, resource, context):
if policy['effect'] == 'allow':
return True
elif policy['effect'] == 'deny':
return False
# Default deny
return False
def _matches_policy(self, policy: Dict, principal: Principal,
action: Action, resource: Resource,
context: Context) -> bool:
"""Check if policy matches current request."""
# Check principal matches
if not self._matches_principal(policy.get('principal', {}), principal):
return False
# Check action matches
if action.value not in policy.get('actions', []):
return False
# Check resource matches
if not self._matches_resource(policy.get('resource', {}), resource):
return False
# Check conditions
if not self._evaluate_conditions(policy.get('conditions', {}),
principal, resource, context):
return False
return True
def _evaluate_conditions(self, conditions: Dict, principal: Principal,
resource: Resource, context: Context) -> bool:
"""Evaluate policy conditions."""
for condition_type, condition_checks in conditions.items():
if condition_type == 'StringEquals':
for attribute, expected_value in condition_checks.items():
actual_value = self._get_attribute_value(
attribute, principal, resource, context
)
if actual_value != expected_value:
return False
elif condition_type == 'StringLike':
for attribute, pattern in condition_checks.items():
actual_value = self._get_attribute_value(
attribute, principal, resource, context
)
if not self._matches_pattern(actual_value, pattern):
return False
elif condition_type == 'IpAddress':
if context.source_ip not in condition_checks.get('aws:SourceIp', []):
return False
elif condition_type == 'Bool':
for attribute, expected in condition_checks.items():
if attribute == 'aws:MultiFactorAuthPresent':
if context.mfa_authenticated != expected:
return False
return True
def _get_attribute_value(self, attribute_path: str, principal: Principal,
resource: Resource, context: Context) -> Optional[str]:
"""Extract attribute value from objects."""
parts = attribute_path.split(':')
if parts[0] == 'principal':
return self._extract_from_object(principal, parts[1:])
elif parts[0] == 'resource':
return self._extract_from_object(resource, parts[1:])
elif parts[0] == 'context':
return self._extract_from_object(context, parts[1:])
return None
# Example ABAC policies for IaC
abac_policies = [
{
"id": "dev-team-dev-env",
"effect": "allow",
"principal": {
"groups": ["developers"]
},
"actions": ["create", "update", "delete"],
"resource": {
"type": "*"
},
"conditions": {
"StringEquals": {
"context:environment": "development",
"resource:tags:Team": "${principal:attributes:team}"
}
}
},
{
"id": "prod-deployment-restrictions",
"effect": "allow",
"principal": {
"groups": ["senior-engineers", "sre-team"]
},
"actions": ["update"],
"resource": {
"type": "*"
},
"conditions": {
"StringEquals": {
"context:environment": "production"
},
"Bool": {
"aws:MultiFactorAuthPresent": true
},
"IpAddress": {
"aws:SourceIp": ["10.0.0.0/8", "172.16.0.0/12"]
}
}
},
{
"id": "security-critical-resources",
"effect": "deny",
"principal": {
"groups": ["*"]
},
"actions": ["delete"],
"resource": {
"type": "*"
},
"conditions": {
"StringEquals": {
"resource:tags:SecurityCritical": "true"
},
"StringNotEquals": {
"principal:groups": ["security-team"]
}
}
}
]
Dynamic policy evaluation enables context-aware access decisions. Time-based restrictions might allow infrastructure changes only during maintenance windows. Geographic restrictions could limit production changes to specific office locations. Risk scores calculated from various factors can automatically adjust permission levels.