Dataflow Analysis User Guide

Version: 3.0 Language: English


Table of Contents

  1. Overview
  2. Key Features
  3. Architecture
  4. Getting Started
  5. Taint Analysis
  6. Memory Safety Analysis
  7. NULL Pointer Detection
  8. Information Disclosure Detection
  9. Race Condition Detection
  10. Advanced Features
  11. Configuration
  12. Troubleshooting
  13. Performance Benchmarks
  14. API Reference
  15. Additional Resources

Overview

CodeGraph’s dataflow analysis engine provides precise vulnerability detection through: - Taint Analysis - Track untrusted data from sources to sinks (94-96% accuracy) - Memory Safety - Detect use-after-free and double-free bugs (85% with alias analysis) - NULL Pointer Detection - Find potential NULL dereferences (85-90% accuracy) - Information Disclosure - Detect sensitive data leaks (80-85% accuracy) - Race Condition Detection - Find TOCTOU and data races (80-85% accuracy)

All numeric defaults (max_paths, max_depth, thresholds) are loaded from config.yaml via get_unified_config().


Key Features

Core Analysis Phases

  • Inter-Procedural Analysis - Tracks taint across function calls (Phase 4.2)
  • Control-Flow Awareness - Analyzes conditional execution (Phase 4.3)
  • Field-Sensitive Analysis - Tracks struct field taint (Phase 4.4)
  • Context-Sensitive Analysis - Distinguishes calls from different callers (Phase 5)
  • Sanitization Detection - Identifies proper input sanitization

Advanced Enhancements

  • Cross-Language Support - 11 languages via GoCPG
  • Pointer Alias Analysis - Improved UAF detection (+15% accuracy)
  • Symbolic Execution V2 - Path feasibility checking via Z3 (-10-15% false positives)

Architecture

Core Components

TaintPropagator (Main Engine)
+-- DataflowTracker          - Forward dataflow propagation
+-- InterProcTracker         - Inter-procedural tracking
+-- ControlFlowAnalyzer      - Control dependency analysis
+-- FieldSensitiveTracker    - Struct field tracking
+-- ContextSensitiveTracker  - Call context tracking (Phase 5)
+-- PathConstraintTracker    - Symbolic execution via Z3

MemoryLifetimeAnalyzer (Memory Safety)
+-- PointerAliasAnalyzer     - Andersen's alias analysis

Analysis Flow

1. Load domain-specific sources/sinks from plugin YAML
2. Find source nodes (user input, env, network, etc.)
3. Find sink nodes (SQL, shell, file ops, etc.)
4. Propagate taint through dataflow edges
5. Track call contexts for precision (if enabled)
6. Build pointer alias graph (if enabled, memory analysis only)
7. Filter infeasible paths with Z3 (if enabled)
8. Calculate risk scores and confidence
9. Return sorted vulnerability paths

Getting Started

Prerequisites

Required:

pip install -r requirements.txt

Optional (for symbolic execution):

pip install z3-solver

Basic Usage

from src.analysis.dataflow import analyze_sql_injections
from src.services.cpg import CPGQueryService

with CPGQueryService() as cpg:
    # Analyze SQL injection vulnerabilities
    # All defaults come from config.yaml -> taint_analysis.*
    paths = analyze_sql_injections(cpg)

    # Display results
    for path in paths:
        print(f"SQL Injection: {path.source_function} -> {path.sink_function}")
        print(f"  Risk: {path.risk_level} ({path.risk_score:.2f})")
        print(f"  Confidence: {path.confidence:.2f}")
        print(f"  Location: {path.source_location} -> {path.sink_location}")

Taint Analysis

SQL Injection Detection

from src.analysis.dataflow import analyze_sql_injections

paths = analyze_sql_injections(cpg)

for path in paths:
    # Source: where untrusted data comes from
    print(f"Source: {path.source_function} at {path.source_location}")

    # Sink: where it's used dangerously
    print(f"Sink: {path.sink_function} at {path.sink_location}")

    # Sanitization: was it properly sanitized?
    if path.sanitizers:
        print(f"Sanitizers: {', '.join(path.sanitizers)}")
        print(f"Sanitization score: {path.sanitization_score:.2f}")
    else:
        print("No sanitization detected!")

    # Path details
    print(f"Path length: {path.path_length} hops")
    if path.inter_procedural:
        print(f"Functions crossed: {', '.join(path.functions_crossed)}")

Command Injection Detection

from src.analysis.dataflow import analyze_command_injections

# Detects taint flows to OS command sinks (system, popen, exec, subprocess.run)
# Sinks defined per-domain in annotations.yaml -> sink_type: command_injection
paths = analyze_command_injections(cpg)

for path in paths:
    print(f"Command Injection (CWE-78): {path.source_function} -> {path.sink_function}")
    print(f"  Risk: {path.risk_level} ({path.risk_score:.2f})")
    print(f"  Recommendation: {path.recommendation}")

Path Traversal Detection

from src.analysis.dataflow import analyze_path_traversal

# Detects taint flows to filesystem sinks (fopen, open, access, stat, os.path.join)
# Sinks defined per-domain in annotations.yaml -> sink_type: path_traversal
paths = analyze_path_traversal(cpg)

for path in paths:
    print(f"Path Traversal (CWE-22): {path.source_function} -> {path.sink_function}")
    print(f"  Risk: {path.risk_level} ({path.risk_score:.2f})")
    print(f"  Recommendation: {path.recommendation}")

Custom Taint Analysis

from src.analysis.dataflow.taint_analysis import TaintPropagator

with CPGQueryService() as cpg:
    # Create propagator with custom settings
    propagator = TaintPropagator(
        cpg,
        enable_inter_proc=True,         # Inter-procedural tracking (Phase 4.2)
        enable_control_flow=True,       # Control-flow analysis (Phase 4.3)
        enable_field_sensitive=True,    # Field-sensitive tracking (Phase 4.4)
        enable_context_sensitive=True,  # Call context tracking (Phase 5)
        enable_symbolic_execution=True  # Path feasibility checking via Z3
    )

    # Find taint paths (defaults from config.yaml -> taint_analysis.*)
    paths = propagator.find_taint_paths(
        source_category='sql',
        timeout_seconds=60
    )

Memory Safety Analysis

Use-After-Free Detection

from src.analysis.dataflow import analyze_use_after_free
from src.services.cpg import CPGQueryService

with CPGQueryService() as cpg:
    from src.analysis.dataflow.memory_lifetime import MemoryLifetimeAnalyzer

    analyzer = MemoryLifetimeAnalyzer(
        cpg,
        enable_alias_analysis=True  # Track aliased pointers via Andersen's algorithm
    )

    paths = analyzer.analyze_use_after_free(
        max_paths=100,
        max_hops=15,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Use-After-Free:")
        print(f"  Allocation: {path.allocation_function} at {path.allocation_location}")
        print(f"  Free: {path.free_function} at {path.free_location}")
        print(f"  Use: {path.use_type} at {path.use_location}")
        print(f"  Pointer: {path.pointer_name}")
        print(f"  Risk: {path.risk_score:.2f}")
        print(f"  Confidence: {path.confidence:.2f}")

Double-Free Detection

from src.analysis.dataflow import analyze_double_free

with CPGQueryService() as cpg:
    analyzer = MemoryLifetimeAnalyzer(cpg)

    paths = analyzer.analyze_double_free(
        max_paths=50,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Double-Free:")
        print(f"  Allocation: {path.allocation_function} at {path.allocation_location}")
        print(f"  First free: {path.first_free_location}")
        print(f"  Second free: {path.second_free_location}")
        print(f"  Risk: {path.risk_score:.2f}")

Pointer Alias Analysis

from src.analysis.dataflow.pointer_alias import PointerAliasAnalyzer

with CPGQueryService() as cpg:
    analyzer = PointerAliasAnalyzer(cpg)

    # Analyze function for pointer aliases
    points_to = analyzer.analyze(function_id=123)

    # Check if two pointers may alias
    if analyzer.may_alias(ptr1_id=456, ptr2_id=457):
        print("Pointers may alias!")

        # Get all aliases
        aliases = analyzer.get_aliases(ptr1_id=456)
        print(f"Aliases: {aliases}")

    # Get statistics
    stats = analyzer.get_statistics()
    print(f"Analyzed {stats['total_pointers']} pointers")
    print(f"Found {stats['total_allocations']} allocations")

NULL Pointer Detection

from src.analysis.dataflow import analyze_null_dereferences

with CPGQueryService() as cpg:
    paths = analyze_null_dereferences(
        cpg,
        max_paths=100,
        max_hops=10,
        min_confidence=0.7
    )

    for path in paths:
        print(f"NULL Dereference:")
        print(f"  Source: {path.source_function}() at {path.source_location}")
        print(f"  Dereference: {path.dereference_type} at {path.dereference_location}")
        print(f"  Has NULL check: {path.has_null_check}")
        print(f"  Risk: {path.risk_score:.2f}")
        print(f"  Variable: {path.variable_name}")

Information Disclosure Detection

from src.analysis.dataflow import analyze_info_disclosure

with CPGQueryService() as cpg:
    paths = analyze_info_disclosure(
        cpg,
        max_paths=100,
        max_hops=10,
        min_risk_score=0.6
    )

    for path in paths:
        print(f"Information Disclosure:")
        print(f"  Data category: {path.data_category}")  # credentials, pii, secrets
        print(f"  Source: {path.source_function} at {path.source_location}")
        print(f"  Sink: {path.sink_function} at {path.sink_location}")
        print(f"  Disclosure type: {path.disclosure_type}")  # error_message, debug_log
        print(f"  Severity: {path.severity}")
        print(f"  Risk: {path.risk_score:.2f}")

Race Condition Detection

from src.analysis.dataflow import analyze_race_conditions

with CPGQueryService() as cpg:
    paths = analyze_race_conditions(
        cpg,
        max_paths=100,
        max_hops=15,
        min_confidence=0.7
    )

    for path in paths:
        print(f"Race Condition:")
        print(f"  Type: {path.race_type}")  # toctou, data_race, missing_lock
        print(f"  Check: {path.check_function} at {path.check_location}")
        print(f"  Use: {path.use_function} at {path.use_location}")
        print(f"  Resource: {path.shared_resource}")
        print(f"  Has lock: {path.has_lock}")
        print(f"  Risk: {path.risk_score:.2f}")

Advanced Features

Symbolic Execution

from src.analysis.dataflow.symbolic_execution import PathConstraintTracker

with CPGQueryService() as cpg:
    tracker = PathConstraintTracker(cpg, max_constraints=20)

    # Check if specific path is feasible
    is_feasible, confidence = tracker.is_path_feasible(
        start_node=100,
        end_node=200
    )

    if not is_feasible and confidence > 0.9:
        print("Path is definitely infeasible (false positive)")

    # Filter infeasible paths from list
    all_paths = analyze_sql_injections(cpg)
    feasible_paths = tracker.filter_infeasible_paths(
        all_paths,
        min_confidence=0.8
    )

    print(f"Filtered {len(all_paths) - len(feasible_paths)} infeasible paths")

TaintPropagator automatically loads SymbolicExecutionConfig from config.yaml -> symbolic_execution.* with settings: solver_timeout_ms (500), solver_timeout_uf_ms (2000), max_parse_depth (10), enable_function_models (True), enable_arithmetic (True).

Cross-Language Analysis

from src.domains import DomainRegistry

# Activate domain for your language
DomainRegistry.activate("python_django")  # or: javascript, go, postgresql

# Analysis automatically adapts to domain
paths = analyze_info_disclosure(cpg)
# Uses Python-specific sources/sinks (os.getenv, logger.error, etc.)

Field-Sensitive Analysis

paths = propagator.find_taint_paths(source_category='sql')

for path in paths:
    if path.field_sensitive:
        print(f"Tainted fields: {', '.join(path.tainted_fields)}")
        # Example: ['user.email', 'user.password']

Control-Flow Analysis

for path in paths:
    if path.is_conditional:
        print(f"Control dependencies: {len(path.control_dependencies)}")
        print(f"Execution probability: {path.execution_probability:.2f}")

        for dep in path.control_dependencies:
            print(f"  Depends on: {dep.control_code} at {dep.control_location}")

Configuration

Domain Selection

Edit config.yaml:

domain:
  name: python_django  # or: javascript, go, postgresql

Analyzer Options

# Disable specific features for performance
propagator = TaintPropagator(
    cpg,
    enable_inter_proc=False,         # Disable inter-procedural (faster)
    enable_control_flow=False,       # Disable control-flow (faster)
    enable_field_sensitive=False,    # Disable field tracking (faster)
    enable_context_sensitive=False,  # Disable context tracking (faster)
    enable_symbolic_execution=False  # Disable path feasibility (faster)
)

# Adjust analysis depth (defaults from config.yaml -> taint_analysis.*)
paths = propagator.find_taint_paths(
    max_paths=50,    # Fewer paths (faster)
    max_depth=10     # Shallower search (faster)
)

Baseline Comparison

# Compare accuracy with and without enhancements
baseline_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
enhanced_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=True)

baseline_paths = baseline_analyzer.analyze_use_after_free()
enhanced_paths = enhanced_analyzer.analyze_use_after_free()

print(f"Baseline: {len(baseline_paths)} bugs found")
print(f"Enhanced: {len(enhanced_paths)} bugs found")

Troubleshooting

Z3 Solver Not Available

Error: Symbolic execution disabled: No module named 'z3'

Solution:

pip install z3-solver

If you don’t need symbolic execution:

propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

High Memory Usage

Symptom: Analysis crashes with OOM error

Solutions:

# 1. Reduce max paths
paths = analyzer.analyze_use_after_free(max_paths=50)

# 2. Reduce max depth
paths = analyzer.analyze_use_after_free(max_hops=10)

# 3. Disable expensive features
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

Slow Analysis

Symptom: Analysis takes >10 seconds

Solutions:

# 1. Disable symbolic execution (saves 20% time)
propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

# 2. Disable pointer alias analysis (saves 50% time)
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

# 3. Reduce analysis depth
paths = propagator.find_taint_paths(max_depth=10)

No Paths Found

Symptom: [] returned from analysis

Possible causes: 1. Wrong domain selected (check config.yaml) 2. No sources/sinks in codebase 3. min_risk_score threshold too high

Solution:

# Lower risk threshold (default from config.yaml)
paths = analyze_sql_injections(cpg, min_risk_score=0.1)

# Check domain
from src.domains import get_active_domain
print(f"Active domain: {get_active_domain().name}")

# Verify sources/sinks via public API
sources = propagator.find_source_nodes()
sinks = propagator.find_sink_nodes()
print(f"Sources: {len(sources)}, Sinks: {len(sinks)}")

Performance Benchmarks

Environment: 10K methods CPG, Intel i7, 16GB RAM

Analysis Type Time Memory Paths Found
SQL Injection 2-4s 80-100MB 50-75
Command Injection 2-4s 80-100MB 30-60
Path Traversal 2-4s 80-100MB 20-50
UAF (baseline) 2-4s 80-100MB 50-75
UAF (with alias) 3-6s 120-150MB 85-100
NULL Dereference 2-3s 50-80MB 40-60
Info Disclosure 3-5s 80-120MB 60-90
Race Conditions 2-4s 60-100MB 30-50

With Symbolic Execution: +20% time, +25% memory


API Reference

Quick Reference

# Taint analysis (defaults from config.yaml -> taint_analysis.*)
analyze_sql_injections(cpg, max_paths=None, min_risk_score=None, max_depth=None)
analyze_command_injections(cpg, max_paths=None, min_risk_score=None, max_depth=None)
analyze_path_traversal(cpg, max_paths=None, min_risk_score=None, max_depth=None)

# Memory safety
analyze_use_after_free(cpg, max_paths=100, max_hops=None, min_confidence=0.6)
analyze_double_free(cpg, max_paths=100, max_hops=None, min_confidence=0.6)

# NULL pointer
analyze_null_dereferences(cpg, max_paths=100, max_hops=None, min_confidence=0.7)

# Information disclosure
analyze_info_disclosure(cpg, max_paths=100, max_hops=None, min_risk_score=0.6)

# Race conditions
analyze_race_conditions(cpg, max_paths=100, max_hops=None, min_confidence=0.7)

For detailed API documentation, see: - src/analysis/dataflow/__init__.py - src/analysis/dataflow/taint_analysis.py - src/analysis/dataflow/memory_lifetime.py - src/analysis/dataflow/pointer_alias.py - src/analysis/dataflow/symbolic_execution.py


Taint Visualization

Taint analysis results can be rendered as Mermaid flowcharts via src/security/taint_visualizer.py, providing visual source-to-sink data flow diagrams. SARIF 2.1.0 export (--sarif-file) includes full codeFlows with step-by-step taint propagation.

Cross-Language Support

GoCPG supports 11 languages: C, C++, Go, Python, JavaScript, TypeScript, Java, Kotlin, C#, PHP, and 1C:Enterprise. Cross-language FFI edges (CGO, ctypes, cffi) connect taint paths across language boundaries.

Note: Always use ProjectManager.get_active_db_path() to get the active project’s DB path – see src/project_manager.py.

Additional Resources

  • CLI Guide: docs/guides/en/CLI_GUIDE.md
  • SQL Query Cookbook: docs/guides/en/SQL_QUERY_COOKBOOK.md
  • Troubleshooting: docs/guides/en/TROUBLESHOOTING.md

Questions? See docs/guides/en/TROUBLESHOOTING.md or contact the development team.


Last updated: March 2026