Centralized Logging Architecture for Security
Centralized Logging Architecture for Security
Centralized logging forms the foundation of effective security monitoring in distributed systems. Log aggregation from hundreds of microservices, containers, and infrastructure components requires robust collection, processing, and storage systems. The sheer volume of logs in modern environments demands efficient architectures that can scale while maintaining query performance.
# Kubernetes centralized logging with security focus
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
data:
fluent-bit.conf: |
[SERVICE]
Flush 5
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
DB /var/log/flb-kube.db
Mem_Buf_Limit 50MB
Skip_Long_Lines On
Refresh_Interval 10
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log On
Keep_Log Off
K8S-Logging.Parser On
K8S-Logging.Exclude On
[FILTER]
Name parser
Match kube.*
Key_Name log
Parser json
Reserve_Data On
[FILTER]
Name record_modifier
Match *
Record cluster_name ${CLUSTER_NAME}
Record environment ${ENVIRONMENT}
Record region ${AWS_REGION}
# Security event detection patterns
[FILTER]
Name grep
Match kube.*
Logical_Op or
Regex log (unauthorized|forbidden|denied|failed|error|attack|suspicious|malicious)
[FILTER]
Name lua
Match kube.*
script security_enrichment.lua
call enrich_security_context
[OUTPUT]
Name es
Match kube.*
Host ${ELASTICSEARCH_HOST}
Port 443
HTTP_User ${ELASTICSEARCH_USER}
HTTP_Passwd ${ELASTICSEARCH_PASSWORD}
Index security-logs
Type _doc
Logstash_Format On
Logstash_Prefix k8s-security
Retry_Limit False
tls On
tls.verify On
Suppress_Type_Name On
[OUTPUT]
Name s3
Match kube.*
bucket security-logs-archive
region ${AWS_REGION}
use_put_object On
total_file_size 50M
upload_timeout 10m
store_dir /tmp/fluent-bit-buffer
security_enrichment.lua: |
function enrich_security_context(tag, timestamp, record)
-- Extract security-relevant fields
local severity = "info"
local security_category = "general"
-- Check for authentication events
if string.match(record["log"], "authentication") or
string.match(record["log"], "login") then
security_category = "authentication"
if string.match(record["log"], "failed") then
severity = "warning"
end
end
-- Check for authorization events
if string.match(record["log"], "unauthorized") or
string.match(record["log"], "forbidden") then
security_category = "authorization"
severity = "warning"
end
-- Check for potential attacks
if string.match(record["log"], "attack") or
string.match(record["log"], "exploit") or
string.match(record["log"], "injection") then
security_category = "attack"
severity = "critical"
end
-- Add security context
record["security_severity"] = severity
record["security_category"] = security_category
record["security_timestamp"] = os.time()
-- Extract IP addresses
local ip_pattern = "(%d+%.%d+%.%d+%.%d+)"
local ip = string.match(record["log"], ip_pattern)
if ip then
record["source_ip"] = ip
end
-- Extract user information
local user_pattern = "user[%s:=]+([%w%-_.]+)"
local user = string.match(record["log"], user_pattern)
if user then
record["user"] = user
end
return 2, timestamp, record
end
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: logging
data:
logstash.conf: |
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}
filter {
# Parse structured logs
if [kubernetes][container][name] =~ "nginx" {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATH:path}(?:%{URIPARAM:params})? %{DATA:protocol}" %{INT:status} %{INT:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}"'
}
}
}
# Security event correlation
if [security_category] == "authentication" {
throttle {
before_count => 3
after_count => 5
period => 300
key => "%{source_ip}-%{user}"
add_tag => ["brute_force_attempt"]
}
}
# GeoIP enrichment for security analysis
if [source_ip] {
geoip {
source => "source_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
# Tag suspicious geographic locations
if [geoip][country_code2] not in ["US", "CA", "GB", "DE", "FR"] {
mutate {
add_tag => ["foreign_access"]
}
}
}
# Add threat intelligence enrichment
translate {
field => "source_ip"
destination => "threat_intel"
dictionary_path => "/etc/logstash/threat_intel.yml"
fallback => "clean"
}
# Calculate risk score
ruby {
code => '
risk_score = 0
# Authentication failures
if event.get("[security_category]") == "authentication" and
event.get("message") =~ /failed/
risk_score += 20
end
# Foreign access
if event.get("tags") and event.get("tags").include?("foreign_access")
risk_score += 15
end
# Known threat
if event.get("[threat_intel]") != "clean"
risk_score += 50
end
# Brute force pattern
if event.get("tags") and event.get("tags").include?("brute_force_attempt")
risk_score += 30
end
event.set("risk_score", risk_score)
if risk_score >= 70
event.set("alert_severity", "high")
elsif risk_score >= 40
event.set("alert_severity", "medium")
elsif risk_score >= 20
event.set("alert_severity", "low")
end
'
}
}
output {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}:443"]
index => "security-events-%{+YYYY.MM.dd}"
user => "${ELASTICSEARCH_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
ssl => true
ssl_certificate_verification => true
}
# High severity alerts to security team
if [alert_severity] == "high" {
email {
to => "[email protected]"
subject => "High Security Alert: %{security_category}"
body => "Security event detected:\n\nSeverity: %{alert_severity}\nCategory: %{security_category}\nSource IP: %{source_ip}\nUser: %{user}\nMessage: %{message}\nRisk Score: %{risk_score}"
}
}
# Send to SIEM
syslog {
host => "${SIEM_HOST}"
port => 514
protocol => "tcp"
facility => "local0"
severity => "informational"
sourcehost => "%{[kubernetes][node][name]}"
tag => "k8s-security"
}
}