Centralized Logging Architecture for Security

Centralized Logging Architecture for Security

Centralized logging forms the foundation of effective security monitoring in distributed systems. Log aggregation from hundreds of microservices, containers, and infrastructure components requires robust collection, processing, and storage systems. The sheer volume of logs in modern environments demands efficient architectures that can scale while maintaining query performance.

# Kubernetes centralized logging with security focus
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush         5
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020

    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            docker
        DB                /var/log/flb-kube.db
        Mem_Buf_Limit     50MB
        Skip_Long_Lines   On
        Refresh_Interval  10

    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           On
        Keep_Log            Off
        K8S-Logging.Parser  On
        K8S-Logging.Exclude On

    [FILTER]
        Name          parser
        Match         kube.*
        Key_Name      log
        Parser        json
        Reserve_Data  On

    [FILTER]
        Name          record_modifier
        Match         *
        Record        cluster_name ${CLUSTER_NAME}
        Record        environment ${ENVIRONMENT}
        Record        region ${AWS_REGION}

    # Security event detection patterns
    [FILTER]
        Name          grep
        Match         kube.*
        Logical_Op    or
        Regex         log (unauthorized|forbidden|denied|failed|error|attack|suspicious|malicious)
        
    [FILTER]
        Name          lua
        Match         kube.*
        script        security_enrichment.lua
        call          enrich_security_context

    [OUTPUT]
        Name              es
        Match             kube.*
        Host              ${ELASTICSEARCH_HOST}
        Port              443
        HTTP_User         ${ELASTICSEARCH_USER}
        HTTP_Passwd       ${ELASTICSEARCH_PASSWORD}
        Index             security-logs
        Type              _doc
        Logstash_Format   On
        Logstash_Prefix   k8s-security
        Retry_Limit       False
        tls               On
        tls.verify        On
        Suppress_Type_Name On

    [OUTPUT]
        Name              s3
        Match             kube.*
        bucket            security-logs-archive
        region            ${AWS_REGION}
        use_put_object    On
        total_file_size   50M
        upload_timeout    10m
        store_dir         /tmp/fluent-bit-buffer

  security_enrichment.lua: |
    function enrich_security_context(tag, timestamp, record)
        -- Extract security-relevant fields
        local severity = "info"
        local security_category = "general"
        
        -- Check for authentication events
        if string.match(record["log"], "authentication") or 
           string.match(record["log"], "login") then
            security_category = "authentication"
            if string.match(record["log"], "failed") then
                severity = "warning"
            end
        end
        
        -- Check for authorization events
        if string.match(record["log"], "unauthorized") or 
           string.match(record["log"], "forbidden") then
            security_category = "authorization"
            severity = "warning"
        end
        
        -- Check for potential attacks
        if string.match(record["log"], "attack") or 
           string.match(record["log"], "exploit") or
           string.match(record["log"], "injection") then
            security_category = "attack"
            severity = "critical"
        end
        
        -- Add security context
        record["security_severity"] = severity
        record["security_category"] = security_category
        record["security_timestamp"] = os.time()
        
        -- Extract IP addresses
        local ip_pattern = "(%d+%.%d+%.%d+%.%d+)"
        local ip = string.match(record["log"], ip_pattern)
        if ip then
            record["source_ip"] = ip
        end
        
        -- Extract user information
        local user_pattern = "user[%s:=]+([%w%-_.]+)"
        local user = string.match(record["log"], user_pattern)
        if user then
            record["user"] = user
        end
        
        return 2, timestamp, record
    end

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
  namespace: logging
data:
  logstash.conf: |
    input {
      beats {
        port => 5044
        ssl => true
        ssl_certificate => "/etc/logstash/certs/logstash.crt"
        ssl_key => "/etc/logstash/certs/logstash.key"
      }
    }

    filter {
      # Parse structured logs
      if [kubernetes][container][name] =~ "nginx" {
        grok {
          match => {
            "message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATH:path}(?:%{URIPARAM:params})? %{DATA:protocol}" %{INT:status} %{INT:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}"'
          }
        }
      }

      # Security event correlation
      if [security_category] == "authentication" {
        throttle {
          before_count => 3
          after_count => 5
          period => 300
          key => "%{source_ip}-%{user}"
          add_tag => ["brute_force_attempt"]
        }
      }

      # GeoIP enrichment for security analysis
      if [source_ip] {
        geoip {
          source => "source_ip"
          target => "geoip"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }
        
        # Tag suspicious geographic locations
        if [geoip][country_code2] not in ["US", "CA", "GB", "DE", "FR"] {
          mutate {
            add_tag => ["foreign_access"]
          }
        }
      }

      # Add threat intelligence enrichment
      translate {
        field => "source_ip"
        destination => "threat_intel"
        dictionary_path => "/etc/logstash/threat_intel.yml"
        fallback => "clean"
      }

      # Calculate risk score
      ruby {
        code => '
          risk_score = 0
          
          # Authentication failures
          if event.get("[security_category]") == "authentication" and 
             event.get("message") =~ /failed/
            risk_score += 20
          end
          
          # Foreign access
          if event.get("tags") and event.get("tags").include?("foreign_access")
            risk_score += 15
          end
          
          # Known threat
          if event.get("[threat_intel]") != "clean"
            risk_score += 50
          end
          
          # Brute force pattern
          if event.get("tags") and event.get("tags").include?("brute_force_attempt")
            risk_score += 30
          end
          
          event.set("risk_score", risk_score)
          
          if risk_score >= 70
            event.set("alert_severity", "high")
          elsif risk_score >= 40
            event.set("alert_severity", "medium")
          elsif risk_score >= 20
            event.set("alert_severity", "low")
          end
        '
      }
    }

    output {
      elasticsearch {
        hosts => ["${ELASTICSEARCH_HOST}:443"]
        index => "security-events-%{+YYYY.MM.dd}"
        user => "${ELASTICSEARCH_USER}"
        password => "${ELASTICSEARCH_PASSWORD}"
        ssl => true
        ssl_certificate_verification => true
      }

      # High severity alerts to security team
      if [alert_severity] == "high" {
        email {
          to => "[email protected]"
          subject => "High Security Alert: %{security_category}"
          body => "Security event detected:\n\nSeverity: %{alert_severity}\nCategory: %{security_category}\nSource IP: %{source_ip}\nUser: %{user}\nMessage: %{message}\nRisk Score: %{risk_score}"
        }
      }

      # Send to SIEM
      syslog {
        host => "${SIEM_HOST}"
        port => 514
        protocol => "tcp"
        facility => "local0"
        severity => "informational"
        sourcehost => "%{[kubernetes][node][name]}"
        tag => "k8s-security"
      }
    }