Analysis Modules Reference

Comprehensive documentation for the code analysis modules in src/analysis/.

Last updated: 2026-03-07

Table of Contents


Overview

The analysis modules provide advanced static analysis capabilities on top of the CPG (Code Property Graph) stored in DuckDB. They are organized into subpackages:

src/analysis/
├── cfg_analyzer.py              # Control flow graph
├── cfg_unreachable.py           # Unreachable code detection
├── clone_detector.py            # Code clone detection
├── compliance.py                # Security compliance mapping
├── explain.py                   # Method analysis engine
├── concurrency_core.py          # Concurrency analysis (4 mixins)
├── field_sensitive_tracer.py    # Field-sensitive taint analysis
├── callgraph/                   # Call graph analysis (8 modules)
├── dataflow/                    # Data flow analysis (12 modules)
│   └── taint/                   # Taint propagation engine (7 modules)
├── autofix/                     # Automated fix generation (5 modules)
└── patterns/                    # Pattern engine (2 modules)

Module Architecture

graph TD
    subgraph "Control Flow"
        CFG[CFGAnalyzer] --> PatchCF[PatchControlFlowAnalyzer]
        CFG --> Unreachable[UnreachableCodeDetector]
    end

    subgraph "Data Flow"
        DFBase[BaseTracer] --> DFTracer[DataFlowTracer]
        DFBase --> TypeProp[TypePropagator]
        DFBase --> PtrAlias[PointerAliasAnalyzer]
    end

    subgraph "Taint Analysis"
        TaintProp[TaintPropagator] --> FieldSensTracker[FieldSensitiveTracker]
        TaintProp --> ContextSens[ContextSensitiveTracker]
        TaintProp --> InterProc[InterProcTracker]
        TaintProp --> DFTracker[DataflowTracker]
        TaintProp --> SymExec[PathConstraintTracker]
        DFTracer --> FieldTracer[FieldSensitiveTracer]
    end

    subgraph "Call Graph"
        CGA[CallGraphAnalyzer] --> PathFind[PathFinder]
        CGA --> Impact[ImpactAnalyzer]
        CGA --> Centrality[CentralityAnalyzer]
        CGA --> Components[ComponentAnalyzer]
        CGA --> CrossLang[CrossLanguageAnalyzer]
        CGA --> Complexity[ComplexityAnalyzer]
    end

    subgraph "Pattern + Autofix"
        Patterns[LLMPatternGenerator] --> Bridge[PatternTaintBridge]
        Bridge --> TaintProp
        AutofixEng[AutofixEngine] --> AutofixGen[AutofixGenerator]
        AutofixEng --> DiffVal[DiffValidator]
        AutofixEng --> SSR[SSRAutofixBridge]
    end

Module-to-Scenario Map

Module Subpackage Type Key Scenarios DuckPGQ
CFGAnalyzer root analyzer S05, S06, S13 (refactoring, performance) No
UnreachableCodeDetector root analyzer S05, S13 (dead code) No
ASTCloneDetector root analyzer S07, S13 (refactoring) No
ComplianceMapper root mapper S02, S08 (security, compliance) No
ConcurrencyAnalyzer root analyzer S16 (concurrency) No
FieldSensitiveTracer root analyzer S02, S08, S14 (security) No
DataFlowTracer dataflow facade S02, S14 (security, incident) No
TypePropagator dataflow analyzer S02 (type confusion) No
PointerAliasAnalyzer dataflow analyzer S02 (use-after-free) No
MemoryLifetimeAnalyzer dataflow analyzer S02, S14 (memory safety) No
NullCheckAnalyzer dataflow analyzer S02, S05 (null deref) No
CodeStringTracer dataflow analyzer S02 (code injection) No
InfoDisclosureAnalyzer dataflow analyzer S02 (info leak) No
TaintPropagator dataflow/taint engine S02, S14 (security) No
CallGraphAnalyzer callgraph facade S01, S12, S14 (onboarding, cross-repo) Yes
PathFinder callgraph analyzer S01, S14 (call chains) Yes
CentralityAnalyzer callgraph analyzer S05, S12 (hotspots) Yes
ComponentAnalyzer callgraph analyzer S12, S13 (components) Yes
ImpactAnalyzer callgraph analyzer S09, S14 (change impact) No
CrossLanguageAnalyzer callgraph analyzer S12 (cross-repo) No
LLMPatternGenerator patterns generator S21 (pattern search) No
PatternTaintBridge patterns bridge S02, S21 (security) No
AutofixEngine autofix engine S02 (security fix) No
PatchControlFlowAnalyzer patch_review analyzer S09 (patch review) No

Control Flow Analysis

CFGAnalyzer

File: src/analysis/cfg_analyzer.py Scenarios: S05, S06, S13 (refactoring, performance, mass-refactoring)

CFG-based analysis using the edges_cfg table for accurate control flow analysis.

Key Classes

@dataclass
class CFGStructure:
    """Represents the CFG structure of a method."""
    method_name: str
    method_full_name: str
    nodes: List[int]
    edges: List[Tuple[int, int]]  # (src, dst) pairs
    entry_nodes: List[int]
    exit_nodes: List[int]
    node_count: int
    edge_count: int

@dataclass
class CFGPath:
    """Represents an execution path through the CFG."""
    path_id: str
    nodes: List[int]
    length: int
    has_loop: bool = False

API Reference

CFGAnalyzer(cpg_service)

Initialize with CPGQueryService or DuckDB connection.

get_method_cfg(method_name: str) -> Optional[CFGStructure]

Get the CFG structure for a method.

from src.analysis.cfg_analyzer import CFGAnalyzer

analyzer = CFGAnalyzer(cpg_service)
cfg = analyzer.get_method_cfg("heap_insert")
print(f"Nodes: {cfg.node_count}, Edges: {cfg.edge_count}")
compute_cyclomatic_complexity(method_name: str) -> int

Calculate McCabe cyclomatic complexity: M = E - N + 2

complexity = analyzer.compute_cyclomatic_complexity("heap_insert")
print(f"Complexity: {complexity}")  # e.g., 15
enumerate_paths(method_name: str, max_paths: int = 100, max_depth: int = 50) -> List[CFGPath]

Find execution paths through the CFG with cycle detection.

paths = analyzer.enumerate_paths("process_query", max_paths=50)
for path in paths:
    print(f"Path {path.path_id}: {path.length} nodes, loop={path.has_loop}")
find_dominators(method_name: str) -> Dict[int, Set[int]]

Compute dominator tree using edges_dominate table.

find_post_dominators(method_name: str) -> Dict[int, Set[int]]

Compute post-dominator tree using edges_post_dominate table.

get_cfg_successors(node_id: int) -> List[int]

Get CFG successor nodes.

get_cfg_predecessors(node_id: int) -> List[int]

Get CFG predecessor nodes.

get_control_flow_paths(source_node: int, sink_node: int, max_depth: int = 20) -> List[List[int]]

Find all paths between two CFG nodes.

analyze_complexity_distribution(threshold: int = 10) -> Dict[str, Any]

Analyze complexity across all methods in the codebase.

dist = analyzer.analyze_complexity_distribution()
print(f"Average complexity: {dist['average']}")
print(f"High complexity methods: {dist['high_complexity_methods']}")

Database Tables Used

  • nodes_method — Method metadata
  • edges_contains — Method-to-node containment
  • edges_cfg — CFG edges between nodes
  • edges_dominate — Dominator relationships
  • edges_post_dominate — Post-dominator relationships

Performance Notes

  • Path enumeration bounded by max_paths (default 100) and max_depth (default 50)
  • Complexity distribution scans all methods — may be slow on large codebases (>50K methods)

UnreachableCodeDetector

File: src/analysis/cfg_unreachable.py Scenarios: S05, S13 (dead code detection)

Detects unreachable code using CFG analysis — code after return, exit(), noreturn functions.

Key Classes

@dataclass
class UnreachableCodeFinding:
    """Represents a finding of unreachable code."""
    method_name: str
    method_id: int
    filename: str
    line_number: int
    terminating_type: str  # 'return', 'exit_call', 'error_call', 'noreturn'
    terminating_node_id: int
    terminating_line: int
    unreachable_node_id: int
    unreachable_code: Optional[str] = None
    confidence: float = 0.9

API Reference

UnreachableCodeDetector(cpg_service)

Initialize with CPGQueryService instance.

detect_unreachable_code() -> List[UnreachableCodeFinding]

Find all unreachable code patterns in the codebase.

from src.analysis.cfg_unreachable import UnreachableCodeDetector

detector = UnreachableCodeDetector(cpg_service)
findings = detector.detect_unreachable_code()
for f in findings:
    print(f"{f.filename}:{f.line_number} — unreachable after {f.terminating_type}")

Database Tables Used

  • edges_cfg — CFG edges to check successor reachability
  • nodes_method — Method metadata
  • nodes_return — Return statement nodes

PatchControlFlowAnalyzer

File: src/patch_review/analyzers/control_flow_analyzer.py Scenarios: S09 (patch review)

Analyzes control flow impact of patches using CFGAnalyzer for accurate metrics.

Key Classes

@dataclass
class NewLoopFinding:
    method_name: str
    loop_type: str
    line_number: int
    is_nested: bool
    has_io: bool
    is_unbounded: bool
    severity: Severity  # HIGH, MEDIUM, LOW
    details: str

@dataclass
class ErrorHandlingChange:
    method_name: str
    change_type: str
    error_type: str
    line_number: int
    details: str

@dataclass
class BranchCoverageImpact:
    new_branches: int
    removed_branches: int
    net_change: int
    methods_with_new_branches: List[str]
    uncovered_paths: List[Dict[str, Any]]

API Reference

PatchControlFlowAnalyzer(conn, delta_cpg=None)

Initialize with DuckDB connection and optional DeltaCPG.

analyze_control_flow_changes(patch, delta_cpg) -> ControlFlowAnalysisResult

Full control flow analysis of a patch.

from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer

analyzer = PatchControlFlowAnalyzer(conn)
result = analyzer.analyze_control_flow_changes(patch, delta_cpg)

print(f"Complexity delta: {result.complexity_delta}")
print(f"New loops: {len(result.new_loops)}")
analyze_complexity_change(changed_methods) -> List[ComplexityDelta]

Calculate complexity before/after for changed methods.

detect_new_loops(changed_methods) -> List[NewLoopFinding]

Detect newly introduced loops with risk classification.

analyze_error_handling_changes(changed_methods) -> List[ErrorHandlingChange]

Track changes to error handling code.

analyze_branch_coverage_impact(changed_methods) -> BranchCoverageImpact

Estimate branch coverage impact of changes.

Loop Severity Classification

  • HIGH: Nested loops, loops with I/O, unbounded loops
  • MEDIUM: Loops with external calls
  • LOW: Simple bounded loops

Data Flow Analysis

DataFlowTracer

File: src/analysis/dataflow/tracer.py Scenarios: S02, S14 (security, incident response)

Main data flow analysis using REACHING_DEF edges. Extends BaseTracer (src/analysis/dataflow/base.py).

Key Classes

@dataclass
class DataFlowPath:
    """A data flow path from definition to use."""
    path_id: str
    variable_name: str
    source_location: Dict[str, Any]
    sink_location: Dict[str, Any]
    path_length: int
    intermediate_nodes: List[Dict[str, Any]] = field(default_factory=list)
    is_inter_procedural: bool = False
    sanitization_points: List[Dict[str, Any]] = field(default_factory=list)

@dataclass
class VariableFlow:
    """All flows of a variable across the codebase."""
    variable_name: str
    definition_points: List[Dict[str, Any]] = field(default_factory=list)
    use_points: List[Dict[str, Any]] = field(default_factory=list)
    flows: List[DataFlowPath] = field(default_factory=list)

API Reference

DataFlowTracer(cpg_service)

Initialize with CPGQueryService.

trace_variable(variable_name: str, method_name: Optional[str] = None, max_depth: Optional[int] = None) -> VariableFlow

Trace all flows of a variable using REACHING_DEF edges.

from src.analysis.dataflow.tracer import DataFlowTracer

tracer = DataFlowTracer(cpg_service)
flow = tracer.trace_variable("user_input", method_name="process_request")
print(f"Definitions: {len(flow.definition_points)}, Uses: {len(flow.use_points)}")
find_reaching_definitions(node_id: int) -> List[Dict]

Find all definitions reaching a use site.

find_variable_uses(node_id: int) -> List[Dict]

Find all uses of a definition.

trace_inter_procedural(variable_name: str, max_depth: int = 5) -> List[DataFlowPath]

Trace data flow across function boundaries.

Database Tables Used

  • nodes_identifier — Variable references
  • edges_reaching_def — Definition-use chains
  • edges_argument — Function argument edges

TypePropagator

File: src/analysis/dataflow/type_propagator.py Scenarios: S02 (type confusion vulnerabilities)

Tracks type transformations along data flow paths (casts, promotions, truncations).

Key Classes

@dataclass
class TypeTransformation:
    """A type transformation at a specific point."""
    node_id: int
    from_type: str
    to_type: str
    transformation_kind: str  # 'cast', 'promotion', 'truncation', 'reinterpret'
    is_safe: bool
    line_number: int

@dataclass
class TypeFlow:
    """Type propagation along a data flow path."""
    variable_name: str
    initial_type: str
    final_type: str
    transformations: List[TypeTransformation]
    has_unsafe_cast: bool
    has_truncation: bool

API Reference

TypePropagator(cpg_service)
trace_type_flow(variable_name: str, method_name: Optional[str] = None) -> TypeFlow

Track type changes along a variable’s data flow.

find_unsafe_casts(limit: int = 100) -> List[TypeTransformation]

Find potentially unsafe type casts.


PointerAliasAnalyzer

File: src/analysis/dataflow/pointer_alias.py Scenarios: S02 (use-after-free, double-free)

Pointer alias analysis for tracking which pointers may refer to the same memory.

Key Classes

@dataclass
class AllocationSite:
    """Represents a memory allocation site."""
    node_id: int
    function_name: str  # malloc, calloc, etc.
    variable_name: str
    line_number: int

API Reference

PointerAliasAnalyzer(cpg_service)
find_aliases(variable_name: str) -> List[str]

Find all variables that may alias the given pointer.

trace_allocation_lifetime(alloc_site: AllocationSite) -> Dict

Track an allocation from creation to deallocation.


PathConstraintTracker

File: src/analysis/dataflow/symbolic_execution.py Scenarios: S02 (path feasibility in taint analysis)

Lightweight symbolic execution for checking path feasibility using Z3 solver.

Key Classes

class SymbolicExecutionConfig:
    """Configuration for symbolic execution engine."""
    enabled: bool = True
    max_constraints: int = 20
    solver_timeout_ms: int = 500
    solver_timeout_uf_ms: int = 2000
    max_parse_depth: int = 10
    enable_function_models: bool = True
    enable_arithmetic: bool = True

@dataclass
class PathConstraint:
    """A constraint on an execution path."""
    node_id: int
    condition: str
    is_true_branch: bool
    variables: List[str]

class PathConstraintTracker:
    """Tracks path constraints for feasibility analysis."""

API Reference

PathConstraintTracker(cpg_service)
check_path_feasibility(path_nodes: List[int]) -> bool

Check whether a data flow path is feasible given branch conditions.


Taint Analysis

TaintPropagator

File: src/analysis/dataflow/taint/propagator.py Scenarios: S02, S14 (security analysis, incident response)

Main engine for taint analysis. Composes all taint sub-modules into a unified pipeline.

Key Classes

@dataclass
class TaintNode:
    """A node in a taint path."""
    node_id: int
    name: str
    code: str
    line_number: int
    filename: str
    node_type: str

@dataclass
class TaintPath:
    """A complete taint path from source to sink."""
    source: TaintNode
    sink: TaintNode
    intermediate: List[TaintNode]
    path_length: int
    confidence: float
    sink_category: str
    is_sanitized: bool = False
    sanitization_point: Optional[TaintNode] = None

API Reference

TaintPropagator(cpg_service, enable_inter_proc=True, enable_control_flow=True, enable_field_sensitive=True, enable_context_sensitive=True, enable_symbolic_execution=True)

Initialize with CPG service and optional feature toggles.

from src.analysis.dataflow.taint.propagator import TaintPropagator

propagator = TaintPropagator(cpg_service)
paths = propagator.find_taint_paths(
    sources=["getenv", "fgets", "read"],
    sinks=["system", "exec", "popen"]
)
for path in paths:
    print(f"Source: {path.source.name} -> Sink: {path.sink.name}")
    print(f"Confidence: {path.confidence}, Sanitized: {path.is_sanitized}")
find_taint_paths(sources, sinks, max_depth=None) -> List[TaintPath]

Find taint paths from source functions to sink functions.

analyze_sql_injections() -> List[TaintPath]

Convenience method for SQL injection detection.

Sub-modules

Sub-module File Purpose
DataflowTracker tracker.py Core BFS-based taint propagation
InterProcTracker interprocedural.py Cross-function call tracking
FieldSensitiveTracker field_sensitive.py Per-field taint tracking
ContextSensitiveTracker context_sensitive.py Call-site context awareness
ControlFlowAnalyzer control_flow.py Control-dependency analysis

FieldSensitiveTracer

File: src/analysis/field_sensitive_tracer.py Scenarios: S02, S08, S14 (security, compliance, incident response)

Top-level field-path tracking for precise taint analysis. Distinguishes between different fields of the same object (e.g., user.password vs user.name).

Note: Not to be confused with FieldSensitiveTracker (dataflow/taint/field_sensitive.py), which is a lower-level taint tracker used internally by TaintPropagator.

Key Classes

@dataclass
class FieldPath:
    """Represents a field access path like obj.field1.field2."""
    base_variable: str
    field_chain: List[str]
    full_path: str
    node_ids: List[int] = field(default_factory=list)
    type_full_name: Optional[str] = None

    @classmethod
    def from_code(cls, code: str) -> "FieldPath": ...
    def matches(self, other: "FieldPath") -> Tuple[bool, str]: ...

@dataclass
class FieldAccess:
    """Represents a single field access in code."""
    node_id: int
    base_variable: str
    field_name: str
    access_code: str
    line_number: int
    filename: str
    access_type: str = "read"  # 'read', 'write', 'call'
    containing_method: Optional[str] = None

@dataclass
class FieldSensitiveFlow:
    """A dataflow path with field sensitivity."""
    source_path: FieldPath
    sink_path: FieldPath
    intermediate_fields: List[FieldPath]
    is_tainted: bool
    relationship: str  # 'exact', 'prefix', 'suffix', 'propagated'
    confidence: float = 1.0

API Reference

FieldSensitiveTracer(cpg_service)

Initialize with CPGQueryService or DuckDB connection.

parse_field_path(code: str) -> FieldPath

Parse field access string into structured FieldPath.

from src.analysis.field_sensitive_tracer import FieldSensitiveTracer

tracer = FieldSensitiveTracer(cpg_service)

path = tracer.parse_field_path("user->password")
print(path.base_variable)  # "user"
print(path.field_chain)    # ["password"]

path = tracer.parse_field_path("request.data.buffer")
print(path.full_path)  # "request.data.buffer"
get_struct_fields(type_name: str) -> List[Dict[str, Any]]

Get fields defined in a struct with type information.

fields = tracer.get_struct_fields("UserData")
for field in fields:
    print(f"{field['name']}: {field['type']}")
find_field_accesses(base_variable: str, field_name: Optional[str] = None) -> List[FieldAccess]

Find all accesses to a specific field.

accesses = tracer.find_field_accesses("user", "password")
for access in accesses:
    print(f"{access.filename}:{access.line_number} - {access.access_type}")
find_all_field_identifiers(field_name: Optional[str] = None, limit: int = 100) -> List[Dict]

Find all field identifier nodes, optionally filtered by name.

trace_field_taint(source_variable: str, source_field: Optional[str] = None, sink_patterns: Optional[List[str]] = None, max_depth: int = 10) -> List[FieldSensitiveFlow]

Trace taint from a source field to sink functions.

flows = tracer.trace_field_taint(
    source_variable="credentials",
    source_field="password",
    sink_patterns=["printf", "log", "send"]
)
for flow in flows:
    print(f"Tainted flow: {flow.source_path.full_path} -> {flow.sink_path.full_path}")
find_sensitive_field_flows(sensitive_fields: List[str] = None, sink_functions: List[str] = None) -> List[Dict[str, Any]]

Find flows from sensitive fields to dangerous sinks.

# Default sensitive fields: password, token, secret, private_key, credential, auth
flows = tracer.find_sensitive_field_flows()
print(f"Found {len(flows)} potential sensitive data exposures")

Sensitive Field Categories

Default sensitive fields tracked: - password, passwd, pwd - token, auth_token, access_token - secret, api_secret, client_secret - private_key, secret_key - credential, credentials - auth, authorization

Database Tables Used

  • nodes_field_identifier — Field access nodes
  • nodes_identifier — Variable identifiers
  • nodes_member — Struct member definitions
  • edges_reaching_def — Reaching definition edges
  • edges_argument — Function argument edges

FieldSensitiveTracker

File: src/analysis/dataflow/taint/field_sensitive.py Scenarios: S02, S14 (internal use by TaintPropagator)

Low-level per-field taint tracking used internally by TaintPropagator. Tracks which specific fields of an object are tainted vs clean.

Key Classes

@dataclass
class FieldTaint:
    """Taint status of a single field."""
    field_name: str
    is_tainted: bool
    source_node: Optional[int] = None

class FieldTaintMap:
    """Maps object fields to their taint status."""
    def set_taint(self, base_var: str, field: str, tainted: bool): ...
    def is_tainted(self, base_var: str, field: str) -> bool: ...
    def get_tainted_fields(self, base_var: str) -> List[str]: ...

ContextSensitiveTracker

File: src/analysis/dataflow/taint/context_sensitive.py Scenarios: S02 (precision improvement in taint analysis)

Tracks call-site context to avoid false positives when the same function is called with different taint states.

API Reference

ContextSensitiveTracker(cpg_service)
track_call_context(caller_id: int, callee_id: int) -> CallContext

Create a call context for taint propagation.


InterProcTracker

File: src/analysis/dataflow/taint/interprocedural.py Scenarios: S02, S14 (cross-function taint tracking)

Tracks taint propagation across function call boundaries via parameter passing and return values.

API Reference

InterProcTracker(cpg_service)
find_callers(method_name: str) -> List[Dict]

Find all call sites for a method.

map_arguments(call_site_id: int) -> Dict[int, int]

Map actual arguments to formal parameters.


Security-Specific Analyzers

MemoryLifetimeAnalyzer

File: src/analysis/dataflow/memory_lifetime.py Scenarios: S02 (use-after-free, double-free, memory leaks)

Tracks memory allocation/deallocation patterns to detect lifetime violations.

Key Classes

class MemoryState(Enum):
    ALLOCATED = "allocated"
    FREED = "freed"
    UNKNOWN = "unknown"

@dataclass
class MemoryOperation:
    node_id: int
    operation: str  # 'alloc', 'free', 'realloc'
    function_name: str
    variable_name: str
    line_number: int
    filename: str

@dataclass
class UseAfterFreePath:
    alloc: MemoryOperation
    free: MemoryOperation
    use: MemoryOperation
    path_length: int

@dataclass
class DoubleFreeePath:
    alloc: MemoryOperation
    first_free: MemoryOperation
    second_free: MemoryOperation

API Reference

MemoryLifetimeAnalyzer(cpg_service)
find_use_after_free(limit: int = 50) -> List[UseAfterFreePath]

Detect use-after-free patterns.

find_double_free(limit: int = 50) -> List[DoubleFreeePath]

Detect double-free patterns.

find_memory_leaks(limit: int = 100) -> List[Dict]

Find allocations without corresponding frees.


NullCheckAnalyzer

File: src/analysis/dataflow/null_check.py Scenarios: S02, S05 (null dereference)

Detects missing null checks after functions that can return NULL.

Key Classes

@dataclass
class NullCheckPath:
    function_name: str
    return_variable: str
    dereference_line: int
    dereference_file: str
    has_null_check: bool
    check_line: Optional[int] = None

API Reference

NullCheckAnalyzer(cpg_service)
find_missing_null_checks(limit: int = 100) -> List[NullCheckPath]

Find dereferences of values that may be NULL without prior checks.


CodeStringTracer

File: src/analysis/dataflow/code_string_tracer.py Scenarios: S02 (code injection)

Traces string construction patterns to detect code injection (SQL, command, eval).

Key Classes

@dataclass
class CodeInjectionPoint:
    function_name: str
    string_variable: str
    injection_type: str  # 'sql', 'command', 'eval'
    line_number: int
    filename: str
    user_input_source: Optional[str] = None

API Reference

CodeStringTracer(cpg_service)
find_string_concat_injections(sink_functions: List[str], limit: int = 50) -> List[CodeInjectionPoint]

Find string concatenation patterns that feed into sensitive sinks.


InfoDisclosureAnalyzer

File: src/analysis/dataflow/info_disclosure.py Scenarios: S02 (information leakage)

Detects flows of sensitive data to output/logging functions.

API Reference

InfoDisclosureAnalyzer(cpg_service)
find_info_leaks(sensitive_patterns: List[str] = None, limit: int = 50) -> List[InfoDisclosurePath]

Find sensitive data flowing to output functions.


ComplianceMapper

File: src/analysis/compliance.py Scenarios: S02, S08 (security compliance reporting)

Maps detected vulnerabilities to security standards (CWE, OWASP, CERT C, MISRA C).

Key Classes

@dataclass
class ComplianceFinding:
    vulnerability_type: str
    severity: str  # 'critical', 'high', 'medium', 'low'
    cwe_id: str
    cwe_name: str
    owasp_category: Optional[str]
    cert_rule: Optional[str]
    misra_rule: Optional[str]
    file_path: str
    line_number: int
    function_name: str
    description: str
    recommendation: str
    risk_score: float

@dataclass
class ComplianceReport:
    scan_date: str
    codebase: str
    total_findings: int
    findings_by_severity: Dict[str, int]
    findings_by_cwe: Dict[str, int]
    findings_by_owasp: Dict[str, int]
    compliance_score: float  # 0.0-1.0
    findings: List[ComplianceFinding]
    standards_used: List[str]

API Reference

ComplianceMapper()

Initialize (no external dependencies).

map_vulnerability(vuln_type, severity, file_path, line_number, function_name, description, risk_score) -> ComplianceFinding

Map a vulnerability to compliance standards.

from src.analysis.compliance import ComplianceMapper

mapper = ComplianceMapper()
finding = mapper.map_vulnerability(
    vuln_type="sql_injection",
    severity="high",
    file_path="src/api.c",
    line_number=42,
    function_name="process_query",
    description="User input concatenated into SQL query",
    risk_score=8.5
)
print(f"{finding.cwe_id}: {finding.cwe_name}")
# CWE-89: Improper Neutralization of Special Elements used in an SQL Command

Call Graph Analysis

CallGraphAnalyzer

File: src/analysis/callgraph/analyzer.py Scenarios: S01, S09, S12, S14 (onboarding, patch review, cross-repo, incident response)

Main facade composing all call graph analysis modules. Delegates to specialized sub-analyzers.

Architecture

CallGraphAnalyzer
├── PathFinder          — shortest path, callers/callees
├── CentralityAnalyzer  — PageRank, betweenness
├── ComponentAnalyzer   — SCC, WCC components
├── ComplexityAnalyzer  — CFG-based complexity
├── ImpactAnalyzer      — change impact, entry points, attack paths
└── CrossLanguageAnalyzer — FFI boundary detection

API Reference

CallGraphAnalyzer(cpg_service)
find_shortest_path(source_method: str, target_method: str, max_depth: Optional[int] = None) -> Optional[CallPath]

Find shortest call chain between two methods.

from src.analysis.callgraph.analyzer import CallGraphAnalyzer

cga = CallGraphAnalyzer(cpg_service)
path = cga.find_shortest_path("handle_request", "exec_query")
if path:
    print(f"Path length: {path.length}")
    print(f"Methods: {' -> '.join(path.methods)}")
find_all_callers(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]

Find all methods that call the given method.

find_all_callees(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]

Find all methods called by the given method.

find_cross_language_calls(method_name=None, source_language=None, target_language=None) -> List[CrossLanguageCall]

Find cross-language call edges (FFI boundaries).

detect_cycles(max_cycle_length: int = 10) -> List[CallCycle]

Detect recursion cycles using Tarjan’s SCC algorithm.

analyze_impact(method_name: str, max_depth: Optional[int] = None) -> ImpactAnalysis

Analyze change impact (transitive callers, affected components).

get_call_statistics() -> Dict[str, Any]

Overall call graph statistics (total methods, calls, fan-out).

find_entry_points(method_name: str, max_depth: int = 10) -> List[Dict]

Find public API entry points that can reach a method.

trace_attack_paths(entry_points, vuln_method, vuln_file="", vuln_line=0, max_paths=5) -> List[AttackPath]

Trace attack paths from entry points to a vulnerability.

compute_pagerank(top_n: Optional[int] = None) -> List[Dict]

Compute PageRank scores for method importance ranking.

compute_betweenness_centrality(sample_size=None, top_n=None) -> List[Dict]

Compute betweenness centrality for bridge method identification.

find_hotspots(min_in_degree: int = 3, limit: int = 25) -> List[Dict]

Find performance hotspots (methods with high caller count).

Key Data Models

@dataclass
class CallPath:
    source: str
    target: str
    methods: List[str]
    length: int

@dataclass
class CallCycle:
    methods: List[str]
    length: int
    is_direct: bool

@dataclass
class ImpactAnalysis:
    method_name: str
    direct_callers: List[str]
    transitive_callers: List[str]
    affected_files: List[str]
    impact_score: float

@dataclass
class AttackPath:
    entry_point: str
    vulnerability: str
    path: List[str]
    risk_amplification: float

@dataclass
class CrossLanguageCall:
    caller: str
    callee: str
    caller_language: str
    callee_language: str
    call_type: str  # 'ffi', 'cgo', 'jni', etc.

Sub-analyzers

Module File Key Methods DuckPGQ
PathFinder callgraph/pathfinding.py find_shortest_path, find_all_callers, find_all_callees No
PGQPathFinder callgraph/pathfinding.py Same API, uses DuckPGQ MATCH syntax Yes
CentralityAnalyzer callgraph/centrality.py compute_pagerank, compute_betweenness_centrality No
PGQCentralityAnalyzer callgraph/centrality.py Same API, uses DuckPGQ graph queries Yes
ComponentAnalyzer callgraph/components.py compute_scc, compute_wcc No
PGQComponentAnalyzer callgraph/components.py Same API, DuckPGQ-accelerated Yes
ComplexityAnalyzer callgraph/complexity.py compute_cyclomatic_complexity (CFG-based) No
ImpactAnalyzer callgraph/impact.py analyze_impact, find_entry_points, trace_attack_paths No
CrossLanguageAnalyzer callgraph/cross_language.py find_cross_language_calls No

Concurrency Analysis

ConcurrencyAnalyzer

File: src/analysis/concurrency_core.py Scenarios: S16 (concurrency analysis)

Composed from 4 specialized mixins. Lock/shared-memory patterns loaded from domain plugin.

class ConcurrencyAnalyzer(
    LockAnalyzerMixin,
    RaceDetectorMixin,
    SharedAccessAnalyzerMixin,
    AtomicOperationsAnalyzerMixin,
): ...

API Reference

ConcurrencyAnalyzer(cpg_service)

Access all methods from the 4 mixins below.

from src.analysis.concurrency_core import ConcurrencyAnalyzer

analyzer = ConcurrencyAnalyzer(cpg_service)
races = analyzer.detect_race_conditions()
locks = analyzer.find_lock_usage(lock_type="lwlock")
stats = analyzer.get_concurrency_statistics()

Data Models

@dataclass
class LockUsage:
    function_name: str
    lock_type: str
    lock_name: Optional[str]
    operation: str
    file_name: str
    line_number: int

@dataclass
class RaceConditionPattern:
    pattern_id: str
    pattern_type: str  # 'toctou', 'unprotected_access', 'signal_handler', 'double_check'
    affected_functions: List[str]
    shared_resource: str
    severity: str
    description: str

@dataclass
class SharedAccess:
    variable_name: str
    accessor_functions: List[str]
    access_type: str
    is_protected: bool
    protecting_lock: Optional[str]

@dataclass
class LockOrderViolation:
    violation_id: str
    lock_a: str
    lock_b: str
    function_acquiring_a_then_b: str
    function_acquiring_b_then_a: str
    risk_level: str

LockAnalyzerMixin

File: src/analysis/lock_analyzer.py

  • find_lock_usage(lock_type=None, function_name=None, limit=None) -> List[LockUsage]
  • detect_lock_ordering_issues(limit=None) -> List[LockOrderViolation]
  • detect_potential_deadlocks(limit=None) -> List[Dict]
  • analyze_lock_graph() -> Dict[str, Any]
  • get_lock_statistics() -> Dict[str, Any]

RaceDetectorMixin

File: src/analysis/race_detector.py

  • detect_race_conditions(pattern_types=None, limit=None) -> List[RaceConditionPattern]

Pattern types: toctou, signal_handler, unprotected_access, double_check.

SharedAccessAnalyzerMixin

File: src/analysis/shared_access_analyzer.py

  • analyze_shared_access(variable_pattern=None, limit=None) -> List[SharedAccess]

AtomicOperationsAnalyzerMixin

File: src/analysis/atomic_operations_analyzer.py

  • find_atomic_operations(limit: int = 100) -> List[Dict]
  • find_condition_variables(limit: int = 50) -> List[Dict]
  • analyze_function_concurrency(function_name: str) -> Dict
  • get_concurrency_statistics() -> Dict[str, Any]

Clone Detection

ASTCloneDetector

File: src/analysis/clone_detector.py Scenarios: S07, S13 (refactoring, duplicate detection)

Multi-level AST-based clone detection: - Type-1: Exact clones (identical code) - Type-2: Renamed clones (identifier changes only) - Type-3: Structural clones (similar structure with modifications) - Type-4: Semantic clones (different code, same behavior)

Key Classes

@dataclass
class CloneResult:
    method1_id: int
    method1_name: str
    method1_file: str
    method2_id: int
    method2_name: str
    method2_file: str
    similarity: float
    clone_type: str  # 'exact', 'renamed', 'structural', 'semantic'
    shared_patterns: List[str] = field(default_factory=list)
    line_count1: int = 0
    line_count2: int = 0

API Reference

ASTCloneDetector(cpg_service)
detect_clones(min_similarity=None, category=None, max_methods=None, min_lines=None) -> List[CloneResult]

Detect code clones across the codebase.

from src.analysis.clone_detector import ASTCloneDetector

detector = ASTCloneDetector(cpg_service)
clones = detector.detect_clones(min_similarity=0.8, min_lines=10)
for clone in clones:
    print(f"{clone.method1_name}{clone.method2_name} ({clone.clone_type}, {clone.similarity:.0%})")
detect_clones_for_category(category: str, min_similarity: Optional[float] = None) -> List[CloneResult]

Detect clones within a specific category (e.g., “null_check”, “string_operations”).

Performance Notes

  • Pairwise comparison: O(n²) where n = number of methods
  • Use max_methods to limit scope on large codebases
  • Use min_lines to skip trivial methods

Pattern Engine

LLMPatternGenerator

File: src/analysis/patterns/llm_pattern_generator.py Scenarios: S21 (pattern search)

LLM-assisted generation of YAML pattern rules from natural language descriptions.

Key Classes

@dataclass
class GeneratedRule:
    """Result of LLM rule generation."""
    yaml_text: str
    rule_id: str
    language: str
    has_fix: bool
    validated: bool
    validation_errors: List[str] = field(default_factory=list)
    generation_attempts: int = 1

API Reference

LLMPatternGenerator()

Initialize (loads prompt language from global registry).

async generate_rule(description: str, language: str, domain: Optional[str] = None, with_fix: bool = True, max_retries: int = 3, examples: Optional[List[str]] = None) -> GeneratedRule

Generate a YAML rule from a natural language description. Async method.

from src.analysis.patterns.llm_pattern_generator import LLMPatternGenerator

generator = LLMPatternGenerator()
rule = await generator.generate_rule(
    description="Find malloc calls without matching free",
    language="c",
    domain="linux_kernel"
)
print(rule.rule_id)        # e.g., "malloc-without-free"
print(rule.validated)      # True if passed gocpg validate-rule
print(rule.yaml_text)      # Full YAML rule content

PatternTaintBridge

File: src/analysis/patterns/taint_bridge.py Scenarios: S02, S21 (security, pattern search)

Bridges structural pattern matches with taint analysis. Enriches pattern findings with data flow information.

API Reference

PatternTaintBridge(cpg_service, taint_propagator=None)

Initialize with CPG service and optional TaintPropagator (falls back to lightweight SQL BFS).

async enrich_findings_with_taint(findings: List[Dict]) -> List[Dict]

Add taint path information to pattern findings. Async method.

from src.analysis.patterns.taint_bridge import PatternTaintBridge

bridge = PatternTaintBridge(cpg_service, taint_propagator)
enriched = await bridge.enrich_findings_with_taint(pattern_findings)

for finding in enriched:
    if finding.get("taint_enriched"):
        print(f"Finding {finding['rule_id']} has {len(finding['taint_paths'])} taint paths")

Autofix Engine

AutofixEngine

File: src/analysis/autofix/engine.py Scenarios: S02 (automated security fix generation)

Orchestrates autofix generation: template-based fixes first, LLM fallback, then validation.

Pipeline

TaintPath → Parse Location → Read Source → Infer Vuln Type
  → Try Template Fix → (if None) LLM Fallback → Validate Diff → AutofixResult

Key Classes

@dataclass
class AutofixResult:
    fix: FixSuggestion
    strategy: str  # "template" or "llm"
    validated: bool
    validation: Optional[ValidationResult] = None
    taint_path: Optional[TaintPath] = None
    cwe_id: str = ""

API Reference

AutofixEngine(source_root: str = "", dry_run: bool = True)

Initialize with source root directory. dry_run=True generates diffs without applying.

generate_fixes(taint_paths: List[TaintPath], vulnerability_type: str = "") -> List[AutofixResult]

Generate fixes for a list of taint paths.

from src.analysis.autofix.engine import AutofixEngine

engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths, vulnerability_type="sql_injection")
for result in results:
    print(f"Strategy: {result.strategy}, Validated: {result.validated}")
    print(f"Diff:\n{result.fix.diff_patch}")

AutofixGenerator

File: src/analysis/autofix/generator.py

Template-based fix generation using regex patterns for known vulnerability types.

@dataclass
class FixSuggestion:
    vulnerability_type: str
    severity: str
    file_path: str
    line_number: int
    original_code: str
    fixed_code: str
    explanation: str
    confidence: float  # 0.0-1.0
    diff_patch: str     # Unified diff format

@dataclass
class FixTemplate:
    vuln_type: str
    name: str
    description: str
    pattern: str      # Regex pattern
    replacement: str  # Replacement template
    explanation: str
  • AutofixGenerator() — loads templates from domain plugin
  • generate_fix(code, vuln_type, context) -> Optional[FixSuggestion]

DiffValidator

File: src/analysis/autofix/diff_validator.py

Validates that generated fixes are safe to apply.

@dataclass
class ValidationResult:
    valid: bool
    error: Optional[str] = None
    patched_content: Optional[str] = None
  • DiffValidator()MAX_CHANGE_RATIO = 0.5
  • validate(original_code, fixed_code, file_path, source_root) -> ValidationResult

PromptBuilder

File: src/analysis/autofix/prompt_builder.py

Builds LLM prompts for autofix generation when templates don’t match.

@dataclass
class AutofixPromptContext:
    vulnerable_code: str
    vulnerability_type: str
    cwe_id: str
    taint_path_summary: str
    language: str
  • PromptBuilder() — uses config/prompts/autofix/
  • build_prompt(context: AutofixPromptContext) -> str

SSRAutofixBridge

File: src/analysis/autofix/ssr_bridge.py

Bridge between autofix engine and GoCPG Structural Search & Replace (SSR).

  • SSRAutofixBridge(gocpg_client)
  • apply_ssr_rule(rule_id: str, file_path: str) -> Optional[FixSuggestion]

Patch Review Analyzers

Additional analyzers in src/patch_review/analyzers/ that extend base analysis modules for patch-specific impact assessment.

PatchCallGraphAnalyzer

File: src/patch_review/analyzers/call_graph_analyzer.py Scenarios: S09 (patch review)

Analyzes call graph impact of patch changes — blast radius, breaking changes, ripple effect.

@dataclass
class CallGraphNode:
    method_name: str
    full_name: str
    filename: str
    callers: List[str] = field(default_factory=list)
    callees: List[str] = field(default_factory=list)
    is_changed: bool = False
    change_type: Optional[ChangeType] = None
  • PatchCallGraphAnalyzer(conn, delta_cpg=None)
  • analyze_call_graph_changes(patch, delta_cpg) -> CallGraphAnalysisResult
  • compute_blast_radius(changed_methods) -> BlastRadius
  • detect_breaking_changes(changed_methods) -> List[BreakingChange]
  • compute_ripple_effect(changed_methods) -> RippleEffect

PatchDataFlowAnalyzer

File: src/patch_review/analyzers/dataflow_analyzer.py Scenarios: S09 (patch review)

Analyzes data flow impact of patches — new taint paths, sanitization bypasses, sensitive data flows.

@dataclass
class DataFlowChange:
    change_type: str
    source_method: str
    affected_variable: str
    severity: Severity
  • PatchDataFlowAnalyzer(conn, delta_cpg=None)
  • analyze_dataflow_changes(patch, delta_cpg) -> DataFlowAnalysisResult

PatchDependencyAnalyzer

File: src/patch_review/analyzers/dependency_analyzer.py Scenarios: S09 (patch review)

Analyzes how patches affect module dependencies, import relationships, and architectural coupling.

class DependencyChangeType(Enum):
    ADDED = "added"
    REMOVED = "removed"
    MODIFIED = "modified"
    CIRCULAR_INTRODUCED = "circular_introduced"
    LAYER_VIOLATION = "layer_violation"

@dataclass
class DependencyChange:
    change_type: DependencyChangeType
    source_module: str
    target_module: str
    source_file: str

@dataclass
class CircularDependency:
    cycle_path: List[str]
    introduced_edge: Tuple[str, str]
    severity: Severity

@dataclass
class LayerViolation:
    source_layer: str
    target_layer: str
    source_module: str
    target_module: str

@dataclass
class CouplingMetrics:
    afferent_coupling: int
    efferent_coupling: int
    instability: float

@dataclass
class DependencyAnalysisResult:
    dependency_changes: List[DependencyChange]
    circular_dependencies: List[CircularDependency]
    layer_violations: List[LayerViolation]
    coupling_metrics: Dict[str, CouplingMetrics]
  • PatchDependencyAnalyzer(conn, delta_cpg=None)
  • analyze_dependency_changes(patch, delta_cpg) -> DependencyAnalysisResult

Additional Data Flow Modules

RaceConditionAnalyzer (dataflow)

File: src/analysis/dataflow/race_condition.py Scenarios: S02, S16 (security, concurrency)

Detects TOCTOU vulnerabilities and data races using data flow analysis with control dependency checking.

@dataclass
class RaceConditionPath:
    race_type: str  # 'toctou', 'data_race', 'missing_lock'
    check_location: str
    check_function: str
    use_location: str
    use_function: str
    shared_resource: str
    has_lock: bool
    lock_function: Optional[str]
    path_nodes: List[TaintNode]
    confidence: float
    risk_score: float
  • RaceConditionAnalyzer(cpg_service)
  • analyze(max_paths=100, max_hops=None, min_confidence=0.7) -> List[RaceConditionPath]

Data Model Modules

The following modules define shared data models used across analysis subpackages:

Module File Key Classes
Dataflow Models src/analysis/dataflow/models.py DataFlowPath, VariableFlow
Taint Models src/analysis/dataflow/taint/models.py TaintNode, TaintPath, ControlDependency, CallContext
Concurrency Models src/analysis/concurrency_dataclasses.py LockUsage, RaceConditionPattern, SharedAccess, LockOrderViolation
Call Graph Models src/analysis/callgraph/models.py CallPath, CallCycle, ImpactAnalysis, AttackPath, CrossLanguageCall
Call Graph Base src/analysis/callgraph/base.py BaseAnalyzer (abstract base for all callgraph sub-analyzers)
Complexity Config src/analysis/callgraph/complexity.py ComplexityAnalyzer, DefaultThresholds

Database Schema Reference

Key Tables for Analysis

Table Purpose
nodes_method Method definitions
nodes_control_structure Control flow structures (if, for, while)
nodes_field_identifier Field access expressions
nodes_identifier Variable references
nodes_call Function call sites
nodes_return Return statements
nodes_member Struct member definitions
edges_cfg Control flow graph edges
edges_contains Containment relationships
edges_dominate Dominator relationships
edges_post_dominate Post-dominator relationships
edges_reaching_def Reaching definition edges
edges_argument Function argument edges
edges_call Call graph edges

Edge Types for Dataflow

Edge Type Table Purpose
CFG edges_cfg Control flow between statements
REACHING_DEF edges_reaching_def Definition-use chains
ARGUMENT edges_argument Function call arguments
CONTAINS edges_contains Scope containment
CALL edges_call Function call relationships
DOMINATE edges_dominate Dominance for control dependency

Troubleshooting

“Method not found”

The method name must match exactly (case-sensitive). Use the simple name, not the full qualified name.

# Correct
cfg = analyzer.get_method_cfg("heap_insert")

# Incorrect
cfg = analyzer.get_method_cfg("heap_insert(Relation, HeapTuple)")

“No CFG data found”

Ensure the CPG export included CFG edges. Check:

SELECT COUNT(*) FROM edges_cfg;

“No field accesses found”

Field access tracking requires nodes_field_identifier data:

SELECT COUNT(*) FROM nodes_field_identifier;

“No taint paths found”

Check that REACHING_DEF edges exist and source/sink functions are present:

SELECT COUNT(*) FROM edges_reaching_def;
SELECT DISTINCT name FROM nodes_call WHERE name IN ('system', 'exec', 'popen');

Performance Considerations

  • Path enumeration bounded by max_paths and max_depth to prevent explosion
  • Use max_depth to limit taint tracking depth (default from config)
  • Clone detection is O(n²) — use max_methods on large codebases
  • DuckPGQ variants (PGQ* classes) are faster for graph traversals
  • Large methods may have many paths; consider sampling

Method Explanation Engine

Module: src/analysis/explain.py

Comprehensive method analysis engine that aggregates CPG metrics, call graph data, taint information, and domain context into a single result. Used by MCP tool codegraph_explain and CLI.

ExplainResult

Aggregated method analysis result (dataclass).

Field Type Description
method_name str Short method name
full_name str Fully qualified name
file_path str Source file path
line_start int Start line number
line_end int End line number
line_count int Number of lines
signature str Method signature (first line of code)
cyclomatic_complexity int Cyclomatic complexity score
risk_level str Risk level: low (<10), moderate (10–19), high (20–49), critical (≥50)
fan_in int Number of callers
fan_out int Number of callees
direct_callers List[str] Direct caller names
transitive_caller_count int Total transitive callers up to depth
direct_callees List[str] Direct callee names
is_taint_source bool Method is a taint source (from domain plugin)
is_taint_sink bool Method is a taint sink (from domain plugin)
taint_paths_through int Taint paths passing through
subsystem str Domain subsystem name
pattern_flags Dict[str, bool] Flags: has_todo_fixme, has_deprecated, has_debug_code
docstring str Method docstring

ExplainAnalyzer

from src.analysis.explain import ExplainAnalyzer, ExplainResult

analyzer = ExplainAnalyzer(db_path="project.duckdb")
result = analyzer.collect("process_data", depth=2)

Constructor: ExplainAnalyzer(db_path: Optional[str] = None)

Methods:

Method Returns Description
collect(method_name, depth=2) Optional[ExplainResult] Collect comprehensive analysis data for a method
to_dict(result) Dict[str, Any] Convert ExplainResult to dict for JSON output
fuzzy_search(method_name, limit=5) List[str] Fuzzy search for method names when exact match fails

Config Orphan Analysis

Module: src/analysis/config_analyzer.py

Detects unused, missing, and mismatched configuration parameters by cross-referencing YAML config, Pydantic/dataclass schema, and source code usage. Performs a 4-stage pipeline: extraction → cross-reference → FP filtering → reporting.

Used by: S12 (tech debt) via OrphanConfigHandler, CLI dogfood config-check.

ConfigOrphanAnalyzer

from src.analysis.config_analyzer import ConfigOrphanAnalyzer, AnalyzerConfig

analyzer = ConfigOrphanAnalyzer(AnalyzerConfig())
orphans = analyzer.scan()
summary = analyzer.get_summary(orphans)

Constructor: ConfigOrphanAnalyzer(config: Optional[AnalyzerConfig] = None)

Methods:

Method Returns Description
scan() List[OrphanFinding] Run full orphan detection pipeline (4 stages)
get_summary(orphans) Dict[str, Any] Generate summary statistics by type and severity

Detection stages: 1. Extraction — parse YAML keys (YAMLExtractor), schema dataclass fields (ASTExtractor), code references (CodeScanner) 2. Cross-reference — compare defined vs used parameters 3. FP filtering — remove false positives (env vars, dynamic access, known patterns) 4. Reporting — produce OrphanFinding list

OrphanFinding

Dataclass representing a single orphan config finding.

Field Type Description
orphan_type str One of: yaml_unused, yaml_missing, code_orphan, path_mismatch, orphaned_dataclass, unused_default
severity str error, warning, or info
param_name str Dotted parameter path (e.g. timeouts.http_client)
description str Human-readable description
file_path str Source file where the issue was found
line_number int Line number in source file
suggestion str Fix suggestion (when available)
metadata Dict[str, Any] Additional context

AnalyzerConfig

Dataclass for domain-agnostic analyzer configuration.

Field Type Default Description
config_files List[str] ["config.yaml"] YAML config files to analyze
schema_files List[str] ["src/config/unified_config.py"] Pydantic/dataclass schema files
source_dirs List[str] ["src/"] Directories to scan for config usage
exclude_dirs List[str] ["tests/", "scripts/", "docs/"] Directories to exclude from scanning
test_dirs List[str] ["tests/"] Test directories (scanned separately)
root_dir Optional[str] None Project root (auto-detected if not set)
access_patterns List[str] regex list Patterns for typed config access (cfg.section.field)
raw_access_patterns List[str] regex list Patterns for raw dict access (_raw.get("key"))
dynamic_patterns List[str] regex list Patterns indicating dynamic access (excluded from detection)

YAMLExtractor

Extracts all leaf keys and environment variable references from YAML config files.

Method Returns Description
extract(config_path) Tuple[Dict[str, Any], Set[str]] Returns (flat_keys dict, env_var_keys set)

ASTExtractor

Parses Python schema files via AST to extract dataclass fields and @property definitions.

Method Returns Description
extract(schema_path) Tuple[Dict, Dict, Set] Returns (dataclass_fields, property_map, property_names)

CodeScanner

Scans Python source files for config access patterns with context-aware cfg variable tracking. Distinguishes between variables holding the full UnifiedConfig object vs sub-properties.

Method Returns Description
scan(source_dirs, exclude_dirs) Tuple[Dict, Set, bool] Returns (references, raw_keys, has_dynamic_access)

Scoped Analysis Configuration

Module: src/analysis/scope_config.py

Provides path-aware analysis policy selection so one project can apply different rules to main, tests, vendor, or other file groups.

AnalysisScope

Dataclass representing one named analysis scope.

Field Type Description
name str Scope name
paths list[str] Glob patterns matched against normalized file paths
rules str \| list[str] Rule mode such as all, critical_only, taint_only, or explicit rule IDs
severity_threshold str Minimum severity accepted in this scope
suppress_policy str Suppression policy identifier
enabled bool Whether the scope is active
Method Returns Description
matches(file_path) bool Check whether a file belongs to this scope
to_dict() dict Serialize the scope

ScopeConfig

Dataclass storing all configured scopes for a project.

Field Type Description
scopes list[AnalysisScope] Ordered list of configured scopes
default_scope str Fallback scope name
Method Returns Description
from_yaml(path) ScopeConfig Load scope rules from YAML
from_config_dict(config_dict) ScopeConfig Build config from a nested dict
resolve_scope(file_path) AnalysisScope \| None Resolve the effective scope for a file
should_report(file_path, severity, rule_id) bool Decide whether a finding should be reported
to_dict() dict Serialize the full config

See Also