Enterprise Security Module

Table of Contents


Overview

The CodeGraph Enterprise Security Module provides a comprehensive security layer for LLM-powered code analysis. It protects against data leakage, logs all LLM interactions for audit compliance, integrates with enterprise SIEM systems, and manages secrets through HashiCorp Vault.

The module operates as an interceptor layer around the LLM provider, scanning all prompts and responses for sensitive data before they leave or enter the system.

graph LR
    A[User Query] --> B[SecureLLMProvider]
    B --> C{DLP Pre-Request Scan}
    C -->|BLOCK| D[DLPBlockedException]
    C -->|MASK| E[Masked Prompt]
    C -->|PASS| F[Original Prompt]
    E --> G[LLM Provider]
    F --> G
    G --> H{DLP Post-Response Scan}
    H -->|MASK| I[Masked Response]
    H -->|PASS| J[Original Response]
    I --> K[User]
    J --> K
    B --> L[LLMSecurityLogger]
    L --> M[(llm_audit_log DB)]
    L --> N[SIEMDispatcher]
    N --> O[SysLog]
    N --> P[CEF / ArcSight]
    N --> Q[LEEF / QRadar]
    C -->|BLOCK/WARN| R[DLPWebhookClient]

The module supports:

  • Pre-request DLP scanning of prompts before they are sent to the LLM
  • Post-response DLP scanning of LLM responses before they are returned
  • Audit logging of every LLM interaction to a database and/or SIEM
  • Secret management via HashiCorp Vault with automatic credential rotation
  • D3FEND hardening checks for source code analysis
  • Taint-verified vulnerability scanning to reduce false positives
  • Multi-format report generation (JSON, Markdown, SARIF)
  • Automated fix suggestions via template and LLM-based generation

Features

1. LLM Request/Response Logging

Every LLM interaction is logged with full metadata:

  • System prompt hash and length (prompts can be redacted before logging)
  • User prompt preview with configurable length limits
  • Response preview and length
  • Token usage (prompt, completion, total)
  • Latency in milliseconds
  • DLP action taken and match count
  • User, session, and IP context

Logged to the llm_audit_log database table and optionally forwarded to SIEM.

2. SIEM Integration

Real-time security event dispatch to enterprise SIEM systems via three formats:

Format Target System Standard
SysLog Splunk, Graylog, rsyslog RFC 5424
CEF ArcSight, Micro Focus Common Event Format
LEEF IBM QRadar Log Event Extended Format 2.0

All formats support UDP, TCP, and TLS transport. Events are buffered with configurable retry and backoff.

3. DLP (Data Loss Prevention)

Content scanning with four action levels:

Action Behavior
BLOCK Reject the request entirely, raise DLPBlockedException
MASK Replace sensitive data with placeholders ([REDACTED], [EMAIL], etc.)
WARN Allow but log a warning to SIEM
LOG_ONLY Log the match silently

Built-in pattern categories: credentials, PII, source code artifacts. Custom patterns are supported via regex and keyword lists.

4. HashiCorp Vault Integration

Secure secret management with:

  • Token, AppRole, and Kubernetes authentication methods
  • KV v2 secret engine support
  • Automatic token renewal
  • Configurable cache TTL (default 300s)
  • Access audit logging to SIEM
  • Graceful fallback to environment variables when Vault is unavailable

Architecture

graph TB
    subgraph "Security Layer"
        SLP[SecureLLMProvider]
        CS[ContentScanner]
        SIEM[SIEMDispatcher]
        VC[VaultClient]
        LSL[LLMSecurityLogger]
    end

    subgraph "SIEM Handlers"
        SH[SysLogHandler]
        SJH[SysLogJSONHandler]
        CH[CEFHandler]
        LH[LEEFHandler]
    end

    subgraph "Advanced Security"
        FSS[FileSecurityScanner]
        TVS[TaintVerifiedScanner]
        HS[HardeningScanner]
        RG[ReportGenerator]
        AE[AutofixEngine]
    end

    subgraph "Storage"
        DB[(llm_audit_log)]
        DLPDB[(dlp_events)]
        SEDB[(security_events)]
    end

    SLP --> CS
    SLP --> LSL
    LSL --> DB
    LSL --> SIEM
    CS --> SIEM
    SIEM --> SH
    SIEM --> SJH
    SIEM --> CH
    SIEM --> LH
    SLP --> VC

    FSS --> RG
    TVS --> RG
    HS --> RG
    AE --> TVS

    SIEM --> SEDB
    CS --> DLPDB

The security module is configured through a single SecurityConfig Pydantic model, loaded from the security: section of config.yaml via get_security_config().


Configuration

SecurityConfig

Module: src/security/config.py

The root configuration model for the entire security module:

class SecurityConfig(BaseModel):
    enabled: bool = False
    llm_logging: LLMLoggingConfig
    siem: SIEMConfig
    dlp: DLPConfig
    vault: VaultConfig

Access the singleton instance:

from src.security.config import get_security_config

config = get_security_config()

The configuration is loaded from config.yaml and supports environment variable interpolation using ${VAR_NAME:-default} syntax.

LLMLoggingConfig

class LLMLoggingConfig(BaseModel):
    enabled: bool = True
    log_prompts: bool = True
    redact_prompts: bool = True
    max_prompt_length: int = 2000
    log_responses: bool = True
    max_response_length: int = 5000
    log_token_usage: bool = True
    log_latency: bool = True
    log_to_database: bool = True
    log_to_siem: bool = True

SIEMConfig

class SIEMConfig(BaseModel):
    enabled: bool = True
    syslog: SysLogConfig       # RFC 5424, UDP/TCP/TLS
    cef: CEFConfig             # ArcSight
    leef: LEEFConfig           # QRadar
    buffer: SIEMBufferConfig   # Retry and buffering

Sub-configurations:

class SysLogConfig(BaseModel):
    enabled: bool = True
    protocol: SIEMProtocol = SIEMProtocol.UDP  # udp, tcp, tls
    host: str = "localhost"
    port: int = 514
    facility: int = 16  # LOCAL0
    app_name: str = "codegraph"
    hostname: Optional[str] = None  # Auto-detected
    tls: Optional[TLSConfig] = None

class CEFConfig(BaseModel):
    enabled: bool = False
    host: str = ""
    port: int = 514
    protocol: SIEMProtocol = SIEMProtocol.UDP
    device_vendor: str = "CodeGraph"
    device_product: str = "CodeAnalysis"
    device_version: str = "1.0"

class LEEFConfig(BaseModel):
    enabled: bool = False
    host: str = ""
    port: int = 514
    protocol: SIEMProtocol = SIEMProtocol.UDP
    product_vendor: str = "CodeGraph"
    product_name: str = "CodeAnalysis"
    product_version: str = "1.0"

class SIEMBufferConfig(BaseModel):
    max_size: int = 10000
    flush_interval_seconds: int = 5
    retry_attempts: int = 3
    retry_backoff_seconds: float = 2.0

DLPConfig

class DLPConfig(BaseModel):
    enabled: bool = True
    pre_request: DLPPreRequestConfig   # default_action = WARN
    post_response: DLPPostResponseConfig  # default_action = MASK
    categories: Dict[str, DLPCategoryConfig]
    keywords: Dict[str, DLPKeywordListConfig]
    keywords_action: DLPAction = DLPAction.LOG_ONLY
    webhook: DLPWebhookConfig

VaultConfig

class VaultConfig(BaseModel):
    enabled: bool = False
    url: str = "http://localhost:8200"
    auth_method: str = "token"  # token, approle, kubernetes
    namespace: Optional[str] = None
    token: VaultTokenAuthConfig
    approle: VaultAppRoleAuthConfig
    kubernetes: VaultKubernetesAuthConfig
    secrets: Dict[str, VaultSecretPathConfig]
    secrets_mount_point: str = "secret"
    llm_secrets_path: str = "codegraph/llm"
    cache_ttl_seconds: int = 300
    timeout_seconds: int = 30
    tls_verify: bool = True
    rotation_enabled: bool = False
    rotation_check_interval: int = 300
    audit_access: bool = True

Full config.yaml example

security:
  enabled: true

  llm_logging:
    enabled: true
    log_prompts: true
    redact_prompts: true
    max_prompt_length: 2000
    log_responses: true
    max_response_length: 5000
    log_token_usage: true
    log_latency: true
    log_to_database: true
    log_to_siem: true

  siem:
    enabled: true
    syslog:
      enabled: true
      protocol: udp
      host: siem.company.local
      port: 514
      facility: 16
      app_name: codegraph
    cef:
      enabled: false
      host: arcsight.company.local
      port: 514
    leef:
      enabled: false
      host: qradar.company.local
      port: 514
    buffer:
      max_size: 10000
      flush_interval_seconds: 5
      retry_attempts: 3

  dlp:
    enabled: true
    pre_request:
      enabled: true
      default_action: WARN
    post_response:
      enabled: true
      default_action: MASK
    categories:
      credentials:
        enabled: true
        action: BLOCK
        patterns:
          - name: api_key_generic
            regex: '(?i)(api[_-]?key|apikey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?'
          - name: aws_access_key
            regex: 'AKIA[0-9A-Z]{16}'
          - name: private_key
            regex: '-----BEGIN (RSA |EC |OPENSSH |DSA )?PRIVATE KEY-----'
      pii:
        enabled: true
        action: MASK
        patterns:
          - name: email
            regex: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
            mask_with: '[EMAIL]'
          - name: credit_card
            regex: '\b(?:\d{4}[\s-]?){3}\d{4}\b'
            mask_with: '[CARD]'
    keywords:
      sensitive_terms:
        words: ["classified", "top secret"]
        case_sensitive: false
    keywords_action: LOG_ONLY
    webhook:
      enabled: false
      endpoint: https://dlp.company.local/alerts
      auth_header: "Bearer ${DLP_WEBHOOK_TOKEN}"
      timeout_seconds: 10
      retry_attempts: 3
      notify_on: [BLOCK, WARN]

  vault:
    enabled: false
    url: https://vault.company.local:8200
    auth_method: approle
    namespace: codegraph
    approle:
      role_id: "${VAULT_ROLE_ID}"
      secret_id: "${VAULT_SECRET_ID}"
    secrets_mount_point: secret
    llm_secrets_path: codegraph/llm
    cache_ttl_seconds: 300
    tls_verify: true
    rotation_enabled: true
    rotation_check_interval: 300
    audit_access: true

Environment Variables

Only one dedicated environment variable controls the security module at runtime:

Variable Default Description
DLP_ENABLED not set Set in src/api/main.py to enable/disable DLP at the API level

All other security settings are configured through the security: section of config.yaml and loaded via get_security_config(). The configuration supports ${VAR_NAME:-default} syntax for referencing environment variables within YAML values (e.g., Vault credentials, webhook tokens).


Core Components

SecureLLMProvider

Module: src/security/llm/secure_provider.py

Wraps any LLM provider to add DLP scanning, audit logging, and SIEM dispatch.

class SecureLLMProvider:
    def __init__(self, wrapped_provider, config: Optional[SecurityConfig] = None): ...
    def generate(self, system_prompt, user_prompt, **kwargs): ...
    def stream(self, system_prompt, user_prompt, **kwargs): ...

Usage:

from src.security.llm.secure_provider import SecureLLMProvider
from src.security.config import get_security_config

# Wrap an existing LLM provider
secure = SecureLLMProvider(base_provider, get_security_config())

# generate() performs: DLP pre-scan -> LLM call -> DLP post-scan -> log
response = secure.generate(
    system_prompt="You are a code reviewer.",
    user_prompt="Review this function..."
)

The provider intercepts both generate() and stream() calls. For each request:

  1. Scans the user prompt with ContentScanner.scan_request()
  2. If the action is BLOCK, raises DLPBlockedException
  3. If the action is MASK, sends the masked content to the LLM
  4. Scans the LLM response with ContentScanner.scan_response()
  5. Logs the full interaction via LLMSecurityLogger
  6. Dispatches a SecurityEvent to SIEM

ContentScanner (DLP)

Module: src/security/dlp/scanner.py

Scans text content for sensitive data using regex patterns and keyword lists.

class ContentScanner:
    def __init__(self, config: DLPConfig): ...
    def scan(self, content: str) -> List[DLPMatch]: ...
    def scan_request(self, content: str) -> ScanResult: ...
    def scan_response(self, content: str) -> ScanResult: ...
    def get_action(self, matches: List[DLPMatch]) -> DLPAction: ...
    def mask(self, content: str, matches: List[DLPMatch]) -> str: ...

Methods:

Method Description
scan(content) Returns raw list of DLPMatch objects
scan_request(content) Full pre-request scan with action resolution
scan_response(content) Full post-response scan with masking
get_action(matches) Determines highest-priority action from matches
mask(content, matches) Replaces matched text with mask placeholders

Usage:

from src.security.dlp.scanner import ContentScanner
from src.security.config import get_security_config

scanner = ContentScanner(get_security_config().dlp)

# Pre-request scan
result = scanner.scan_request(user_prompt)
if result.blocked:
    raise DLPBlockedException(result.matches)
elif result.action == DLPAction.MASK:
    user_prompt = result.modified_content

# Post-response scan
result = scanner.scan_response(llm_response)
if result.has_matches:
    llm_response = result.modified_content

Action priority (highest wins when multiple patterns match):

BLOCK (4) > MASK (3) > WARN (2) > LOG_ONLY (1)

ScanResult

Module: src/security/dlp/scanner.py

@dataclass
class ScanResult:
    has_matches: bool
    matches: List[DLPMatch]
    action: DLPAction
    modified_content: Optional[str]
    blocked: bool

DLPMatch

Module: src/security/dlp/patterns.py

@dataclass
class DLPMatch:
    category: str
    pattern_name: str
    match_type: MatchType  # REGEX or KEYWORD
    matched_text: str
    start: int
    end: int
    action: DLPAction
    mask_with: str = "[REDACTED]"
    severity: str = "medium"

The MatchType enum has two values: REGEX and KEYWORD.

DLPBlockedException

Module: src/security/dlp/scanner.py (re-exported from src/security/dlp/__init__.py)

class DLPBlockedException(Exception):
    def __init__(self, matches: List[DLPMatch], message: Optional[str] = None): ...

Raised when a DLP scan returns a BLOCK action. The matches attribute contains the list of patterns that triggered the block.

SIEMDispatcher

Module: src/security/siem/dispatcher.py

Routes security events to multiple SIEM handlers simultaneously. Thread-safe with buffered delivery.

class SIEMDispatcher:
    def __init__(self, config: SIEMConfig): ...
    def dispatch(self, event: SecurityEvent) -> bool: ...
    def add_handler(self, handler: BaseSIEMHandler) -> None: ...
    def close(self) -> None: ...

Factory functions:

def init_siem_dispatcher(config: SIEMConfig) -> SIEMDispatcher
def get_siem_dispatcher() -> SIEMDispatcher

Usage:

from src.security.siem.dispatcher import get_siem_dispatcher
from src.security.siem.base_handler import SecurityEvent, SecurityEventType

dispatcher = get_siem_dispatcher()

event = SecurityEvent.create(
    event_type=SecurityEventType.DLP_BLOCK,
    message="Credential detected in prompt",
    request_id="req-abc-123",
    severity=3,
    user_id="user@company.com",
    dlp_category="credentials",
    dlp_pattern="aws_access_key",
)
dispatcher.dispatch(event)

The dispatcher automatically initializes handlers based on SIEMConfig: - SysLogHandler if syslog.enabled and syslog.host are set - CEFHandler if cef.enabled and cef.host are set - LEEFHandler if leef.enabled and leef.host are set

Events are buffered via SIEMBuffer with configurable retry and exponential backoff.

SecurityEvent

Module: src/security/siem/base_handler.py

@dataclass
class SecurityEvent:
    event_type: SecurityEventType
    timestamp: str
    request_id: str
    message: str
    severity: int = 6  # INFO (RFC 5424)
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    ip_address: Optional[str] = None
    user_agent: Optional[str] = None
    provider: Optional[str] = None
    model: Optional[str] = None
    action: Optional[str] = None
    dlp_category: Optional[str] = None
    dlp_pattern: Optional[str] = None
    tokens_used: Optional[int] = None
    latency_ms: Optional[float] = None
    project_id: Optional[str] = None
    group_id: Optional[str] = None
    details: Dict[str, Any] = field(default_factory=dict)

Class methods:

Method Description
SecurityEvent.create(event_type, message, request_id, severity=6, **kwargs) Creates event with auto-generated UTC timestamp
to_dict() Converts event to dictionary for serialization

Severity levels follow RFC 5424 (0 = Emergency, 7 = Debug).

SecurityEventType

Module: src/security/siem/base_handler.py

class SecurityEventType(str, Enum):
    LLM_REQUEST = "llm.request"
    LLM_RESPONSE = "llm.response"
    LLM_ERROR = "llm.error"
    DLP_BLOCK = "dlp.block"
    DLP_MASK = "dlp.mask"
    DLP_WARN = "dlp.warn"
    DLP_LOG = "dlp.log"
    VAULT_ACCESS = "vault.access"
    VAULT_ROTATE = "vault.rotate"
    AUTH_SUCCESS = "auth.success"
    AUTH_FAILURE = "auth.failure"
    RATE_LIMIT = "rate.limit"
    SECURITY_ALERT = "security.alert"
    PATH_VIOLATION = "security.path_violation"
    IDOR_ATTEMPT = "security.idor_attempt"
    WEBHOOK_REPLAY = "security.webhook.replay"
    MCP_AUTH_FAILURE = "security.mcp.auth_failure"

VaultClient

Module: src/security/vault/client.py

Provides access to secrets stored in HashiCorp Vault. Falls back to environment variables when Vault is unavailable.

class VaultClient:
    def __init__(self, config: VaultConfig): ...
    def read_secret(self, path, version=None, mount_point=None) -> Dict: ...
    def write_secret(self, path, data, mount_point=None) -> Dict: ...
    def delete_secret(self, path, versions=None, mount_point=None): ...
    def get_llm_credentials(self) -> Dict[str, str]: ...
    def renew_token(self) -> bool: ...
    def is_authenticated(self) -> bool: ...
    def close(self): ...

Usage:

from src.security.vault.client import VaultClient
from src.security.config import get_security_config

vault = VaultClient(get_security_config().vault)

# Read LLM provider credentials
creds = vault.get_llm_credentials()
api_key = creds.get("api_key")

# Read arbitrary secret
db_config = vault.read_secret("codegraph/database")

# Token management
if vault.is_authenticated():
    vault.renew_token()

vault.close()

Requires the hvac library (pip install hvac). Raises VaultError on failures.

Authentication methods:

Method Config Key Description
Token vault.token.value Direct token authentication
AppRole vault.approle.role_id / vault.approle.secret_id Machine-to-machine auth
Kubernetes vault.kubernetes.role / vault.kubernetes.jwt_path In-cluster auth via service account

Advanced Security Features

FileSecurityScanner

Module: src/security/file_scanner.py

Direct file-based scanning for security vulnerabilities without requiring CPG generation. Uses regex patterns to detect issues in source files.

class FileSecurityScanner:
    def __init__(self, patterns=None, exclude_dirs=None, exclude_files=None): ...
    def scan_project(self, project_path: str) -> ScanResult: ...
    def scan_django_settings(self, settings_path: str) -> List[FileFinding]: ...
    def scan_for_secrets(self, project_path: str) -> List[FileFinding]: ...

Note: The main scan method is scan_project(), not scan().

Usage:

from src.security.file_scanner import FileSecurityScanner

scanner = FileSecurityScanner(
    exclude_dirs=["node_modules", "venv", ".git"]
)
result = scanner.scan_project("/path/to/project")

print(f"Files scanned: {result.files_scanned}")
print(f"Critical issues: {result.critical_count}")
print(f"High issues: {result.high_count}")

for finding in result.findings:
    print(f"  {finding.severity.value}: {finding.pattern_name} in {finding.file_path}:{finding.line_number}")

The ScanResult returned by scan_project() is defined in src/security/file_scanner.py (different from the DLP ScanResult):

@dataclass
class ScanResult:
    project_path: str
    scan_time: datetime
    duration_seconds: float
    files_scanned: int
    findings: List[FileFinding]
    errors: List[str]

    @property
    def critical_count(self) -> int: ...
    @property
    def high_count(self) -> int: ...

The FileFinding dataclass:

@dataclass
class FileFinding:
    pattern_id: str
    pattern_name: str
    severity: VulnerabilitySeverity
    category: VulnerabilityCategory
    file_path: str
    line_number: int
    line_content: str
    match_text: str
    description: str
    cwe_ids: List[str]
    remediation: str
    confidence: float = 0.8

TaintVerifiedScanner

Module: src/security/taint_verified_scanner.py

Verifies potential vulnerabilities through data-flow analysis. Uses DataFlowTracer to confirm whether user input actually reaches dangerous sinks, reducing false positives.

class TaintVerifiedScanner:
    def __init__(self, cpg_service): ...
    def verify_sql_injection(
        self,
        findings,
        source_functions=None,
        sink_functions=None,
        max_depth=None,
    ) -> List[VerifiedFinding]: ...
    def scan_sql_injection_verified(self, limit=50) -> List[VerifiedFinding]: ...

Note: The constructor accepts cpg_service (a CPG query service instance), not a raw database path.

Usage:

from src.security.taint_verified_scanner import TaintVerifiedScanner

scanner = TaintVerifiedScanner(cpg_service)

# Scan with taint verification
verified = scanner.scan_sql_injection_verified(limit=50)

for finding in verified:
    if finding.is_verified:
        print(f"CONFIRMED: {finding.original_finding}")
        print(f"  Taint path: {finding.taint_path}")
    else:
        print(f"  Sanitized (confidence: {finding.sanitization_confidence})")

The VerifiedFinding dataclass:

@dataclass
class VerifiedFinding:
    original_finding: Dict[str, Any]
    is_verified: bool
    taint_path: Optional[DataFlowPath] = None
    sanitization_confidence: float = 0.0
    verification_notes: List[str]

HardeningScanner (D3FEND)

Module: src/security/hardening/hardening_scanner.py

Implements MITRE D3FEND Source Code Hardening checks. Verifies that defensive coding practices are followed, unlike vulnerability scanners that look for exploitable flaws.

class HardeningScanner:
    def __init__(self, cpg_service: Any, language: str = "c"): ...
    def scan_all(self, limit_per_check: int = 50) -> List[HardeningFinding]: ...
    def scan_by_d3fend_id(self, d3fend_ids: List[str], limit: int = 50) -> List[HardeningFinding]: ...
    def scan_by_category(self, category: HardeningCategory, limit: int = 50) -> List[HardeningFinding]: ...
    def scan_by_severity(self, min_severity: HardeningSeverity, limit: int = 50) -> List[HardeningFinding]: ...
    def get_compliance_score(self, findings: List[HardeningFinding]) -> Dict[str, Any]: ...
    def get_checks_summary(self) -> Dict[str, Any]: ...
    def get_remediation_report(self, findings: List[HardeningFinding]) -> str: ...

Usage:

from src.security.hardening.hardening_scanner import HardeningScanner
from src.security.hardening.base import HardeningCategory, HardeningSeverity

scanner = HardeningScanner(cpg_service, language="c")

# Run all checks
findings = scanner.scan_all(limit_per_check=50)

# Filter by D3FEND technique
init_findings = scanner.scan_by_d3fend_id(["D3-VI", "D3-RN"])

# Filter by category
pointer_findings = scanner.scan_by_category(HardeningCategory.POINTER_SAFETY)

# Filter by minimum severity
critical_findings = scanner.scan_by_severity(HardeningSeverity.HIGH)

# Get compliance score
score = scanner.get_compliance_score(findings)
print(f"Compliance: {score}")

# Get remediation report (Markdown)
report = scanner.get_remediation_report(findings)

Supported D3FEND techniques:

ID Technique Category
D3-VI Variable Initialization INITIALIZATION
D3-CS Credential Scrubbing CREDENTIAL_MANAGEMENT
D3-IRV Integer Range Validation INTEGER_SAFETY
D3-PV Pointer Validation POINTER_SAFETY
D3-RN Reference Nullification MEMORY_SAFETY
D3-TL Trusted Library LIBRARY_SAFETY
D3-VTV Variable Type Validation TYPE_SAFETY
D3-MBSV Memory Block Start Validation POINTER_SAFETY
D3-NPC Null Pointer Checking POINTER_SAFETY
D3-DLV Domain Logic Validation DOMAIN_VALIDATION
D3-OLV Operational Logic Validation OPERATIONAL_VALIDATION

HardeningCheck

Module: src/security/hardening/base.py

Definition of a D3FEND hardening check:

@dataclass
class HardeningCheck:
    id: str
    d3fend_id: str
    d3fend_name: str
    category: HardeningCategory
    severity: HardeningSeverity
    description: str
    cpgql_query: str
    cwe_ids: List[str]
    language_scope: List[str]
    indicators: List[str]
    good_patterns: List[str]
    remediation: str
    example_code: str
    confidence_weight: float

Note: The SQL query field is named cpgql_query, not sql_query.

HardeningFinding

Module: src/security/hardening/base.py

Result from running a hardening check:

@dataclass
class HardeningFinding:
    finding_id: str
    check_id: str
    d3fend_id: str
    category: str
    severity: str
    method_name: str
    filename: str
    line_number: int
    code_snippet: str
    description: str
    cwe_ids: List[str] = field(default_factory=list)
    remediation: str = ""
    confidence: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)

HardeningCategory

Module: src/security/hardening/base.py

class HardeningCategory(Enum):
    INITIALIZATION = "initialization"
    CREDENTIAL_MANAGEMENT = "credential_mgmt"
    INTEGER_SAFETY = "integer_safety"
    POINTER_SAFETY = "pointer_safety"
    MEMORY_SAFETY = "memory_safety"
    LIBRARY_SAFETY = "library_safety"
    TYPE_SAFETY = "type_safety"
    DOMAIN_VALIDATION = "domain_validation"
    OPERATIONAL_VALIDATION = "operational"

HardeningSeverity

Module: src/security/hardening/base.py

class HardeningSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

ReportGenerator

Module: src/security/report_generator.py

Generates consolidated security audit reports in multiple formats. The class name is ReportGenerator (not SecurityReportGenerator).

class ReportGenerator:
    def __init__(self): ...
    def create_report(self, project_name, project_path, scan_result=None) -> SecurityAuditReport: ...
    def add_cpg_findings(self, findings): ...
    def add_dlp_findings(self, findings): ...
    def add_hardening_findings(self, findings): ...
    def add_taint_paths(self, taint_paths): ...
    def save_report(
        self,
        output_dir,
        formats=None,
        base_filename="security_audit",
        language="en",
    ) -> Dict[str, str]: ...

Usage:

from src.security.report_generator import ReportGenerator

generator = ReportGenerator()

# Create base report from file scan
report = generator.create_report(
    project_name="my-app",
    project_path="/path/to/project",
    scan_result=file_scan_result,
)

# Add findings from other sources
generator.add_cpg_findings(cpg_findings)
generator.add_dlp_findings(dlp_findings)
generator.add_hardening_findings(hardening_findings)
generator.add_taint_paths(taint_paths)

# Save in multiple formats
files = generator.save_report(
    output_dir="./reports",
    formats=["json", "markdown", "sarif"],
    language="en",
)
# Returns: {"json": "/path/to/report.json", "markdown": "/path/to/report.md", ...}

SecurityAuditReport

Module: src/security/report_generator.py

The consolidated report dataclass:

@dataclass
class SecurityAuditReport:
    project_name: str
    project_path: str
    audit_time: datetime
    duration_seconds: float
    file_findings: List[FileFinding]
    cpg_findings: List[Dict[str, Any]]
    dlp_findings: List[Dict[str, Any]]
    hardening_findings: List[Dict[str, Any]]
    taint_paths: List[Any]
    files_scanned: int = 0
    patterns_checked: int = 0
    errors: List[str]

Output methods:

Method Description
to_json() JSON report with metadata, summary, and all findings
to_markdown(language="en") Localized Markdown report with severity tables and D3FEND section
to_sarif() SARIF 2.1.0 format for GitHub Security Alerts

Properties:

Property Description
all_findings All findings except hardening, enriched with OWASP categories
all_findings_including_hardening All findings including D3FEND hardening
severity_counts Dict of severity to count
critical_count Number of critical findings
high_count Number of high findings
total_findings Total count (excluding hardening)

AutofixEngine

Module: src/analysis/autofix/engine.py

Generates automated fix suggestions for security vulnerabilities found through taint analysis. Tries template-based fixes first, falls back to LLM generation.

class AutofixEngine:
    def __init__(self, source_root: str = "", dry_run: bool = True): ...
    def generate_fixes(self, taint_paths) -> List[AutofixResult]: ...

Usage:

from src.analysis.autofix.engine import AutofixEngine

engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths)

for result in results:
    print(f"Strategy: {result.strategy}")  # "template" or "llm"
    print(f"CWE: {result.cwe_id}")
    print(f"Validated: {result.validated}")
    print(f"Diff:\n{result.fix.diff}")

The AutofixResult dataclass:

@dataclass
class AutofixResult:
    fix: FixSuggestion
    strategy: str  # "template" or "llm"
    validated: bool
    validation: Optional[ValidationResult] = None
    taint_path: Optional[TaintPath] = None
    cwe_id: str = ""

The engine is always read-only by default (dry_run=True). It generates diffs but never applies them automatically.

Security Hypothesis System

Module: src/security/hypothesis/

The Security Hypothesis System is an advanced subsystem for proactive vulnerability detection. It generates security hypotheses based on CWE/CAPEC knowledge bases and validates them against the CPG.

Key features: - CWE database (58 entries) and CAPEC database (27 entries) - 13 SQL templates and 12 taint templates (inter-procedural) - 6 framework-specific providers: PostgreSQL, Django, Spring, Express, Gin, Next.js - Multi-criteria scoring with configurable presets (embedded, web, enterprise) - Chain detection (14 escalation patterns) - Feedback store and trend tracking - Incremental analysis via git diff

For full documentation, see Hypothesis System Reference.

CLI entry points:

python -m src.cli hypothesis run --language C [--max 50] [--min-priority 0.3] [--format json]
python -m src.cli hypothesis list-cwes [--category buffer_overflow]
python -m src.cli hypothesis providers

DLP Patterns

Built-in Pattern Categories

The module ships with three default categories, defined in src/security/config.py via get_default_dlp_categories():

credentials (action: BLOCK):

Pattern Description Example Match
api_key_generic Generic API key api_key="sk_live_abc..."
aws_access_key AWS Access Key ID AKIAIOSFODNN7EXAMPLE
aws_secret_key AWS Secret Access Key aws_secret_access_key="..."
private_key Private key header -----BEGIN RSA PRIVATE KEY-----
password_pattern Password in config/code password="hunter2"
bearer_token JWT Bearer token Bearer eyJhbG...
github_token GitHub PAT ghp_xxxxxxxxxxxx...

pii (action: MASK):

Pattern Mask Description
email [EMAIL] Email addresses
phone_ru [PHONE] Russian phone numbers
phone_us [PHONE] US phone numbers
ssn [SSN] US Social Security Numbers
credit_card [CARD] Credit card numbers
ip_address [IP] IPv4 addresses
passport_ru [PASSPORT] Russian passport numbers

source_code (action: WARN):

Pattern Mask Description
connection_string [CONN_STRING] Database connection strings (JDBC, MySQL, PostgreSQL, etc.)
internal_path_unix [PATH] Unix internal paths (/home/, /var/, /etc/)
internal_path_windows [PATH] Windows internal paths (C:\Users\, etc.)

Custom Patterns

Add custom patterns via config.yaml:

security:
  dlp:
    categories:
      internal_projects:
        enabled: true
        action: WARN
        patterns:
          - name: project_codename
            regex: '(?i)(project[\s_-]?(phoenix|atlas|nova))'
            mask_with: '[PROJECT]'
            description: "Internal project codenames"
    keywords:
      restricted_terms:
        words: ["confidential", "internal only", "do not distribute"]
        case_sensitive: false
    keywords_action: WARN

Each DLPCategoryConfig supports:

class DLPCategoryConfig(BaseModel):
    enabled: bool = True
    action: DLPAction = DLPAction.WARN
    patterns: List[DLPPatternConfig]

class DLPPatternConfig(BaseModel):
    name: str
    regex: str
    mask_with: str = "[REDACTED]"
    description: Optional[str] = None

Database Tables

Migration: 002_llm_audit_log.py

llm_audit_log

Stores every LLM interaction with full metadata.

Column Type Description
id INTEGER PK Auto-increment primary key
request_id VARCHAR Unique request identifier
user_id VARCHAR User who made the request
session_id VARCHAR Session identifier
ip_address VARCHAR Client IP address
provider VARCHAR LLM provider name
model VARCHAR Model identifier
system_prompt_hash VARCHAR SHA-256 hash of system prompt
system_prompt_length INTEGER Length of system prompt
user_prompt_preview TEXT Truncated user prompt
user_prompt_length INTEGER Full length of user prompt
response_preview TEXT Truncated LLM response
response_length INTEGER Full length of response
status VARCHAR Request status (success/error)
prompt_tokens INTEGER Tokens in prompt
completion_tokens INTEGER Tokens in completion
total_tokens INTEGER Total tokens used
latency_ms FLOAT Request latency in milliseconds
dlp_action VARCHAR DLP action taken (BLOCK/MASK/WARN/LOG_ONLY)
dlp_match_count INTEGER Number of DLP matches
dlp_categories VARCHAR Comma-separated DLP categories
error_type VARCHAR Error type if failed
error_message TEXT Error message if failed
timestamp TIMESTAMP Event timestamp
metadata JSON Additional metadata

dlp_events

Stores individual DLP match events, linked to llm_audit_log.

Column Type Description
id INTEGER PK Auto-increment primary key
audit_log_id INTEGER FK Reference to llm_audit_log.id
request_id VARCHAR Request identifier
event_type VARCHAR DLP event type (pre_request/post_response)
action VARCHAR DLP action taken
category VARCHAR DLP category (credentials, pii, etc.)
pattern_name VARCHAR Pattern that matched
severity VARCHAR Match severity
match_preview VARCHAR Truncated matched text
position VARCHAR Match position (start-end)
user_id VARCHAR User identifier
ip_address VARCHAR Client IP
timestamp TIMESTAMP Event timestamp

security_events

Stores SIEM events dispatched by the security module.

Column Type Description
id INTEGER PK Auto-increment primary key
event_id VARCHAR Unique event identifier
event_type VARCHAR Event type from SecurityEventType
severity INTEGER RFC 5424 severity (0-7)
request_id VARCHAR Associated request identifier
user_id VARCHAR User identifier
session_id VARCHAR Session identifier
ip_address VARCHAR Client IP
message TEXT Human-readable message
details JSON Additional event details
dispatched BOOLEAN Whether event was sent to SIEM
dispatch_error TEXT Error message if dispatch failed
timestamp TIMESTAMP Event timestamp

SIEM Event Formats

All handlers extend BaseSIEMHandler and implement format_event() and send().

SysLog (RFC 5424)

Module: src/security/siem/syslog_handler.pySysLogHandler

Message format:

<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [SD-ID SD-PARAMS] MSG

Example:

<134>1 2024-12-09T10:30:00.000000Z server01 codegraph 1234 LLM001 [meta@47450 request_id="abc123" event_type="llm.request" user_id="user@corp.com" provider="gigachat"] LLM request logged

PRI is calculated as facility * 8 + severity. Structured data includes request_id, event_type, and all non-null optional fields from SecurityEvent.

Supports UDP, TCP, and TLS transport protocols.

SysLog JSON

Module: src/security/siem/syslog_handler.pySysLogJSONHandler

Extends SysLogHandler with JSON-formatted message bodies. Useful for SIEM systems that parse JSON payloads from syslog messages.

CEF (ArcSight)

Module: src/security/siem/cef_handler.pyCEFHandler

Common Event Format message:

CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension

Example:

CEF:0|CodeGraph|CodeAnalysis|1.0|DLP001|DLP Block|7|src=192.168.1.1 suser=user123 msg=Credential detected

Severity mapping (RFC 5424 to CEF 0-10 scale):

RFC 5424 CEF Level
0 (Emergency) 10 Highest
1 (Alert) 9
2 (Critical) 8
3 (Error) 7
4 (Warning) 6
5 (Notice) 5
6 (Info) 3
7 (Debug) 1 Lowest

LEEF (QRadar)

Module: src/security/siem/leef_handler.pyLEEFHandler

Log Event Extended Format 2.0 message:

LEEF:2.0|Vendor|Product|Version|EventID|key1=value1\tkey2=value2

Example:

LEEF:2.0|CodeGraph|CodeAnalysis|1.0|DLP001|src=192.168.1.1  usrName=user123 msg=Credential detected

LEEF 2.0 uses tab characters as field delimiters in the extension block.

Event ID mapping (shared across CEF and LEEF):

Event Type ID Description
llm.request LLM001 LLM request logged
llm.response LLM002 LLM response logged
llm.error LLM003 LLM error occurred
dlp.block DLP001 DLP blocked content
dlp.mask DLP002 DLP masked content
dlp.warn DLP003 DLP warning issued
dlp.log DLP004 DLP logged match
vault.access VLT001 Vault secret accessed
vault.rotate VLT002 Vault secret rotated
auth.success AUTH01 Authentication successful
auth.failure AUTH02 Authentication failed
rate.limit RATE01 Rate limit exceeded
security.alert SEC001 General security alert

Webhook Integration

DLPWebhookClient

Module: src/security/dlp/webhook.py

Sends DLP alerts to external systems via HTTP webhooks with async delivery, retry, and exponential backoff.

class DLPWebhookClient:
    def __init__(self, config: DLPWebhookConfig): ...
    def send_alert(self, matches, action, request_id=None, user_id=None, ip_address=None) -> bool: ...
    def send_alert_sync(self, matches, action, **kwargs) -> bool: ...
    def stop(self) -> None: ...

    @property
    def is_enabled(self) -> bool: ...
    @property
    def queue_size(self) -> int: ...

Configuration (DLPWebhookConfig):

class DLPWebhookConfig(BaseModel):
    enabled: bool = False
    endpoint: Optional[str] = None
    auth_header: Optional[str] = None
    timeout_seconds: int = 10
    retry_attempts: int = 3
    notify_on: List[DLPAction] = [DLPAction.BLOCK, DLPAction.WARN]

Usage:

from src.security.dlp.webhook import DLPWebhookClient, create_webhook_alert_callback
from src.security.config import get_security_config

# Direct usage
client = DLPWebhookClient(get_security_config().dlp.webhook)
client.send_alert(
    matches=dlp_matches,
    action=DLPAction.BLOCK,
    request_id="req-123",
    user_id="user@corp.com",
)
client.stop()

# Or use the convenience callback factory
callback, client = create_webhook_alert_callback(config.dlp.webhook)
# callback(matches, action) can be passed to DLP action handlers

The client supports context manager usage:

with DLPWebhookClient(config) as client:
    client.send_alert(matches, action)

DLPAlert

Module: src/security/dlp/webhook.py

Alert payload sent to external DLP systems:

@dataclass
class DLPAlert:
    alert_id: str
    timestamp: str
    action: str
    match_count: int
    categories: List[str]
    patterns: List[str]
    request_id: Optional[str] = None
    user_id: Optional[str] = None
    ip_address: Optional[str] = None
    severity: str = "medium"
    context: Dict[str, Any] = None

Alerts are created from DLP matches via DLPAlert.from_matches(). The severity is determined by the highest severity among the matched patterns.

JSON payload example:

{
  "alert_id": "a1b2c3d4e5f6",
  "timestamp": "2024-12-09T10:30:00.000000Z",
  "action": "BLOCK",
  "match_count": 2,
  "categories": ["credentials"],
  "patterns": ["aws_access_key", "private_key"],
  "request_id": "req-abc-123",
  "user_id": "user@company.com",
  "ip_address": "192.168.1.100",
  "severity": "critical",
  "context": {}
}

CLI Usage

Security Audit CLI

Module: src/cli/security_audit.py

Four commands: full, quick, settings, secrets.

# Full security audit with all report formats
python -m src.cli.security_audit full --path /path/to/project

# With options
python -m src.cli.security_audit full \
    --path /path/to/project \
    --output ./reports \
    --format json markdown sarif \
    --exclude-dirs vendor build \
    --no-cpg \
    --language python \
    --verbose

# Quick file-based scan only
python -m src.cli.security_audit quick --path /path/to/project

# Scan Django settings
python -m src.cli.security_audit settings --path /path/to/settings.py

# Scan for hardcoded secrets
python -m src.cli.security_audit secrets --path /path/to/project

Flags for full command:

Flag Description
--path, -p Path to project (required)
--output, -o Output directory for reports (default: ./security_reports)
--format, -f Output format(s): json, markdown/md, sarif, all
--exclude-dirs Additional directories to exclude
--no-cpg Skip CPG-based analysis (faster, file-based only)
--language, -l Target language: auto, c, cpp, python, javascript, typescript, go, csharp, kotlin, java, php
--verbose, -v Verbose output

Note: The --autofix flag is NOT available on the security audit CLI.

Audit CLI (with Autofix)

The --autofix flag is available on the main audit command:

python -m src.cli audit --db /path/to/cpg.duckdb --autofix

This runs the audit composite scenario with automated fix generation enabled.


MCP Tools

Module: src/mcp/tools/security.py

Two security-related MCP tools are registered:

codegraph_autofix

Generates automated fix suggestions for security vulnerabilities in a method.

codegraph_autofix(method_name: str, cwe: str = "")
Parameter Type Required Description
method_name str Yes Method to analyze and generate fixes for
cwe str No CWE filter (e.g., "CWE-89" for SQL injection only)

The tool runs a security scan on the specified method, builds taint paths from findings, and generates template-based or LLM-powered fix patches. Returns diffs only (read-only, never applies).

Note: There is no vulnerability_type parameter.

codegraph_taint_analysis

Runs taint analysis on a specified method.


Security Module Structure

src/security/
    __init__.py
    _base.py                          # VulnerabilitySeverity, VulnerabilityCategory
    config.py                         # SecurityConfig, DLPConfig, SIEMConfig, VaultConfig
    file_scanner.py                   # FileSecurityScanner, FileFinding, ScanResult
    taint_verified_scanner.py         # TaintVerifiedScanner, VerifiedFinding
    report_generator.py               # ReportGenerator, SecurityAuditReport
    report_localizer.py               # ReportLocalizer (EN/RU)
    sarif_exporter.py                 # SARIF 2.1.0 export
    owasp_mapping.py                  # OWASP Top 10 enrichment
    taint_visualizer.py               # Taint path visualization
    security_patterns.py              # SecurityPattern definitions
    security_agents.py                # SecurityScanner (CPG-based)
    dlp/
        __init__.py
        patterns.py                   # DLPMatch, MatchType, PatternRegistry
        scanner.py                    # ContentScanner, ScanResult, DLPBlockedException
        actions.py                    # DLP action handlers
        webhook.py                    # DLPWebhookClient, DLPAlert
    siem/
        __init__.py
        base_handler.py               # BaseSIEMHandler, SecurityEvent, SecurityEventType
        dispatcher.py                 # SIEMDispatcher, init_siem_dispatcher
        syslog_handler.py             # SysLogHandler, SysLogJSONHandler
        cef_handler.py                # CEFHandler
        leef_handler.py               # LEEFHandler
        buffer.py                     # SIEMBuffer (retry + backoff)
    vault/
        __init__.py
        client.py                     # VaultClient, VaultError
        secret_manager.py             # Higher-level secret management
    llm/
        __init__.py
        secure_provider.py            # SecureLLMProvider
        request_logger.py             # LLMSecurityLogger
    hardening/
        __init__.py
        base.py                       # HardeningCheck, HardeningFinding, enums
        d3fend_checks.py              # D3FEND check definitions
        hardening_scanner.py          # HardeningScanner
    hypothesis/                       # Security Hypothesis System (21+ files)
        __init__.py
        hypothesis_generator.py
        knowledge_base.py
        query_synthesizer.py
        query_templates.py
        models.py
        executor.py
        validator.py
        chain_detector.py
        multi_criteria_scorer.py
        feedback.py
        trend_store.py
        incremental.py
        providers/
            __init__.py
            registry.py
            yaml_provider.py
        postgresql/
        django/
        spring/
        express/
        gin/
        nextjs/
    patterns/                         # Language-specific security patterns
        __init__.py
        python_django.py
        injection.py
        auth.py
        crypto.py
        memory.py
        concurrency.py
        input_validation.py
        java.py
        javascript.py
        go.py
        csharp.py
        kotlin.py
        php.py

Quick Start Guide

1. Enable the security module

# config.yaml
security:
  enabled: true

2. Wrap your LLM provider

from src.security.llm.secure_provider import SecureLLMProvider
from src.security.config import get_security_config

secure_provider = SecureLLMProvider(your_llm_provider, get_security_config())
response = secure_provider.generate(system_prompt, user_prompt)

3. Run a file-based security scan

python -m src.cli.security_audit full --path /path/to/project --output ./reports

4. Run D3FEND hardening checks

from src.security.hardening.hardening_scanner import HardeningScanner

scanner = HardeningScanner(cpg_service, language="c")
findings = scanner.scan_all()
score = scanner.get_compliance_score(findings)

5. Generate a consolidated report

from src.security.report_generator import ReportGenerator

gen = ReportGenerator()
report = gen.create_report("my-app", "/path/to/project", scan_result)
gen.add_hardening_findings(hardening_results)
gen.save_report("./reports", formats=["json", "markdown", "sarif"])

6. Configure SIEM forwarding

# config.yaml
security:
  siem:
    enabled: true
    syslog:
      enabled: true
      host: siem.company.local
      port: 514
      protocol: tls
      tls:
        ca_cert: /etc/ssl/certs/siem-ca.pem
        verify: true

7. Enable DLP with webhook alerts

# config.yaml
security:
  dlp:
    enabled: true
    webhook:
      enabled: true
      endpoint: https://dlp-alerts.company.local/api/v1/alerts
      auth_header: "Bearer ${DLP_WEBHOOK_TOKEN}"
      notify_on: [BLOCK, WARN]

See Also