Dataflow Analysis User Guide¶
Version: 2.0 (Priority 3 Enhanced) Last Updated: January 17, 2026 Language: English
Table of Contents¶
- Overview
- Key Features
- Architecture
- Getting Started
- Taint Analysis
- Memory Safety Analysis
- NULL Pointer Detection
- Information Disclosure Detection
- Race Condition Detection
- Advanced Features
- Configuration
- Troubleshooting
Overview¶
CodeGraph’s dataflow analysis engine provides precise vulnerability detection through: - Taint Analysis - Track untrusted data from sources to sinks - Memory Safety - Detect use-after-free and double-free bugs - NULL Pointer Detection - Find potential NULL dereferences - Information Disclosure - Detect sensitive data leaks - Race Condition Detection - Find TOCTOU and data races
Accuracy: 85-90% (improved from 70% pattern-based approach)
Key Features¶
Phase 4 Features (Core)¶
✅ Inter-Procedural Analysis - Tracks taint across function calls ✅ Control-Flow Awareness - Analyzes conditional execution ✅ Field-Sensitive Analysis - Tracks struct field taint ✅ Sanitization Detection - Identifies proper input sanitization
Priority 3 Enhancements (New!)¶
✅ Cross-Language Support - Python, JavaScript, Go, PostgreSQL ✅ Pointer Alias Analysis - Improved UAF detection (+15% accuracy) ✅ Symbolic Execution - Path feasibility checking (-5-7% false positives)
Architecture¶
Core Components¶
TaintPropagator (Main Engine)
├── DataflowTracker - Forward dataflow propagation
├── InterProcTracker - Inter-procedural tracking
├── ControlFlowAnalyzer - Control dependency analysis
├── FieldSensitiveTracker - Struct field tracking
├── PointerAliasAnalyzer - Alias detection (Priority 3)
└── PathConstraintTracker - Symbolic execution (Priority 3)
Analysis Flow¶
1. Load domain-specific sources/sinks
2. Find source nodes (user input, etc.)
3. Find sink nodes (SQL, file ops, etc.)
4. Propagate taint through dataflow edges
5. [NEW] Build pointer alias graph (if enabled)
6. [NEW] Filter infeasible paths with Z3 (if enabled)
7. Calculate risk scores and confidence
8. Return sorted vulnerability paths
Getting Started¶
Prerequisites¶
Required:
pip install -r requirements.txt
Optional (for symbolic execution):
pip install z3-solver
Basic Usage¶
from src.analysis.dataflow import analyze_sql_injections
from src.services.cpg import CPGQueryService
with CPGQueryService("cpg.duckdb") as cpg:
# Analyze SQL injection vulnerabilities
paths = analyze_sql_injections(
cpg,
max_paths=100, # Maximum paths to return
max_hops=15, # Maximum dataflow depth
min_confidence=0.7 # Minimum confidence threshold
)
# Display results
for path in paths:
print(f"SQL Injection: {path.source_function} → {path.sink_function}")
print(f" Risk: {path.risk_level} ({path.risk_score:.2f})")
print(f" Confidence: {path.confidence:.2f}")
print(f" Location: {path.source_location} → {path.sink_location}")
Taint Analysis¶
SQL Injection Detection¶
from src.analysis.dataflow import analyze_sql_injections
paths = analyze_sql_injections(cpg)
for path in paths:
# Source: where untrusted data comes from
print(f"Source: {path.source_function} at {path.source_location}")
# Sink: where it's used dangerously
print(f"Sink: {path.sink_function} at {path.sink_location}")
# Sanitization: was it properly sanitized?
if path.sanitizers:
print(f"Sanitizers: {', '.join(path.sanitizers)}")
print(f"Sanitization score: {path.sanitization_score:.2f}")
else:
print("⚠️ No sanitization detected!")
# Path details
print(f"Path length: {path.path_length} hops")
if path.inter_procedural:
print(f"Functions crossed: {', '.join(path.functions_crossed)}")
Command Injection Detection¶
from src.analysis.dataflow import analyze_command_injections
paths = analyze_command_injections(cpg)
for path in paths:
print(f"Command Injection: {path.source_function} → {path.sink_function}")
print(f" Risk: {path.risk_level}")
print(f" Recommendation: {path.recommendation}")
Custom Taint Analysis¶
from src.analysis.dataflow.taint_analysis import TaintPropagator
with CPGQueryService("cpg.duckdb") as cpg:
# Create propagator with custom settings
propagator = TaintPropagator(
cpg,
enable_inter_proc=True, # Inter-procedural tracking
enable_control_flow=True, # Control-flow analysis
enable_field_sensitive=True, # Field-sensitive tracking
enable_symbolic_execution=True # Path feasibility checking (Priority 3)
)
# Find taint paths
paths = propagator.find_taint_paths(
source_category='sql',
max_paths=100,
max_depth=15
)
Memory Safety Analysis¶
Use-After-Free Detection¶
from src.analysis.dataflow import analyze_use_after_free
from src.services.cpg import CPGQueryService
with CPGQueryService("cpg.duckdb") as cpg:
# Analyze with pointer alias analysis enabled (Priority 3)
from src.analysis.dataflow.memory_lifetime import MemoryLifetimeAnalyzer
analyzer = MemoryLifetimeAnalyzer(
cpg,
enable_alias_analysis=True # NEW: Track aliased pointers
)
paths = analyzer.analyze_use_after_free(
max_paths=100,
max_hops=15,
min_confidence=0.6
)
for path in paths:
print(f"Use-After-Free:")
print(f" Allocation: {path.allocation_function} at {path.allocation_location}")
print(f" Free: {path.free_function} at {path.free_location}")
print(f" Use: {path.use_type} at {path.use_location}")
print(f" Pointer: {path.pointer_name}")
print(f" Risk: {path.risk_score:.2f}")
print(f" Confidence: {path.confidence:.2f}")
Double-Free Detection¶
from src.analysis.dataflow import analyze_double_free
with CPGQueryService("cpg.duckdb") as cpg:
analyzer = MemoryLifetimeAnalyzer(cpg)
paths = analyzer.analyze_double_free(
max_paths=50,
min_confidence=0.6
)
for path in paths:
print(f"Double-Free:")
print(f" Allocation: {path.allocation_function} at {path.allocation_location}")
print(f" First free: {path.first_free_location}")
print(f" Second free: {path.second_free_location}")
print(f" Risk: {path.risk_score:.2f}")
Pointer Alias Analysis (Priority 3)¶
from src.analysis.dataflow.pointer_alias import PointerAliasAnalyzer
with CPGQueryService("cpg.duckdb") as cpg:
analyzer = PointerAliasAnalyzer(cpg)
# Analyze function for pointer aliases
points_to = analyzer.analyze(function_id=123)
# Check if two pointers may alias
if analyzer.may_alias(ptr1_id=456, ptr2_id=457):
print("Pointers may alias!")
# Get all aliases
aliases = analyzer.get_aliases(ptr1_id=456)
print(f"Aliases: {aliases}")
# Get allocation sites
sites = analyzer.points_to_allocation(ptr1_id=456)
for site in sites:
print(f"Points to: {site.alloc_type} at {site.location}")
# Get statistics
stats = analyzer.get_statistics()
print(f"Analyzed {stats['total_pointers']} pointers")
print(f"Found {stats['total_allocations']} allocations")
NULL Pointer Detection¶
from src.analysis.dataflow import analyze_null_dereferences
with CPGQueryService("cpg.duckdb") as cpg:
paths = analyze_null_dereferences(
cpg,
max_paths=100,
max_hops=10,
min_confidence=0.7
)
for path in paths:
print(f"NULL Dereference:")
print(f" Source: {path.source_function}() at {path.source_location}")
print(f" Dereference: {path.dereference_type} at {path.dereference_location}")
print(f" Has NULL check: {'✅' if path.has_null_check else '❌'}")
print(f" Risk: {path.risk_score:.2f}")
print(f" Variable: {path.variable_name}")
Information Disclosure Detection¶
from src.analysis.dataflow import analyze_info_disclosure
with CPGQueryService("cpg.duckdb") as cpg:
paths = analyze_info_disclosure(
cpg,
max_paths=100,
max_hops=10,
min_risk_score=0.6
)
for path in paths:
print(f"Information Disclosure:")
print(f" Data category: {path.data_category}") # credentials, pii, secrets
print(f" Source: {path.source_function} at {path.source_location}")
print(f" Sink: {path.sink_function} at {path.sink_location}")
print(f" Disclosure type: {path.disclosure_type}") # error_message, debug_log
print(f" Severity: {path.severity}")
print(f" Risk: {path.risk_score:.2f}")
Race Condition Detection¶
from src.analysis.dataflow import analyze_race_conditions
with CPGQueryService("cpg.duckdb") as cpg:
paths = analyze_race_conditions(
cpg,
max_paths=100,
max_hops=15,
min_confidence=0.7
)
for path in paths:
print(f"Race Condition:")
print(f" Type: {path.race_type}") # toctou, data_race, missing_lock
print(f" Check: {path.check_function} at {path.check_location}")
print(f" Use: {path.use_function} at {path.use_location}")
print(f" Resource: {path.resource_name}")
print(f" Has lock: {'✅' if path.has_lock else '❌'}")
print(f" Risk: {path.risk_score:.2f}")
Advanced Features¶
Symbolic Execution (Priority 3)¶
from src.analysis.dataflow.symbolic_execution import PathConstraintTracker
with CPGQueryService("cpg.duckdb") as cpg:
tracker = PathConstraintTracker(cpg, max_constraints=20)
# Check if specific path is feasible
is_feasible, confidence = tracker.is_path_feasible(
start_node=100,
end_node=200
)
if not is_feasible and confidence > 0.9:
print("Path is definitely infeasible (false positive)")
# Filter infeasible paths from list
all_paths = analyze_sql_injections(cpg)
feasible_paths = tracker.filter_infeasible_paths(
all_paths,
min_confidence=0.8
)
print(f"Filtered {len(all_paths) - len(feasible_paths)} infeasible paths")
Cross-Language Analysis (Priority 3)¶
from src.domains import DomainRegistry
# Activate domain for your language
DomainRegistry.activate("python_django") # or: javascript, go, postgresql
# Analysis automatically adapts to domain
paths = analyze_info_disclosure(cpg)
# Uses Python-specific sources/sinks (os.getenv, logger.error, etc.)
Field-Sensitive Analysis¶
paths = propagator.find_taint_paths(source_category='sql')
for path in paths:
if path.field_sensitive:
print(f"Tainted fields: {', '.join(path.tainted_fields)}")
# Example: ['user.email', 'user.password']
Control-Flow Analysis¶
for path in paths:
if path.is_conditional:
print(f"Control dependencies: {len(path.control_dependencies)}")
print(f"Execution probability: {path.execution_probability:.2f}")
for dep in path.control_dependencies:
print(f" Depends on: {dep.condition} at {dep.location}")
Configuration¶
Domain Selection¶
Edit config.yaml:
domain:
name: python_django # or: javascript, go, postgresql
Analyzer Options¶
# Disable specific features for performance
propagator = TaintPropagator(
cpg,
enable_inter_proc=False, # Disable inter-procedural (faster)
enable_control_flow=False, # Disable control-flow (faster)
enable_field_sensitive=False, # Disable field tracking (faster)
enable_symbolic_execution=False # Disable path feasibility (faster)
)
# Adjust analysis depth
paths = propagator.find_taint_paths(
max_paths=50, # Fewer paths (faster)
max_depth=10 # Shallower search (faster)
)
Baseline Comparison¶
# Compare accuracy with and without enhancements
baseline_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
enhanced_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=True)
baseline_paths = baseline_analyzer.analyze_use_after_free()
enhanced_paths = enhanced_analyzer.analyze_use_after_free()
print(f"Baseline: {len(baseline_paths)} bugs found")
print(f"Enhanced: {len(enhanced_paths)} bugs found")
print(f"Additional: {len(enhanced_paths) - len(baseline_paths)} (+{(len(enhanced_paths) - len(baseline_paths)) / len(baseline_paths) * 100:.1f}%)")
Troubleshooting¶
Z3 Solver Not Available¶
Error: Symbolic execution disabled: No module named 'z3'
Solution:
pip install z3-solver
If you don’t need symbolic execution:
propagator = TaintPropagator(cpg, enable_symbolic_execution=False)
High Memory Usage¶
Symptom: Analysis crashes with OOM error
Solutions:
# 1. Reduce max paths
paths = analyzer.analyze_use_after_free(max_paths=50) # default: 100
# 2. Reduce max depth
paths = analyzer.analyze_use_after_free(max_hops=10) # default: 15
# 3. Disable expensive features
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
Slow Analysis¶
Symptom: Analysis takes >10 seconds
Solutions:
# 1. Disable symbolic execution (saves 20% time)
propagator = TaintPropagator(cpg, enable_symbolic_execution=False)
# 2. Disable pointer alias analysis (saves 50% time)
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
# 3. Reduce analysis depth
paths = propagator.find_taint_paths(max_depth=10) # default: 15
No Paths Found¶
Symptom: [] returned from analysis
Possible causes:
1. Wrong domain selected (check config.yaml)
2. No sources/sinks in codebase
3. min_confidence threshold too high
Solution:
# Lower confidence threshold
paths = analyze_sql_injections(cpg, min_confidence=0.5) # default: 0.7
# Check domain
from src.domains import get_active_domain
print(f"Active domain: {get_active_domain().name}")
# Verify sources/sinks
propagator._load_domain_data()
print(f"Sources: {len(propagator._taint_sources)}")
print(f"Sinks: {len(propagator._taint_sinks)}")
Performance Benchmarks¶
Environment: 10K methods CPG, Intel i7, 16GB RAM
| Analysis Type | Time | Memory | Paths Found |
|---|---|---|---|
| SQL Injection | 2-4s | 80-100MB | 50-75 |
| UAF (baseline) | 2-4s | 80-100MB | 50-75 |
| UAF (with alias) | 3-6s | 120-150MB | 85-100 |
| NULL Dereference | 2-3s | 50-80MB | 40-60 |
| Info Disclosure | 3-5s | 80-120MB | 60-90 |
| Race Conditions | 2-4s | 60-100MB | 30-50 |
With Symbolic Execution: +20% time, +25% memory
API Reference¶
Quick Reference¶
# Taint analysis
analyze_sql_injections(cpg, max_paths=100, max_hops=15, min_confidence=0.7)
analyze_command_injections(cpg, ...)
analyze_path_traversal(cpg, ...)
# Memory safety
analyze_use_after_free(cpg, max_paths=100, max_hops=15, min_confidence=0.6)
analyze_double_free(cpg, ...)
# NULL pointer
analyze_null_dereferences(cpg, max_paths=100, max_hops=10, min_confidence=0.7)
# Information disclosure
analyze_info_disclosure(cpg, max_paths=100, max_hops=10, min_risk_score=0.6)
# Race conditions
analyze_race_conditions(cpg, max_paths=100, max_hops=15, min_confidence=0.7)
For detailed API documentation, see:
- src/analysis/dataflow/__init__.py
- src/analysis/dataflow/taint_analysis.py
- src/analysis/dataflow/memory_lifetime.py
- src/analysis/dataflow/pointer_alias.py (Priority 3)
- src/analysis/dataflow/symbolic_execution.py (Priority 3)
Additional Resources¶
- Integration Guide:
PART1_INTEGRATION_COMPLETE.md - Cross-Language Support:
PRIORITY3_CROSS_LANGUAGE_SUPPORT.md - Pointer Alias Analysis:
PRIORITY3_POINTER_ALIAS_ANALYSIS.md - Complete Priority 3 Docs:
PRIORITY3_COMPLETE.md
Questions? See docs/FAQ.md or contact the development team.