Security Event Correlation

Security Event Correlation

Security event correlation transforms individual security events into actionable intelligence. Container environments generate massive event volumes requiring intelligent correlation to identify actual threats. Time-based correlation groups related events. Entity-based correlation tracks events across container lifecycles. Pattern-based correlation identifies known attack sequences.

Event enrichment adds context necessary for accurate correlation. Container metadata provides application context. Orchestrator information shows deployment relationships. Threat intelligence identifies known malicious indicators. User identity links container actions to human actors. This enriched context enables accurate threat identification and reduces false positives.

// Example: Security event correlation engine for containers
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

type SecurityEvent struct {
    ID           string                 `json:"id"`
    Timestamp    time.Time             `json:"timestamp"`
    ContainerID  string                `json:"container_id"`
    Type         string                `json:"type"`
    Severity     string                `json:"severity"`
    Source       string                `json:"source"`
    Details      map[string]interface{} `json:"details"`
    Enrichment   map[string]interface{} `json:"enrichment"`
}

type CorrelationRule struct {
    ID          string
    Name        string
    Description string
    Window      time.Duration
    Conditions  []RuleCondition
    Actions     []RuleAction
}

type CorrelationEngine struct {
    rules         map[string]*CorrelationRule
    eventBuffer   *EventBuffer
    enricher      *EventEnricher
    alertManager  *AlertManager
    mu            sync.RWMutex
    correlations  map[string]*ActiveCorrelation
}

func NewCorrelationEngine() *CorrelationEngine {
    return &CorrelationEngine{
        rules:        make(map[string]*CorrelationRule),
        eventBuffer:  NewEventBuffer(10000, 5*time.Minute),
        enricher:     NewEventEnricher(),
        alertManager: NewAlertManager(),
        correlations: make(map[string]*ActiveCorrelation),
    }
}

func (ce *CorrelationEngine) ProcessEvent(ctx context.Context, event *SecurityEvent) error {
    // Enrich event with additional context
    enrichedEvent, err := ce.enricher.Enrich(ctx, event)
    if err != nil {
        return fmt.Errorf("enrichment failed: %w", err)
    }
    
    // Add to event buffer
    ce.eventBuffer.Add(enrichedEvent)
    
    // Check correlation rules
    ce.mu.RLock()
    rules := make([]*CorrelationRule, 0, len(ce.rules))
    for _, rule := range ce.rules {
        rules = append(rules, rule)
    }
    ce.mu.RUnlock()
    
    for _, rule := range rules {
        if ce.evaluateRule(rule, enrichedEvent) {
            ce.handleRuleMatch(ctx, rule, enrichedEvent)
        }
    }
    
    return nil
}

func (ce *CorrelationEngine) evaluateRule(rule *CorrelationRule, event *SecurityEvent) bool {
    // Get relevant events within time window
    windowStart := event.Timestamp.Add(-rule.Window)
    relevantEvents := ce.eventBuffer.GetEventsInWindow(windowStart, event.Timestamp)
    
    // Group events by correlation key
    eventGroups := ce.groupEvents(relevantEvents, rule)
    
    // Check if conditions are met
    for _, group := range eventGroups {
        if ce.checkConditions(group, rule.Conditions) {
            // Create or update correlation
            ce.updateCorrelation(rule.ID, group)
            return true
        }
    }
    
    return false
}

func (ce *CorrelationEngine) groupEvents(events []*SecurityEvent, rule *CorrelationRule) map[string][]*SecurityEvent {
    groups := make(map[string][]*SecurityEvent)
    
    for _, event := range events {
        // Group by container ID by default
        key := event.ContainerID
        
        // Override grouping based on rule configuration
        if rule.GroupBy == "namespace" {
            key = event.Enrichment["namespace"].(string)
        } else if rule.GroupBy == "node" {
            key = event.Enrichment["node"].(string)
        } else if rule.GroupBy == "user" {
            key = event.Enrichment["user"].(string)
        }
        
        groups[key] = append(groups[key], event)
    }
    
    return groups
}

func (ce *CorrelationEngine) InitializeBuiltinRules() {
    // Container escape detection
    ce.AddRule(&CorrelationRule{
        ID:          "container_escape_attempt",
        Name:        "Container Escape Attempt",
        Description: "Detects potential container escape through privilege escalation",
        Window:      5 * time.Minute,
        Conditions: []RuleCondition{
            {Type: "event_count", Field: "type", Value: "privilege_escalation", Operator: ">=", Threshold: 1},
            {Type: "event_count", Field: "type", Value: "sensitive_file_access", Operator: ">=", Threshold: 1},
            {Type: "event_sequence", Sequence: []string{"capability_added", "namespace_change"}},
        },
        Actions: []RuleAction{
            {Type: "alert", Severity: "critical"},
            {Type: "isolate_container"},
            {Type: "capture_forensics"},
        },
    })
    
    // Cryptomining detection
    ce.AddRule(&CorrelationRule{
        ID:          "cryptomining_detection",
        Name:        "Cryptocurrency Mining Detection",
        Description: "Detects cryptocurrency mining in containers",
        Window:      10 * time.Minute,
        Conditions: []RuleCondition{
            {Type: "threshold", Field: "cpu_usage", Operator: ">", Threshold: 80},
            {Type: "event_count", Field: "type", Value: "suspicious_domain", Operator: ">=", Threshold: 1},
            {Type: "pattern_match", Field: "process_name", Pattern: "(minerd|xmrig|cgminer)"},
        },
        Actions: []RuleAction{
            {Type: "alert", Severity: "high"},
            {Type: "kill_container"},
            {Type: "block_image"},
        },
    })
    
    // Lateral movement detection
    ce.AddRule(&CorrelationRule{
        ID:          "lateral_movement",
        Name:        "Lateral Movement Detection",
        Description: "Detects potential lateral movement between containers",
        Window:      15 * time.Minute,
        Conditions: []RuleCondition{
            {Type: "unique_count", Field: "dst_container", Operator: ">", Threshold: 3},
            {Type: "event_count", Field: "type", Value: "network_scan", Operator: ">=", Threshold: 1},
            {Type: "event_count", Field: "type", Value: "authentication_failure", Operator: ">=", Threshold: 5},
        },
        Actions: []RuleAction{
            {Type: "alert", Severity: "high"},
            {Type: "network_isolation"},
            {Type: "increase_monitoring"},
        },
    })
    
    // Data exfiltration detection
    ce.AddRule(&CorrelationRule{
        ID:          "data_exfiltration",
        Name:        "Data Exfiltration Detection",
        Description: "Detects potential data exfiltration from containers",
        Window:      30 * time.Minute,
        Conditions: []RuleCondition{
            {Type: "threshold", Field: "network_egress_bytes", Operator: ">", Threshold: 1073741824}, // 1GB
            {Type: "unique_count", Field: "dst_ip", Operator: ">", Threshold: 10},
            {Type: "time_pattern", Pattern: "outside_business_hours"},
        },
        Actions: []RuleAction{
            {Type: "alert", Severity: "critical"},
            {Type: "throttle_network"},
            {Type: "notify_dpo"},
        },
    })
}