Security Event Correlation
Security Event Correlation
Security event correlation transforms individual security events into actionable intelligence. Container environments generate massive event volumes requiring intelligent correlation to identify actual threats. Time-based correlation groups related events. Entity-based correlation tracks events across container lifecycles. Pattern-based correlation identifies known attack sequences.
Event enrichment adds context necessary for accurate correlation. Container metadata provides application context. Orchestrator information shows deployment relationships. Threat intelligence identifies known malicious indicators. User identity links container actions to human actors. This enriched context enables accurate threat identification and reduces false positives.
// Example: Security event correlation engine for containers
package main
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
type SecurityEvent struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
ContainerID string `json:"container_id"`
Type string `json:"type"`
Severity string `json:"severity"`
Source string `json:"source"`
Details map[string]interface{} `json:"details"`
Enrichment map[string]interface{} `json:"enrichment"`
}
type CorrelationRule struct {
ID string
Name string
Description string
Window time.Duration
Conditions []RuleCondition
Actions []RuleAction
}
type CorrelationEngine struct {
rules map[string]*CorrelationRule
eventBuffer *EventBuffer
enricher *EventEnricher
alertManager *AlertManager
mu sync.RWMutex
correlations map[string]*ActiveCorrelation
}
func NewCorrelationEngine() *CorrelationEngine {
return &CorrelationEngine{
rules: make(map[string]*CorrelationRule),
eventBuffer: NewEventBuffer(10000, 5*time.Minute),
enricher: NewEventEnricher(),
alertManager: NewAlertManager(),
correlations: make(map[string]*ActiveCorrelation),
}
}
func (ce *CorrelationEngine) ProcessEvent(ctx context.Context, event *SecurityEvent) error {
// Enrich event with additional context
enrichedEvent, err := ce.enricher.Enrich(ctx, event)
if err != nil {
return fmt.Errorf("enrichment failed: %w", err)
}
// Add to event buffer
ce.eventBuffer.Add(enrichedEvent)
// Check correlation rules
ce.mu.RLock()
rules := make([]*CorrelationRule, 0, len(ce.rules))
for _, rule := range ce.rules {
rules = append(rules, rule)
}
ce.mu.RUnlock()
for _, rule := range rules {
if ce.evaluateRule(rule, enrichedEvent) {
ce.handleRuleMatch(ctx, rule, enrichedEvent)
}
}
return nil
}
func (ce *CorrelationEngine) evaluateRule(rule *CorrelationRule, event *SecurityEvent) bool {
// Get relevant events within time window
windowStart := event.Timestamp.Add(-rule.Window)
relevantEvents := ce.eventBuffer.GetEventsInWindow(windowStart, event.Timestamp)
// Group events by correlation key
eventGroups := ce.groupEvents(relevantEvents, rule)
// Check if conditions are met
for _, group := range eventGroups {
if ce.checkConditions(group, rule.Conditions) {
// Create or update correlation
ce.updateCorrelation(rule.ID, group)
return true
}
}
return false
}
func (ce *CorrelationEngine) groupEvents(events []*SecurityEvent, rule *CorrelationRule) map[string][]*SecurityEvent {
groups := make(map[string][]*SecurityEvent)
for _, event := range events {
// Group by container ID by default
key := event.ContainerID
// Override grouping based on rule configuration
if rule.GroupBy == "namespace" {
key = event.Enrichment["namespace"].(string)
} else if rule.GroupBy == "node" {
key = event.Enrichment["node"].(string)
} else if rule.GroupBy == "user" {
key = event.Enrichment["user"].(string)
}
groups[key] = append(groups[key], event)
}
return groups
}
func (ce *CorrelationEngine) InitializeBuiltinRules() {
// Container escape detection
ce.AddRule(&CorrelationRule{
ID: "container_escape_attempt",
Name: "Container Escape Attempt",
Description: "Detects potential container escape through privilege escalation",
Window: 5 * time.Minute,
Conditions: []RuleCondition{
{Type: "event_count", Field: "type", Value: "privilege_escalation", Operator: ">=", Threshold: 1},
{Type: "event_count", Field: "type", Value: "sensitive_file_access", Operator: ">=", Threshold: 1},
{Type: "event_sequence", Sequence: []string{"capability_added", "namespace_change"}},
},
Actions: []RuleAction{
{Type: "alert", Severity: "critical"},
{Type: "isolate_container"},
{Type: "capture_forensics"},
},
})
// Cryptomining detection
ce.AddRule(&CorrelationRule{
ID: "cryptomining_detection",
Name: "Cryptocurrency Mining Detection",
Description: "Detects cryptocurrency mining in containers",
Window: 10 * time.Minute,
Conditions: []RuleCondition{
{Type: "threshold", Field: "cpu_usage", Operator: ">", Threshold: 80},
{Type: "event_count", Field: "type", Value: "suspicious_domain", Operator: ">=", Threshold: 1},
{Type: "pattern_match", Field: "process_name", Pattern: "(minerd|xmrig|cgminer)"},
},
Actions: []RuleAction{
{Type: "alert", Severity: "high"},
{Type: "kill_container"},
{Type: "block_image"},
},
})
// Lateral movement detection
ce.AddRule(&CorrelationRule{
ID: "lateral_movement",
Name: "Lateral Movement Detection",
Description: "Detects potential lateral movement between containers",
Window: 15 * time.Minute,
Conditions: []RuleCondition{
{Type: "unique_count", Field: "dst_container", Operator: ">", Threshold: 3},
{Type: "event_count", Field: "type", Value: "network_scan", Operator: ">=", Threshold: 1},
{Type: "event_count", Field: "type", Value: "authentication_failure", Operator: ">=", Threshold: 5},
},
Actions: []RuleAction{
{Type: "alert", Severity: "high"},
{Type: "network_isolation"},
{Type: "increase_monitoring"},
},
})
// Data exfiltration detection
ce.AddRule(&CorrelationRule{
ID: "data_exfiltration",
Name: "Data Exfiltration Detection",
Description: "Detects potential data exfiltration from containers",
Window: 30 * time.Minute,
Conditions: []RuleCondition{
{Type: "threshold", Field: "network_egress_bytes", Operator: ">", Threshold: 1073741824}, // 1GB
{Type: "unique_count", Field: "dst_ip", Operator: ">", Threshold: 10},
{Type: "time_pattern", Pattern: "outside_business_hours"},
},
Actions: []RuleAction{
{Type: "alert", Severity: "critical"},
{Type: "throttle_network"},
{Type: "notify_dpo"},
},
})
}