Руководство пользователя по анализу потоков данных

Версия: 3.0 Язык: Русский


Содержание

  1. Обзор
  2. Ключевые возможности
  3. Архитектура
  4. Начало работы
  5. Taint-анализ
  6. Анализ безопасности памяти
  7. Обнаружение NULL-указателей
  8. Обнаружение утечек информации
  9. Обнаружение состояний гонки
  10. Продвинутые функции
  11. Конфигурация
  12. Решение проблем
  13. Тесты производительности
  14. Справочник API
  15. Дополнительные ресурсы

Обзор

Движок анализа потоков данных CodeGraph обеспечивает точное обнаружение уязвимостей через: - Taint-анализ — отслеживание ненадёжных данных от источников до критичных операций (94-96% точности) - Безопасность памяти — обнаружение use-after-free и double-free ошибок (85% с анализом алиасов) - Обнаружение NULL-указателей — поиск потенциальных разыменований NULL (85-90% точности) - Утечка информации — обнаружение утечек конфиденциальных данных (80-85% точности) - Состояния гонки — поиск TOCTOU и проблем гонки данных (80-85% точности)

Все числовые параметры по умолчанию (max_paths, max_depth, пороги) загружаются из config.yaml через get_unified_config().


Ключевые возможности

Основные фазы анализа

  • Межпроцедурный анализ — отслеживание заражённых данных через вызовы функций (Phase 4.2)
  • Анализ потока управления — анализ условного выполнения (Phase 4.3)
  • Field-sensitive анализ — отслеживание заражённых данных на уровне полей структур (Phase 4.4)
  • Контекстно-зависимый анализ — различение вызовов от разных вызывающих (Phase 5)
  • Обнаружение санитизации — определение правильной санитизации входных данных

Расширенные возможности

  • Кросс-языковая поддержка — 11 языков через GoCPG
  • Анализ алиасов указателей — улучшенное обнаружение UAF (+15% точности)
  • Символьное выполнение V2 — проверка выполнимости путей через Z3 (-10-15% ложных срабатываний)

Архитектура

Основные компоненты

TaintPropagator (Главный движок)
+-- DataflowTracker          - Прямое распространение dataflow
+-- InterProcTracker         - Межпроцедурное отслеживание
+-- ControlFlowAnalyzer      - Анализ зависимостей по управлению
+-- FieldSensitiveTracker    - Отслеживание полей структур
+-- ContextSensitiveTracker  - Отслеживание контекста вызовов (Phase 5)
+-- PathConstraintTracker    - Символьное выполнение через Z3

MemoryLifetimeAnalyzer (Безопасность памяти)
+-- PointerAliasAnalyzer     - Анализ алиасов по Андерсену

Процесс анализа

1. Загрузка domain-специфичных sources/sinks из YAML плагина
2. Поиск узлов-источников (пользовательский ввод, сеть и т.д.)
3. Поиск узлов-приёмников (SQL, команды ОС, файловые операции и т.д.)
4. Распространение taint через рёбра потоков данных
5. Отслеживание контекста вызовов для точности (если включено)
6. Построение графа алиасов указателей (если включено, только для анализа памяти)
7. Фильтрация невыполнимых путей с помощью Z3 (если включено)
8. Расчёт оценок рисков и уверенности
9. Возврат отсортированных путей уязвимостей

Начало работы

Зависимости

Обязательные:

pip install -r requirements.txt

Опциональные (для символьного выполнения):

pip install z3-solver

Базовое использование

from src.analysis.dataflow import analyze_sql_injections
from src.services.cpg import CPGQueryService

with CPGQueryService() as cpg:
    # Анализ уязвимостей SQL-инъекций
    # Все значения по умолчанию берутся из config.yaml -> taint_analysis.*
    paths = analyze_sql_injections(cpg)

    # Отображение результатов
    for path in paths:
        print(f"SQL-инъекция: {path.source_function} -> {path.sink_function}")
        print(f"  Риск: {path.risk_level} ({path.risk_score:.2f})")
        print(f"  Уверенность: {path.confidence:.2f}")
        print(f"  Местоположение: {path.source_location} -> {path.sink_location}")

Taint-анализ

Обнаружение SQL-инъекций

from src.analysis.dataflow import analyze_sql_injections

paths = analyze_sql_injections(cpg)

for path in paths:
    # Source: откуда приходят ненадёжные данные
    print(f"Источник: {path.source_function} в {path.source_location}")

    # Sink: где они используются опасным образом
    print(f"Приёмник: {path.sink_function} в {path.sink_location}")

    # Sanitization: была ли правильная санитизация?
    if path.sanitizers:
        print(f"Санитайзеры: {', '.join(path.sanitizers)}")
        print(f"Оценка санитизации: {path.sanitization_score:.2f}")
    else:
        print("Санитизация не обнаружена!")

    # Детали пути
    print(f"Длина пути: {path.path_length} переходов")
    if path.inter_procedural:
        print(f"Пройденные функции: {', '.join(path.functions_crossed)}")

Обнаружение инъекций команд

from src.analysis.dataflow import analyze_command_injections

# Обнаружение потоков заражённых данных к приёмникам команд ОС (system, popen, exec, subprocess.run)
# Приёмники определены в annotations.yaml -> sink_type: command_injection
paths = analyze_command_injections(cpg)

for path in paths:
    print(f"Инъекция команд (CWE-78): {path.source_function} -> {path.sink_function}")
    print(f"  Риск: {path.risk_level} ({path.risk_score:.2f})")
    print(f"  Рекомендация: {path.recommendation}")

Обнаружение обхода пути

from src.analysis.dataflow import analyze_path_traversal

# Обнаружение потоков заражённых данных к приёмникам файловой системы (fopen, open, access, stat)
# Приёмники определены в annotations.yaml -> sink_type: path_traversal
paths = analyze_path_traversal(cpg)

for path in paths:
    print(f"Обход пути (CWE-22): {path.source_function} -> {path.sink_function}")
    print(f"  Риск: {path.risk_level} ({path.risk_score:.2f})")
    print(f"  Рекомендация: {path.recommendation}")

Пользовательский Taint-анализ

from src.analysis.dataflow.taint_analysis import TaintPropagator

with CPGQueryService() as cpg:
    # Создание propagator с пользовательскими настройками
    propagator = TaintPropagator(
        cpg,
        enable_inter_proc=True,         # Межпроцедурное отслеживание (Phase 4.2)
        enable_control_flow=True,       # Анализ потока управления (Phase 4.3)
        enable_field_sensitive=True,    # Field-sensitive отслеживание (Phase 4.4)
        enable_context_sensitive=True,  # Контекстно-зависимый анализ (Phase 5)
        enable_symbolic_execution=True  # Проверка выполнимости путей через Z3
    )

    # Поиск taint-путей (значения по умолчанию из config.yaml -> taint_analysis.*)
    paths = propagator.find_taint_paths(
        source_category='sql',
        timeout_seconds=60
    )

Анализ безопасности памяти

Обнаружение Use-After-Free

from src.analysis.dataflow import analyze_use_after_free
from src.services.cpg import CPGQueryService

with CPGQueryService() as cpg:
    from src.analysis.dataflow.memory_lifetime import MemoryLifetimeAnalyzer

    analyzer = MemoryLifetimeAnalyzer(
        cpg,
        enable_alias_analysis=True  # Отслеживание алиасов по алгоритму Андерсена
    )

    paths = analyzer.analyze_use_after_free(
        max_paths=100,
        max_hops=15,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Use-After-Free:")
        print(f"  Аллокация: {path.allocation_function} в {path.allocation_location}")
        print(f"  Освобождение: {path.free_function} в {path.free_location}")
        print(f"  Использование: {path.use_type} в {path.use_location}")
        print(f"  Указатель: {path.pointer_name}")
        print(f"  Риск: {path.risk_score:.2f}")
        print(f"  Уверенность: {path.confidence:.2f}")

Обнаружение Double-Free

from src.analysis.dataflow import analyze_double_free

with CPGQueryService() as cpg:
    analyzer = MemoryLifetimeAnalyzer(cpg)

    paths = analyzer.analyze_double_free(
        max_paths=50,
        min_confidence=0.6
    )

    for path in paths:
        print(f"Double-Free:")
        print(f"  Аллокация: {path.allocation_function} в {path.allocation_location}")
        print(f"  Первое освобождение: {path.first_free_location}")
        print(f"  Второе освобождение: {path.second_free_location}")
        print(f"  Риск: {path.risk_score:.2f}")

Анализ алиасов указателей

from src.analysis.dataflow.pointer_alias import PointerAliasAnalyzer

with CPGQueryService() as cpg:
    analyzer = PointerAliasAnalyzer(cpg)

    # Анализ функции на наличие алиасов указателей
    points_to = analyzer.analyze(function_id=123)

    # Проверка, могут ли два указателя быть алиасами
    if analyzer.may_alias(ptr1_id=456, ptr2_id=457):
        print("Указатели могут быть алиасами!")

        # Получение всех алиасов
        aliases = analyzer.get_aliases(ptr1_id=456)
        print(f"Алиасы: {aliases}")

    # Получение статистики
    stats = analyzer.get_statistics()
    print(f"Проанализировано {stats['total_pointers']} указателей")
    print(f"Найдено {stats['total_allocations']} аллокаций")

Обнаружение NULL-указателей

from src.analysis.dataflow import analyze_null_dereferences

with CPGQueryService() as cpg:
    paths = analyze_null_dereferences(
        cpg,
        max_paths=100,
        max_hops=10,
        min_confidence=0.7
    )

    for path in paths:
        print(f"NULL-разыменование:")
        print(f"  Источник: {path.source_function}() в {path.source_location}")
        print(f"  Разыменование: {path.dereference_type} в {path.dereference_location}")
        print(f"  Есть проверка на NULL: {path.has_null_check}")
        print(f"  Риск: {path.risk_score:.2f}")
        print(f"  Переменная: {path.variable_name}")

Обнаружение утечек информации

from src.analysis.dataflow import analyze_info_disclosure

with CPGQueryService() as cpg:
    paths = analyze_info_disclosure(
        cpg,
        max_paths=100,
        max_hops=10,
        min_risk_score=0.6
    )

    for path in paths:
        print(f"Утечка информации:")
        print(f"  Категория данных: {path.data_category}")  # credentials, pii, secrets
        print(f"  Источник: {path.source_function} в {path.source_location}")
        print(f"  Приёмник: {path.sink_function} в {path.sink_location}")
        print(f"  Тип раскрытия: {path.disclosure_type}")  # error_message, debug_log
        print(f"  Серьёзность: {path.severity}")
        print(f"  Риск: {path.risk_score:.2f}")

Обнаружение состояний гонки

from src.analysis.dataflow import analyze_race_conditions

with CPGQueryService() as cpg:
    paths = analyze_race_conditions(
        cpg,
        max_paths=100,
        max_hops=15,
        min_confidence=0.7
    )

    for path in paths:
        print(f"Состояние гонки:")
        print(f"  Тип: {path.race_type}")  # toctou, data_race, missing_lock
        print(f"  Проверка: {path.check_function} в {path.check_location}")
        print(f"  Использование: {path.use_function} в {path.use_location}")
        print(f"  Ресурс: {path.shared_resource}")
        print(f"  Есть блокировка: {path.has_lock}")
        print(f"  Риск: {path.risk_score:.2f}")

Продвинутые функции

Символьное выполнение

from src.analysis.dataflow.symbolic_execution import PathConstraintTracker

with CPGQueryService() as cpg:
    tracker = PathConstraintTracker(cpg, max_constraints=20)

    # Проверка, выполним ли конкретный путь
    is_feasible, confidence = tracker.is_path_feasible(
        start_node=100,
        end_node=200
    )

    if not is_feasible and confidence > 0.9:
        print("Путь определённо невыполним (ложное срабатывание)")

    # Фильтрация невыполнимых путей из списка
    all_paths = analyze_sql_injections(cpg)
    feasible_paths = tracker.filter_infeasible_paths(
        all_paths,
        min_confidence=0.8
    )

    print(f"Отфильтровано {len(all_paths) - len(feasible_paths)} невыполнимых путей")

TaintPropagator автоматически загружает SymbolicExecutionConfig из config.yaml -> symbolic_execution.* с настройками: solver_timeout_ms (500), solver_timeout_uf_ms (2000), max_parse_depth (10), enable_function_models (True), enable_arithmetic (True).

Кросс-языковой анализ

from src.domains import DomainRegistry

# Активация домена для вашего языка
DomainRegistry.activate("python_django")  # или: javascript, go, postgresql

# Анализ автоматически адаптируется к домену
paths = analyze_info_disclosure(cpg)
# Использует Python-специфичные sources/sinks (os.getenv, logger.error и т.д.)

Field-Sensitive анализ

paths = propagator.find_taint_paths(source_category='sql')

for path in paths:
    if path.field_sensitive:
        print(f"Заражённые поля: {', '.join(path.tainted_fields)}")
        # Пример: ['user.email', 'user.password']

Анализ потока управления

for path in paths:
    if path.is_conditional:
        print(f"Зависимости по управлению: {len(path.control_dependencies)}")
        print(f"Вероятность выполнения: {path.execution_probability:.2f}")

        for dep in path.control_dependencies:
            print(f"  Зависит от: {dep.control_code} в {dep.control_location}")

Конфигурация

Выбор домена

Отредактируйте config.yaml:

domain:
  name: python_django  # или: javascript, go, postgresql

Опции анализатора

# Отключение конкретных функций для производительности
propagator = TaintPropagator(
    cpg,
    enable_inter_proc=False,         # Отключить межпроцедурный (быстрее)
    enable_control_flow=False,       # Отключить поток управления (быстрее)
    enable_field_sensitive=False,    # Отключить отслеживание полей (быстрее)
    enable_context_sensitive=False,  # Отключить контекстный анализ (быстрее)
    enable_symbolic_execution=False  # Отключить проверку выполнимости (быстрее)
)

# Настройка глубины анализа (значения по умолчанию из config.yaml -> taint_analysis.*)
paths = propagator.find_taint_paths(
    max_paths=50,    # Меньше путей (быстрее)
    max_depth=10     # Меньшая глубина поиска (быстрее)
)

Сравнение с базовым вариантом

# Сравнение точности с и без улучшений
baseline_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)
enhanced_analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=True)

baseline_paths = baseline_analyzer.analyze_use_after_free()
enhanced_paths = enhanced_analyzer.analyze_use_after_free()

print(f"Базовый: {len(baseline_paths)} ошибок найдено")
print(f"Улучшенный: {len(enhanced_paths)} ошибок найдено")

Решение проблем

Z3 Solver недоступен

Ошибка: Symbolic execution disabled: No module named 'z3'

Решение:

pip install z3-solver

Если символьное выполнение не нужно:

propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

Высокое потребление памяти

Симптом: Анализ падает с ошибкой OOM

Решения:

# 1. Уменьшить max_paths
paths = analyzer.analyze_use_after_free(max_paths=50)

# 2. Уменьшить max_hops
paths = analyzer.analyze_use_after_free(max_hops=10)

# 3. Отключить дорогие функции
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

Медленный анализ

Симптом: Анализ занимает >10 секунд

Решения:

# 1. Отключить символьное выполнение (экономия 20% времени)
propagator = TaintPropagator(cpg, enable_symbolic_execution=False)

# 2. Отключить анализ алиасов указателей (экономия 50% времени)
analyzer = MemoryLifetimeAnalyzer(cpg, enable_alias_analysis=False)

# 3. Уменьшить глубину анализа
paths = propagator.find_taint_paths(max_depth=10)

Не найдено путей

Симптом: [] возвращается из анализа

Возможные причины: 1. Неправильный домен выбран (проверьте config.yaml) 2. Нет sources/sinks в кодовой базе 3. Порог min_risk_score слишком высокий

Решение:

# Понизить порог риска (значение по умолчанию из config.yaml)
paths = analyze_sql_injections(cpg, min_risk_score=0.1)

# Проверить домен
from src.domains import get_active_domain
print(f"Активный домен: {get_active_domain().name}")

# Проверить sources/sinks через публичный API
sources = propagator.find_source_nodes()
sinks = propagator.find_sink_nodes()
print(f"Источники: {len(sources)}, Приёмники: {len(sinks)}")

Тесты производительности

Окружение: CPG с 10K методами, Intel i7, 16GB RAM

Тип анализа Время Память Найдено путей
SQL Injection 2-4с 80-100МБ 50-75
Инъекция команд 2-4с 80-100МБ 30-60
Обход пути 2-4с 80-100МБ 20-50
UAF (базовый) 2-4с 80-100МБ 50-75
UAF (с алиасами) 3-6с 120-150МБ 85-100
NULL Dereference 2-3с 50-80МБ 40-60
Утечка информации 3-5с 80-120МБ 60-90
Состояния гонки 2-4с 60-100МБ 30-50

С символьным выполнением: +20% времени, +25% памяти


Справочник API

Краткий справочник

# Taint-анализ (значения по умолчанию из config.yaml -> taint_analysis.*)
analyze_sql_injections(cpg, max_paths=None, min_risk_score=None, max_depth=None)
analyze_command_injections(cpg, max_paths=None, min_risk_score=None, max_depth=None)
analyze_path_traversal(cpg, max_paths=None, min_risk_score=None, max_depth=None)

# Безопасность памяти
analyze_use_after_free(cpg, max_paths=100, max_hops=None, min_confidence=0.6)
analyze_double_free(cpg, max_paths=100, max_hops=None, min_confidence=0.6)

# NULL-указатели
analyze_null_dereferences(cpg, max_paths=100, max_hops=None, min_confidence=0.7)

# Утечка информации
analyze_info_disclosure(cpg, max_paths=100, max_hops=None, min_risk_score=0.6)

# Состояния гонки
analyze_race_conditions(cpg, max_paths=100, max_hops=None, min_confidence=0.7)

Подробную документацию API см. в: - src/analysis/dataflow/__init__.py - src/analysis/dataflow/taint_analysis.py - src/analysis/dataflow/memory_lifetime.py - src/analysis/dataflow/pointer_alias.py - src/analysis/dataflow/symbolic_execution.py


Визуализация Taint-потоков

Результаты анализа заражения данных могут быть отрисованы в виде Mermaid-диаграмм через src/security/taint_visualizer.py, предоставляя визуальные диаграммы потоков данных от источника к приёмнику. Экспорт SARIF 2.1.0 (--sarif-file) включает полные codeFlows с пошаговым распространением заражённых данных.

Кросс-языковая поддержка

GoCPG поддерживает 11 языков: C, C++, Go, Python, JavaScript, TypeScript, Java, Kotlin, C#, PHP и 1С:Предприятие. Межъязыковые FFI-рёбра (CGO, ctypes, cffi) связывают пути заражения через границы языков.

Примечание: Всегда используйте ProjectManager.get_active_db_path() для получения пути к базе активного проекта – см. src/project_manager.py.

Дополнительные ресурсы

  • Руководство по CLI: docs/guides/ru/CLI_GUIDE.md
  • Книга рецептов SQL-запросов: docs/guides/ru/SQL_QUERY_COOKBOOK.md
  • Устранение неполадок: docs/guides/ru/TROUBLESHOOTING.md

Вопросы? См. docs/guides/ru/TROUBLESHOOTING.md или свяжитесь с командой разработки.


Последнее обновление: март 2026