Analysis Modules Reference

Analysis Modules Reference

Comprehensive documentation for the code analysis modules in src/analysis/.

Table of Contents

Overview

The analysis modules provide advanced static analysis capabilities on top of the CPG (Code Property Graph) stored in DuckDB:

Module Purpose Key Scenarios
CFGAnalyzer Control flow graph analysis, complexity metrics 5, 6, 13 (refactoring, performance)
FieldSensitiveTracer Field-path taint tracking 2, 8, 14 (security, compliance, incident)
DataFlowTracer General dataflow analysis facade 2, 14 (security, incident response)
CallGraphAnalyzer Call graph traversal, entry point discovery, attack path tracing 1, 12, 14 (onboarding, cross-repo, incident)
ConcurrencyAnalyzer Lock/mutex pattern detection 16 (concurrency)
CloneDetector Code duplicate detection 7, 13 (refactoring)
LLMPatternGenerator LLM-assisted YAML rule generation 21 (pattern search)
PatternTaintBridge Pattern-to-taint analysis bridge 2, 21 (security, pattern search)

CFGAnalyzer

File: src/analysis/cfg_analyzer.py

CFG-based analysis using the edges_cfg table for accurate control flow analysis.

Features

  • Cyclomatic Complexity: McCabe complexity via M = E - N + 2
  • Path Enumeration: DFS-based execution path discovery with cycle detection
  • Dominator Analysis: Compute dominator and post-dominator trees
  • CFG Structure Extraction: Get nodes, edges, entry/exit points

Key Classes

@dataclass
class CFGStructure:
    """Represents the CFG structure of a method"""
    method_name: str
    method_full_name: str
    nodes: List[int]
    edges: List[Tuple[int, int]]  # (src, dst) pairs
    entry_nodes: List[int]
    exit_nodes: List[int]
    node_count: int
    edge_count: int

@dataclass
class CFGPath:
    """Represents an execution path through the CFG"""
    path_id: str
    nodes: List[int]
    length: int
    has_loop: bool = False

API Reference

CFGAnalyzer(cpg_service)

Initialize with CPGQueryService or DuckDB connection.

get_method_cfg(method_name: str) -> Optional[CFGStructure]

Get the CFG structure for a method.

from src.analysis.cfg_analyzer import CFGAnalyzer

analyzer = CFGAnalyzer(cpg_service)
cfg = analyzer.get_method_cfg("heap_insert")
print(f"Nodes: {cfg.node_count}, Edges: {cfg.edge_count}")

compute_cyclomatic_complexity(method_name: str) -> int

Calculate McCabe cyclomatic complexity: M = E - N + 2

complexity = analyzer.compute_cyclomatic_complexity("heap_insert")
print(f"Complexity: {complexity}")  # e.g., 15

enumerate_paths(method_name: str, max_paths: int = 100) -> List[CFGPath]

Find execution paths through the CFG with cycle detection.

paths = analyzer.enumerate_paths("process_query", max_paths=50)
for path in paths:
    print(f"Path {path.path_id}: {path.length} nodes, loop={path.has_loop}")

find_dominators(method_name: str) -> Dict[int, Set[int]]

Compute dominator tree using edges_dominate table.

find_post_dominators(method_name: str) -> Dict[int, Set[int]]

Compute post-dominator tree using edges_post_dominate table.

analyze_complexity_distribution() -> Dict[str, Any]

Analyze complexity across all methods in the codebase.

dist = analyzer.analyze_complexity_distribution()
print(f"Average complexity: {dist['average']}")
print(f"High complexity methods: {dist['high_complexity_methods']}")

Database Tables Used

  • nodes_method - Method metadata
  • edges_contains - Method-to-node containment
  • edges_cfg - CFG edges between nodes
  • edges_dominate - Dominator relationships
  • edges_post_dominate - Post-dominator relationships

FieldSensitiveTracer

File: src/analysis/field_sensitive_tracer.py

Field-path tracking for precise taint analysis, distinguishing between different fields of the same object.

Features

  • Field Path Parsing: Parse obj.field1.field2 and obj->field->data access chains
  • Field Access Tracking: Find all reads/writes to specific fields
  • Taint Propagation: Track taint through field accesses
  • Sensitive Field Detection: Identify flows from sensitive fields (password, token, etc.)

Key Classes

@dataclass
class FieldPath:
    """Represents a field access path like obj.field1.field2"""
    base_variable: str
    field_chain: List[str]
    full_path: str
    node_ids: List[int]
    type_full_name: Optional[str]

@dataclass
class FieldAccess:
    """Represents a single field access in code"""
    node_id: int
    base_variable: str
    field_name: str
    access_code: str
    line_number: int
    filename: str
    access_type: str  # 'read', 'write', 'call'
    containing_method: Optional[str]

@dataclass
class FieldSensitiveFlow:
    """A dataflow path with field sensitivity"""
    source_path: FieldPath
    sink_path: FieldPath
    intermediate_fields: List[FieldPath]
    is_tainted: bool
    relationship: str  # 'exact', 'prefix', 'suffix', 'propagated'
    confidence: float

API Reference

FieldSensitiveTracer(cpg_service)

Initialize with CPGQueryService or DuckDB connection.

parse_field_path(code: str) -> FieldPath

Parse field access string into structured FieldPath.

from src.analysis.field_sensitive_tracer import FieldSensitiveTracer

tracer = FieldSensitiveTracer(cpg_service)

# Parse pointer notation
path = tracer.parse_field_path("user->password")
print(path.base_variable)  # "user"
print(path.field_chain)    # ["password"]

# Parse dot notation
path = tracer.parse_field_path("request.data.buffer")
print(path.full_path)  # "request.data.buffer"

get_struct_fields(struct_name: str) -> List[Dict]

Get fields defined in a struct with type information.

fields = tracer.get_struct_fields("UserData")
for field in fields:
    print(f"{field['name']}: {field['type']}")

find_field_accesses(base_var: str, field_name: str) -> List[FieldAccess]

Find all accesses to a specific field.

accesses = tracer.find_field_accesses("user", "password")
for access in accesses:
    print(f"{access.filename}:{access.line_number} - {access.access_type}")

trace_field_taint(source_field: str, sink_functions: List[str]) -> List[FieldSensitiveFlow]

Trace taint from a source field to sink functions.

flows = tracer.trace_field_taint(
    source_field="credentials->password",
    sink_functions=["printf", "log", "send"]
)
for flow in flows:
    print(f"Tainted flow: {flow.source_path} -> {flow.sink_path}")

find_sensitive_field_flows(sensitive_fields: List[str] = None, sink_functions: List[str] = None) -> List[FieldSensitiveFlow]

Find flows from sensitive fields to dangerous sinks.

# Default sensitive fields: password, token, secret, private_key, credential, auth
flows = tracer.find_sensitive_field_flows()
print(f"Found {len(flows)} potential sensitive data exposures")

Sensitive Field Categories

Default sensitive fields tracked: - password, passwd, pwd - token, auth_token, access_token - secret, api_secret, client_secret - private_key, secret_key - credential, credentials - auth, authorization

Database Tables Used

  • nodes_field_identifier - Field access nodes
  • nodes_identifier - Variable identifiers
  • nodes_member - Struct member definitions
  • edges_reaching_def - Reaching definition edges
  • edges_argument - Function argument edges

DataFlowTracer

File: src/analysis/dataflow_tracer.py

Facade module that provides unified access to dataflow analysis capabilities.

Integration Methods

The DataFlowTracer provides integration between base dataflow analysis and field-sensitive analysis:

find_taint_paths_field_sensitive(source_functions, sink_functions, track_fields=True, max_depth=10) -> List[DataFlowPath]

Enhanced taint analysis with optional field tracking.

from src.analysis.dataflow_tracer import DataFlowTracer

tracer = DataFlowTracer(cpg_service)

# Find taint paths with field sensitivity
paths = tracer.find_taint_paths_field_sensitive(
    source_functions=["getenv", "fgets", "read"],
    sink_functions=["system", "exec", "popen"],
    track_fields=True,
    max_depth=10
)

for path in paths:
    print(f"Source: {path.source} -> Sink: {path.sink}")
    print(f"Fields accessed: {path.field_accesses}")

find_sensitive_data_flows(sensitive_fields=None, sink_functions=None) -> List[Dict]

Wrapper around FieldSensitiveTracer for common security analysis.

# Use defaults for common patterns
flows = tracer.find_sensitive_data_flows()

# Or customize
flows = tracer.find_sensitive_data_flows(
    sensitive_fields=["api_key", "bearer_token"],
    sink_functions=["http_request", "socket_send"]
)

ControlFlowAnalyzer (Patch Review)

File: src/patch_review/analyzers/control_flow_analyzer.py

Analyzes control flow impact of patches, now using CFGAnalyzer for accurate metrics.

Features

  • Complexity delta calculation (before/after patch)
  • New loop detection with risk classification
  • Error handling change tracking
  • Branch coverage impact estimation

Integration with CFGAnalyzer

from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer

analyzer = PatchControlFlowAnalyzer(cpg_service)

# Analyze patch impact
result = analyzer.analyze_control_flow_changes(patch_data)

print(f"Complexity delta: {result.complexity_delta}")
print(f"New loops: {len(result.new_loops)}")
print(f"Error handling changes: {len(result.error_handling_changes)}")

Loop Severity Classification

New loops are classified by risk: - HIGH: Nested loops, loops with I/O, unbounded loops - MEDIUM: Loops with external calls - LOW: Simple bounded loops


Usage Examples

Security Audit (Scenario 2)

from src.analysis.field_sensitive_tracer import FieldSensitiveTracer

tracer = FieldSensitiveTracer(cpg_service)

# Find password leaks
flows = tracer.find_sensitive_field_flows(
    sensitive_fields=["password", "credentials"],
    sink_functions=["printf", "fprintf", "syslog", "elog"]
)

for flow in flows:
    print(f"ALERT: {flow.source_path} flows to {flow.sink_path}")

Complexity Analysis (Scenario 5/6)

from src.analysis.cfg_analyzer import CFGAnalyzer

analyzer = CFGAnalyzer(cpg_service)

# Analyze all methods
dist = analyzer.analyze_complexity_distribution()

# Find refactoring candidates
for method in dist['high_complexity_methods']:
    print(f"Refactor candidate: {method['name']} (complexity={method['complexity']})")

Patch Review (Scenario 9)

from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer

analyzer = PatchControlFlowAnalyzer(cpg_service)
result = analyzer.analyze_control_flow_changes(patch)

if result.complexity_delta > 10:
    print("WARNING: Significant complexity increase")

for loop in result.new_loops:
    if loop.severity == 'HIGH':
        print(f"ALERT: High-risk loop in {loop.method_name}")

Database Schema Reference

Key Tables for Analysis

Table Purpose
nodes_method Method definitions
nodes_control_structure Control flow structures (if, for, while)
nodes_field_identifier Field access expressions
nodes_identifier Variable references
edges_cfg Control flow graph edges
edges_contains Containment relationships
edges_dominate Dominator relationships
edges_reaching_def Reaching definition edges
edges_argument Function argument edges

Edge Types for Dataflow

Edge Type Table Purpose
CFG edges_cfg Control flow between statements
REACHING_DEF edges_reaching_def Definition-use chains
ARGUMENT edges_argument Function call arguments
CONTAINS edges_contains Scope containment

Troubleshooting

“Method not found”

The method name must match exactly (case-sensitive). Use the simple name, not the full qualified name.

# Correct
cfg = analyzer.get_method_cfg("heap_insert")

# Incorrect
cfg = analyzer.get_method_cfg("heap_insert(Relation, HeapTuple)")

“No CFG data found”

Ensure the CPG export included CFG edges. Check if edges_cfg table has data:

SELECT COUNT(*) FROM edges_cfg;

“No field accesses found”

Field access tracking requires nodes_field_identifier data:

SELECT COUNT(*) FROM nodes_field_identifier;

Performance Considerations

  • Path enumeration is bounded by max_paths to prevent explosion
  • Use max_depth to limit taint tracking depth
  • Large methods may have many paths; consider sampling

LLMPatternGenerator

File: src/analysis/patterns/llm_pattern_generator.py

LLM-assisted generation of YAML pattern rules from natural language descriptions. Used by the patterns generate CLI command and the /api/v1/patterns/generate REST endpoint.

Features

  • Natural language to YAML rule: Describe a pattern in English and get a valid YAML rule
  • Language-aware: Generates patterns appropriate for the target language (C, Python, Go, etc.)
  • CPG constraint generation: Adds data flow, type, and call graph constraints
  • Validation: Validates generated rules against the pattern schema

API Reference

LLMPatternGenerator(llm_provider)

Initialize with an LLM provider instance.

generate_rule(description: str, language: str, examples: List[str] = None) -> Dict

Generate a YAML rule from a natural language description.

from src.analysis.patterns.llm_pattern_generator import LLMPatternGenerator

generator = LLMPatternGenerator(llm_provider)
rule = generator.generate_rule(
    description="Find malloc calls without matching free",
    language="c"
)
print(rule['id'])       # e.g., "malloc-without-free"
print(rule['pattern'])  # e.g., "malloc($size)"
print(rule['constraints'])  # CPG constraints

PatternTaintBridge

File: src/analysis/patterns/taint_bridge.py

Bridges structural pattern matches with taint analysis, enabling patterns to leverage interprocedural data flow tracking.

Features

  • Pattern-to-taint: Convert pattern matches into taint analysis entry points
  • Taint-enriched findings: Augment pattern findings with taint path information
  • Cross-function tracking: Follow data flow from pattern matches across function boundaries

API Reference

PatternTaintBridge(cpg_service, taint_analyzer)

Initialize with CPG service and taint analyzer instances.

enrich_findings_with_taint(findings: List[Dict]) -> List[Dict]

Add taint path information to pattern findings.

from src.analysis.patterns.taint_bridge import PatternTaintBridge

bridge = PatternTaintBridge(cpg_service, taint_analyzer)
enriched = bridge.enrich_findings_with_taint(pattern_findings)

for finding in enriched:
    if finding.get('taint_paths'):
        print(f"Finding {finding['rule_id']} has {len(finding['taint_paths'])} taint paths")

find_tainted_patterns(rule_id: str) -> List[Dict]

Find pattern matches that involve tainted data flows.

tainted = bridge.find_tainted_patterns("sql-injection-concat")
for match in tainted:
    print(f"Tainted match at {match['file']}:{match['line']}")
    for path in match['taint_paths']:
        print(f"  Source: {path['source']} -> Sink: {path['sink']}")

See Also