Table of Contents¶
- Overview
- Features
- 1. LLM Request/Response Logging
- 2. SIEM Integration
- 3. DLP (Data Loss Prevention)
- 4. HashiCorp Vault Integration
- Architecture
- Configuration
- SecurityConfig
- Environment Variables
- Core Components
- SecureLLMProvider
- ContentScanner (DLP)
- SIEMDispatcher
- VaultClient
- Advanced Security Features
- FileSecurityScanner
- TaintVerifiedScanner
- HardeningScanner (D3FEND)
- ReportGenerator
- AutofixEngine
- Security Hypothesis System
- DLP Patterns
- Built-in Pattern Categories
- Custom Patterns
- Database Tables
- llm_audit_log
- dlp_events
- security_events
- SIEM Event Formats
- SysLog (RFC 5424)
- SysLog JSON
- CEF (ArcSight)
- LEEF (QRadar)
- Webhook Integration
- DLPWebhookClient
- DLPAlert
- CLI Usage
- Security Audit CLI
- Audit CLI (with Autofix)
- MCP Tools
- codegraph_autofix
- codegraph_taint_analysis
- Security Module Structure
- Quick Start Guide
- 1. Enable the security module
- 2. Wrap your LLM provider
- 3. Run a file-based security scan
- 4. Run D3FEND hardening checks
- 5. Generate a consolidated report
- 6. Configure SIEM forwarding
- 7. Enable DLP with webhook alerts
- See Also
Overview¶
The CodeGraph Enterprise Security Module provides a comprehensive security layer for LLM-powered code analysis. It protects against data leakage, logs all LLM interactions for audit compliance, integrates with enterprise SIEM systems, and manages secrets through HashiCorp Vault.
The module operates as an interceptor layer around the LLM provider, scanning all prompts and responses for sensitive data before they leave or enter the system.
graph LR
A[User Query] --> B[SecureLLMProvider]
B --> C{DLP Pre-Request Scan}
C -->|BLOCK| D[DLPBlockedException]
C -->|MASK| E[Masked Prompt]
C -->|PASS| F[Original Prompt]
E --> G[LLM Provider]
F --> G
G --> H{DLP Post-Response Scan}
H -->|MASK| I[Masked Response]
H -->|PASS| J[Original Response]
I --> K[User]
J --> K
B --> L[LLMSecurityLogger]
L --> M[(llm_audit_log DB)]
L --> N[SIEMDispatcher]
N --> O[SysLog]
N --> P[CEF / ArcSight]
N --> Q[LEEF / QRadar]
C -->|BLOCK/WARN| R[DLPWebhookClient]
The module supports:
- Pre-request DLP scanning of prompts before they are sent to the LLM
- Post-response DLP scanning of LLM responses before they are returned
- Audit logging of every LLM interaction to a database and/or SIEM
- Secret management via HashiCorp Vault with automatic credential rotation
- D3FEND hardening checks for source code analysis
- Taint-verified vulnerability scanning to reduce false positives
- Multi-format report generation (JSON, Markdown, SARIF)
- Automated fix suggestions via template and LLM-based generation
Features¶
1. LLM Request/Response Logging¶
Every LLM interaction is logged with full metadata:
- System prompt hash and length (prompts can be redacted before logging)
- User prompt preview with configurable length limits
- Response preview and length
- Token usage (prompt, completion, total)
- Latency in milliseconds
- DLP action taken and match count
- User, session, and IP context
Logged to the llm_audit_log database table and optionally forwarded to SIEM.
2. SIEM Integration¶
Real-time security event dispatch to enterprise SIEM systems via three formats:
| Format | Target System | Standard |
|---|---|---|
| SysLog | Splunk, Graylog, rsyslog | RFC 5424 |
| CEF | ArcSight, Micro Focus | Common Event Format |
| LEEF | IBM QRadar | Log Event Extended Format 2.0 |
All formats support UDP, TCP, and TLS transport. Events are buffered with configurable retry and backoff.
3. DLP (Data Loss Prevention)¶
Content scanning with four action levels:
| Action | Behavior |
|---|---|
BLOCK |
Reject the request entirely, raise DLPBlockedException |
MASK |
Replace sensitive data with placeholders ([REDACTED], [EMAIL], etc.) |
WARN |
Allow but log a warning to SIEM |
LOG_ONLY |
Log the match silently |
Built-in pattern categories: credentials, PII, source code artifacts. Custom patterns are supported via regex and keyword lists.
4. HashiCorp Vault Integration¶
Secure secret management with:
- Token, AppRole, and Kubernetes authentication methods
- KV v2 secret engine support
- Automatic token renewal
- Configurable cache TTL (default 300s)
- Access audit logging to SIEM
- Graceful fallback to environment variables when Vault is unavailable
Architecture¶
graph TB
subgraph "Security Layer"
SLP[SecureLLMProvider]
CS[ContentScanner]
SIEM[SIEMDispatcher]
VC[VaultClient]
LSL[LLMSecurityLogger]
end
subgraph "SIEM Handlers"
SH[SysLogHandler]
SJH[SysLogJSONHandler]
CH[CEFHandler]
LH[LEEFHandler]
end
subgraph "Advanced Security"
FSS[FileSecurityScanner]
TVS[TaintVerifiedScanner]
HS[HardeningScanner]
RG[ReportGenerator]
AE[AutofixEngine]
end
subgraph "Storage"
DB[(llm_audit_log)]
DLPDB[(dlp_events)]
SEDB[(security_events)]
end
SLP --> CS
SLP --> LSL
LSL --> DB
LSL --> SIEM
CS --> SIEM
SIEM --> SH
SIEM --> SJH
SIEM --> CH
SIEM --> LH
SLP --> VC
FSS --> RG
TVS --> RG
HS --> RG
AE --> TVS
SIEM --> SEDB
CS --> DLPDB
The security module is configured through a single SecurityConfig Pydantic model, loaded from the security: section of config.yaml via get_security_config().
Configuration¶
SecurityConfig¶
Module: src/security/config.py
The root configuration model for the entire security module:
class SecurityConfig(BaseModel):
enabled: bool = False
llm_logging: LLMLoggingConfig
siem: SIEMConfig
dlp: DLPConfig
vault: VaultConfig
Access the singleton instance:
from src.security.config import get_security_config
config = get_security_config()
The configuration is loaded from config.yaml and supports environment variable interpolation using ${VAR_NAME:-default} syntax.
LLMLoggingConfig¶
class LLMLoggingConfig(BaseModel):
enabled: bool = True
log_prompts: bool = True
redact_prompts: bool = True
max_prompt_length: int = 2000
log_responses: bool = True
max_response_length: int = 5000
log_token_usage: bool = True
log_latency: bool = True
log_to_database: bool = True
log_to_siem: bool = True
SIEMConfig¶
class SIEMConfig(BaseModel):
enabled: bool = True
syslog: SysLogConfig # RFC 5424, UDP/TCP/TLS
cef: CEFConfig # ArcSight
leef: LEEFConfig # QRadar
buffer: SIEMBufferConfig # Retry and buffering
Sub-configurations:
class SysLogConfig(BaseModel):
enabled: bool = True
protocol: SIEMProtocol = SIEMProtocol.UDP # udp, tcp, tls
host: str = "localhost"
port: int = 514
facility: int = 16 # LOCAL0
app_name: str = "codegraph"
hostname: Optional[str] = None # Auto-detected
tls: Optional[TLSConfig] = None
class CEFConfig(BaseModel):
enabled: bool = False
host: str = ""
port: int = 514
protocol: SIEMProtocol = SIEMProtocol.UDP
device_vendor: str = "CodeGraph"
device_product: str = "CodeAnalysis"
device_version: str = "1.0"
class LEEFConfig(BaseModel):
enabled: bool = False
host: str = ""
port: int = 514
protocol: SIEMProtocol = SIEMProtocol.UDP
product_vendor: str = "CodeGraph"
product_name: str = "CodeAnalysis"
product_version: str = "1.0"
class SIEMBufferConfig(BaseModel):
max_size: int = 10000
flush_interval_seconds: int = 5
retry_attempts: int = 3
retry_backoff_seconds: float = 2.0
DLPConfig¶
class DLPConfig(BaseModel):
enabled: bool = True
pre_request: DLPPreRequestConfig # default_action = WARN
post_response: DLPPostResponseConfig # default_action = MASK
categories: Dict[str, DLPCategoryConfig]
keywords: Dict[str, DLPKeywordListConfig]
keywords_action: DLPAction = DLPAction.LOG_ONLY
webhook: DLPWebhookConfig
VaultConfig¶
class VaultConfig(BaseModel):
enabled: bool = False
url: str = "http://localhost:8200"
auth_method: str = "token" # token, approle, kubernetes
namespace: Optional[str] = None
token: VaultTokenAuthConfig
approle: VaultAppRoleAuthConfig
kubernetes: VaultKubernetesAuthConfig
secrets: Dict[str, VaultSecretPathConfig]
secrets_mount_point: str = "secret"
llm_secrets_path: str = "codegraph/llm"
cache_ttl_seconds: int = 300
timeout_seconds: int = 30
tls_verify: bool = True
rotation_enabled: bool = False
rotation_check_interval: int = 300
audit_access: bool = True
Full config.yaml example¶
security:
enabled: true
llm_logging:
enabled: true
log_prompts: true
redact_prompts: true
max_prompt_length: 2000
log_responses: true
max_response_length: 5000
log_token_usage: true
log_latency: true
log_to_database: true
log_to_siem: true
siem:
enabled: true
syslog:
enabled: true
protocol: udp
host: siem.company.local
port: 514
facility: 16
app_name: codegraph
cef:
enabled: false
host: arcsight.company.local
port: 514
leef:
enabled: false
host: qradar.company.local
port: 514
buffer:
max_size: 10000
flush_interval_seconds: 5
retry_attempts: 3
dlp:
enabled: true
pre_request:
enabled: true
default_action: WARN
post_response:
enabled: true
default_action: MASK
categories:
credentials:
enabled: true
action: BLOCK
patterns:
- name: api_key_generic
regex: '(?i)(api[_-]?key|apikey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?'
- name: aws_access_key
regex: 'AKIA[0-9A-Z]{16}'
- name: private_key
regex: '-----BEGIN (RSA |EC |OPENSSH |DSA )?PRIVATE KEY-----'
pii:
enabled: true
action: MASK
patterns:
- name: email
regex: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
mask_with: '[EMAIL]'
- name: credit_card
regex: '\b(?:\d{4}[\s-]?){3}\d{4}\b'
mask_with: '[CARD]'
keywords:
sensitive_terms:
words: ["classified", "top secret"]
case_sensitive: false
keywords_action: LOG_ONLY
webhook:
enabled: false
endpoint: https://dlp.company.local/alerts
auth_header: "Bearer ${DLP_WEBHOOK_TOKEN}"
timeout_seconds: 10
retry_attempts: 3
notify_on: [BLOCK, WARN]
vault:
enabled: false
url: https://vault.company.local:8200
auth_method: approle
namespace: codegraph
approle:
role_id: "${VAULT_ROLE_ID}"
secret_id: "${VAULT_SECRET_ID}"
secrets_mount_point: secret
llm_secrets_path: codegraph/llm
cache_ttl_seconds: 300
tls_verify: true
rotation_enabled: true
rotation_check_interval: 300
audit_access: true
Environment Variables¶
Only one dedicated environment variable controls the security module at runtime:
| Variable | Default | Description |
|---|---|---|
DLP_ENABLED |
not set | Set in src/api/main.py to enable/disable DLP at the API level |
All other security settings are configured through the security: section of config.yaml and loaded via get_security_config(). The configuration supports ${VAR_NAME:-default} syntax for referencing environment variables within YAML values (e.g., Vault credentials, webhook tokens).
Core Components¶
SecureLLMProvider¶
Module: src/security/llm/secure_provider.py
Wraps any LLM provider to add DLP scanning, audit logging, and SIEM dispatch.
class SecureLLMProvider:
def __init__(self, wrapped_provider, config: Optional[SecurityConfig] = None): ...
def generate(self, system_prompt, user_prompt, **kwargs): ...
def stream(self, system_prompt, user_prompt, **kwargs): ...
Usage:
from src.security.llm.secure_provider import SecureLLMProvider
from src.security.config import get_security_config
# Wrap an existing LLM provider
secure = SecureLLMProvider(base_provider, get_security_config())
# generate() performs: DLP pre-scan -> LLM call -> DLP post-scan -> log
response = secure.generate(
system_prompt="You are a code reviewer.",
user_prompt="Review this function..."
)
The provider intercepts both generate() and stream() calls. For each request:
- Scans the user prompt with
ContentScanner.scan_request() - If the action is
BLOCK, raisesDLPBlockedException - If the action is
MASK, sends the masked content to the LLM - Scans the LLM response with
ContentScanner.scan_response() - Logs the full interaction via
LLMSecurityLogger - Dispatches a
SecurityEventto SIEM
ContentScanner (DLP)¶
Module: src/security/dlp/scanner.py
Scans text content for sensitive data using regex patterns and keyword lists.
class ContentScanner:
def __init__(self, config: DLPConfig): ...
def scan(self, content: str) -> List[DLPMatch]: ...
def scan_request(self, content: str) -> ScanResult: ...
def scan_response(self, content: str) -> ScanResult: ...
def get_action(self, matches: List[DLPMatch]) -> DLPAction: ...
def mask(self, content: str, matches: List[DLPMatch]) -> str: ...
Methods:
| Method | Description |
|---|---|
scan(content) |
Returns raw list of DLPMatch objects |
scan_request(content) |
Full pre-request scan with action resolution |
scan_response(content) |
Full post-response scan with masking |
get_action(matches) |
Determines highest-priority action from matches |
mask(content, matches) |
Replaces matched text with mask placeholders |
Usage:
from src.security.dlp.scanner import ContentScanner
from src.security.config import get_security_config
scanner = ContentScanner(get_security_config().dlp)
# Pre-request scan
result = scanner.scan_request(user_prompt)
if result.blocked:
raise DLPBlockedException(result.matches)
elif result.action == DLPAction.MASK:
user_prompt = result.modified_content
# Post-response scan
result = scanner.scan_response(llm_response)
if result.has_matches:
llm_response = result.modified_content
Action priority (highest wins when multiple patterns match):
BLOCK (4) > MASK (3) > WARN (2) > LOG_ONLY (1)
ScanResult¶
Module: src/security/dlp/scanner.py
@dataclass
class ScanResult:
has_matches: bool
matches: List[DLPMatch]
action: DLPAction
modified_content: Optional[str]
blocked: bool
DLPMatch¶
Module: src/security/dlp/patterns.py
@dataclass
class DLPMatch:
category: str
pattern_name: str
match_type: MatchType # REGEX or KEYWORD
matched_text: str
start: int
end: int
action: DLPAction
mask_with: str = "[REDACTED]"
severity: str = "medium"
The MatchType enum has two values: REGEX and KEYWORD.
DLPBlockedException¶
Module: src/security/dlp/scanner.py (re-exported from src/security/dlp/__init__.py)
class DLPBlockedException(Exception):
def __init__(self, matches: List[DLPMatch], message: Optional[str] = None): ...
Raised when a DLP scan returns a BLOCK action. The matches attribute contains the list of patterns that triggered the block.
SIEMDispatcher¶
Module: src/security/siem/dispatcher.py
Routes security events to multiple SIEM handlers simultaneously. Thread-safe with buffered delivery.
class SIEMDispatcher:
def __init__(self, config: SIEMConfig): ...
def dispatch(self, event: SecurityEvent) -> bool: ...
def add_handler(self, handler: BaseSIEMHandler) -> None: ...
def close(self) -> None: ...
Factory functions:
def init_siem_dispatcher(config: SIEMConfig) -> SIEMDispatcher
def get_siem_dispatcher() -> SIEMDispatcher
Usage:
from src.security.siem.dispatcher import get_siem_dispatcher
from src.security.siem.base_handler import SecurityEvent, SecurityEventType
dispatcher = get_siem_dispatcher()
event = SecurityEvent.create(
event_type=SecurityEventType.DLP_BLOCK,
message="Credential detected in prompt",
request_id="req-abc-123",
severity=3,
user_id="user@company.com",
dlp_category="credentials",
dlp_pattern="aws_access_key",
)
dispatcher.dispatch(event)
The dispatcher automatically initializes handlers based on SIEMConfig:
- SysLogHandler if syslog.enabled and syslog.host are set
- CEFHandler if cef.enabled and cef.host are set
- LEEFHandler if leef.enabled and leef.host are set
Events are buffered via SIEMBuffer with configurable retry and exponential backoff.
SecurityEvent¶
Module: src/security/siem/base_handler.py
@dataclass
class SecurityEvent:
event_type: SecurityEventType
timestamp: str
request_id: str
message: str
severity: int = 6 # INFO (RFC 5424)
user_id: Optional[str] = None
session_id: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
provider: Optional[str] = None
model: Optional[str] = None
action: Optional[str] = None
dlp_category: Optional[str] = None
dlp_pattern: Optional[str] = None
tokens_used: Optional[int] = None
latency_ms: Optional[float] = None
project_id: Optional[str] = None
group_id: Optional[str] = None
details: Dict[str, Any] = field(default_factory=dict)
Class methods:
| Method | Description |
|---|---|
SecurityEvent.create(event_type, message, request_id, severity=6, **kwargs) |
Creates event with auto-generated UTC timestamp |
to_dict() |
Converts event to dictionary for serialization |
Severity levels follow RFC 5424 (0 = Emergency, 7 = Debug).
SecurityEventType¶
Module: src/security/siem/base_handler.py
class SecurityEventType(str, Enum):
LLM_REQUEST = "llm.request"
LLM_RESPONSE = "llm.response"
LLM_ERROR = "llm.error"
DLP_BLOCK = "dlp.block"
DLP_MASK = "dlp.mask"
DLP_WARN = "dlp.warn"
DLP_LOG = "dlp.log"
VAULT_ACCESS = "vault.access"
VAULT_ROTATE = "vault.rotate"
AUTH_SUCCESS = "auth.success"
AUTH_FAILURE = "auth.failure"
RATE_LIMIT = "rate.limit"
SECURITY_ALERT = "security.alert"
PATH_VIOLATION = "security.path_violation"
IDOR_ATTEMPT = "security.idor_attempt"
WEBHOOK_REPLAY = "security.webhook.replay"
MCP_AUTH_FAILURE = "security.mcp.auth_failure"
VaultClient¶
Module: src/security/vault/client.py
Provides access to secrets stored in HashiCorp Vault. Falls back to environment variables when Vault is unavailable.
class VaultClient:
def __init__(self, config: VaultConfig): ...
def read_secret(self, path, version=None, mount_point=None) -> Dict: ...
def write_secret(self, path, data, mount_point=None) -> Dict: ...
def delete_secret(self, path, versions=None, mount_point=None): ...
def get_llm_credentials(self) -> Dict[str, str]: ...
def renew_token(self) -> bool: ...
def is_authenticated(self) -> bool: ...
def close(self): ...
Usage:
from src.security.vault.client import VaultClient
from src.security.config import get_security_config
vault = VaultClient(get_security_config().vault)
# Read LLM provider credentials
creds = vault.get_llm_credentials()
api_key = creds.get("api_key")
# Read arbitrary secret
db_config = vault.read_secret("codegraph/database")
# Token management
if vault.is_authenticated():
vault.renew_token()
vault.close()
Requires the hvac library (pip install hvac). Raises VaultError on failures.
Authentication methods:
| Method | Config Key | Description |
|---|---|---|
| Token | vault.token.value |
Direct token authentication |
| AppRole | vault.approle.role_id / vault.approle.secret_id |
Machine-to-machine auth |
| Kubernetes | vault.kubernetes.role / vault.kubernetes.jwt_path |
In-cluster auth via service account |
Advanced Security Features¶
FileSecurityScanner¶
Module: src/security/file_scanner.py
Direct file-based scanning for security vulnerabilities without requiring CPG generation. Uses regex patterns to detect issues in source files.
class FileSecurityScanner:
def __init__(self, patterns=None, exclude_dirs=None, exclude_files=None): ...
def scan_project(self, project_path: str) -> ScanResult: ...
def scan_django_settings(self, settings_path: str) -> List[FileFinding]: ...
def scan_for_secrets(self, project_path: str) -> List[FileFinding]: ...
Note: The main scan method is scan_project(), not scan().
Usage:
from src.security.file_scanner import FileSecurityScanner
scanner = FileSecurityScanner(
exclude_dirs=["node_modules", "venv", ".git"]
)
result = scanner.scan_project("/path/to/project")
print(f"Files scanned: {result.files_scanned}")
print(f"Critical issues: {result.critical_count}")
print(f"High issues: {result.high_count}")
for finding in result.findings:
print(f" {finding.severity.value}: {finding.pattern_name} in {finding.file_path}:{finding.line_number}")
The ScanResult returned by scan_project() is defined in src/security/file_scanner.py (different from the DLP ScanResult):
@dataclass
class ScanResult:
project_path: str
scan_time: datetime
duration_seconds: float
files_scanned: int
findings: List[FileFinding]
errors: List[str]
@property
def critical_count(self) -> int: ...
@property
def high_count(self) -> int: ...
The FileFinding dataclass:
@dataclass
class FileFinding:
pattern_id: str
pattern_name: str
severity: VulnerabilitySeverity
category: VulnerabilityCategory
file_path: str
line_number: int
line_content: str
match_text: str
description: str
cwe_ids: List[str]
remediation: str
confidence: float = 0.8
TaintVerifiedScanner¶
Module: src/security/taint_verified_scanner.py
Verifies potential vulnerabilities through data-flow analysis. Uses DataFlowTracer to confirm whether user input actually reaches dangerous sinks, reducing false positives.
class TaintVerifiedScanner:
def __init__(self, cpg_service): ...
def verify_sql_injection(
self,
findings,
source_functions=None,
sink_functions=None,
max_depth=None,
) -> List[VerifiedFinding]: ...
def scan_sql_injection_verified(self, limit=50) -> List[VerifiedFinding]: ...
Note: The constructor accepts cpg_service (a CPG query service instance), not a raw database path.
Usage:
from src.security.taint_verified_scanner import TaintVerifiedScanner
scanner = TaintVerifiedScanner(cpg_service)
# Scan with taint verification
verified = scanner.scan_sql_injection_verified(limit=50)
for finding in verified:
if finding.is_verified:
print(f"CONFIRMED: {finding.original_finding}")
print(f" Taint path: {finding.taint_path}")
else:
print(f" Sanitized (confidence: {finding.sanitization_confidence})")
The VerifiedFinding dataclass:
@dataclass
class VerifiedFinding:
original_finding: Dict[str, Any]
is_verified: bool
taint_path: Optional[DataFlowPath] = None
sanitization_confidence: float = 0.0
verification_notes: List[str]
HardeningScanner (D3FEND)¶
Module: src/security/hardening/hardening_scanner.py
Implements MITRE D3FEND Source Code Hardening checks. Verifies that defensive coding practices are followed, unlike vulnerability scanners that look for exploitable flaws.
class HardeningScanner:
def __init__(self, cpg_service: Any, language: str = "c"): ...
def scan_all(self, limit_per_check: int = 50) -> List[HardeningFinding]: ...
def scan_by_d3fend_id(self, d3fend_ids: List[str], limit: int = 50) -> List[HardeningFinding]: ...
def scan_by_category(self, category: HardeningCategory, limit: int = 50) -> List[HardeningFinding]: ...
def scan_by_severity(self, min_severity: HardeningSeverity, limit: int = 50) -> List[HardeningFinding]: ...
def get_compliance_score(self, findings: List[HardeningFinding]) -> Dict[str, Any]: ...
def get_checks_summary(self) -> Dict[str, Any]: ...
def get_remediation_report(self, findings: List[HardeningFinding]) -> str: ...
Usage:
from src.security.hardening.hardening_scanner import HardeningScanner
from src.security.hardening.base import HardeningCategory, HardeningSeverity
scanner = HardeningScanner(cpg_service, language="c")
# Run all checks
findings = scanner.scan_all(limit_per_check=50)
# Filter by D3FEND technique
init_findings = scanner.scan_by_d3fend_id(["D3-VI", "D3-RN"])
# Filter by category
pointer_findings = scanner.scan_by_category(HardeningCategory.POINTER_SAFETY)
# Filter by minimum severity
critical_findings = scanner.scan_by_severity(HardeningSeverity.HIGH)
# Get compliance score
score = scanner.get_compliance_score(findings)
print(f"Compliance: {score}")
# Get remediation report (Markdown)
report = scanner.get_remediation_report(findings)
Supported D3FEND techniques:
| ID | Technique | Category |
|---|---|---|
| D3-VI | Variable Initialization | INITIALIZATION |
| D3-CS | Credential Scrubbing | CREDENTIAL_MANAGEMENT |
| D3-IRV | Integer Range Validation | INTEGER_SAFETY |
| D3-PV | Pointer Validation | POINTER_SAFETY |
| D3-RN | Reference Nullification | MEMORY_SAFETY |
| D3-TL | Trusted Library | LIBRARY_SAFETY |
| D3-VTV | Variable Type Validation | TYPE_SAFETY |
| D3-MBSV | Memory Block Start Validation | POINTER_SAFETY |
| D3-NPC | Null Pointer Checking | POINTER_SAFETY |
| D3-DLV | Domain Logic Validation | DOMAIN_VALIDATION |
| D3-OLV | Operational Logic Validation | OPERATIONAL_VALIDATION |
HardeningCheck¶
Module: src/security/hardening/base.py
Definition of a D3FEND hardening check:
@dataclass
class HardeningCheck:
id: str
d3fend_id: str
d3fend_name: str
category: HardeningCategory
severity: HardeningSeverity
description: str
cpgql_query: str
cwe_ids: List[str]
language_scope: List[str]
indicators: List[str]
good_patterns: List[str]
remediation: str
example_code: str
confidence_weight: float
Note: The SQL query field is named cpgql_query, not sql_query.
HardeningFinding¶
Module: src/security/hardening/base.py
Result from running a hardening check:
@dataclass
class HardeningFinding:
finding_id: str
check_id: str
d3fend_id: str
category: str
severity: str
method_name: str
filename: str
line_number: int
code_snippet: str
description: str
cwe_ids: List[str] = field(default_factory=list)
remediation: str = ""
confidence: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
HardeningCategory¶
Module: src/security/hardening/base.py
class HardeningCategory(Enum):
INITIALIZATION = "initialization"
CREDENTIAL_MANAGEMENT = "credential_mgmt"
INTEGER_SAFETY = "integer_safety"
POINTER_SAFETY = "pointer_safety"
MEMORY_SAFETY = "memory_safety"
LIBRARY_SAFETY = "library_safety"
TYPE_SAFETY = "type_safety"
DOMAIN_VALIDATION = "domain_validation"
OPERATIONAL_VALIDATION = "operational"
HardeningSeverity¶
Module: src/security/hardening/base.py
class HardeningSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
ReportGenerator¶
Module: src/security/report_generator.py
Generates consolidated security audit reports in multiple formats. The class name is ReportGenerator (not SecurityReportGenerator).
class ReportGenerator:
def __init__(self): ...
def create_report(self, project_name, project_path, scan_result=None) -> SecurityAuditReport: ...
def add_cpg_findings(self, findings): ...
def add_dlp_findings(self, findings): ...
def add_hardening_findings(self, findings): ...
def add_taint_paths(self, taint_paths): ...
def save_report(
self,
output_dir,
formats=None,
base_filename="security_audit",
language="en",
) -> Dict[str, str]: ...
Usage:
from src.security.report_generator import ReportGenerator
generator = ReportGenerator()
# Create base report from file scan
report = generator.create_report(
project_name="my-app",
project_path="/path/to/project",
scan_result=file_scan_result,
)
# Add findings from other sources
generator.add_cpg_findings(cpg_findings)
generator.add_dlp_findings(dlp_findings)
generator.add_hardening_findings(hardening_findings)
generator.add_taint_paths(taint_paths)
# Save in multiple formats
files = generator.save_report(
output_dir="./reports",
formats=["json", "markdown", "sarif"],
language="en",
)
# Returns: {"json": "/path/to/report.json", "markdown": "/path/to/report.md", ...}
SecurityAuditReport¶
Module: src/security/report_generator.py
The consolidated report dataclass:
@dataclass
class SecurityAuditReport:
project_name: str
project_path: str
audit_time: datetime
duration_seconds: float
file_findings: List[FileFinding]
cpg_findings: List[Dict[str, Any]]
dlp_findings: List[Dict[str, Any]]
hardening_findings: List[Dict[str, Any]]
taint_paths: List[Any]
files_scanned: int = 0
patterns_checked: int = 0
errors: List[str]
Output methods:
| Method | Description |
|---|---|
to_json() |
JSON report with metadata, summary, and all findings |
to_markdown(language="en") |
Localized Markdown report with severity tables and D3FEND section |
to_sarif() |
SARIF 2.1.0 format for GitHub Security Alerts |
Properties:
| Property | Description |
|---|---|
all_findings |
All findings except hardening, enriched with OWASP categories |
all_findings_including_hardening |
All findings including D3FEND hardening |
severity_counts |
Dict of severity to count |
critical_count |
Number of critical findings |
high_count |
Number of high findings |
total_findings |
Total count (excluding hardening) |
AutofixEngine¶
Module: src/analysis/autofix/engine.py
Generates automated fix suggestions for security vulnerabilities found through taint analysis. Tries template-based fixes first, falls back to LLM generation.
class AutofixEngine:
def __init__(self, source_root: str = "", dry_run: bool = True): ...
def generate_fixes(self, taint_paths) -> List[AutofixResult]: ...
Usage:
from src.analysis.autofix.engine import AutofixEngine
engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths)
for result in results:
print(f"Strategy: {result.strategy}") # "template" or "llm"
print(f"CWE: {result.cwe_id}")
print(f"Validated: {result.validated}")
print(f"Diff:\n{result.fix.diff}")
The AutofixResult dataclass:
@dataclass
class AutofixResult:
fix: FixSuggestion
strategy: str # "template" or "llm"
validated: bool
validation: Optional[ValidationResult] = None
taint_path: Optional[TaintPath] = None
cwe_id: str = ""
The engine is always read-only by default (dry_run=True). It generates diffs but never applies them automatically.
Security Hypothesis System¶
Module: src/security/hypothesis/
The Security Hypothesis System is an advanced subsystem for proactive vulnerability detection. It generates security hypotheses based on CWE/CAPEC knowledge bases and validates them against the CPG.
Key features: - CWE database (58 entries) and CAPEC database (27 entries) - 13 SQL templates and 12 taint templates (inter-procedural) - 6 framework-specific providers: PostgreSQL, Django, Spring, Express, Gin, Next.js - Multi-criteria scoring with configurable presets (embedded, web, enterprise) - Chain detection (14 escalation patterns) - Feedback store and trend tracking - Incremental analysis via git diff
For full documentation, see Hypothesis System Reference.
CLI entry points:
python -m src.cli hypothesis run --language C [--max 50] [--min-priority 0.3] [--format json]
python -m src.cli hypothesis list-cwes [--category buffer_overflow]
python -m src.cli hypothesis providers
DLP Patterns¶
Built-in Pattern Categories¶
The module ships with three default categories, defined in src/security/config.py via get_default_dlp_categories():
credentials (action: BLOCK):
| Pattern | Description | Example Match |
|---|---|---|
api_key_generic |
Generic API key | api_key="sk_live_abc..." |
aws_access_key |
AWS Access Key ID | AKIAIOSFODNN7EXAMPLE |
aws_secret_key |
AWS Secret Access Key | aws_secret_access_key="..." |
private_key |
Private key header | -----BEGIN RSA PRIVATE KEY----- |
password_pattern |
Password in config/code | password="hunter2" |
bearer_token |
JWT Bearer token | Bearer eyJhbG... |
github_token |
GitHub PAT | ghp_xxxxxxxxxxxx... |
pii (action: MASK):
| Pattern | Mask | Description |
|---|---|---|
email |
[EMAIL] |
Email addresses |
phone_ru |
[PHONE] |
Russian phone numbers |
phone_us |
[PHONE] |
US phone numbers |
ssn |
[SSN] |
US Social Security Numbers |
credit_card |
[CARD] |
Credit card numbers |
ip_address |
[IP] |
IPv4 addresses |
passport_ru |
[PASSPORT] |
Russian passport numbers |
source_code (action: WARN):
| Pattern | Mask | Description |
|---|---|---|
connection_string |
[CONN_STRING] |
Database connection strings (JDBC, MySQL, PostgreSQL, etc.) |
internal_path_unix |
[PATH] |
Unix internal paths (/home/, /var/, /etc/) |
internal_path_windows |
[PATH] |
Windows internal paths (C:\Users\, etc.) |
Custom Patterns¶
Add custom patterns via config.yaml:
security:
dlp:
categories:
internal_projects:
enabled: true
action: WARN
patterns:
- name: project_codename
regex: '(?i)(project[\s_-]?(phoenix|atlas|nova))'
mask_with: '[PROJECT]'
description: "Internal project codenames"
keywords:
restricted_terms:
words: ["confidential", "internal only", "do not distribute"]
case_sensitive: false
keywords_action: WARN
Each DLPCategoryConfig supports:
class DLPCategoryConfig(BaseModel):
enabled: bool = True
action: DLPAction = DLPAction.WARN
patterns: List[DLPPatternConfig]
class DLPPatternConfig(BaseModel):
name: str
regex: str
mask_with: str = "[REDACTED]"
description: Optional[str] = None
Database Tables¶
Migration: 002_llm_audit_log.py
llm_audit_log¶
Stores every LLM interaction with full metadata.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK | Auto-increment primary key |
request_id |
VARCHAR | Unique request identifier |
user_id |
VARCHAR | User who made the request |
session_id |
VARCHAR | Session identifier |
ip_address |
VARCHAR | Client IP address |
provider |
VARCHAR | LLM provider name |
model |
VARCHAR | Model identifier |
system_prompt_hash |
VARCHAR | SHA-256 hash of system prompt |
system_prompt_length |
INTEGER | Length of system prompt |
user_prompt_preview |
TEXT | Truncated user prompt |
user_prompt_length |
INTEGER | Full length of user prompt |
response_preview |
TEXT | Truncated LLM response |
response_length |
INTEGER | Full length of response |
status |
VARCHAR | Request status (success/error) |
prompt_tokens |
INTEGER | Tokens in prompt |
completion_tokens |
INTEGER | Tokens in completion |
total_tokens |
INTEGER | Total tokens used |
latency_ms |
FLOAT | Request latency in milliseconds |
dlp_action |
VARCHAR | DLP action taken (BLOCK/MASK/WARN/LOG_ONLY) |
dlp_match_count |
INTEGER | Number of DLP matches |
dlp_categories |
VARCHAR | Comma-separated DLP categories |
error_type |
VARCHAR | Error type if failed |
error_message |
TEXT | Error message if failed |
timestamp |
TIMESTAMP | Event timestamp |
metadata |
JSON | Additional metadata |
dlp_events¶
Stores individual DLP match events, linked to llm_audit_log.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK | Auto-increment primary key |
audit_log_id |
INTEGER FK | Reference to llm_audit_log.id |
request_id |
VARCHAR | Request identifier |
event_type |
VARCHAR | DLP event type (pre_request/post_response) |
action |
VARCHAR | DLP action taken |
category |
VARCHAR | DLP category (credentials, pii, etc.) |
pattern_name |
VARCHAR | Pattern that matched |
severity |
VARCHAR | Match severity |
match_preview |
VARCHAR | Truncated matched text |
position |
VARCHAR | Match position (start-end) |
user_id |
VARCHAR | User identifier |
ip_address |
VARCHAR | Client IP |
timestamp |
TIMESTAMP | Event timestamp |
security_events¶
Stores SIEM events dispatched by the security module.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK | Auto-increment primary key |
event_id |
VARCHAR | Unique event identifier |
event_type |
VARCHAR | Event type from SecurityEventType |
severity |
INTEGER | RFC 5424 severity (0-7) |
request_id |
VARCHAR | Associated request identifier |
user_id |
VARCHAR | User identifier |
session_id |
VARCHAR | Session identifier |
ip_address |
VARCHAR | Client IP |
message |
TEXT | Human-readable message |
details |
JSON | Additional event details |
dispatched |
BOOLEAN | Whether event was sent to SIEM |
dispatch_error |
TEXT | Error message if dispatch failed |
timestamp |
TIMESTAMP | Event timestamp |
SIEM Event Formats¶
All handlers extend BaseSIEMHandler and implement format_event() and send().
SysLog (RFC 5424)¶
Module: src/security/siem/syslog_handler.py – SysLogHandler
Message format:
<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [SD-ID SD-PARAMS] MSG
Example:
<134>1 2024-12-09T10:30:00.000000Z server01 codegraph 1234 LLM001 [meta@47450 request_id="abc123" event_type="llm.request" user_id="user@corp.com" provider="gigachat"] LLM request logged
PRI is calculated as facility * 8 + severity. Structured data includes request_id, event_type, and all non-null optional fields from SecurityEvent.
Supports UDP, TCP, and TLS transport protocols.
SysLog JSON¶
Module: src/security/siem/syslog_handler.py – SysLogJSONHandler
Extends SysLogHandler with JSON-formatted message bodies. Useful for SIEM systems that parse JSON payloads from syslog messages.
CEF (ArcSight)¶
Module: src/security/siem/cef_handler.py – CEFHandler
Common Event Format message:
CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
Example:
CEF:0|CodeGraph|CodeAnalysis|1.0|DLP001|DLP Block|7|src=192.168.1.1 suser=user123 msg=Credential detected
Severity mapping (RFC 5424 to CEF 0-10 scale):
| RFC 5424 | CEF | Level |
|---|---|---|
| 0 (Emergency) | 10 | Highest |
| 1 (Alert) | 9 | |
| 2 (Critical) | 8 | |
| 3 (Error) | 7 | |
| 4 (Warning) | 6 | |
| 5 (Notice) | 5 | |
| 6 (Info) | 3 | |
| 7 (Debug) | 1 | Lowest |
LEEF (QRadar)¶
Module: src/security/siem/leef_handler.py – LEEFHandler
Log Event Extended Format 2.0 message:
LEEF:2.0|Vendor|Product|Version|EventID|key1=value1\tkey2=value2
Example:
LEEF:2.0|CodeGraph|CodeAnalysis|1.0|DLP001|src=192.168.1.1 usrName=user123 msg=Credential detected
LEEF 2.0 uses tab characters as field delimiters in the extension block.
Event ID mapping (shared across CEF and LEEF):
| Event Type | ID | Description |
|---|---|---|
llm.request |
LLM001 | LLM request logged |
llm.response |
LLM002 | LLM response logged |
llm.error |
LLM003 | LLM error occurred |
dlp.block |
DLP001 | DLP blocked content |
dlp.mask |
DLP002 | DLP masked content |
dlp.warn |
DLP003 | DLP warning issued |
dlp.log |
DLP004 | DLP logged match |
vault.access |
VLT001 | Vault secret accessed |
vault.rotate |
VLT002 | Vault secret rotated |
auth.success |
AUTH01 | Authentication successful |
auth.failure |
AUTH02 | Authentication failed |
rate.limit |
RATE01 | Rate limit exceeded |
security.alert |
SEC001 | General security alert |
Webhook Integration¶
DLPWebhookClient¶
Module: src/security/dlp/webhook.py
Sends DLP alerts to external systems via HTTP webhooks with async delivery, retry, and exponential backoff.
class DLPWebhookClient:
def __init__(self, config: DLPWebhookConfig): ...
def send_alert(self, matches, action, request_id=None, user_id=None, ip_address=None) -> bool: ...
def send_alert_sync(self, matches, action, **kwargs) -> bool: ...
def stop(self) -> None: ...
@property
def is_enabled(self) -> bool: ...
@property
def queue_size(self) -> int: ...
Configuration (DLPWebhookConfig):
class DLPWebhookConfig(BaseModel):
enabled: bool = False
endpoint: Optional[str] = None
auth_header: Optional[str] = None
timeout_seconds: int = 10
retry_attempts: int = 3
notify_on: List[DLPAction] = [DLPAction.BLOCK, DLPAction.WARN]
Usage:
from src.security.dlp.webhook import DLPWebhookClient, create_webhook_alert_callback
from src.security.config import get_security_config
# Direct usage
client = DLPWebhookClient(get_security_config().dlp.webhook)
client.send_alert(
matches=dlp_matches,
action=DLPAction.BLOCK,
request_id="req-123",
user_id="user@corp.com",
)
client.stop()
# Or use the convenience callback factory
callback, client = create_webhook_alert_callback(config.dlp.webhook)
# callback(matches, action) can be passed to DLP action handlers
The client supports context manager usage:
with DLPWebhookClient(config) as client:
client.send_alert(matches, action)
DLPAlert¶
Module: src/security/dlp/webhook.py
Alert payload sent to external DLP systems:
@dataclass
class DLPAlert:
alert_id: str
timestamp: str
action: str
match_count: int
categories: List[str]
patterns: List[str]
request_id: Optional[str] = None
user_id: Optional[str] = None
ip_address: Optional[str] = None
severity: str = "medium"
context: Dict[str, Any] = None
Alerts are created from DLP matches via DLPAlert.from_matches(). The severity is determined by the highest severity among the matched patterns.
JSON payload example:
{
"alert_id": "a1b2c3d4e5f6",
"timestamp": "2024-12-09T10:30:00.000000Z",
"action": "BLOCK",
"match_count": 2,
"categories": ["credentials"],
"patterns": ["aws_access_key", "private_key"],
"request_id": "req-abc-123",
"user_id": "user@company.com",
"ip_address": "192.168.1.100",
"severity": "critical",
"context": {}
}
CLI Usage¶
Security Audit CLI¶
Module: src/cli/security_audit.py
Four commands: full, quick, settings, secrets.
# Full security audit with all report formats
python -m src.cli.security_audit full --path /path/to/project
# With options
python -m src.cli.security_audit full \
--path /path/to/project \
--output ./reports \
--format json markdown sarif \
--exclude-dirs vendor build \
--no-cpg \
--language python \
--verbose
# Quick file-based scan only
python -m src.cli.security_audit quick --path /path/to/project
# Scan Django settings
python -m src.cli.security_audit settings --path /path/to/settings.py
# Scan for hardcoded secrets
python -m src.cli.security_audit secrets --path /path/to/project
Flags for full command:
| Flag | Description |
|---|---|
--path, -p |
Path to project (required) |
--output, -o |
Output directory for reports (default: ./security_reports) |
--format, -f |
Output format(s): json, markdown/md, sarif, all |
--exclude-dirs |
Additional directories to exclude |
--no-cpg |
Skip CPG-based analysis (faster, file-based only) |
--language, -l |
Target language: auto, c, cpp, python, javascript, typescript, go, csharp, kotlin, java, php |
--verbose, -v |
Verbose output |
Note: The --autofix flag is NOT available on the security audit CLI.
Audit CLI (with Autofix)¶
The --autofix flag is available on the main audit command:
python -m src.cli audit --db /path/to/cpg.duckdb --autofix
This runs the audit composite scenario with automated fix generation enabled.
MCP Tools¶
Module: src/mcp/tools/security.py
Two security-related MCP tools are registered:
codegraph_autofix¶
Generates automated fix suggestions for security vulnerabilities in a method.
codegraph_autofix(method_name: str, cwe: str = "")
| Parameter | Type | Required | Description |
|---|---|---|---|
method_name |
str |
Yes | Method to analyze and generate fixes for |
cwe |
str |
No | CWE filter (e.g., "CWE-89" for SQL injection only) |
The tool runs a security scan on the specified method, builds taint paths from findings, and generates template-based or LLM-powered fix patches. Returns diffs only (read-only, never applies).
Note: There is no vulnerability_type parameter.
codegraph_taint_analysis¶
Runs taint analysis on a specified method.
Security Module Structure¶
src/security/
__init__.py
_base.py # VulnerabilitySeverity, VulnerabilityCategory
config.py # SecurityConfig, DLPConfig, SIEMConfig, VaultConfig
file_scanner.py # FileSecurityScanner, FileFinding, ScanResult
taint_verified_scanner.py # TaintVerifiedScanner, VerifiedFinding
report_generator.py # ReportGenerator, SecurityAuditReport
report_localizer.py # ReportLocalizer (EN/RU)
sarif_exporter.py # SARIF 2.1.0 export
owasp_mapping.py # OWASP Top 10 enrichment
taint_visualizer.py # Taint path visualization
security_patterns.py # SecurityPattern definitions
security_agents.py # SecurityScanner (CPG-based)
dlp/
__init__.py
patterns.py # DLPMatch, MatchType, PatternRegistry
scanner.py # ContentScanner, ScanResult, DLPBlockedException
actions.py # DLP action handlers
webhook.py # DLPWebhookClient, DLPAlert
siem/
__init__.py
base_handler.py # BaseSIEMHandler, SecurityEvent, SecurityEventType
dispatcher.py # SIEMDispatcher, init_siem_dispatcher
syslog_handler.py # SysLogHandler, SysLogJSONHandler
cef_handler.py # CEFHandler
leef_handler.py # LEEFHandler
buffer.py # SIEMBuffer (retry + backoff)
vault/
__init__.py
client.py # VaultClient, VaultError
secret_manager.py # Higher-level secret management
llm/
__init__.py
secure_provider.py # SecureLLMProvider
request_logger.py # LLMSecurityLogger
hardening/
__init__.py
base.py # HardeningCheck, HardeningFinding, enums
d3fend_checks.py # D3FEND check definitions
hardening_scanner.py # HardeningScanner
hypothesis/ # Security Hypothesis System (21+ files)
__init__.py
hypothesis_generator.py
knowledge_base.py
query_synthesizer.py
query_templates.py
models.py
executor.py
validator.py
chain_detector.py
multi_criteria_scorer.py
feedback.py
trend_store.py
incremental.py
providers/
__init__.py
registry.py
yaml_provider.py
postgresql/
django/
spring/
express/
gin/
nextjs/
patterns/ # Language-specific security patterns
__init__.py
python_django.py
injection.py
auth.py
crypto.py
memory.py
concurrency.py
input_validation.py
java.py
javascript.py
go.py
csharp.py
kotlin.py
php.py
Quick Start Guide¶
1. Enable the security module¶
# config.yaml
security:
enabled: true
2. Wrap your LLM provider¶
from src.security.llm.secure_provider import SecureLLMProvider
from src.security.config import get_security_config
secure_provider = SecureLLMProvider(your_llm_provider, get_security_config())
response = secure_provider.generate(system_prompt, user_prompt)
3. Run a file-based security scan¶
python -m src.cli.security_audit full --path /path/to/project --output ./reports
4. Run D3FEND hardening checks¶
from src.security.hardening.hardening_scanner import HardeningScanner
scanner = HardeningScanner(cpg_service, language="c")
findings = scanner.scan_all()
score = scanner.get_compliance_score(findings)
5. Generate a consolidated report¶
from src.security.report_generator import ReportGenerator
gen = ReportGenerator()
report = gen.create_report("my-app", "/path/to/project", scan_result)
gen.add_hardening_findings(hardening_results)
gen.save_report("./reports", formats=["json", "markdown", "sarif"])
6. Configure SIEM forwarding¶
# config.yaml
security:
siem:
enabled: true
syslog:
enabled: true
host: siem.company.local
port: 514
protocol: tls
tls:
ca_cert: /etc/ssl/certs/siem-ca.pem
verify: true
7. Enable DLP with webhook alerts¶
# config.yaml
security:
dlp:
enabled: true
webhook:
enabled: true
endpoint: https://dlp-alerts.company.local/api/v1/alerts
auth_header: "Bearer ${DLP_WEBHOOK_TOKEN}"
notify_on: [BLOCK, WARN]
See Also¶
- REST API Reference – security-related API endpoints
- MCP Tools Reference –
codegraph_autofixandcodegraph_taint_analysis - GRPC API Reference – gRPC security context
- Hypothesis System Reference – hypothesis system data model and APIs
- Analysis Modules Reference –
AutofixEngine, taint analysis, data flow - Agents Reference –
SecurityScanner,DataFlowAnalyzer,VulnerabilityReporter