Comprehensive documentation for the code analysis modules in src/analysis/.
Last updated: 2026-03-07
Table of Contents¶
- Overview
- Module Architecture
- Module-to-Scenario Map
- Control Flow Analysis
- CFGAnalyzer
- UnreachableCodeDetector
- PatchControlFlowAnalyzer
- Data Flow Analysis
- DataFlowTracer
- TypePropagator
- PointerAliasAnalyzer
- PathConstraintTracker
- Taint Analysis
- TaintPropagator
- FieldSensitiveTracer
- FieldSensitiveTracker
- ContextSensitiveTracker
- InterProcTracker
- Security-Specific Analyzers
- MemoryLifetimeAnalyzer
- NullCheckAnalyzer
- CodeStringTracer
- InfoDisclosureAnalyzer
- ComplianceMapper
- Call Graph Analysis
- CallGraphAnalyzer
- Concurrency Analysis
- ConcurrencyAnalyzer
- LockAnalyzerMixin
- RaceDetectorMixin
- SharedAccessAnalyzerMixin
- AtomicOperationsAnalyzerMixin
- Clone Detection
- ASTCloneDetector
- Pattern Engine
- LLMPatternGenerator
- PatternTaintBridge
- Autofix Engine
- AutofixEngine
- AutofixGenerator
- DiffValidator
- PromptBuilder
- SSRAutofixBridge
- Patch Review Analyzers
- PatchCallGraphAnalyzer
- PatchDataFlowAnalyzer
- PatchDependencyAnalyzer
- Additional Data Flow Modules
- RaceConditionAnalyzer (dataflow)
- Data Model Modules
- Database Schema Reference
- Key Tables for Analysis
- Edge Types for Dataflow
- Troubleshooting
- “Method not found”
- “No CFG data found”
- “No field accesses found”
- “No taint paths found”
- Performance Considerations
- Method Explanation Engine
- ExplainResult
- ExplainAnalyzer
- Config Orphan Analysis
- ConfigOrphanAnalyzer
- OrphanFinding
- AnalyzerConfig
- YAMLExtractor
- ASTExtractor
- CodeScanner
- Scoped Analysis Configuration
- AnalysisScope
- ScopeConfig
- See Also
Overview¶
The analysis modules provide advanced static analysis capabilities on top of the CPG (Code Property Graph) stored in DuckDB. They are organized into subpackages:
src/analysis/
├── cfg_analyzer.py # Control flow graph
├── cfg_unreachable.py # Unreachable code detection
├── clone_detector.py # Code clone detection
├── compliance.py # Security compliance mapping
├── explain.py # Method analysis engine
├── concurrency_core.py # Concurrency analysis (4 mixins)
├── field_sensitive_tracer.py # Field-sensitive taint analysis
├── callgraph/ # Call graph analysis (8 modules)
├── dataflow/ # Data flow analysis (12 modules)
│ └── taint/ # Taint propagation engine (7 modules)
├── autofix/ # Automated fix generation (5 modules)
└── patterns/ # Pattern engine (2 modules)
Module Architecture¶
graph TD
subgraph "Control Flow"
CFG[CFGAnalyzer] --> PatchCF[PatchControlFlowAnalyzer]
CFG --> Unreachable[UnreachableCodeDetector]
end
subgraph "Data Flow"
DFBase[BaseTracer] --> DFTracer[DataFlowTracer]
DFBase --> TypeProp[TypePropagator]
DFBase --> PtrAlias[PointerAliasAnalyzer]
end
subgraph "Taint Analysis"
TaintProp[TaintPropagator] --> FieldSensTracker[FieldSensitiveTracker]
TaintProp --> ContextSens[ContextSensitiveTracker]
TaintProp --> InterProc[InterProcTracker]
TaintProp --> DFTracker[DataflowTracker]
TaintProp --> SymExec[PathConstraintTracker]
DFTracer --> FieldTracer[FieldSensitiveTracer]
end
subgraph "Call Graph"
CGA[CallGraphAnalyzer] --> PathFind[PathFinder]
CGA --> Impact[ImpactAnalyzer]
CGA --> Centrality[CentralityAnalyzer]
CGA --> Components[ComponentAnalyzer]
CGA --> CrossLang[CrossLanguageAnalyzer]
CGA --> Complexity[ComplexityAnalyzer]
end
subgraph "Pattern + Autofix"
Patterns[LLMPatternGenerator] --> Bridge[PatternTaintBridge]
Bridge --> TaintProp
AutofixEng[AutofixEngine] --> AutofixGen[AutofixGenerator]
AutofixEng --> DiffVal[DiffValidator]
AutofixEng --> SSR[SSRAutofixBridge]
end
Module-to-Scenario Map¶
| Module | Subpackage | Type | Key Scenarios | DuckPGQ |
|---|---|---|---|---|
| CFGAnalyzer | root | analyzer | S05, S06, S13 (refactoring, performance) | No |
| UnreachableCodeDetector | root | analyzer | S05, S13 (dead code) | No |
| ASTCloneDetector | root | analyzer | S07, S13 (refactoring) | No |
| ComplianceMapper | root | mapper | S02, S08 (security, compliance) | No |
| ConcurrencyAnalyzer | root | analyzer | S16 (concurrency) | No |
| FieldSensitiveTracer | root | analyzer | S02, S08, S14 (security) | No |
| DataFlowTracer | dataflow | facade | S02, S14 (security, incident) | No |
| TypePropagator | dataflow | analyzer | S02 (type confusion) | No |
| PointerAliasAnalyzer | dataflow | analyzer | S02 (use-after-free) | No |
| MemoryLifetimeAnalyzer | dataflow | analyzer | S02, S14 (memory safety) | No |
| NullCheckAnalyzer | dataflow | analyzer | S02, S05 (null deref) | No |
| CodeStringTracer | dataflow | analyzer | S02 (code injection) | No |
| InfoDisclosureAnalyzer | dataflow | analyzer | S02 (info leak) | No |
| TaintPropagator | dataflow/taint | engine | S02, S14 (security) | No |
| CallGraphAnalyzer | callgraph | facade | S01, S12, S14 (onboarding, cross-repo) | Yes |
| PathFinder | callgraph | analyzer | S01, S14 (call chains) | Yes |
| CentralityAnalyzer | callgraph | analyzer | S05, S12 (hotspots) | Yes |
| ComponentAnalyzer | callgraph | analyzer | S12, S13 (components) | Yes |
| ImpactAnalyzer | callgraph | analyzer | S09, S14 (change impact) | No |
| CrossLanguageAnalyzer | callgraph | analyzer | S12 (cross-repo) | No |
| LLMPatternGenerator | patterns | generator | S21 (pattern search) | No |
| PatternTaintBridge | patterns | bridge | S02, S21 (security) | No |
| AutofixEngine | autofix | engine | S02 (security fix) | No |
| PatchControlFlowAnalyzer | patch_review | analyzer | S09 (patch review) | No |
Control Flow Analysis¶
CFGAnalyzer¶
File: src/analysis/cfg_analyzer.py
Scenarios: S05, S06, S13 (refactoring, performance, mass-refactoring)
CFG-based analysis using the edges_cfg table for accurate control flow analysis.
Key Classes¶
@dataclass
class CFGStructure:
"""Represents the CFG structure of a method."""
method_name: str
method_full_name: str
nodes: List[int]
edges: List[Tuple[int, int]] # (src, dst) pairs
entry_nodes: List[int]
exit_nodes: List[int]
node_count: int
edge_count: int
@dataclass
class CFGPath:
"""Represents an execution path through the CFG."""
path_id: str
nodes: List[int]
length: int
has_loop: bool = False
API Reference¶
CFGAnalyzer(cpg_service)¶
Initialize with CPGQueryService or DuckDB connection.
get_method_cfg(method_name: str) -> Optional[CFGStructure]¶
Get the CFG structure for a method.
from src.analysis.cfg_analyzer import CFGAnalyzer
analyzer = CFGAnalyzer(cpg_service)
cfg = analyzer.get_method_cfg("heap_insert")
print(f"Nodes: {cfg.node_count}, Edges: {cfg.edge_count}")
compute_cyclomatic_complexity(method_name: str) -> int¶
Calculate McCabe cyclomatic complexity: M = E - N + 2
complexity = analyzer.compute_cyclomatic_complexity("heap_insert")
print(f"Complexity: {complexity}") # e.g., 15
enumerate_paths(method_name: str, max_paths: int = 100, max_depth: int = 50) -> List[CFGPath]¶
Find execution paths through the CFG with cycle detection.
paths = analyzer.enumerate_paths("process_query", max_paths=50)
for path in paths:
print(f"Path {path.path_id}: {path.length} nodes, loop={path.has_loop}")
find_dominators(method_name: str) -> Dict[int, Set[int]]¶
Compute dominator tree using edges_dominate table.
find_post_dominators(method_name: str) -> Dict[int, Set[int]]¶
Compute post-dominator tree using edges_post_dominate table.
get_cfg_successors(node_id: int) -> List[int]¶
Get CFG successor nodes.
get_cfg_predecessors(node_id: int) -> List[int]¶
Get CFG predecessor nodes.
get_control_flow_paths(source_node: int, sink_node: int, max_depth: int = 20) -> List[List[int]]¶
Find all paths between two CFG nodes.
analyze_complexity_distribution(threshold: int = 10) -> Dict[str, Any]¶
Analyze complexity across all methods in the codebase.
dist = analyzer.analyze_complexity_distribution()
print(f"Average complexity: {dist['average']}")
print(f"High complexity methods: {dist['high_complexity_methods']}")
Database Tables Used¶
nodes_method— Method metadataedges_contains— Method-to-node containmentedges_cfg— CFG edges between nodesedges_dominate— Dominator relationshipsedges_post_dominate— Post-dominator relationships
Performance Notes¶
- Path enumeration bounded by
max_paths(default 100) andmax_depth(default 50) - Complexity distribution scans all methods — may be slow on large codebases (>50K methods)
UnreachableCodeDetector¶
File: src/analysis/cfg_unreachable.py
Scenarios: S05, S13 (dead code detection)
Detects unreachable code using CFG analysis — code after return, exit(), noreturn functions.
Key Classes¶
@dataclass
class UnreachableCodeFinding:
"""Represents a finding of unreachable code."""
method_name: str
method_id: int
filename: str
line_number: int
terminating_type: str # 'return', 'exit_call', 'error_call', 'noreturn'
terminating_node_id: int
terminating_line: int
unreachable_node_id: int
unreachable_code: Optional[str] = None
confidence: float = 0.9
API Reference¶
UnreachableCodeDetector(cpg_service)¶
Initialize with CPGQueryService instance.
detect_unreachable_code() -> List[UnreachableCodeFinding]¶
Find all unreachable code patterns in the codebase.
from src.analysis.cfg_unreachable import UnreachableCodeDetector
detector = UnreachableCodeDetector(cpg_service)
findings = detector.detect_unreachable_code()
for f in findings:
print(f"{f.filename}:{f.line_number} — unreachable after {f.terminating_type}")
Database Tables Used¶
edges_cfg— CFG edges to check successor reachabilitynodes_method— Method metadatanodes_return— Return statement nodes
PatchControlFlowAnalyzer¶
File: src/patch_review/analyzers/control_flow_analyzer.py
Scenarios: S09 (patch review)
Analyzes control flow impact of patches using CFGAnalyzer for accurate metrics.
Key Classes¶
@dataclass
class NewLoopFinding:
method_name: str
loop_type: str
line_number: int
is_nested: bool
has_io: bool
is_unbounded: bool
severity: Severity # HIGH, MEDIUM, LOW
details: str
@dataclass
class ErrorHandlingChange:
method_name: str
change_type: str
error_type: str
line_number: int
details: str
@dataclass
class BranchCoverageImpact:
new_branches: int
removed_branches: int
net_change: int
methods_with_new_branches: List[str]
uncovered_paths: List[Dict[str, Any]]
API Reference¶
PatchControlFlowAnalyzer(conn, delta_cpg=None)¶
Initialize with DuckDB connection and optional DeltaCPG.
analyze_control_flow_changes(patch, delta_cpg) -> ControlFlowAnalysisResult¶
Full control flow analysis of a patch.
from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer
analyzer = PatchControlFlowAnalyzer(conn)
result = analyzer.analyze_control_flow_changes(patch, delta_cpg)
print(f"Complexity delta: {result.complexity_delta}")
print(f"New loops: {len(result.new_loops)}")
analyze_complexity_change(changed_methods) -> List[ComplexityDelta]¶
Calculate complexity before/after for changed methods.
detect_new_loops(changed_methods) -> List[NewLoopFinding]¶
Detect newly introduced loops with risk classification.
analyze_error_handling_changes(changed_methods) -> List[ErrorHandlingChange]¶
Track changes to error handling code.
analyze_branch_coverage_impact(changed_methods) -> BranchCoverageImpact¶
Estimate branch coverage impact of changes.
Loop Severity Classification¶
- HIGH: Nested loops, loops with I/O, unbounded loops
- MEDIUM: Loops with external calls
- LOW: Simple bounded loops
Data Flow Analysis¶
DataFlowTracer¶
File: src/analysis/dataflow/tracer.py
Scenarios: S02, S14 (security, incident response)
Main data flow analysis using REACHING_DEF edges. Extends BaseTracer (src/analysis/dataflow/base.py).
Key Classes¶
@dataclass
class DataFlowPath:
"""A data flow path from definition to use."""
path_id: str
variable_name: str
source_location: Dict[str, Any]
sink_location: Dict[str, Any]
path_length: int
intermediate_nodes: List[Dict[str, Any]] = field(default_factory=list)
is_inter_procedural: bool = False
sanitization_points: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class VariableFlow:
"""All flows of a variable across the codebase."""
variable_name: str
definition_points: List[Dict[str, Any]] = field(default_factory=list)
use_points: List[Dict[str, Any]] = field(default_factory=list)
flows: List[DataFlowPath] = field(default_factory=list)
API Reference¶
DataFlowTracer(cpg_service)¶
Initialize with CPGQueryService.
trace_variable(variable_name: str, method_name: Optional[str] = None, max_depth: Optional[int] = None) -> VariableFlow¶
Trace all flows of a variable using REACHING_DEF edges.
from src.analysis.dataflow.tracer import DataFlowTracer
tracer = DataFlowTracer(cpg_service)
flow = tracer.trace_variable("user_input", method_name="process_request")
print(f"Definitions: {len(flow.definition_points)}, Uses: {len(flow.use_points)}")
find_reaching_definitions(node_id: int) -> List[Dict]¶
Find all definitions reaching a use site.
find_variable_uses(node_id: int) -> List[Dict]¶
Find all uses of a definition.
trace_inter_procedural(variable_name: str, max_depth: int = 5) -> List[DataFlowPath]¶
Trace data flow across function boundaries.
Database Tables Used¶
nodes_identifier— Variable referencesedges_reaching_def— Definition-use chainsedges_argument— Function argument edges
TypePropagator¶
File: src/analysis/dataflow/type_propagator.py
Scenarios: S02 (type confusion vulnerabilities)
Tracks type transformations along data flow paths (casts, promotions, truncations).
Key Classes¶
@dataclass
class TypeTransformation:
"""A type transformation at a specific point."""
node_id: int
from_type: str
to_type: str
transformation_kind: str # 'cast', 'promotion', 'truncation', 'reinterpret'
is_safe: bool
line_number: int
@dataclass
class TypeFlow:
"""Type propagation along a data flow path."""
variable_name: str
initial_type: str
final_type: str
transformations: List[TypeTransformation]
has_unsafe_cast: bool
has_truncation: bool
API Reference¶
TypePropagator(cpg_service)¶
trace_type_flow(variable_name: str, method_name: Optional[str] = None) -> TypeFlow¶
Track type changes along a variable’s data flow.
find_unsafe_casts(limit: int = 100) -> List[TypeTransformation]¶
Find potentially unsafe type casts.
PointerAliasAnalyzer¶
File: src/analysis/dataflow/pointer_alias.py
Scenarios: S02 (use-after-free, double-free)
Pointer alias analysis for tracking which pointers may refer to the same memory.
Key Classes¶
@dataclass
class AllocationSite:
"""Represents a memory allocation site."""
node_id: int
function_name: str # malloc, calloc, etc.
variable_name: str
line_number: int
API Reference¶
PointerAliasAnalyzer(cpg_service)¶
find_aliases(variable_name: str) -> List[str]¶
Find all variables that may alias the given pointer.
trace_allocation_lifetime(alloc_site: AllocationSite) -> Dict¶
Track an allocation from creation to deallocation.
PathConstraintTracker¶
File: src/analysis/dataflow/symbolic_execution.py
Scenarios: S02 (path feasibility in taint analysis)
Lightweight symbolic execution for checking path feasibility using Z3 solver.
Key Classes¶
class SymbolicExecutionConfig:
"""Configuration for symbolic execution engine."""
enabled: bool = True
max_constraints: int = 20
solver_timeout_ms: int = 500
solver_timeout_uf_ms: int = 2000
max_parse_depth: int = 10
enable_function_models: bool = True
enable_arithmetic: bool = True
@dataclass
class PathConstraint:
"""A constraint on an execution path."""
node_id: int
condition: str
is_true_branch: bool
variables: List[str]
class PathConstraintTracker:
"""Tracks path constraints for feasibility analysis."""
API Reference¶
PathConstraintTracker(cpg_service)¶
check_path_feasibility(path_nodes: List[int]) -> bool¶
Check whether a data flow path is feasible given branch conditions.
Taint Analysis¶
TaintPropagator¶
File: src/analysis/dataflow/taint/propagator.py
Scenarios: S02, S14 (security analysis, incident response)
Main engine for taint analysis. Composes all taint sub-modules into a unified pipeline.
Key Classes¶
@dataclass
class TaintNode:
"""A node in a taint path."""
node_id: int
name: str
code: str
line_number: int
filename: str
node_type: str
@dataclass
class TaintPath:
"""A complete taint path from source to sink."""
source: TaintNode
sink: TaintNode
intermediate: List[TaintNode]
path_length: int
confidence: float
sink_category: str
is_sanitized: bool = False
sanitization_point: Optional[TaintNode] = None
API Reference¶
TaintPropagator(cpg_service, enable_inter_proc=True, enable_control_flow=True, enable_field_sensitive=True, enable_context_sensitive=True, enable_symbolic_execution=True)¶
Initialize with CPG service and optional feature toggles.
from src.analysis.dataflow.taint.propagator import TaintPropagator
propagator = TaintPropagator(cpg_service)
paths = propagator.find_taint_paths(
sources=["getenv", "fgets", "read"],
sinks=["system", "exec", "popen"]
)
for path in paths:
print(f"Source: {path.source.name} -> Sink: {path.sink.name}")
print(f"Confidence: {path.confidence}, Sanitized: {path.is_sanitized}")
find_taint_paths(sources, sinks, max_depth=None) -> List[TaintPath]¶
Find taint paths from source functions to sink functions.
analyze_sql_injections() -> List[TaintPath]¶
Convenience method for SQL injection detection.
Sub-modules¶
| Sub-module | File | Purpose |
|---|---|---|
DataflowTracker |
tracker.py |
Core BFS-based taint propagation |
InterProcTracker |
interprocedural.py |
Cross-function call tracking |
FieldSensitiveTracker |
field_sensitive.py |
Per-field taint tracking |
ContextSensitiveTracker |
context_sensitive.py |
Call-site context awareness |
ControlFlowAnalyzer |
control_flow.py |
Control-dependency analysis |
FieldSensitiveTracer¶
File: src/analysis/field_sensitive_tracer.py
Scenarios: S02, S08, S14 (security, compliance, incident response)
Top-level field-path tracking for precise taint analysis. Distinguishes between different fields of the same object (e.g., user.password vs user.name).
Note: Not to be confused with
FieldSensitiveTracker(dataflow/taint/field_sensitive.py), which is a lower-level taint tracker used internally byTaintPropagator.
Key Classes¶
@dataclass
class FieldPath:
"""Represents a field access path like obj.field1.field2."""
base_variable: str
field_chain: List[str]
full_path: str
node_ids: List[int] = field(default_factory=list)
type_full_name: Optional[str] = None
@classmethod
def from_code(cls, code: str) -> "FieldPath": ...
def matches(self, other: "FieldPath") -> Tuple[bool, str]: ...
@dataclass
class FieldAccess:
"""Represents a single field access in code."""
node_id: int
base_variable: str
field_name: str
access_code: str
line_number: int
filename: str
access_type: str = "read" # 'read', 'write', 'call'
containing_method: Optional[str] = None
@dataclass
class FieldSensitiveFlow:
"""A dataflow path with field sensitivity."""
source_path: FieldPath
sink_path: FieldPath
intermediate_fields: List[FieldPath]
is_tainted: bool
relationship: str # 'exact', 'prefix', 'suffix', 'propagated'
confidence: float = 1.0
API Reference¶
FieldSensitiveTracer(cpg_service)¶
Initialize with CPGQueryService or DuckDB connection.
parse_field_path(code: str) -> FieldPath¶
Parse field access string into structured FieldPath.
from src.analysis.field_sensitive_tracer import FieldSensitiveTracer
tracer = FieldSensitiveTracer(cpg_service)
path = tracer.parse_field_path("user->password")
print(path.base_variable) # "user"
print(path.field_chain) # ["password"]
path = tracer.parse_field_path("request.data.buffer")
print(path.full_path) # "request.data.buffer"
get_struct_fields(type_name: str) -> List[Dict[str, Any]]¶
Get fields defined in a struct with type information.
fields = tracer.get_struct_fields("UserData")
for field in fields:
print(f"{field['name']}: {field['type']}")
find_field_accesses(base_variable: str, field_name: Optional[str] = None) -> List[FieldAccess]¶
Find all accesses to a specific field.
accesses = tracer.find_field_accesses("user", "password")
for access in accesses:
print(f"{access.filename}:{access.line_number} - {access.access_type}")
find_all_field_identifiers(field_name: Optional[str] = None, limit: int = 100) -> List[Dict]¶
Find all field identifier nodes, optionally filtered by name.
trace_field_taint(source_variable: str, source_field: Optional[str] = None, sink_patterns: Optional[List[str]] = None, max_depth: int = 10) -> List[FieldSensitiveFlow]¶
Trace taint from a source field to sink functions.
flows = tracer.trace_field_taint(
source_variable="credentials",
source_field="password",
sink_patterns=["printf", "log", "send"]
)
for flow in flows:
print(f"Tainted flow: {flow.source_path.full_path} -> {flow.sink_path.full_path}")
find_sensitive_field_flows(sensitive_fields: List[str] = None, sink_functions: List[str] = None) -> List[Dict[str, Any]]¶
Find flows from sensitive fields to dangerous sinks.
# Default sensitive fields: password, token, secret, private_key, credential, auth
flows = tracer.find_sensitive_field_flows()
print(f"Found {len(flows)} potential sensitive data exposures")
Sensitive Field Categories¶
Default sensitive fields tracked:
- password, passwd, pwd
- token, auth_token, access_token
- secret, api_secret, client_secret
- private_key, secret_key
- credential, credentials
- auth, authorization
Database Tables Used¶
nodes_field_identifier— Field access nodesnodes_identifier— Variable identifiersnodes_member— Struct member definitionsedges_reaching_def— Reaching definition edgesedges_argument— Function argument edges
FieldSensitiveTracker¶
File: src/analysis/dataflow/taint/field_sensitive.py
Scenarios: S02, S14 (internal use by TaintPropagator)
Low-level per-field taint tracking used internally by TaintPropagator. Tracks which specific fields of an object are tainted vs clean.
Key Classes¶
@dataclass
class FieldTaint:
"""Taint status of a single field."""
field_name: str
is_tainted: bool
source_node: Optional[int] = None
class FieldTaintMap:
"""Maps object fields to their taint status."""
def set_taint(self, base_var: str, field: str, tainted: bool): ...
def is_tainted(self, base_var: str, field: str) -> bool: ...
def get_tainted_fields(self, base_var: str) -> List[str]: ...
ContextSensitiveTracker¶
File: src/analysis/dataflow/taint/context_sensitive.py
Scenarios: S02 (precision improvement in taint analysis)
Tracks call-site context to avoid false positives when the same function is called with different taint states.
API Reference¶
ContextSensitiveTracker(cpg_service)¶
track_call_context(caller_id: int, callee_id: int) -> CallContext¶
Create a call context for taint propagation.
InterProcTracker¶
File: src/analysis/dataflow/taint/interprocedural.py
Scenarios: S02, S14 (cross-function taint tracking)
Tracks taint propagation across function call boundaries via parameter passing and return values.
API Reference¶
InterProcTracker(cpg_service)¶
find_callers(method_name: str) -> List[Dict]¶
Find all call sites for a method.
map_arguments(call_site_id: int) -> Dict[int, int]¶
Map actual arguments to formal parameters.
Security-Specific Analyzers¶
MemoryLifetimeAnalyzer¶
File: src/analysis/dataflow/memory_lifetime.py
Scenarios: S02 (use-after-free, double-free, memory leaks)
Tracks memory allocation/deallocation patterns to detect lifetime violations.
Key Classes¶
class MemoryState(Enum):
ALLOCATED = "allocated"
FREED = "freed"
UNKNOWN = "unknown"
@dataclass
class MemoryOperation:
node_id: int
operation: str # 'alloc', 'free', 'realloc'
function_name: str
variable_name: str
line_number: int
filename: str
@dataclass
class UseAfterFreePath:
alloc: MemoryOperation
free: MemoryOperation
use: MemoryOperation
path_length: int
@dataclass
class DoubleFreeePath:
alloc: MemoryOperation
first_free: MemoryOperation
second_free: MemoryOperation
API Reference¶
MemoryLifetimeAnalyzer(cpg_service)¶
find_use_after_free(limit: int = 50) -> List[UseAfterFreePath]¶
Detect use-after-free patterns.
find_double_free(limit: int = 50) -> List[DoubleFreeePath]¶
Detect double-free patterns.
find_memory_leaks(limit: int = 100) -> List[Dict]¶
Find allocations without corresponding frees.
NullCheckAnalyzer¶
File: src/analysis/dataflow/null_check.py
Scenarios: S02, S05 (null dereference)
Detects missing null checks after functions that can return NULL.
Key Classes¶
@dataclass
class NullCheckPath:
function_name: str
return_variable: str
dereference_line: int
dereference_file: str
has_null_check: bool
check_line: Optional[int] = None
API Reference¶
NullCheckAnalyzer(cpg_service)¶
find_missing_null_checks(limit: int = 100) -> List[NullCheckPath]¶
Find dereferences of values that may be NULL without prior checks.
CodeStringTracer¶
File: src/analysis/dataflow/code_string_tracer.py
Scenarios: S02 (code injection)
Traces string construction patterns to detect code injection (SQL, command, eval).
Key Classes¶
@dataclass
class CodeInjectionPoint:
function_name: str
string_variable: str
injection_type: str # 'sql', 'command', 'eval'
line_number: int
filename: str
user_input_source: Optional[str] = None
API Reference¶
CodeStringTracer(cpg_service)¶
find_string_concat_injections(sink_functions: List[str], limit: int = 50) -> List[CodeInjectionPoint]¶
Find string concatenation patterns that feed into sensitive sinks.
InfoDisclosureAnalyzer¶
File: src/analysis/dataflow/info_disclosure.py
Scenarios: S02 (information leakage)
Detects flows of sensitive data to output/logging functions.
API Reference¶
InfoDisclosureAnalyzer(cpg_service)¶
find_info_leaks(sensitive_patterns: List[str] = None, limit: int = 50) -> List[InfoDisclosurePath]¶
Find sensitive data flowing to output functions.
ComplianceMapper¶
File: src/analysis/compliance.py
Scenarios: S02, S08 (security compliance reporting)
Maps detected vulnerabilities to security standards (CWE, OWASP, CERT C, MISRA C).
Key Classes¶
@dataclass
class ComplianceFinding:
vulnerability_type: str
severity: str # 'critical', 'high', 'medium', 'low'
cwe_id: str
cwe_name: str
owasp_category: Optional[str]
cert_rule: Optional[str]
misra_rule: Optional[str]
file_path: str
line_number: int
function_name: str
description: str
recommendation: str
risk_score: float
@dataclass
class ComplianceReport:
scan_date: str
codebase: str
total_findings: int
findings_by_severity: Dict[str, int]
findings_by_cwe: Dict[str, int]
findings_by_owasp: Dict[str, int]
compliance_score: float # 0.0-1.0
findings: List[ComplianceFinding]
standards_used: List[str]
API Reference¶
ComplianceMapper()¶
Initialize (no external dependencies).
map_vulnerability(vuln_type, severity, file_path, line_number, function_name, description, risk_score) -> ComplianceFinding¶
Map a vulnerability to compliance standards.
from src.analysis.compliance import ComplianceMapper
mapper = ComplianceMapper()
finding = mapper.map_vulnerability(
vuln_type="sql_injection",
severity="high",
file_path="src/api.c",
line_number=42,
function_name="process_query",
description="User input concatenated into SQL query",
risk_score=8.5
)
print(f"{finding.cwe_id}: {finding.cwe_name}")
# CWE-89: Improper Neutralization of Special Elements used in an SQL Command
Call Graph Analysis¶
CallGraphAnalyzer¶
File: src/analysis/callgraph/analyzer.py
Scenarios: S01, S09, S12, S14 (onboarding, patch review, cross-repo, incident response)
Main facade composing all call graph analysis modules. Delegates to specialized sub-analyzers.
Architecture¶
CallGraphAnalyzer
├── PathFinder — shortest path, callers/callees
├── CentralityAnalyzer — PageRank, betweenness
├── ComponentAnalyzer — SCC, WCC components
├── ComplexityAnalyzer — CFG-based complexity
├── ImpactAnalyzer — change impact, entry points, attack paths
└── CrossLanguageAnalyzer — FFI boundary detection
API Reference¶
CallGraphAnalyzer(cpg_service)¶
find_shortest_path(source_method: str, target_method: str, max_depth: Optional[int] = None) -> Optional[CallPath]¶
Find shortest call chain between two methods.
from src.analysis.callgraph.analyzer import CallGraphAnalyzer
cga = CallGraphAnalyzer(cpg_service)
path = cga.find_shortest_path("handle_request", "exec_query")
if path:
print(f"Path length: {path.length}")
print(f"Methods: {' -> '.join(path.methods)}")
find_all_callers(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]¶
Find all methods that call the given method.
find_all_callees(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]¶
Find all methods called by the given method.
find_cross_language_calls(method_name=None, source_language=None, target_language=None) -> List[CrossLanguageCall]¶
Find cross-language call edges (FFI boundaries).
detect_cycles(max_cycle_length: int = 10) -> List[CallCycle]¶
Detect recursion cycles using Tarjan’s SCC algorithm.
analyze_impact(method_name: str, max_depth: Optional[int] = None) -> ImpactAnalysis¶
Analyze change impact (transitive callers, affected components).
get_call_statistics() -> Dict[str, Any]¶
Overall call graph statistics (total methods, calls, fan-out).
find_entry_points(method_name: str, max_depth: int = 10) -> List[Dict]¶
Find public API entry points that can reach a method.
trace_attack_paths(entry_points, vuln_method, vuln_file="", vuln_line=0, max_paths=5) -> List[AttackPath]¶
Trace attack paths from entry points to a vulnerability.
compute_pagerank(top_n: Optional[int] = None) -> List[Dict]¶
Compute PageRank scores for method importance ranking.
compute_betweenness_centrality(sample_size=None, top_n=None) -> List[Dict]¶
Compute betweenness centrality for bridge method identification.
find_hotspots(min_in_degree: int = 3, limit: int = 25) -> List[Dict]¶
Find performance hotspots (methods with high caller count).
Key Data Models¶
@dataclass
class CallPath:
source: str
target: str
methods: List[str]
length: int
@dataclass
class CallCycle:
methods: List[str]
length: int
is_direct: bool
@dataclass
class ImpactAnalysis:
method_name: str
direct_callers: List[str]
transitive_callers: List[str]
affected_files: List[str]
impact_score: float
@dataclass
class AttackPath:
entry_point: str
vulnerability: str
path: List[str]
risk_amplification: float
@dataclass
class CrossLanguageCall:
caller: str
callee: str
caller_language: str
callee_language: str
call_type: str # 'ffi', 'cgo', 'jni', etc.
Sub-analyzers¶
| Module | File | Key Methods | DuckPGQ |
|---|---|---|---|
| PathFinder | callgraph/pathfinding.py |
find_shortest_path, find_all_callers, find_all_callees | No |
| PGQPathFinder | callgraph/pathfinding.py |
Same API, uses DuckPGQ MATCH syntax |
Yes |
| CentralityAnalyzer | callgraph/centrality.py |
compute_pagerank, compute_betweenness_centrality | No |
| PGQCentralityAnalyzer | callgraph/centrality.py |
Same API, uses DuckPGQ graph queries | Yes |
| ComponentAnalyzer | callgraph/components.py |
compute_scc, compute_wcc | No |
| PGQComponentAnalyzer | callgraph/components.py |
Same API, DuckPGQ-accelerated | Yes |
| ComplexityAnalyzer | callgraph/complexity.py |
compute_cyclomatic_complexity (CFG-based) | No |
| ImpactAnalyzer | callgraph/impact.py |
analyze_impact, find_entry_points, trace_attack_paths | No |
| CrossLanguageAnalyzer | callgraph/cross_language.py |
find_cross_language_calls | No |
Concurrency Analysis¶
ConcurrencyAnalyzer¶
File: src/analysis/concurrency_core.py
Scenarios: S16 (concurrency analysis)
Composed from 4 specialized mixins. Lock/shared-memory patterns loaded from domain plugin.
class ConcurrencyAnalyzer(
LockAnalyzerMixin,
RaceDetectorMixin,
SharedAccessAnalyzerMixin,
AtomicOperationsAnalyzerMixin,
): ...
API Reference¶
ConcurrencyAnalyzer(cpg_service)¶
Access all methods from the 4 mixins below.
from src.analysis.concurrency_core import ConcurrencyAnalyzer
analyzer = ConcurrencyAnalyzer(cpg_service)
races = analyzer.detect_race_conditions()
locks = analyzer.find_lock_usage(lock_type="lwlock")
stats = analyzer.get_concurrency_statistics()
Data Models¶
@dataclass
class LockUsage:
function_name: str
lock_type: str
lock_name: Optional[str]
operation: str
file_name: str
line_number: int
@dataclass
class RaceConditionPattern:
pattern_id: str
pattern_type: str # 'toctou', 'unprotected_access', 'signal_handler', 'double_check'
affected_functions: List[str]
shared_resource: str
severity: str
description: str
@dataclass
class SharedAccess:
variable_name: str
accessor_functions: List[str]
access_type: str
is_protected: bool
protecting_lock: Optional[str]
@dataclass
class LockOrderViolation:
violation_id: str
lock_a: str
lock_b: str
function_acquiring_a_then_b: str
function_acquiring_b_then_a: str
risk_level: str
LockAnalyzerMixin¶
File: src/analysis/lock_analyzer.py
find_lock_usage(lock_type=None, function_name=None, limit=None) -> List[LockUsage]detect_lock_ordering_issues(limit=None) -> List[LockOrderViolation]detect_potential_deadlocks(limit=None) -> List[Dict]analyze_lock_graph() -> Dict[str, Any]get_lock_statistics() -> Dict[str, Any]
RaceDetectorMixin¶
File: src/analysis/race_detector.py
detect_race_conditions(pattern_types=None, limit=None) -> List[RaceConditionPattern]
Pattern types: toctou, signal_handler, unprotected_access, double_check.
SharedAccessAnalyzerMixin¶
File: src/analysis/shared_access_analyzer.py
analyze_shared_access(variable_pattern=None, limit=None) -> List[SharedAccess]
AtomicOperationsAnalyzerMixin¶
File: src/analysis/atomic_operations_analyzer.py
find_atomic_operations(limit: int = 100) -> List[Dict]find_condition_variables(limit: int = 50) -> List[Dict]analyze_function_concurrency(function_name: str) -> Dictget_concurrency_statistics() -> Dict[str, Any]
Clone Detection¶
ASTCloneDetector¶
File: src/analysis/clone_detector.py
Scenarios: S07, S13 (refactoring, duplicate detection)
Multi-level AST-based clone detection: - Type-1: Exact clones (identical code) - Type-2: Renamed clones (identifier changes only) - Type-3: Structural clones (similar structure with modifications) - Type-4: Semantic clones (different code, same behavior)
Key Classes¶
@dataclass
class CloneResult:
method1_id: int
method1_name: str
method1_file: str
method2_id: int
method2_name: str
method2_file: str
similarity: float
clone_type: str # 'exact', 'renamed', 'structural', 'semantic'
shared_patterns: List[str] = field(default_factory=list)
line_count1: int = 0
line_count2: int = 0
API Reference¶
ASTCloneDetector(cpg_service)¶
detect_clones(min_similarity=None, category=None, max_methods=None, min_lines=None) -> List[CloneResult]¶
Detect code clones across the codebase.
from src.analysis.clone_detector import ASTCloneDetector
detector = ASTCloneDetector(cpg_service)
clones = detector.detect_clones(min_similarity=0.8, min_lines=10)
for clone in clones:
print(f"{clone.method1_name} ≈ {clone.method2_name} ({clone.clone_type}, {clone.similarity:.0%})")
detect_clones_for_category(category: str, min_similarity: Optional[float] = None) -> List[CloneResult]¶
Detect clones within a specific category (e.g., “null_check”, “string_operations”).
Performance Notes¶
- Pairwise comparison: O(n²) where n = number of methods
- Use
max_methodsto limit scope on large codebases - Use
min_linesto skip trivial methods
Pattern Engine¶
LLMPatternGenerator¶
File: src/analysis/patterns/llm_pattern_generator.py
Scenarios: S21 (pattern search)
LLM-assisted generation of YAML pattern rules from natural language descriptions.
Key Classes¶
@dataclass
class GeneratedRule:
"""Result of LLM rule generation."""
yaml_text: str
rule_id: str
language: str
has_fix: bool
validated: bool
validation_errors: List[str] = field(default_factory=list)
generation_attempts: int = 1
API Reference¶
LLMPatternGenerator()¶
Initialize (loads prompt language from global registry).
async generate_rule(description: str, language: str, domain: Optional[str] = None, with_fix: bool = True, max_retries: int = 3, examples: Optional[List[str]] = None) -> GeneratedRule¶
Generate a YAML rule from a natural language description. Async method.
from src.analysis.patterns.llm_pattern_generator import LLMPatternGenerator
generator = LLMPatternGenerator()
rule = await generator.generate_rule(
description="Find malloc calls without matching free",
language="c",
domain="linux_kernel"
)
print(rule.rule_id) # e.g., "malloc-without-free"
print(rule.validated) # True if passed gocpg validate-rule
print(rule.yaml_text) # Full YAML rule content
PatternTaintBridge¶
File: src/analysis/patterns/taint_bridge.py
Scenarios: S02, S21 (security, pattern search)
Bridges structural pattern matches with taint analysis. Enriches pattern findings with data flow information.
API Reference¶
PatternTaintBridge(cpg_service, taint_propagator=None)¶
Initialize with CPG service and optional TaintPropagator (falls back to lightweight SQL BFS).
async enrich_findings_with_taint(findings: List[Dict]) -> List[Dict]¶
Add taint path information to pattern findings. Async method.
from src.analysis.patterns.taint_bridge import PatternTaintBridge
bridge = PatternTaintBridge(cpg_service, taint_propagator)
enriched = await bridge.enrich_findings_with_taint(pattern_findings)
for finding in enriched:
if finding.get("taint_enriched"):
print(f"Finding {finding['rule_id']} has {len(finding['taint_paths'])} taint paths")
Autofix Engine¶
AutofixEngine¶
File: src/analysis/autofix/engine.py
Scenarios: S02 (automated security fix generation)
Orchestrates autofix generation: template-based fixes first, LLM fallback, then validation.
Pipeline¶
TaintPath → Parse Location → Read Source → Infer Vuln Type
→ Try Template Fix → (if None) LLM Fallback → Validate Diff → AutofixResult
Key Classes¶
@dataclass
class AutofixResult:
fix: FixSuggestion
strategy: str # "template" or "llm"
validated: bool
validation: Optional[ValidationResult] = None
taint_path: Optional[TaintPath] = None
cwe_id: str = ""
API Reference¶
AutofixEngine(source_root: str = "", dry_run: bool = True)¶
Initialize with source root directory. dry_run=True generates diffs without applying.
generate_fixes(taint_paths: List[TaintPath], vulnerability_type: str = "") -> List[AutofixResult]¶
Generate fixes for a list of taint paths.
from src.analysis.autofix.engine import AutofixEngine
engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths, vulnerability_type="sql_injection")
for result in results:
print(f"Strategy: {result.strategy}, Validated: {result.validated}")
print(f"Diff:\n{result.fix.diff_patch}")
AutofixGenerator¶
File: src/analysis/autofix/generator.py
Template-based fix generation using regex patterns for known vulnerability types.
@dataclass
class FixSuggestion:
vulnerability_type: str
severity: str
file_path: str
line_number: int
original_code: str
fixed_code: str
explanation: str
confidence: float # 0.0-1.0
diff_patch: str # Unified diff format
@dataclass
class FixTemplate:
vuln_type: str
name: str
description: str
pattern: str # Regex pattern
replacement: str # Replacement template
explanation: str
AutofixGenerator()— loads templates from domain plugingenerate_fix(code, vuln_type, context) -> Optional[FixSuggestion]
DiffValidator¶
File: src/analysis/autofix/diff_validator.py
Validates that generated fixes are safe to apply.
@dataclass
class ValidationResult:
valid: bool
error: Optional[str] = None
patched_content: Optional[str] = None
DiffValidator()—MAX_CHANGE_RATIO = 0.5validate(original_code, fixed_code, file_path, source_root) -> ValidationResult
PromptBuilder¶
File: src/analysis/autofix/prompt_builder.py
Builds LLM prompts for autofix generation when templates don’t match.
@dataclass
class AutofixPromptContext:
vulnerable_code: str
vulnerability_type: str
cwe_id: str
taint_path_summary: str
language: str
PromptBuilder()— usesconfig/prompts/autofix/build_prompt(context: AutofixPromptContext) -> str
SSRAutofixBridge¶
File: src/analysis/autofix/ssr_bridge.py
Bridge between autofix engine and GoCPG Structural Search & Replace (SSR).
SSRAutofixBridge(gocpg_client)apply_ssr_rule(rule_id: str, file_path: str) -> Optional[FixSuggestion]
Patch Review Analyzers¶
Additional analyzers in src/patch_review/analyzers/ that extend base analysis modules for patch-specific impact assessment.
PatchCallGraphAnalyzer¶
File: src/patch_review/analyzers/call_graph_analyzer.py
Scenarios: S09 (patch review)
Analyzes call graph impact of patch changes — blast radius, breaking changes, ripple effect.
@dataclass
class CallGraphNode:
method_name: str
full_name: str
filename: str
callers: List[str] = field(default_factory=list)
callees: List[str] = field(default_factory=list)
is_changed: bool = False
change_type: Optional[ChangeType] = None
PatchCallGraphAnalyzer(conn, delta_cpg=None)analyze_call_graph_changes(patch, delta_cpg) -> CallGraphAnalysisResultcompute_blast_radius(changed_methods) -> BlastRadiusdetect_breaking_changes(changed_methods) -> List[BreakingChange]compute_ripple_effect(changed_methods) -> RippleEffect
PatchDataFlowAnalyzer¶
File: src/patch_review/analyzers/dataflow_analyzer.py
Scenarios: S09 (patch review)
Analyzes data flow impact of patches — new taint paths, sanitization bypasses, sensitive data flows.
@dataclass
class DataFlowChange:
change_type: str
source_method: str
affected_variable: str
severity: Severity
PatchDataFlowAnalyzer(conn, delta_cpg=None)analyze_dataflow_changes(patch, delta_cpg) -> DataFlowAnalysisResult
PatchDependencyAnalyzer¶
File: src/patch_review/analyzers/dependency_analyzer.py
Scenarios: S09 (patch review)
Analyzes how patches affect module dependencies, import relationships, and architectural coupling.
class DependencyChangeType(Enum):
ADDED = "added"
REMOVED = "removed"
MODIFIED = "modified"
CIRCULAR_INTRODUCED = "circular_introduced"
LAYER_VIOLATION = "layer_violation"
@dataclass
class DependencyChange:
change_type: DependencyChangeType
source_module: str
target_module: str
source_file: str
@dataclass
class CircularDependency:
cycle_path: List[str]
introduced_edge: Tuple[str, str]
severity: Severity
@dataclass
class LayerViolation:
source_layer: str
target_layer: str
source_module: str
target_module: str
@dataclass
class CouplingMetrics:
afferent_coupling: int
efferent_coupling: int
instability: float
@dataclass
class DependencyAnalysisResult:
dependency_changes: List[DependencyChange]
circular_dependencies: List[CircularDependency]
layer_violations: List[LayerViolation]
coupling_metrics: Dict[str, CouplingMetrics]
PatchDependencyAnalyzer(conn, delta_cpg=None)analyze_dependency_changes(patch, delta_cpg) -> DependencyAnalysisResult
Additional Data Flow Modules¶
RaceConditionAnalyzer (dataflow)¶
File: src/analysis/dataflow/race_condition.py
Scenarios: S02, S16 (security, concurrency)
Detects TOCTOU vulnerabilities and data races using data flow analysis with control dependency checking.
@dataclass
class RaceConditionPath:
race_type: str # 'toctou', 'data_race', 'missing_lock'
check_location: str
check_function: str
use_location: str
use_function: str
shared_resource: str
has_lock: bool
lock_function: Optional[str]
path_nodes: List[TaintNode]
confidence: float
risk_score: float
RaceConditionAnalyzer(cpg_service)analyze(max_paths=100, max_hops=None, min_confidence=0.7) -> List[RaceConditionPath]
Data Model Modules¶
The following modules define shared data models used across analysis subpackages:
| Module | File | Key Classes |
|---|---|---|
| Dataflow Models | src/analysis/dataflow/models.py |
DataFlowPath, VariableFlow |
| Taint Models | src/analysis/dataflow/taint/models.py |
TaintNode, TaintPath, ControlDependency, CallContext |
| Concurrency Models | src/analysis/concurrency_dataclasses.py |
LockUsage, RaceConditionPattern, SharedAccess, LockOrderViolation |
| Call Graph Models | src/analysis/callgraph/models.py |
CallPath, CallCycle, ImpactAnalysis, AttackPath, CrossLanguageCall |
| Call Graph Base | src/analysis/callgraph/base.py |
BaseAnalyzer (abstract base for all callgraph sub-analyzers) |
| Complexity Config | src/analysis/callgraph/complexity.py |
ComplexityAnalyzer, DefaultThresholds |
Database Schema Reference¶
Key Tables for Analysis¶
| Table | Purpose |
|---|---|
nodes_method |
Method definitions |
nodes_control_structure |
Control flow structures (if, for, while) |
nodes_field_identifier |
Field access expressions |
nodes_identifier |
Variable references |
nodes_call |
Function call sites |
nodes_return |
Return statements |
nodes_member |
Struct member definitions |
edges_cfg |
Control flow graph edges |
edges_contains |
Containment relationships |
edges_dominate |
Dominator relationships |
edges_post_dominate |
Post-dominator relationships |
edges_reaching_def |
Reaching definition edges |
edges_argument |
Function argument edges |
edges_call |
Call graph edges |
Edge Types for Dataflow¶
| Edge Type | Table | Purpose |
|---|---|---|
| CFG | edges_cfg |
Control flow between statements |
| REACHING_DEF | edges_reaching_def |
Definition-use chains |
| ARGUMENT | edges_argument |
Function call arguments |
| CONTAINS | edges_contains |
Scope containment |
| CALL | edges_call |
Function call relationships |
| DOMINATE | edges_dominate |
Dominance for control dependency |
Troubleshooting¶
“Method not found”¶
The method name must match exactly (case-sensitive). Use the simple name, not the full qualified name.
# Correct
cfg = analyzer.get_method_cfg("heap_insert")
# Incorrect
cfg = analyzer.get_method_cfg("heap_insert(Relation, HeapTuple)")
“No CFG data found”¶
Ensure the CPG export included CFG edges. Check:
SELECT COUNT(*) FROM edges_cfg;
“No field accesses found”¶
Field access tracking requires nodes_field_identifier data:
SELECT COUNT(*) FROM nodes_field_identifier;
“No taint paths found”¶
Check that REACHING_DEF edges exist and source/sink functions are present:
SELECT COUNT(*) FROM edges_reaching_def;
SELECT DISTINCT name FROM nodes_call WHERE name IN ('system', 'exec', 'popen');
Performance Considerations¶
- Path enumeration bounded by
max_pathsandmax_depthto prevent explosion - Use
max_depthto limit taint tracking depth (default from config) - Clone detection is O(n²) — use
max_methodson large codebases - DuckPGQ variants (
PGQ*classes) are faster for graph traversals - Large methods may have many paths; consider sampling
Method Explanation Engine¶
Module: src/analysis/explain.py
Comprehensive method analysis engine that aggregates CPG metrics, call graph data, taint information, and domain context into a single result. Used by MCP tool codegraph_explain and CLI.
ExplainResult¶
Aggregated method analysis result (dataclass).
| Field | Type | Description |
|---|---|---|
method_name |
str |
Short method name |
full_name |
str |
Fully qualified name |
file_path |
str |
Source file path |
line_start |
int |
Start line number |
line_end |
int |
End line number |
line_count |
int |
Number of lines |
signature |
str |
Method signature (first line of code) |
cyclomatic_complexity |
int |
Cyclomatic complexity score |
risk_level |
str |
Risk level: low (<10), moderate (10–19), high (20–49), critical (≥50) |
fan_in |
int |
Number of callers |
fan_out |
int |
Number of callees |
direct_callers |
List[str] |
Direct caller names |
transitive_caller_count |
int |
Total transitive callers up to depth |
direct_callees |
List[str] |
Direct callee names |
is_taint_source |
bool |
Method is a taint source (from domain plugin) |
is_taint_sink |
bool |
Method is a taint sink (from domain plugin) |
taint_paths_through |
int |
Taint paths passing through |
subsystem |
str |
Domain subsystem name |
pattern_flags |
Dict[str, bool] |
Flags: has_todo_fixme, has_deprecated, has_debug_code |
docstring |
str |
Method docstring |
ExplainAnalyzer¶
from src.analysis.explain import ExplainAnalyzer, ExplainResult
analyzer = ExplainAnalyzer(db_path="project.duckdb")
result = analyzer.collect("process_data", depth=2)
Constructor: ExplainAnalyzer(db_path: Optional[str] = None)
Methods:
| Method | Returns | Description |
|---|---|---|
collect(method_name, depth=2) |
Optional[ExplainResult] |
Collect comprehensive analysis data for a method |
to_dict(result) |
Dict[str, Any] |
Convert ExplainResult to dict for JSON output |
fuzzy_search(method_name, limit=5) |
List[str] |
Fuzzy search for method names when exact match fails |
Config Orphan Analysis¶
Module: src/analysis/config_analyzer.py
Detects unused, missing, and mismatched configuration parameters by cross-referencing YAML config, Pydantic/dataclass schema, and source code usage. Performs a 4-stage pipeline: extraction → cross-reference → FP filtering → reporting.
Used by: S12 (tech debt) via OrphanConfigHandler, CLI dogfood config-check.
ConfigOrphanAnalyzer¶
from src.analysis.config_analyzer import ConfigOrphanAnalyzer, AnalyzerConfig
analyzer = ConfigOrphanAnalyzer(AnalyzerConfig())
orphans = analyzer.scan()
summary = analyzer.get_summary(orphans)
Constructor: ConfigOrphanAnalyzer(config: Optional[AnalyzerConfig] = None)
Methods:
| Method | Returns | Description |
|---|---|---|
scan() |
List[OrphanFinding] |
Run full orphan detection pipeline (4 stages) |
get_summary(orphans) |
Dict[str, Any] |
Generate summary statistics by type and severity |
Detection stages:
1. Extraction — parse YAML keys (YAMLExtractor), schema dataclass fields (ASTExtractor), code references (CodeScanner)
2. Cross-reference — compare defined vs used parameters
3. FP filtering — remove false positives (env vars, dynamic access, known patterns)
4. Reporting — produce OrphanFinding list
OrphanFinding¶
Dataclass representing a single orphan config finding.
| Field | Type | Description |
|---|---|---|
orphan_type |
str |
One of: yaml_unused, yaml_missing, code_orphan, path_mismatch, orphaned_dataclass, unused_default |
severity |
str |
error, warning, or info |
param_name |
str |
Dotted parameter path (e.g. timeouts.http_client) |
description |
str |
Human-readable description |
file_path |
str |
Source file where the issue was found |
line_number |
int |
Line number in source file |
suggestion |
str |
Fix suggestion (when available) |
metadata |
Dict[str, Any] |
Additional context |
AnalyzerConfig¶
Dataclass for domain-agnostic analyzer configuration.
| Field | Type | Default | Description |
|---|---|---|---|
config_files |
List[str] |
["config.yaml"] |
YAML config files to analyze |
schema_files |
List[str] |
["src/config/unified_config.py"] |
Pydantic/dataclass schema files |
source_dirs |
List[str] |
["src/"] |
Directories to scan for config usage |
exclude_dirs |
List[str] |
["tests/", "scripts/", "docs/"] |
Directories to exclude from scanning |
test_dirs |
List[str] |
["tests/"] |
Test directories (scanned separately) |
root_dir |
Optional[str] |
None |
Project root (auto-detected if not set) |
access_patterns |
List[str] |
regex list | Patterns for typed config access (cfg.section.field) |
raw_access_patterns |
List[str] |
regex list | Patterns for raw dict access (_raw.get("key")) |
dynamic_patterns |
List[str] |
regex list | Patterns indicating dynamic access (excluded from detection) |
YAMLExtractor¶
Extracts all leaf keys and environment variable references from YAML config files.
| Method | Returns | Description |
|---|---|---|
extract(config_path) |
Tuple[Dict[str, Any], Set[str]] |
Returns (flat_keys dict, env_var_keys set) |
ASTExtractor¶
Parses Python schema files via AST to extract dataclass fields and @property definitions.
| Method | Returns | Description |
|---|---|---|
extract(schema_path) |
Tuple[Dict, Dict, Set] |
Returns (dataclass_fields, property_map, property_names) |
CodeScanner¶
Scans Python source files for config access patterns with context-aware cfg variable tracking. Distinguishes between variables holding the full UnifiedConfig object vs sub-properties.
| Method | Returns | Description |
|---|---|---|
scan(source_dirs, exclude_dirs) |
Tuple[Dict, Set, bool] |
Returns (references, raw_keys, has_dynamic_access) |
Scoped Analysis Configuration¶
Module: src/analysis/scope_config.py
Provides path-aware analysis policy selection so one project can apply different rules to main, tests, vendor, or other file groups.
AnalysisScope¶
Dataclass representing one named analysis scope.
| Field | Type | Description |
|---|---|---|
name |
str |
Scope name |
paths |
list[str] |
Glob patterns matched against normalized file paths |
rules |
str \| list[str] |
Rule mode such as all, critical_only, taint_only, or explicit rule IDs |
severity_threshold |
str |
Minimum severity accepted in this scope |
suppress_policy |
str |
Suppression policy identifier |
enabled |
bool |
Whether the scope is active |
| Method | Returns | Description |
|---|---|---|
matches(file_path) |
bool |
Check whether a file belongs to this scope |
to_dict() |
dict |
Serialize the scope |
ScopeConfig¶
Dataclass storing all configured scopes for a project.
| Field | Type | Description |
|---|---|---|
scopes |
list[AnalysisScope] |
Ordered list of configured scopes |
default_scope |
str |
Fallback scope name |
| Method | Returns | Description |
|---|---|---|
from_yaml(path) |
ScopeConfig |
Load scope rules from YAML |
from_config_dict(config_dict) |
ScopeConfig |
Build config from a nested dict |
resolve_scope(file_path) |
AnalysisScope \| None |
Resolve the effective scope for a file |
should_report(file_path, severity, rule_id) |
bool |
Decide whether a finding should be reported |
to_dict() |
dict |
Serialize the full config |
See Also¶
- Schema Reference — Complete database schema
- SQL Query Cookbook — Query examples
- Agents Reference — Agent pipeline
- Scenarios Guide — Usage scenarios