Integration Guide for SOC and Monitoring Teams
Table of Contents¶
- Overview
- Key Capabilities
- Architecture
- System Components
- Event Types
- SecurityEventType
- Severity Levels (RFC 5424)
- SecurityEvent Dataclass
- Fields
- Factory Method
- Syslog Format (RFC 5424)
- Message Structure
- Example Message
- Configuration
- Splunk Integration
- CEF Format (ArcSight)
- CEF Message Structure
- Signature ID Mapping
- CEF Example Message
- Extension Fields
- CEF Configuration
- ArcSight FlexConnector
- LEEF Format (QRadar)
- LEEF Message Structure
- LEEF Example Message
- LEEF Configuration
- QRadar Log Source
- Buffering and Reliability
- Buffer Configuration
- Failure Behavior
- Buffer Statistics
- API Reference
- Creating Events
- SIEMDispatcher API
- Event Examples
- DLP Block Event
- LLM Request Event
- Auth Failure Event
- Security Alert Event
- Monitoring
- Troubleshooting
- Connection Testing
- Diagnostics
- Logging
- Related Documents
Overview¶
CodeGraph supports real-time security event dispatch to SIEM systems. Three formats are supported: Syslog (RFC 5424), CEF (ArcSight), and LEEF (QRadar).
Key Capabilities¶
- 3 output formats: Syslog RFC 5424, CEF, LEEF
- 17 event types: LLM, DLP, Auth, Vault, Rate Limiting, Security (path violation, IDOR, webhook replay, MCP auth)
- 3 transport protocols: UDP, TCP, TLS
- Buffering: up to 10,000 events in queue
- Retry with re-queue: automatic retry on failures
- Graceful degradation: continued operation when SIEM unavailable
Architecture¶
System Components¶
┌─────────────────────────────────────────────────────────────────────┐
│ APPLICATION │
│ │
│ [LLM Provider] ──┐ │
│ │ │
│ [DLP Scanner] ───┼──► [SecurityEvent] ──► [SIEMDispatcher] │
│ │ │ │
│ [Auth Module] ───┘ │ │
│ │ │
│ ┌──────▼──────┐ │
│ │ SIEMBuffer │ │
│ │ (10K queue) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────────────────┼─────────────────┐│
│ │ │ ││
│ ▼ ▼ ▼│
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐│
│ │SysLogHandler │ │ CEFHandler │ │ LEEFHandler ││
│ │ (RFC 5424) │ │ (ArcSight) │ │ (QRadar) ││
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘│
└──────────────────┼───────────────────────────┼───────────────────┼────────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Splunk │ │ ArcSight │ │ QRadar │
│ Graylog │ │ Splunk │ │ │
│ rsyslog │ │ │ │ │
└────────────┘ └────────────┘ └────────────┘
Event Types¶
SecurityEventType¶
All 17 event types defined in src/security/siem/base_handler.py. String values use lowercase dotted format:
| Event Type | String Value | Description | Severity |
|---|---|---|---|
LLM_REQUEST |
llm.request |
Request to LLM provider | INFO (6) |
LLM_RESPONSE |
llm.response |
Response from LLM | INFO (6) |
LLM_ERROR |
llm.error |
LLM interaction error | ERROR (3) |
DLP_BLOCK |
dlp.block |
Request blocked by DLP | CRITICAL (2) |
DLP_MASK |
dlp.mask |
Data masked | WARNING (4) |
DLP_WARN |
dlp.warn |
DLP warning | WARNING (4) |
DLP_LOG |
dlp.log |
DLP logging | INFO (6) |
AUTH_SUCCESS |
auth.success |
Successful authentication | INFO (6) |
AUTH_FAILURE |
auth.failure |
Failed authentication | WARNING (4) |
VAULT_ACCESS |
vault.access |
Vault secrets access | INFO (6) |
VAULT_ROTATE |
vault.rotate |
Secrets rotation | NOTICE (5) |
RATE_LIMIT |
rate.limit |
Rate limit exceeded | WARNING (4) |
SECURITY_ALERT |
security.alert |
Critical security event | ALERT (1) |
PATH_VIOLATION |
security.path_violation |
Path traversal attempt | WARNING (4) |
IDOR_ATTEMPT |
security.idor_attempt |
IDOR access attempt | WARNING (4) |
WEBHOOK_REPLAY |
security.webhook.replay |
Webhook replay attack | WARNING (4) |
MCP_AUTH_FAILURE |
security.mcp.auth_failure |
MCP authentication failure | WARNING (4) |
Severity Levels (RFC 5424)¶
| Code | Level | Description |
|---|---|---|
| 0 | EMERGENCY | System unusable |
| 1 | ALERT | Immediate action required |
| 2 | CRITICAL | Critical condition |
| 3 | ERROR | Error condition |
| 4 | WARNING | Warning condition |
| 5 | NOTICE | Normal but significant |
| 6 | INFO | Informational message |
| 7 | DEBUG | Debug message |
SecurityEvent Dataclass¶
Fields¶
All 19 fields of the SecurityEvent dataclass (base_handler.py):
| Field | Type | Default | Description |
|---|---|---|---|
event_type |
SecurityEventType |
required | Type of security event |
timestamp |
str |
required | ISO 8601 timestamp |
request_id |
str |
required | Unique request identifier |
message |
str |
required | Human-readable message |
severity |
int |
6 (INFO) |
RFC 5424 severity (0–7) |
user_id |
Optional[str] |
None |
User identifier |
session_id |
Optional[str] |
None |
Session identifier |
ip_address |
Optional[str] |
None |
Client IP address |
user_agent |
Optional[str] |
None |
User-Agent header |
provider |
Optional[str] |
None |
LLM provider name |
model |
Optional[str] |
None |
LLM model name |
action |
Optional[str] |
None |
Action performed |
dlp_category |
Optional[str] |
None |
DLP category |
dlp_pattern |
Optional[str] |
None |
DLP pattern name |
tokens_used |
Optional[int] |
None |
Token count |
latency_ms |
Optional[float] |
None |
Latency in milliseconds |
project_id |
Optional[str] |
None |
Multi-tenant project ID |
group_id |
Optional[str] |
None |
Multi-tenant group ID |
details |
Dict[str, Any] |
{} |
Additional event details |
Factory Method¶
# SecurityEvent.create() auto-generates ISO timestamp
event = SecurityEvent.create(
event_type=SecurityEventType.DLP_BLOCK,
message="AWS access key detected",
request_id="req-12345",
severity=2, # CRITICAL
user_id="user_123",
ip_address="10.0.0.50",
dlp_category="credentials",
dlp_pattern="aws_access_key",
action="BLOCK"
)
# Convert to dict (None fields omitted)
event_dict = event.to_dict()
Syslog Format (RFC 5424)¶
Message Structure¶
<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MSG
Example Message¶
<134>1 2026-02-26T10:30:00.000Z codegraph-server codegraph 12345 DLP001
[meta@47450 category="credentials" pattern="aws_access_key" action="BLOCK"]
DLP blocked request: AWS access key detected in user prompt
Configuration¶
security:
siem:
enabled: true
syslog:
enabled: true
protocol: udp # udp, tcp, tls
host: "siem.company.com"
port: 514
facility: 16 # LOCAL0 (16-23)
app_name: "codegraph"
hostname: null # Auto-detect
tls: # Only for protocol: tls
ca_cert: "/path/to/ca.crt"
client_cert: "/path/to/client.crt"
client_key: "/path/to/client.key"
verify: true
Splunk Integration¶
# inputs.conf
[udp://514]
sourcetype = syslog
index = security
# props.conf
[syslog]
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%3N%Z
SHOULD_LINEMERGE = false
CEF Format (ArcSight)¶
CEF Message Structure¶
CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
Signature ID Mapping¶
Full mapping from SIGNATURE_MAP (cef_handler.py) and EVENT_ID_MAP (leef_handler.py):
| Event Type | Signature ID | Name |
|---|---|---|
llm.request |
LLM001 | LLM Request |
llm.response |
LLM002 | LLM Response |
llm.error |
LLM003 | LLM Error |
dlp.block |
DLP001 | DLP Block |
dlp.mask |
DLP002 | DLP Mask |
dlp.warn |
DLP003 | DLP Warning |
dlp.log |
DLP004 | DLP Log |
vault.access |
VLT001 | Vault Access |
vault.rotate |
VLT002 | Vault Rotate |
auth.success |
AUTH01 | Auth Success |
auth.failure |
AUTH02 | Auth Failure |
rate.limit |
RATE01 | Rate Limit |
security.alert |
SEC001 | Security Alert |
Event types not in the map (security.path_violation, security.idor_attempt, security.webhook.replay, security.mcp.auth_failure) use fallback ID GEN001.
CEF Example Message¶
CEF:0|CodeGraph|CodeAnalysis|1.0|DLP001|DLP Block|8|
src=10.0.0.50 suser=analyst@company.com externalId=req-12345
msg=AWS access key detected rt=2026-02-26T10:30:00.000Z
cs1=GigaChat cs1Label=LLM Provider
cs4=credentials cs4Label=DLP Category
cs5=aws_access_key cs5Label=DLP Pattern
cn2=125 cn2Label=Latency MS
Extension Fields¶
| Field | CEF Key | Description |
|---|---|---|
| IP Address | src |
Source IP |
| User ID | suser |
User identifier |
| Request ID | externalId |
Request ID |
| Message | msg |
Message text |
| Timestamp | rt |
Event time |
| LLM Provider | cs1 |
LLM provider |
| LLM Model | cs2 |
LLM model |
| Action | cs3 |
Action taken |
| DLP Category | cs4 |
DLP category |
| DLP Pattern | cs5 |
DLP pattern |
| Tokens Used | cn1 |
Tokens used |
| Latency MS | cn2 |
Latency in ms |
CEF Configuration¶
security:
siem:
cef:
enabled: true
host: "arcsight.company.com"
port: 514
protocol: tcp
device_vendor: "CodeGraph"
device_product: "CodeAnalysis"
device_version: "1.0"
ArcSight FlexConnector¶
<!-- connector.parser.xml -->
<parser>
<name>CodeGraph CEF Parser</name>
<pattern>CEF:0|CodeGraph|CodeAnalysis|.*</pattern>
</parser>
LEEF Format (QRadar)¶
LEEF Message Structure¶
LEEF:Version|Vendor|Product|Version|EventID|Extension
LEEF Example Message¶
LEEF:2.0|CodeGraph|CodeAnalysis|1.0|DLP001|
cat=DLP sev=8 src=10.0.0.50 usrName=analyst
msg=AWS access key detected devTime=2026-02-26T10:30:00.000Z
LEEF Configuration¶
security:
siem:
leef:
enabled: true
host: "qradar.company.com"
port: 514
protocol: udp
product_vendor: "CodeGraph"
product_name: "CodeAnalysis"
product_version: "1.0"
QRadar Log Source¶
- Admin → Log Sources → Add
- Vendor:
CodeGraph - Log Source Type:
Universal LEEF - Protocol: Syslog
- Port: 514
Buffering and Reliability¶
Buffer Configuration¶
security:
siem:
buffer:
max_size: 10000 # Maximum events in queue
flush_interval_seconds: 5 # Flush interval
retry_attempts: 3 # Retry count
retry_backoff_seconds: 2.0 # Delay between retries
Failure Behavior¶
- First attempt — immediate send via buffer flush
- Failure — event moved to retry buffer, re-tried on next flush cycle
- Repeated failures — re-queued up to
retry_attemptstimes - After max retries exhausted — event discarded, counted as
failed - Buffer overflow — oldest events auto-dropped (circular buffer), counted as
dropped
Buffer Statistics¶
dispatcher = get_siem_dispatcher()
stats = dispatcher.stats
print(f"Enqueued: {stats['enqueued']}")
print(f"Sent: {stats['sent']}")
print(f"Failed: {stats['failed']}")
print(f"Dropped: {stats['dropped']}")
print(f"Retried: {stats['retried']}")
API Reference¶
Creating Events¶
from src.security.siem.base_handler import SecurityEvent, SecurityEventType
from src.security.siem.dispatcher import dispatch_security_event
# Create event
event = SecurityEvent.create(
event_type=SecurityEventType.DLP_BLOCK,
message="AWS access key detected in user prompt",
request_id="req-12345",
severity=2, # CRITICAL
user_id="user_123",
ip_address="10.0.0.50",
dlp_category="credentials",
dlp_pattern="aws_access_key",
action="BLOCK"
)
# Send to SIEM
success = dispatch_security_event(event)
SIEMDispatcher API¶
from src.security.siem.dispatcher import SIEMDispatcher, init_siem_dispatcher
from src.security.config import get_security_config
# Initialize
config = get_security_config()
dispatcher = init_siem_dispatcher(config.siem)
# Async dispatch (via buffer)
dispatcher.dispatch(event)
# Sync dispatch (bypass buffer, for critical events)
dispatcher.dispatch_sync(event)
# Flush buffer
sent_count = dispatcher.flush()
# Statistics
print(dispatcher.stats)
print(f"Handlers: {dispatcher.handler_count}")
print(f"Enabled: {dispatcher.is_enabled}")
# Close
dispatcher.close()
Event Examples¶
DLP Block Event¶
{
"event_type": "dlp.block",
"timestamp": "2026-02-26T10:30:00.000Z",
"request_id": "req-12345",
"severity": 2,
"message": "DLP blocked request: AWS access key detected",
"user_id": "analyst@company.com",
"ip_address": "10.0.0.50",
"dlp_category": "credentials",
"dlp_pattern": "aws_access_key",
"action": "BLOCK"
}
LLM Request Event¶
{
"event_type": "llm.request",
"timestamp": "2026-02-26T10:30:00.000Z",
"request_id": "req-67890",
"severity": 6,
"message": "LLM request to GigaChat",
"user_id": "analyst@company.com",
"provider": "GigaChat",
"model": "GigaChat-2-Pro",
"tokens_used": 150,
"latency_ms": 1250.5
}
Auth Failure Event¶
{
"event_type": "auth.failure",
"timestamp": "2026-02-26T10:30:00.000Z",
"request_id": "req-11111",
"severity": 4,
"message": "Authentication failed: invalid credentials",
"user_id": "unknown",
"ip_address": "10.0.0.99",
"details": {
"reason": "invalid_password",
"attempts": 3
}
}
Security Alert Event¶
{
"event_type": "security.path_violation",
"timestamp": "2026-02-26T10:31:00.000Z",
"request_id": "req-44444",
"severity": 4,
"message": "Path traversal attempt: ../../etc/passwd",
"user_id": "unknown",
"ip_address": "10.0.0.90",
"details": {
"attempted_path": "../../etc/passwd",
"normalized_path": "/etc/passwd"
}
}
Monitoring¶
SIEM delivery is monitored through the dispatcher’s stats API. There are no dedicated SIEM Prometheus counters — use the buffer statistics and application logs.
To monitor SIEM activity:
- Buffer stats —
dispatcher.statsreturnsenqueued,sent,failed,dropped,retriedcounters - Handler status —
dispatcher.is_enabledanddispatcher.handler_count - Application logs — enable DEBUG logging for
src.security.siemnamespace - CLI —
python -m src.cli siem statusfor dispatcher status
Troubleshooting¶
Connection Testing¶
# Test UDP
echo "<134>1 test message" | nc -u siem.company.com 514
# Test TCP
echo "<134>1 test message" | nc siem.company.com 514
# Test TLS
openssl s_client -connect siem.company.com:6514
Diagnostics¶
# Check dispatcher status
from src.security.siem.dispatcher import get_siem_dispatcher
dispatcher = get_siem_dispatcher()
if dispatcher:
print(f"Enabled: {dispatcher.is_enabled}")
print(f"Handlers: {dispatcher.handler_count}")
print(f"Stats: {dispatcher.stats}")
else:
print("SIEM dispatcher not initialized")
Logging¶
# logging.yaml
loggers:
src.security.siem:
level: DEBUG
handlers: [console, file]
Related Documents¶
- Enterprise Security Brief — Security overview
- DLP Security — DLP integration
- LLM Security — LLM security
Version: 1.2 | March 2026