Dataflow Analysis User Guide

Dataflow Analysis User Guide

Version: 2.0 (Priority 3 Enhanced) Last Updated: January 17, 2026 Language: English


Table of Contents

  1. Overview
  2. Key Features
  3. Architecture
  4. Getting Started
  5. Taint Analysis
  6. Memory Safety Analysis
  7. NULL Pointer Detection
  8. Information Disclosure Detection
  9. Race Condition Detection
  10. Advanced Features
  11. Configuration
  12. Troubleshooting

Overview

CodeGraph’s dataflow analysis engine provides precise vulnerability detection through: - Taint Analysis - Track untrusted data from sources to sinks - Memory Safety - Detect use-after-free and double-free bugs - NULL Pointer Detection - Find potential NULL dereferences - Information Disclosure - Detect sensitive data leaks - Race Condition Detection - Find TOCTOU and data races

Accuracy: 85-90% (improved from 70% pattern-based approach)


Key Features

Phase 4 Features (Core)

Inter-Procedural Analysis - Tracks taint across function calls ✅ Control-Flow Awareness - Analyzes conditional execution ✅ Field-Sensitive Analysis - Tracks struct field taint ✅ Sanitization Detection - Identifies proper input sanitization

Priority 3 Enhancements (New!)

Cross-Language Support - Python, JavaScript, Go, PostgreSQL ✅ Pointer Alias Analysis - Improved UAF detection (+15% accuracy) ✅ Symbolic Execution - Path feasibility checking (-5-7% false positives)


Architecture

Core Components

TaintPropagator (Main Engine)
├── DataflowTracker       - Forward dataflow propagation
├── InterProcTracker      - Inter-procedural tracking
├── ControlFlowAnalyzer   - Control dependency analysis
├── FieldSensitiveTracker - Struct field tracking
├── PointerAliasAnalyzer  - Alias detection (Priority 3)
└── PathConstraintTracker - Symbolic execution (Priority 3)

Analysis Flow

1. Load domain-specific sources/sinks
2. Find source nodes (user input, etc.)
3. Find sink nodes (SQL, file ops, etc.)
4. Propagate taint through dataflow edges
5. [NEW] Build pointer alias graph (if enabled)
6. [NEW] Filter infeasible paths with Z3 (if enabled)
7. Calculate risk scores and confidence
8. Return sorted vulnerability paths

Getting Started

Prerequisites

Required:

pip install -r requirements.txt

Optional (for symbolic execution):

pip install z3-solver

Basic Usage

from src.analysis.dataflow import analyze_sql_injections
from src.services.cpg import CPGQueryService

with CPGQueryService("cpg.duckdb") as cpg:
    # Analyze SQL injection vulnerabilities
    paths = analyze_sql_injections(
        cpg,
        max_paths=100,      # Maximum paths to return
        max_hops=15,        # Maximum dataflow depth
        min_confidence=0.7  # Minimum confidence threshold
    )

    # Display results
    for path in paths:
        print(f"SQL Injection: {path.source_function}{path.sink_function}")
        print(f"  Risk: {path.risk_level} ({path.risk_score:.2f})")
        print(f"  Confidence: {path.confidence:.2f}")
        print(f"  Location: {path.source_location}{path.sink_location}")

Taint Analysis

SQL Injection Detection

from src.analysis.dataflow import analyze_sql_injections

paths = analyze_sql_injections(cpg)

for path in paths:
    # Source: where untrusted data comes from
    print(f"Source: {path.source_function} at {path.source_location}")

    # Sink: where it's used dangerously
    print(f"Sink: {path.sink_function} at {path.sink_location}")

    # Sanitization: was it properly sanitized?
    if path.sanitizers:
        print(f"Sanitizers: {', '.join(path.sanitizers)}")
        print(f"Sanitization score: {path.sanitization_score:.2f}")
    else:
        print("⚠️ No sanitization detected!")

    # Path details
    print(f"Path length: {path.path_length} hops")
    if path.inter_procedural:
        print(f"Functions crossed: {', '.join(path.functions_crossed)}")

Command Injection Detection

from src.analysis.dataflow import analyze_command_injections

paths = analyze_command_injections(cpg)

for path in paths:
    print(f"Command Injection: {path.source_function}{path.sink_function}")
    print(f"  Risk: {path.risk_level}")
    print(f"  Recommendation: {path.recommendation}")

Custom Taint Analysis

from src.analysis.dataflow.taint_analysis import TaintPropagator

with CPGQueryService("cpg.duckdb") as cpg:
    # Create propagator with custom settings
    propagator = TaintPropagator(
        cpg,
        enable_inter_proc=True,        # Inter-procedural tracking
        enable_control_flow=True,      # Control-flow analysis
        enable_field_sensitive=True,   # Field-sensitive tracking
        enable_symbolic_execution=True # Path feasibility checking (Priority 3)
    )

    # Find taint paths
    paths = propagator.find_taint_paths(
        source_category='sql',
        max_paths=100,
        max_depth=15
    )

Memory Safety Analysis

Use-After-Free Detection

from src.analysis.dataflow import analyze_use_after_free
from src.services.cpg import CPGQueryService

with CPGQueryService("cpg.duckdb") as cpg:
    # Analyze with pointer alias analysis enabled (Priority 3)
    from src.analysis.dataflow.memory_lifetime import MemoryLifetimeAnalyzer

    analyzer = MemoryLifetimeAnalyzer(
        cpg,
        enable_alias_analysis=True  # NEW: Track aliased pointers
    )

    paths = analyzer.analyze_use_after_free(
        max_paths=100,
        max_hops=15,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Use-After-Free:")
        print(f"  Allocation: {path.allocation_function} at {path.allocation_location}")
        print(f"  Free: {path.free_function} at {path.free_location}")
        print(f"  Use: {path.use_type} at {path.use_location}")
        print(f"  Pointer: {path.pointer_name}")
        print(f"  Risk: {path.risk_score:.2f}")
        print(f"  Confidence: {path.confidence:.2f}")

Double-Free Detection

from src.analysis.dataflow import analyze_double_free

with CPGQueryService("cpg.duckdb") as cpg:
    analyzer = MemoryLifetimeAnalyzer(cpg)

    paths = analyzer.analyze_double_free(
        max_paths=50,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Double-Free:")
        print(f"  Allocation: {path.allocation_function} at {path.allocation_location}")
        print(f"  First free: {path.first_free_location}")
        print(f"  Second free: {path.second_free_location}")
        print(f"  Risk: {path.risk_score:.2f}")

Pointer Alias Analysis (Priority 3)

from src.analysis.dataflow.pointer_alias import PointerAliasAnalyzer

with CPGQueryService("cpg.duckdb") as cpg:
    analyzer = PointerAliasAnalyzer(cpg)

    # Analyze function for pointer aliases
    points_to = analyzer.analyze(function_id=123)

    # Check if two pointers may alias
    if analyzer.may_alias(ptr1_id=456, ptr2_id=457):
        print("Pointers may alias!")

        # Get all aliases
        aliases = analyzer.get_aliases(ptr1_id=456)
        print(f"Aliases: {aliases}")

        # Get allocation sites
        sites = analyzer.points_to_allocation(ptr1_id=456)
        for site in sites:
            print(f"Points to: {site.alloc_type} at {site.location}")

    # Get statistics
    stats = analyzer.get_statistics()
    print(f"Analyzed {stats['total_pointers']} pointers")
    print(f"Found {stats['total_allocations']} allocations")

NULL Pointer Detection

from src.analysis.dataflow import analyze_null_dereferences

with CPGQueryService("cpg.duckdb") as cpg:
    paths = analyze_null_dereferences(
        cpg,
        max_paths=100,
        max_hops=10,
        min_confidence=0.7
    )

    for path in paths:
        print(f"NULL Dereference:")
        print(f"  Source: {path.source_function}() at {path.source_location}")
        print(f"  Dereference: {path.dereference_type} at {path.dereference_location}")
        print(f"  Has NULL check: {'✅' if path.has_null_check else '❌'}")
        print(f"  Risk: {path.risk_score:.2f}")
        print(f"  Variable: {path.variable_name}")

Information Disclosure Detection

from src.analysis.dataflow import analyze_info_disclosure

with CPGQueryService("cpg.duckdb") as cpg:
    paths = analyze_info_disclosure(
        cpg,
        max_paths=100,
        max_hops=10,
        min_risk_score=0.6
    )

    for path in paths:
        print(f"Information Disclosure:")
        print(f"  Data category: {path.data_category}")  # credentials, pii, secrets
        print(f"  Source: {path.source_function} at {path.source_location}")
        print(f"  Sink: {path.sink_function} at {path.sink_location}")
        print(f"  Disclosure type: {path.disclosure_type}")  # error_message, debug_log
        print(f"  Severity: {path.severity}")
        print(f"  Risk: {path.risk_score:.2f}")

Race Condition Detection

from src.analysis.dataflow import analyze_race_conditions

with CPGQueryService("cpg.duckdb") as cpg:
    paths = analyze_race_conditions(
        cpg,
        max_paths=100,
        max_hops=15,
        min_confidence=0.7
    )

    for path in paths:
        print(f"Race Condition:")
        print(f"  Type: {path.race_type}")  # toctou, data_race, missing_lock
        print(f"  Check: {path.check_function} at {path.check_location}")
        print(f"  Use: {path.use_function} at {path.use_location}")
        print(f"  Resource: {path.resource_name}")
        print(f"  Has lock: {'✅' if path.has_lock else '❌'}")
        print(f"  Risk: {path.risk_score:.2f}")

Advanced Features

Symbolic Execution (Priority 3)

from src.analysis.dataflow.symbolic_execution import PathConstraintTracker

with CPGQueryService("cpg.duckdb") as cpg:
    tracker = PathConstraintTracker(cpg, max_constraints=20)

    # Check if specific path is feasible
    is_feasible, confidence = tracker.is_path_feasible(
        start_node=100,
        end_node=200
    )

    if not is_feasible and confidence > 0.9:
        print("Path is definitely infeasible (false positive)")

    # Filter infeasible paths from list
    all_paths = analyze_sql_injections(cpg)
    feasible_paths = tracker.filter_infeasible_paths(
        all_paths,
        min_confidence=0.8
    )

    print(f"Filtered {len(all_paths) - len(feasible_paths)} infeasible paths")

Cross-Language Analysis (Priority 3)

from src.domains import DomainRegistry

# Activate domain for your language
DomainRegistry.activate("python_django")  # or: javascript, go, postgresql

# Analysis automatically adapts to domain
paths = analyze_info_disclosure(cpg)
# Uses Python-specific sources/sinks (os.getenv, logger.error, etc.)

Field-Sensitive Analysis

paths = propagator.find_taint_paths(source_category='sql')

for path in paths:
    if path.field_sensitive:
        print(f"Tainted fields: {', '.join(path.tainted_fields)}")
        # Example: ['user.email', 'user.password']

Control-Flow Analysis

for path in paths:
    if path.is_conditional:
        print(f"Control dependencies: {len(path.control_dependencies)}")
        print(f"Execution probability: {path.execution_probability:.2f}")

        for dep in path.control_dependencies:
            print(f"  Depends on: {dep.condition} at {dep.location}")

Configuration

Domain Selection

Edit config.yaml:

domain:
  name: python_django  # or: javascript, go, postgresql

Analyzer Options

# Disable specific features for performance
propagator = TaintPropagator(
    cpg,
    enable_inter_proc=False,        # Disable inter-procedural (faster)
    enable_control_flow=False,      # Disable control-flow (faster)
    enable_field_sensitive=False,   # Disable field tracking (faster)
    enable_symbolic_execution=False # Disable path feasibility (faster)
)

# Adjust analysis depth
paths = propagator.find_taint_paths(
    max_paths=50,    # Fewer paths (faster)
    max_depth=10     # Shallower search (faster)
)

Baseline Comparison

# Compare accuracy with and without enhancements
baseline_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
enhanced_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=True)

baseline_paths = baseline_analyzer.analyze_use_after_free()
enhanced_paths = enhanced_analyzer.analyze_use_after_free()

print(f"Baseline: {len(baseline_paths)} bugs found")
print(f"Enhanced: {len(enhanced_paths)} bugs found")
print(f"Additional: {len(enhanced_paths) - len(baseline_paths)} (+{(len(enhanced_paths) - len(baseline_paths)) / len(baseline_paths) * 100:.1f}%)")

Troubleshooting

Z3 Solver Not Available

Error: Symbolic execution disabled: No module named 'z3'

Solution:

pip install z3-solver

If you don’t need symbolic execution:

propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

High Memory Usage

Symptom: Analysis crashes with OOM error

Solutions:

# 1. Reduce max paths
paths = analyzer.analyze_use_after_free(max_paths=50)  # default: 100

# 2. Reduce max depth
paths = analyzer.analyze_use_after_free(max_hops=10)  # default: 15

# 3. Disable expensive features
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

Slow Analysis

Symptom: Analysis takes >10 seconds

Solutions:

# 1. Disable symbolic execution (saves 20% time)
propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

# 2. Disable pointer alias analysis (saves 50% time)
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

# 3. Reduce analysis depth
paths = propagator.find_taint_paths(max_depth=10)  # default: 15

No Paths Found

Symptom: [] returned from analysis

Possible causes: 1. Wrong domain selected (check config.yaml) 2. No sources/sinks in codebase 3. min_confidence threshold too high

Solution:

# Lower confidence threshold
paths = analyze_sql_injections(cpg, min_confidence=0.5)  # default: 0.7

# Check domain
from src.domains import get_active_domain
print(f"Active domain: {get_active_domain().name}")

# Verify sources/sinks
propagator._load_domain_data()
print(f"Sources: {len(propagator._taint_sources)}")
print(f"Sinks: {len(propagator._taint_sinks)}")

Performance Benchmarks

Environment: 10K methods CPG, Intel i7, 16GB RAM

Analysis Type Time Memory Paths Found
SQL Injection 2-4s 80-100MB 50-75
UAF (baseline) 2-4s 80-100MB 50-75
UAF (with alias) 3-6s 120-150MB 85-100
NULL Dereference 2-3s 50-80MB 40-60
Info Disclosure 3-5s 80-120MB 60-90
Race Conditions 2-4s 60-100MB 30-50

With Symbolic Execution: +20% time, +25% memory


API Reference

Quick Reference

# Taint analysis
analyze_sql_injections(cpg, max_paths=100, max_hops=15, min_confidence=0.7)
analyze_command_injections(cpg, ...)
analyze_path_traversal(cpg, ...)

# Memory safety
analyze_use_after_free(cpg, max_paths=100, max_hops=15, min_confidence=0.6)
analyze_double_free(cpg, ...)

# NULL pointer
analyze_null_dereferences(cpg, max_paths=100, max_hops=10, min_confidence=0.7)

# Information disclosure
analyze_info_disclosure(cpg, max_paths=100, max_hops=10, min_risk_score=0.6)

# Race conditions
analyze_race_conditions(cpg, max_paths=100, max_hops=15, min_confidence=0.7)

For detailed API documentation, see: - src/analysis/dataflow/__init__.py - src/analysis/dataflow/taint_analysis.py - src/analysis/dataflow/memory_lifetime.py - src/analysis/dataflow/pointer_alias.py (Priority 3) - src/analysis/dataflow/symbolic_execution.py (Priority 3)


Additional Resources

  • Integration Guide: PART1_INTEGRATION_COMPLETE.md
  • Cross-Language Support: PRIORITY3_CROSS_LANGUAGE_SUPPORT.md
  • Pointer Alias Analysis: PRIORITY3_POINTER_ALIAS_ANALYSIS.md
  • Complete Priority 3 Docs: PRIORITY3_COMPLETE.md

Questions? See docs/FAQ.md or contact the development team.