Справочник модулей анализа

Полная документация по модулям анализа кода в src/analysis/.

Последнее обновление: 2026-03-07

Содержание


Обзор

Модули анализа обеспечивают расширенные возможности статического анализа на основе CPG (графа свойств кода), хранящегося в DuckDB. Организация по подпакетам:

src/analysis/
├── cfg_analyzer.py              # Граф потока управления
├── cfg_unreachable.py           # Обнаружение недостижимого кода
├── clone_detector.py            # Обнаружение дублей кода
├── compliance.py                # Маппинг стандартов безопасности
├── explain.py                   # Движок анализа методов
├── concurrency_core.py          # Анализ параллелизма (4 примеси)
├── field_sensitive_tracer.py    # Анализ заражения с учётом полей
├── callgraph/                   # Анализ графа вызовов (8 модулей)
├── dataflow/                    # Анализ потоков данных (12 модулей)
│   └── taint/                   # Механизм распространения заражения (7 модулей)
├── autofix/                     # Автоматическая генерация исправлений (5 модулей)
└── patterns/                    # Подсистема шаблонов (2 модуля)

Архитектура модулей

graph TD
    subgraph "Поток управления"
        CFG[CFGAnalyzer] --> PatchCF[PatchControlFlowAnalyzer]
        CFG --> Unreachable[UnreachableCodeDetector]
    end

    subgraph "Потоки данных"
        DFBase[BaseTracer] --> DFTracer[DataFlowTracer]
        DFBase --> TypeProp[TypePropagator]
        DFBase --> PtrAlias[PointerAliasAnalyzer]
    end

    subgraph "Анализ заражения"
        TaintProp[TaintPropagator] --> FieldSensTracker[FieldSensitiveTracker]
        TaintProp --> ContextSens[ContextSensitiveTracker]
        TaintProp --> InterProc[InterProcTracker]
        TaintProp --> DFTracker[DataflowTracker]
        TaintProp --> SymExec[PathConstraintTracker]
        DFTracer --> FieldTracer[FieldSensitiveTracer]
    end

    subgraph "Граф вызовов"
        CGA[CallGraphAnalyzer] --> PathFind[PathFinder]
        CGA --> Impact[ImpactAnalyzer]
        CGA --> Centrality[CentralityAnalyzer]
        CGA --> Components[ComponentAnalyzer]
        CGA --> CrossLang[CrossLanguageAnalyzer]
        CGA --> Complexity[ComplexityAnalyzer]
    end

    subgraph "Шаблоны + Автоисправления"
        Patterns[LLMPatternGenerator] --> Bridge[PatternTaintBridge]
        Bridge --> TaintProp
        AutofixEng[AutofixEngine] --> AutofixGen[AutofixGenerator]
        AutofixEng --> DiffVal[DiffValidator]
        AutofixEng --> SSR[SSRAutofixBridge]
    end

Карта модулей и сценариев

Модуль Подпакет Тип Ключевые сценарии DuckPGQ
CFGAnalyzer корень анализатор С05, С06, С13 (рефакторинг, производительность) Нет
UnreachableCodeDetector корень анализатор С05, С13 (мёртвый код) Нет
ASTCloneDetector корень анализатор С07, С13 (рефакторинг) Нет
ComplianceMapper корень маппер С02, С08 (безопасность, соответствие) Нет
ConcurrencyAnalyzer корень анализатор С16 (параллелизм) Нет
FieldSensitiveTracer корень анализатор С02, С08, С14 (безопасность) Нет
DataFlowTracer dataflow фасад С02, С14 (безопасность, инциденты) Нет
TypePropagator dataflow анализатор С02 (путаница типов) Нет
PointerAliasAnalyzer dataflow анализатор С02 (use-after-free) Нет
MemoryLifetimeAnalyzer dataflow анализатор С02, С14 (безопасность памяти) Нет
NullCheckAnalyzer dataflow анализатор С02, С05 (разыменование null) Нет
CodeStringTracer dataflow анализатор С02 (инъекция кода) Нет
InfoDisclosureAnalyzer dataflow анализатор С02 (утечка информации) Нет
TaintPropagator dataflow/taint движок С02, С14 (безопасность) Нет
CallGraphAnalyzer callgraph фасад С01, С12, С14 (ввод в систему, кросс-репо) Да
PathFinder callgraph анализатор С01, С14 (цепочки вызовов) Да
CentralityAnalyzer callgraph анализатор С05, С12 (точки нагрузки) Да
ComponentAnalyzer callgraph анализатор С12, С13 (компоненты) Да
ImpactAnalyzer callgraph анализатор С09, С14 (оценка влияния) Нет
CrossLanguageAnalyzer callgraph анализатор С12 (кросс-репо) Нет
LLMPatternGenerator patterns генератор С21 (поиск шаблонов) Нет
PatternTaintBridge patterns мост С02, С21 (безопасность) Нет
AutofixEngine autofix движок С02 (автоисправления) Нет
PatchControlFlowAnalyzer patch_review анализатор С09 (проверка изменений) Нет

Анализ потока управления

CFGAnalyzer

Файл: src/analysis/cfg_analyzer.py Сценарии: С05, С06, С13 (рефакторинг, производительность, массовый рефакторинг)

Анализ на основе графа потока управления (CFG) с использованием таблицы edges_cfg.

Ключевые классы

@dataclass
class CFGStructure:
    """Описывает структуру CFG метода."""
    method_name: str
    method_full_name: str
    nodes: List[int]
    edges: List[Tuple[int, int]]  # пары (источник, назначение)
    entry_nodes: List[int]
    exit_nodes: List[int]
    node_count: int
    edge_count: int

@dataclass
class CFGPath:
    """Описывает путь выполнения в CFG."""
    path_id: str
    nodes: List[int]
    length: int
    has_loop: bool = False

Справочник API

CFGAnalyzer(cpg_service)

Инициализация с экземпляром CPGQueryService или подключением DuckDB.

get_method_cfg(method_name: str) -> Optional[CFGStructure]

Получить структуру CFG для метода.

from src.analysis.cfg_analyzer import CFGAnalyzer

analyzer = CFGAnalyzer(cpg_service)
cfg = analyzer.get_method_cfg("heap_insert")
print(f"Узлы: {cfg.node_count}, Рёбра: {cfg.edge_count}")
compute_cyclomatic_complexity(method_name: str) -> int

Вычислить цикломатическую сложность Маккейба: M = E - N + 2

complexity = analyzer.compute_cyclomatic_complexity("heap_insert")
print(f"Сложность: {complexity}")  # например, 15
enumerate_paths(method_name: str, max_paths: int = 100, max_depth: int = 50) -> List[CFGPath]

Найти пути выполнения в CFG с обнаружением циклов.

paths = analyzer.enumerate_paths("process_query", max_paths=50)
for path in paths:
    print(f"Путь {path.path_id}: {path.length} узлов, цикл={path.has_loop}")
find_dominators(method_name: str) -> Dict[int, Set[int]]

Построить дерево доминаторов с использованием таблицы edges_dominate.

find_post_dominators(method_name: str) -> Dict[int, Set[int]]

Построить дерево пост-доминаторов с использованием таблицы edges_post_dominate.

get_cfg_successors(node_id: int) -> List[int]

Получить узлы-наследники в CFG.

get_cfg_predecessors(node_id: int) -> List[int]

Получить узлы-предшественники в CFG.

get_control_flow_paths(source_node: int, sink_node: int, max_depth: int = 20) -> List[List[int]]

Найти все пути между двумя узлами CFG.

analyze_complexity_distribution(threshold: int = 10) -> Dict[str, Any]

Проанализировать распределение сложности по всем методам.

dist = analyzer.analyze_complexity_distribution()
print(f"Средняя сложность: {dist['average']}")
print(f"Методы с высокой сложностью: {dist['high_complexity_methods']}")

Используемые таблицы БД

  • nodes_method — Метаданные методов
  • edges_contains — Связи методов с узлами
  • edges_cfg — Рёбра графа потока управления
  • edges_dominate — Отношения доминирования
  • edges_post_dominate — Отношения пост-доминирования

Рекомендации по производительности

  • Перечисление путей ограничено max_paths (по умолчанию 100) и max_depth (по умолчанию 50)
  • Распределение сложности сканирует все методы — может быть медленным на больших кодовых базах (>50K методов)

UnreachableCodeDetector

Файл: src/analysis/cfg_unreachable.py Сценарии: С05, С13 (обнаружение мёртвого кода)

Обнаружение недостижимого кода через анализ CFG — код после return, exit(), функций noreturn.

Ключевые классы

@dataclass
class UnreachableCodeFinding:
    """Описывает находку недостижимого кода."""
    method_name: str
    method_id: int
    filename: str
    line_number: int
    terminating_type: str  # 'return', 'exit_call', 'error_call', 'noreturn'
    terminating_node_id: int
    terminating_line: int
    unreachable_node_id: int
    unreachable_code: Optional[str] = None
    confidence: float = 0.9

Справочник API

UnreachableCodeDetector(cpg_service)

Инициализация с экземпляром CPGQueryService.

detect_unreachable_code() -> List[UnreachableCodeFinding]

Найти все паттерны недостижимого кода.

from src.analysis.cfg_unreachable import UnreachableCodeDetector

detector = UnreachableCodeDetector(cpg_service)
findings = detector.detect_unreachable_code()
for f in findings:
    print(f"{f.filename}:{f.line_number} — недостижимый код после {f.terminating_type}")

Используемые таблицы БД

  • edges_cfg — Рёбра CFG для проверки достижимости потомков
  • nodes_method — Метаданные методов
  • nodes_return — Узлы операторов возврата

PatchControlFlowAnalyzer

Файл: src/patch_review/analyzers/control_flow_analyzer.py Сценарии: С09 (проверка изменений)

Анализирует влияние изменений на поток управления с использованием CFGAnalyzer.

Ключевые классы

@dataclass
class NewLoopFinding:
    method_name: str
    loop_type: str
    line_number: int
    is_nested: bool
    has_io: bool
    is_unbounded: bool
    severity: Severity  # HIGH, MEDIUM, LOW
    details: str

@dataclass
class ErrorHandlingChange:
    method_name: str
    change_type: str
    error_type: str
    line_number: int
    details: str

@dataclass
class BranchCoverageImpact:
    new_branches: int
    removed_branches: int
    net_change: int
    methods_with_new_branches: List[str]
    uncovered_paths: List[Dict[str, Any]]

Справочник API

PatchControlFlowAnalyzer(conn, delta_cpg=None)

Инициализация с подключением DuckDB и необязательным DeltaCPG.

analyze_control_flow_changes(patch, delta_cpg) -> ControlFlowAnalysisResult

Полный анализ потока управления для изменений.

from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer

analyzer = PatchControlFlowAnalyzer(conn)
result = analyzer.analyze_control_flow_changes(patch, delta_cpg)

print(f"Дельта сложности: {result.complexity_delta}")
print(f"Новые циклы: {len(result.new_loops)}")
analyze_complexity_change(changed_methods) -> List[ComplexityDelta]

Расчёт сложности до/после для изменённых методов.

detect_new_loops(changed_methods) -> List[NewLoopFinding]

Обнаружение вновь введённых циклов с классификацией рисков.

analyze_error_handling_changes(changed_methods) -> List[ErrorHandlingChange]

Отслеживание изменений в обработке ошибок.

analyze_branch_coverage_impact(changed_methods) -> BranchCoverageImpact

Оценка влияния на покрытие ветвлений.

Классификация критичности циклов

  • HIGH (Высокий): Вложенные циклы, циклы с операциями ввода-вывода, неограниченные циклы
  • MEDIUM (Средний): Циклы с внешними вызовами
  • LOW (Низкий): Простые ограниченные циклы

Анализ потоков данных

DataFlowTracer

Файл: src/analysis/dataflow/tracer.py Сценарии: С02, С14 (безопасность, реагирование на инциденты)

Основной анализ потоков данных через рёбра REACHING_DEF. Наследует BaseTracer (src/analysis/dataflow/base.py).

Ключевые классы

@dataclass
class DataFlowPath:
    """Путь потока данных от определения к использованию."""
    path_id: str
    variable_name: str
    source_location: Dict[str, Any]
    sink_location: Dict[str, Any]
    path_length: int
    intermediate_nodes: List[Dict[str, Any]] = field(default_factory=list)
    is_inter_procedural: bool = False
    sanitization_points: List[Dict[str, Any]] = field(default_factory=list)

@dataclass
class VariableFlow:
    """Все потоки переменной по кодовой базе."""
    variable_name: str
    definition_points: List[Dict[str, Any]] = field(default_factory=list)
    use_points: List[Dict[str, Any]] = field(default_factory=list)
    flows: List[DataFlowPath] = field(default_factory=list)

Справочник API

DataFlowTracer(cpg_service)

Инициализация с CPGQueryService.

trace_variable(variable_name: str, method_name: Optional[str] = None, max_depth: Optional[int] = None) -> VariableFlow

Отследить все потоки переменной через рёбра REACHING_DEF.

from src.analysis.dataflow.tracer import DataFlowTracer

tracer = DataFlowTracer(cpg_service)
flow = tracer.trace_variable("user_input", method_name="process_request")
print(f"Определения: {len(flow.definition_points)}, Использования: {len(flow.use_points)}")
find_reaching_definitions(node_id: int) -> List[Dict]

Найти все определения, достигающие точки использования.

find_variable_uses(node_id: int) -> List[Dict]

Найти все использования определения.

trace_inter_procedural(variable_name: str, max_depth: int = 5) -> List[DataFlowPath]

Отследить поток данных через границы функций.

Используемые таблицы БД

  • nodes_identifier — Ссылки на переменные
  • edges_reaching_def — Цепочки «определение — использование»
  • edges_argument — Рёбра аргументов функций

TypePropagator

Файл: src/analysis/dataflow/type_propagator.py Сценарии: С02 (уязвимости путаницы типов)

Отслеживает преобразования типов вдоль путей потока данных (приведения, расширения, усечения).

Ключевые классы

@dataclass
class TypeTransformation:
    """Преобразование типа в конкретной точке."""
    node_id: int
    from_type: str
    to_type: str
    transformation_kind: str  # 'cast', 'promotion', 'truncation', 'reinterpret'
    is_safe: bool
    line_number: int

@dataclass
class TypeFlow:
    """Распространение типа вдоль пути данных."""
    variable_name: str
    initial_type: str
    final_type: str
    transformations: List[TypeTransformation]
    has_unsafe_cast: bool
    has_truncation: bool

Справочник API

TypePropagator(cpg_service)
trace_type_flow(variable_name: str, method_name: Optional[str] = None) -> TypeFlow

Отследить изменения типов вдоль потока данных переменной.

find_unsafe_casts(limit: int = 100) -> List[TypeTransformation]

Найти потенциально небезопасные приведения типов.


PointerAliasAnalyzer

Файл: src/analysis/dataflow/pointer_alias.py Сценарии: С02 (use-after-free, double-free)

Анализ псевдонимов указателей для определения, какие указатели могут ссылаться на одну область памяти.

Ключевые классы

@dataclass
class AllocationSite:
    """Описывает точку выделения памяти."""
    node_id: int
    function_name: str  # malloc, calloc и т.д.
    variable_name: str
    line_number: int

Справочник API

PointerAliasAnalyzer(cpg_service)
find_aliases(variable_name: str) -> List[str]

Найти все переменные, которые могут быть псевдонимами указателя.

trace_allocation_lifetime(alloc_site: AllocationSite) -> Dict

Отследить время жизни аллокации от создания до освобождения.


PathConstraintTracker

Файл: src/analysis/dataflow/symbolic_execution.py Сценарии: С02 (проверка достижимости путей в анализе заражения)

Легковесное символьное выполнение для проверки достижимости путей через решатель Z3.

Ключевые классы

class SymbolicExecutionConfig:
    """Конфигурация движка символьного выполнения."""
    enabled: bool = True
    max_constraints: int = 20
    solver_timeout_ms: int = 500
    solver_timeout_uf_ms: int = 2000
    max_parse_depth: int = 10
    enable_function_models: bool = True
    enable_arithmetic: bool = True

@dataclass
class PathConstraint:
    """Ограничение на пути выполнения."""
    node_id: int
    condition: str
    is_true_branch: bool
    variables: List[str]

class PathConstraintTracker:
    """Отслеживание ограничений пути для анализа достижимости."""

Справочник API

PathConstraintTracker(cpg_service)
check_path_feasibility(path_nodes: List[int]) -> bool

Проверить достижимость пути данных с учётом условий ветвления.


Анализ заражения данных

TaintPropagator

Файл: src/analysis/dataflow/taint/propagator.py Сценарии: С02, С14 (анализ безопасности, реагирование на инциденты)

Основной движок анализа заражения данных. Объединяет все подмодули в единый конвейер.

Ключевые классы

@dataclass
class TaintNode:
    """Узел в пути заражения."""
    node_id: int
    name: str
    code: str
    line_number: int
    filename: str
    node_type: str

@dataclass
class TaintPath:
    """Полный путь заражения от источника к приёмнику."""
    source: TaintNode
    sink: TaintNode
    intermediate: List[TaintNode]
    path_length: int
    confidence: float
    sink_category: str
    is_sanitized: bool = False
    sanitization_point: Optional[TaintNode] = None

Справочник API

TaintPropagator(cpg_service, enable_inter_proc=True, enable_control_flow=True, enable_field_sensitive=True, enable_context_sensitive=True, enable_symbolic_execution=True)

Инициализация с CPG-сервисом и переключателями возможностей.

from src.analysis.dataflow.taint.propagator import TaintPropagator

propagator = TaintPropagator(cpg_service)
paths = propagator.find_taint_paths(
    sources=["getenv", "fgets", "read"],
    sinks=["system", "exec", "popen"]
)
for path in paths:
    print(f"Источник: {path.source.name} -> Приёмник: {path.sink.name}")
    print(f"Уверенность: {path.confidence}, Очищен: {path.is_sanitized}")
find_taint_paths(sources, sinks, max_depth=None) -> List[TaintPath]

Найти пути заражения от функций-источников к функциям-приёмникам.

analyze_sql_injections() -> List[TaintPath]

Удобный метод для обнаружения SQL-инъекций.

Подмодули

Подмодуль Файл Назначение
DataflowTracker tracker.py Основное распространение заражения (BFS)
InterProcTracker interprocedural.py Отслеживание через границы функций
FieldSensitiveTracker field_sensitive.py Заражение с учётом полей объектов
ContextSensitiveTracker context_sensitive.py Учёт контекста точки вызова
ControlFlowAnalyzer control_flow.py Анализ зависимостей по управлению

FieldSensitiveTracer

Файл: src/analysis/field_sensitive_tracer.py Сценарии: С02, С08, С14 (безопасность, соответствие, реагирование на инциденты)

Отслеживание путей полей верхнего уровня для точного анализа заражения. Различает разные поля одного объекта (например, user.password и user.name).

Примечание: Не путать с FieldSensitiveTracker (dataflow/taint/field_sensitive.py) — низкоуровневым трекером заражения, используемым внутри TaintPropagator.

Ключевые классы

@dataclass
class FieldPath:
    """Путь доступа к полю, например obj.field1.field2."""
    base_variable: str
    field_chain: List[str]
    full_path: str
    node_ids: List[int] = field(default_factory=list)
    type_full_name: Optional[str] = None

    @classmethod
    def from_code(cls, code: str) -> "FieldPath": ...
    def matches(self, other: "FieldPath") -> Tuple[bool, str]: ...

@dataclass
class FieldAccess:
    """Один доступ к полю в коде."""
    node_id: int
    base_variable: str
    field_name: str
    access_code: str
    line_number: int
    filename: str
    access_type: str = "read"  # 'read', 'write', 'call'
    containing_method: Optional[str] = None

@dataclass
class FieldSensitiveFlow:
    """Путь передачи данных с учётом полей."""
    source_path: FieldPath
    sink_path: FieldPath
    intermediate_fields: List[FieldPath]
    is_tainted: bool
    relationship: str  # 'exact', 'prefix', 'suffix', 'propagated'
    confidence: float = 1.0

Справочник API

FieldSensitiveTracer(cpg_service)

Инициализация с CPGQueryService или подключением DuckDB.

parse_field_path(code: str) -> FieldPath

Разобрать строку доступа к полю в структурированный объект FieldPath.

from src.analysis.field_sensitive_tracer import FieldSensitiveTracer

tracer = FieldSensitiveTracer(cpg_service)

# Разбор нотации указателей
path = tracer.parse_field_path("user->password")
print(path.base_variable)  # "user"
print(path.field_chain)    # ["password"]

# Разбор нотации через точку
path = tracer.parse_field_path("request.data.buffer")
print(path.full_path)  # "request.data.buffer"
get_struct_fields(type_name: str) -> List[Dict[str, Any]]

Получить поля структуры с информацией о типах.

fields = tracer.get_struct_fields("UserData")
for field in fields:
    print(f"{field['name']}: {field['type']}")
find_field_accesses(base_variable: str, field_name: Optional[str] = None) -> List[FieldAccess]

Найти все обращения к конкретному полю.

accesses = tracer.find_field_accesses("user", "password")
for access in accesses:
    print(f"{access.filename}:{access.line_number} - {access.access_type}")
find_all_field_identifiers(field_name: Optional[str] = None, limit: int = 100) -> List[Dict]

Найти все узлы идентификаторов полей, с необязательной фильтрацией по имени.

trace_field_taint(source_variable: str, source_field: Optional[str] = None, sink_patterns: Optional[List[str]] = None, max_depth: int = 10) -> List[FieldSensitiveFlow]

Отследить заражение от исходного поля к функциям-приёмникам.

flows = tracer.trace_field_taint(
    source_variable="credentials",
    source_field="password",
    sink_patterns=["printf", "log", "send"]
)
for flow in flows:
    print(f"Заражённый поток: {flow.source_path.full_path} -> {flow.sink_path.full_path}")
find_sensitive_field_flows(sensitive_fields: List[str] = None, sink_functions: List[str] = None) -> List[Dict[str, Any]]

Найти потоки данных из чувствительных полей в опасные приёмники.

# Чувствительные поля по умолчанию: password, token, secret, private_key, credential, auth
flows = tracer.find_sensitive_field_flows()
print(f"Найдено {len(flows)} потенциальных утечек чувствительных данных")

Категории чувствительных полей

Чувствительные поля, отслеживаемые по умолчанию: - password, passwd, pwd - token, auth_token, access_token - secret, api_secret, client_secret - private_key, secret_key - credential, credentials - auth, authorization

Используемые таблицы БД

  • nodes_field_identifier — Узлы доступа к полям
  • nodes_identifier — Идентификаторы переменных
  • nodes_member — Определения полей структур
  • edges_reaching_def — Рёбра достижимых определений
  • edges_argument — Рёбра аргументов функций

FieldSensitiveTracker

Файл: src/analysis/dataflow/taint/field_sensitive.py Сценарии: С02, С14 (внутреннее использование в TaintPropagator)

Низкоуровневое отслеживание заражения по полям, используемое внутри TaintPropagator. Определяет, какие конкретные поля объекта заражены, а какие чисты.

Ключевые классы

@dataclass
class FieldTaint:
    """Статус заражения одного поля."""
    field_name: str
    is_tainted: bool
    source_node: Optional[int] = None

class FieldTaintMap:
    """Карта полей объекта и их статуса заражения."""
    def set_taint(self, base_var: str, field: str, tainted: bool): ...
    def is_tainted(self, base_var: str, field: str) -> bool: ...
    def get_tainted_fields(self, base_var: str) -> List[str]: ...

ContextSensitiveTracker

Файл: src/analysis/dataflow/taint/context_sensitive.py Сценарии: С02 (повышение точности анализа заражения)

Отслеживание контекста точки вызова для предотвращения ложных срабатываний, когда одна и та же функция вызывается с разным состоянием заражения.

Справочник API

ContextSensitiveTracker(cpg_service)
track_call_context(caller_id: int, callee_id: int) -> CallContext

Создать контекст вызова для распространения заражения.


InterProcTracker

Файл: src/analysis/dataflow/taint/interprocedural.py Сценарии: С02, С14 (межпроцедурное отслеживание заражения)

Отслеживание распространения заражения через границы функций — передача параметров и возвращаемые значения.

Справочник API

InterProcTracker(cpg_service)
find_callers(method_name: str) -> List[Dict]

Найти все точки вызова метода.

map_arguments(call_site_id: int) -> Dict[int, int]

Сопоставить фактические аргументы с формальными параметрами.


Анализаторы безопасности

MemoryLifetimeAnalyzer

Файл: src/analysis/dataflow/memory_lifetime.py Сценарии: С02 (use-after-free, double-free, утечки памяти)

Отслеживание паттернов выделения/освобождения памяти для обнаружения нарушений времени жизни.

Ключевые классы

class MemoryState(Enum):
    ALLOCATED = "allocated"
    FREED = "freed"
    UNKNOWN = "unknown"

@dataclass
class MemoryOperation:
    node_id: int
    operation: str  # 'alloc', 'free', 'realloc'
    function_name: str
    variable_name: str
    line_number: int
    filename: str

@dataclass
class UseAfterFreePath:
    alloc: MemoryOperation
    free: MemoryOperation
    use: MemoryOperation
    path_length: int

@dataclass
class DoubleFreeePath:
    alloc: MemoryOperation
    first_free: MemoryOperation
    second_free: MemoryOperation

Справочник API

MemoryLifetimeAnalyzer(cpg_service)
find_use_after_free(limit: int = 50) -> List[UseAfterFreePath]

Обнаружить паттерны использования после освобождения.

find_double_free(limit: int = 50) -> List[DoubleFreeePath]

Обнаружить паттерны двойного освобождения.

find_memory_leaks(limit: int = 100) -> List[Dict]

Найти аллокации без соответствующего освобождения.


NullCheckAnalyzer

Файл: src/analysis/dataflow/null_check.py Сценарии: С02, С05 (разыменование null)

Обнаружение отсутствующих проверок на null после функций, которые могут вернуть NULL.

Ключевые классы

@dataclass
class NullCheckPath:
    function_name: str
    return_variable: str
    dereference_line: int
    dereference_file: str
    has_null_check: bool
    check_line: Optional[int] = None

Справочник API

NullCheckAnalyzer(cpg_service)
find_missing_null_checks(limit: int = 100) -> List[NullCheckPath]

Найти разыменования значений без предварительной проверки на NULL.


CodeStringTracer

Файл: src/analysis/dataflow/code_string_tracer.py Сценарии: С02 (инъекция кода)

Отслеживание паттернов конструирования строк для обнаружения инъекций (SQL, команды, eval).

Ключевые классы

@dataclass
class CodeInjectionPoint:
    function_name: str
    string_variable: str
    injection_type: str  # 'sql', 'command', 'eval'
    line_number: int
    filename: str
    user_input_source: Optional[str] = None

Справочник API

CodeStringTracer(cpg_service)
find_string_concat_injections(sink_functions: List[str], limit: int = 50) -> List[CodeInjectionPoint]

Найти паттерны конкатенации строк, попадающих в опасные приёмники.


InfoDisclosureAnalyzer

Файл: src/analysis/dataflow/info_disclosure.py Сценарии: С02 (утечка информации)

Обнаружение потоков чувствительных данных в функции вывода и логирования.

Справочник API

InfoDisclosureAnalyzer(cpg_service)
find_info_leaks(sensitive_patterns: List[str] = None, limit: int = 50) -> List[InfoDisclosurePath]

Найти потоки чувствительных данных в функции вывода.


ComplianceMapper

Файл: src/analysis/compliance.py Сценарии: С02, С08 (отчёты о соответствии)

Маппинг обнаруженных уязвимостей на стандарты безопасности (CWE, OWASP, CERT C, MISRA C).

Ключевые классы

@dataclass
class ComplianceFinding:
    vulnerability_type: str
    severity: str  # 'critical', 'high', 'medium', 'low'
    cwe_id: str
    cwe_name: str
    owasp_category: Optional[str]
    cert_rule: Optional[str]
    misra_rule: Optional[str]
    file_path: str
    line_number: int
    function_name: str
    description: str
    recommendation: str
    risk_score: float

@dataclass
class ComplianceReport:
    scan_date: str
    codebase: str
    total_findings: int
    findings_by_severity: Dict[str, int]
    findings_by_cwe: Dict[str, int]
    findings_by_owasp: Dict[str, int]
    compliance_score: float  # 0.0-1.0
    findings: List[ComplianceFinding]
    standards_used: List[str]

Справочник API

ComplianceMapper()

Инициализация (без внешних зависимостей).

map_vulnerability(vuln_type, severity, file_path, line_number, function_name, description, risk_score) -> ComplianceFinding

Сопоставить уязвимость со стандартами безопасности.

from src.analysis.compliance import ComplianceMapper

mapper = ComplianceMapper()
finding = mapper.map_vulnerability(
    vuln_type="sql_injection",
    severity="high",
    file_path="src/api.c",
    line_number=42,
    function_name="process_query",
    description="Пользовательский ввод конкатенирован в SQL-запрос",
    risk_score=8.5
)
print(f"{finding.cwe_id}: {finding.cwe_name}")
# CWE-89: Improper Neutralization of Special Elements used in an SQL Command

Анализ графа вызовов

CallGraphAnalyzer

Файл: src/analysis/callgraph/analyzer.py Сценарии: С01, С09, С12, С14 (ввод в систему, проверка изменений, кросс-репо, реагирование на инциденты)

Главный фасад, объединяющий все модули анализа графа вызовов. Делегирует специализированным подмодулям.

Архитектура

CallGraphAnalyzer
├── PathFinder          — кратчайший путь, вызывающие/вызываемые
├── CentralityAnalyzer  — PageRank, промежуточная центральность
├── ComponentAnalyzer   — компоненты SCC, WCC
├── ComplexityAnalyzer  — сложность на основе CFG
├── ImpactAnalyzer      — оценка влияния, точки входа, пути атаки
└── CrossLanguageAnalyzer — обнаружение границ FFI

Справочник API

CallGraphAnalyzer(cpg_service)
find_shortest_path(source_method: str, target_method: str, max_depth: Optional[int] = None) -> Optional[CallPath]

Найти кратчайшую цепочку вызовов между двумя методами.

from src.analysis.callgraph.analyzer import CallGraphAnalyzer

cga = CallGraphAnalyzer(cpg_service)
path = cga.find_shortest_path("handle_request", "exec_query")
if path:
    print(f"Длина пути: {path.length}")
    print(f"Методы: {' -> '.join(path.methods)}")
find_all_callers(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]

Найти все методы, вызывающие данный метод.

find_all_callees(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]

Найти все методы, вызываемые данным методом.

find_cross_language_calls(method_name=None, source_language=None, target_language=None) -> List[CrossLanguageCall]

Найти межъязыковые рёбра вызовов (границы FFI).

detect_cycles(max_cycle_length: int = 10) -> List[CallCycle]

Обнаружить циклы рекурсии через алгоритм Тарьяна (SCC).

analyze_impact(method_name: str, max_depth: Optional[int] = None) -> ImpactAnalysis

Оценить влияние изменений (транзитивные вызывающие, затронутые компоненты).

get_call_statistics() -> Dict[str, Any]

Общая статистика графа вызовов (всего методов, вызовов, разветвлённость).

find_entry_points(method_name: str, max_depth: int = 10) -> List[Dict]

Найти точки входа публичного API, способные достичь метода.

trace_attack_paths(entry_points, vuln_method, vuln_file="", vuln_line=0, max_paths=5) -> List[AttackPath]

Трассировка путей атаки от точек входа к уязвимости.

compute_pagerank(top_n: Optional[int] = None) -> List[Dict]

Вычислить оценки PageRank для ранжирования важности методов.

compute_betweenness_centrality(sample_size=None, top_n=None) -> List[Dict]

Вычислить промежуточную центральность для выявления мостовых методов.

find_hotspots(min_in_degree: int = 3, limit: int = 25) -> List[Dict]

Найти точки нагрузки (методы с большим числом вызывающих).

Ключевые модели данных

@dataclass
class CallPath:
    source: str
    target: str
    methods: List[str]
    length: int

@dataclass
class CallCycle:
    methods: List[str]
    length: int
    is_direct: bool

@dataclass
class ImpactAnalysis:
    method_name: str
    direct_callers: List[str]
    transitive_callers: List[str]
    affected_files: List[str]
    impact_score: float

@dataclass
class AttackPath:
    entry_point: str
    vulnerability: str
    path: List[str]
    risk_amplification: float

@dataclass
class CrossLanguageCall:
    caller: str
    callee: str
    caller_language: str
    callee_language: str
    call_type: str  # 'ffi', 'cgo', 'jni' и т.д.

Подмодули

Модуль Файл Ключевые методы DuckPGQ
PathFinder callgraph/pathfinding.py find_shortest_path, find_all_callers, find_all_callees Нет
PGQPathFinder callgraph/pathfinding.py Тот же API, использует синтаксис DuckPGQ MATCH Да
CentralityAnalyzer callgraph/centrality.py compute_pagerank, compute_betweenness_centrality Нет
PGQCentralityAnalyzer callgraph/centrality.py Тот же API, использует графовые запросы DuckPGQ Да
ComponentAnalyzer callgraph/components.py compute_scc, compute_wcc Нет
PGQComponentAnalyzer callgraph/components.py Тот же API, ускорение DuckPGQ Да
ComplexityAnalyzer callgraph/complexity.py compute_cyclomatic_complexity (на основе CFG) Нет
ImpactAnalyzer callgraph/impact.py analyze_impact, find_entry_points, trace_attack_paths Нет
CrossLanguageAnalyzer callgraph/cross_language.py find_cross_language_calls Нет

Анализ параллелизма

ConcurrencyAnalyzer

Файл: src/analysis/concurrency_core.py Сценарии: С16 (анализ параллелизма)

Составлен из 4 специализированных примесей. Шаблоны блокировок и разделяемой памяти загружаются из доменного плагина.

class ConcurrencyAnalyzer(
    LockAnalyzerMixin,
    RaceDetectorMixin,
    SharedAccessAnalyzerMixin,
    AtomicOperationsAnalyzerMixin,
): ...

Справочник API

ConcurrencyAnalyzer(cpg_service)

Доступ ко всем методам 4 примесей, описанных ниже.

from src.analysis.concurrency_core import ConcurrencyAnalyzer

analyzer = ConcurrencyAnalyzer(cpg_service)
races = analyzer.detect_race_conditions()
locks = analyzer.find_lock_usage(lock_type="lwlock")
stats = analyzer.get_concurrency_statistics()

Модели данных

@dataclass
class LockUsage:
    function_name: str
    lock_type: str
    lock_name: Optional[str]
    operation: str
    file_name: str
    line_number: int

@dataclass
class RaceConditionPattern:
    pattern_id: str
    pattern_type: str  # 'toctou', 'unprotected_access', 'signal_handler', 'double_check'
    affected_functions: List[str]
    shared_resource: str
    severity: str
    description: str

@dataclass
class SharedAccess:
    variable_name: str
    accessor_functions: List[str]
    access_type: str
    is_protected: bool
    protecting_lock: Optional[str]

@dataclass
class LockOrderViolation:
    violation_id: str
    lock_a: str
    lock_b: str
    function_acquiring_a_then_b: str
    function_acquiring_b_then_a: str
    risk_level: str

LockAnalyzerMixin

Файл: src/analysis/lock_analyzer.py

  • find_lock_usage(lock_type=None, function_name=None, limit=None) -> List[LockUsage]
  • detect_lock_ordering_issues(limit=None) -> List[LockOrderViolation]
  • detect_potential_deadlocks(limit=None) -> List[Dict]
  • analyze_lock_graph() -> Dict[str, Any]
  • get_lock_statistics() -> Dict[str, Any]

RaceDetectorMixin

Файл: src/analysis/race_detector.py

  • detect_race_conditions(pattern_types=None, limit=None) -> List[RaceConditionPattern]

Типы паттернов: toctou, signal_handler, unprotected_access, double_check.

SharedAccessAnalyzerMixin

Файл: src/analysis/shared_access_analyzer.py

  • analyze_shared_access(variable_pattern=None, limit=None) -> List[SharedAccess]

AtomicOperationsAnalyzerMixin

Файл: src/analysis/atomic_operations_analyzer.py

  • find_atomic_operations(limit: int = 100) -> List[Dict]
  • find_condition_variables(limit: int = 50) -> List[Dict]
  • analyze_function_concurrency(function_name: str) -> Dict
  • get_concurrency_statistics() -> Dict[str, Any]

Обнаружение дублей

ASTCloneDetector

Файл: src/analysis/clone_detector.py Сценарии: С07, С13 (рефакторинг, обнаружение дублей)

Многоуровневое обнаружение дублей на основе AST: - Type-1: Точные дубли (идентичный код) - Type-2: Переименованные дубли (изменены только идентификаторы) - Type-3: Структурные дубли (схожая структура с модификациями) - Type-4: Семантические дубли (разный код, одинаковое поведение)

Ключевые классы

@dataclass
class CloneResult:
    method1_id: int
    method1_name: str
    method1_file: str
    method2_id: int
    method2_name: str
    method2_file: str
    similarity: float
    clone_type: str  # 'exact', 'renamed', 'structural', 'semantic'
    shared_patterns: List[str] = field(default_factory=list)
    line_count1: int = 0
    line_count2: int = 0

Справочник API

ASTCloneDetector(cpg_service)
detect_clones(min_similarity=None, category=None, max_methods=None, min_lines=None) -> List[CloneResult]

Обнаружить дубли кода по кодовой базе.

from src.analysis.clone_detector import ASTCloneDetector

detector = ASTCloneDetector(cpg_service)
clones = detector.detect_clones(min_similarity=0.8, min_lines=10)
for clone in clones:
    print(f"{clone.method1_name}{clone.method2_name} ({clone.clone_type}, {clone.similarity:.0%})")
detect_clones_for_category(category: str, min_similarity: Optional[float] = None) -> List[CloneResult]

Обнаружить дубли в рамках категории (например, “null_check”, “string_operations”).

Рекомендации по производительности

  • Попарное сравнение: O(n²), где n — количество методов
  • Используйте max_methods для ограничения на больших кодовых базах
  • Используйте min_lines для пропуска тривиальных методов

Подсистема шаблонов

LLMPatternGenerator

Файл: src/analysis/patterns/llm_pattern_generator.py Сценарии: С21 (поиск шаблонов)

Генерация YAML-правил для структурного поиска шаблонов с помощью LLM из описания на естественном языке.

Ключевые классы

@dataclass
class GeneratedRule:
    """Результат генерации правила через LLM."""
    yaml_text: str
    rule_id: str
    language: str
    has_fix: bool
    validated: bool
    validation_errors: List[str] = field(default_factory=list)
    generation_attempts: int = 1

Справочник API

LLMPatternGenerator()

Инициализация (загружает язык промптов из глобального реестра).

async generate_rule(description: str, language: str, domain: Optional[str] = None, with_fix: bool = True, max_retries: int = 3, examples: Optional[List[str]] = None) -> GeneratedRule

Сгенерировать YAML-правило из описания на естественном языке. Асинхронный метод.

from src.analysis.patterns.llm_pattern_generator import LLMPatternGenerator

generator = LLMPatternGenerator()
rule = await generator.generate_rule(
    description="Найти вызовы malloc без соответствующего free",
    language="c",
    domain="linux_kernel"
)
print(rule.rule_id)        # напр., "malloc-without-free"
print(rule.validated)      # True, если прошёл gocpg validate-rule
print(rule.yaml_text)      # Полное содержимое YAML-правила

PatternTaintBridge

Файл: src/analysis/patterns/taint_bridge.py Сценарии: С02, С21 (безопасность, поиск шаблонов)

Мост между результатами структурного поиска шаблонов и анализом заражения. Обогащает находки информацией о потоках данных.

Справочник API

PatternTaintBridge(cpg_service, taint_propagator=None)

Инициализация с CPG-сервисом и необязательным TaintPropagator (откат к легковесному SQL BFS).

async enrich_findings_with_taint(findings: List[Dict]) -> List[Dict]

Добавить информацию о путях заражения к находкам шаблонов. Асинхронный метод.

from src.analysis.patterns.taint_bridge import PatternTaintBridge

bridge = PatternTaintBridge(cpg_service, taint_propagator)
enriched = await bridge.enrich_findings_with_taint(pattern_findings)

for finding in enriched:
    if finding.get("taint_enriched"):
        print(f"Находка {finding['rule_id']} имеет {len(finding['taint_paths'])} путей заражения")

Подсистема автоисправлений

AutofixEngine

Файл: src/analysis/autofix/engine.py Сценарии: С02 (автоматическая генерация исправлений безопасности)

Оркестрация генерации исправлений: сначала шаблоны, затем откат к LLM, затем валидация.

Конвейер

TaintPath → Разбор расположения → Чтение исходников → Определение типа уязвимости
  → Попытка шаблонного исправления → (если нет) Откат к LLM → Валидация diff → AutofixResult

Ключевые классы

@dataclass
class AutofixResult:
    fix: FixSuggestion
    strategy: str  # "template" или "llm"
    validated: bool
    validation: Optional[ValidationResult] = None
    taint_path: Optional[TaintPath] = None
    cwe_id: str = ""

Справочник API

AutofixEngine(source_root: str = "", dry_run: bool = True)

Инициализация с корневой директорией исходников. dry_run=True генерирует diff без применения.

generate_fixes(taint_paths: List[TaintPath], vulnerability_type: str = "") -> List[AutofixResult]

Сгенерировать исправления для списка путей заражения.

from src.analysis.autofix.engine import AutofixEngine

engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths, vulnerability_type="sql_injection")
for result in results:
    print(f"Стратегия: {result.strategy}, Валидирован: {result.validated}")
    print(f"Diff:\n{result.fix.diff_patch}")

AutofixGenerator

Файл: src/analysis/autofix/generator.py

Генерация исправлений на основе шаблонов с использованием регулярных выражений для известных типов уязвимостей.

@dataclass
class FixSuggestion:
    vulnerability_type: str
    severity: str
    file_path: str
    line_number: int
    original_code: str
    fixed_code: str
    explanation: str
    confidence: float  # 0.0-1.0
    diff_patch: str     # Формат unified diff

@dataclass
class FixTemplate:
    vuln_type: str
    name: str
    description: str
    pattern: str      # Регулярное выражение
    replacement: str  # Шаблон замены
    explanation: str
  • AutofixGenerator() — загружает шаблоны из доменного плагина
  • generate_fix(code, vuln_type, context) -> Optional[FixSuggestion]

DiffValidator

Файл: src/analysis/autofix/diff_validator.py

Валидация безопасности применения сгенерированных исправлений.

@dataclass
class ValidationResult:
    valid: bool
    error: Optional[str] = None
    patched_content: Optional[str] = None
  • DiffValidator()MAX_CHANGE_RATIO = 0.5
  • validate(original_code, fixed_code, file_path, source_root) -> ValidationResult

PromptBuilder

Файл: src/analysis/autofix/prompt_builder.py

Построение промптов для LLM при генерации исправлений, когда шаблоны не подходят.

@dataclass
class AutofixPromptContext:
    vulnerable_code: str
    vulnerability_type: str
    cwe_id: str
    taint_path_summary: str
    language: str
  • PromptBuilder() — использует config/prompts/autofix/
  • build_prompt(context: AutofixPromptContext) -> str

SSRAutofixBridge

Файл: src/analysis/autofix/ssr_bridge.py

Мост между движком автоисправлений и структурным поиском и заменой (SSR) GoCPG.

  • SSRAutofixBridge(gocpg_client)
  • apply_ssr_rule(rule_id: str, file_path: str) -> Optional[FixSuggestion]

Анализаторы проверки изменений

Дополнительные анализаторы в src/patch_review/analyzers/, расширяющие базовые модули для оценки влияния изменений.

PatchCallGraphAnalyzer

Файл: src/patch_review/analyzers/call_graph_analyzer.py Сценарии: С09 (проверка изменений)

Анализ влияния изменений на граф вызовов — радиус поражения, ломающие изменения, эффект волны.

@dataclass
class CallGraphNode:
    method_name: str
    full_name: str
    filename: str
    callers: List[str] = field(default_factory=list)
    callees: List[str] = field(default_factory=list)
    is_changed: bool = False
    change_type: Optional[ChangeType] = None
  • PatchCallGraphAnalyzer(conn, delta_cpg=None)
  • analyze_call_graph_changes(patch, delta_cpg) -> CallGraphAnalysisResult
  • compute_blast_radius(changed_methods) -> BlastRadius
  • detect_breaking_changes(changed_methods) -> List[BreakingChange]
  • compute_ripple_effect(changed_methods) -> RippleEffect

PatchDataFlowAnalyzer

Файл: src/patch_review/analyzers/dataflow_analyzer.py Сценарии: С09 (проверка изменений)

Анализ влияния изменений на потоки данных — новые пути заражения, обход санитизации, потоки чувствительных данных.

@dataclass
class DataFlowChange:
    change_type: str
    source_method: str
    affected_variable: str
    severity: Severity
  • PatchDataFlowAnalyzer(conn, delta_cpg=None)
  • analyze_dataflow_changes(patch, delta_cpg) -> DataFlowAnalysisResult

PatchDependencyAnalyzer

Файл: src/patch_review/analyzers/dependency_analyzer.py Сценарии: С09 (проверка изменений)

Анализ влияния изменений на зависимости модулей, импорты и архитектурную связность.

class DependencyChangeType(Enum):
    ADDED = "added"
    REMOVED = "removed"
    MODIFIED = "modified"
    CIRCULAR_INTRODUCED = "circular_introduced"
    LAYER_VIOLATION = "layer_violation"

@dataclass
class DependencyChange:
    change_type: DependencyChangeType
    source_module: str
    target_module: str
    source_file: str

@dataclass
class CircularDependency:
    cycle_path: List[str]
    introduced_edge: Tuple[str, str]
    severity: Severity

@dataclass
class LayerViolation:
    source_layer: str
    target_layer: str
    source_module: str
    target_module: str

@dataclass
class CouplingMetrics:
    afferent_coupling: int
    efferent_coupling: int
    instability: float

@dataclass
class DependencyAnalysisResult:
    dependency_changes: List[DependencyChange]
    circular_dependencies: List[CircularDependency]
    layer_violations: List[LayerViolation]
    coupling_metrics: Dict[str, CouplingMetrics]
  • PatchDependencyAnalyzer(conn, delta_cpg=None)
  • analyze_dependency_changes(patch, delta_cpg) -> DependencyAnalysisResult

Дополнительные модули потоков данных

RaceConditionAnalyzer (dataflow)

Файл: src/analysis/dataflow/race_condition.py Сценарии: С02, С16 (безопасность, параллелизм)

Обнаружение уязвимостей TOCTOU и гонок данных через анализ потоков данных с проверкой зависимостей по управлению.

@dataclass
class RaceConditionPath:
    race_type: str  # 'toctou', 'data_race', 'missing_lock'
    check_location: str
    check_function: str
    use_location: str
    use_function: str
    shared_resource: str
    has_lock: bool
    lock_function: Optional[str]
    path_nodes: List[TaintNode]
    confidence: float
    risk_score: float
  • RaceConditionAnalyzer(cpg_service)
  • analyze(max_paths=100, max_hops=None, min_confidence=0.7) -> List[RaceConditionPath]

Модули моделей данных

Следующие модули определяют общие модели данных, используемые подпакетами анализа:

Модуль Файл Ключевые классы
Модели потоков данных src/analysis/dataflow/models.py DataFlowPath, VariableFlow
Модели заражения src/analysis/dataflow/taint/models.py TaintNode, TaintPath, ControlDependency, CallContext
Модели параллелизма src/analysis/concurrency_dataclasses.py LockUsage, RaceConditionPattern, SharedAccess, LockOrderViolation
Модели графа вызовов src/analysis/callgraph/models.py CallPath, CallCycle, ImpactAnalysis, AttackPath, CrossLanguageCall
Базовый класс графа src/analysis/callgraph/base.py BaseAnalyzer (абстрактная база для всех подмодулей callgraph)
Конфигурация сложности src/analysis/callgraph/complexity.py ComplexityAnalyzer, DefaultThresholds

Справочник схемы базы данных

Основные таблицы для анализа

Таблица Назначение
nodes_method Определения методов
nodes_control_structure Структуры управления (if, for, while)
nodes_field_identifier Выражения доступа к полям
nodes_identifier Ссылки на переменные
nodes_call Точки вызова функций
nodes_return Операторы возврата
nodes_member Определения полей структур
edges_cfg Рёбра графа потока управления
edges_contains Отношения вложенности
edges_dominate Отношения доминирования
edges_post_dominate Отношения пост-доминирования
edges_reaching_def Рёбра достижимых определений
edges_argument Рёбра аргументов функций
edges_call Рёбра графа вызовов

Типы рёбер для анализа потока данных

Тип ребра Таблица Назначение
CFG edges_cfg Поток управления между операторами
REACHING_DEF edges_reaching_def Цепочки «определение — использование»
ARGUMENT edges_argument Аргументы вызова функций
CONTAINS edges_contains Вложенность областей видимости
CALL edges_call Связи вызовов функций
DOMINATE edges_dominate Доминирование для зависимостей по управлению

Устранение неполадок

«Метод не найден»

Имя метода должно совпадать точно (с учётом регистра). Используйте простое имя, а не полное квалифицированное.

# Правильно
cfg = analyzer.get_method_cfg("heap_insert")

# Неправильно
cfg = analyzer.get_method_cfg("heap_insert(Relation, HeapTuple)")

«Данные CFG не найдены»

Убедитесь, что экспорт CPG включал рёбра CFG. Проверьте:

SELECT COUNT(*) FROM edges_cfg;

«Обращения к полям не найдены»

Для отслеживания обращений к полям необходимы данные nodes_field_identifier:

SELECT COUNT(*) FROM nodes_field_identifier;

«Пути заражения не найдены»

Проверьте наличие рёбер REACHING_DEF и функций-источников/приёмников:

SELECT COUNT(*) FROM edges_reaching_def;
SELECT DISTINCT name FROM nodes_call WHERE name IN ('system', 'exec', 'popen');

Рекомендации по производительности

  • Перечисление путей ограничено max_paths и max_depth для предотвращения экспоненциального роста
  • Используйте max_depth для ограничения глубины отслеживания заражения (по умолчанию из конфигурации)
  • Обнаружение дублей имеет сложность O(n²) — используйте max_methods на больших кодовых базах
  • Варианты с DuckPGQ (PGQ* классы) быстрее для обхода графов
  • В больших методах может быть много путей; рассмотрите выборочный анализ

Движок анализа методов

Модуль: src/analysis/explain.py

Комплексный движок анализа методов, объединяющий метрики CPG, данные графа вызовов, информацию о заражении и доменный контекст в единый результат. Используется MCP-инструментом codegraph_explain и CLI.

ExplainResult

Агрегированный результат анализа метода (dataclass).

Поле Тип Описание
method_name str Краткое имя метода
full_name str Полное квалифицированное имя
file_path str Путь к исходному файлу
line_start int Начальная строка
line_end int Конечная строка
line_count int Количество строк
signature str Сигнатура метода (первая строка кода)
cyclomatic_complexity int Оценка цикломатической сложности
risk_level str Уровень риска: low (<10), moderate (10–19), high (20–49), critical (≥50)
fan_in int Количество вызывающих методов
fan_out int Количество вызываемых методов
direct_callers List[str] Имена прямых вызывающих методов
transitive_caller_count int Общее количество транзитивных вызывающих до глубины depth
direct_callees List[str] Имена прямых вызываемых методов
is_taint_source bool Метод является источником заражения (из доменного плагина)
is_taint_sink bool Метод является приёмником заражения (из доменного плагина)
taint_paths_through int Пути заражения, проходящие через метод
subsystem str Имя подсистемы домена
pattern_flags Dict[str, bool] Флаги: has_todo_fixme, has_deprecated, has_debug_code
docstring str Строка документации метода

ExplainAnalyzer

from src.analysis.explain import ExplainAnalyzer, ExplainResult

analyzer = ExplainAnalyzer(db_path="project.duckdb")
result = analyzer.collect("process_data", depth=2)

Конструктор: ExplainAnalyzer(db_path: Optional[str] = None)

Методы:

Метод Возвращает Описание
collect(method_name, depth=2) Optional[ExplainResult] Собрать комплексные данные анализа для метода
to_dict(result) Dict[str, Any] Преобразовать ExplainResult в словарь для JSON-вывода
fuzzy_search(method_name, limit=5) List[str] Нечёткий поиск имён методов при неточном совпадении

Анализ конфигурационных параметров

Модуль: src/analysis/config_analyzer.py

Обнаруживает неиспользуемые, отсутствующие и несогласованные конфигурационные параметры путём перекрёстной проверки YAML-конфигурации, схемы Pydantic/dataclass и использования в исходном коде. Выполняет 4-этапный конвейер: извлечение → перекрёстная проверка → фильтрация ложных срабатываний → отчёт.

Используется: S12 (технический долг) через OrphanConfigHandler, CLI dogfood config-check.

ConfigOrphanAnalyzer

from src.analysis.config_analyzer import ConfigOrphanAnalyzer, AnalyzerConfig

analyzer = ConfigOrphanAnalyzer(AnalyzerConfig())
orphans = analyzer.scan()
summary = analyzer.get_summary(orphans)

Конструктор: ConfigOrphanAnalyzer(config: Optional[AnalyzerConfig] = None)

Методы:

Метод Возвращает Описание
scan() List[OrphanFinding] Запуск полного конвейера обнаружения (4 этапа)
get_summary(orphans) Dict[str, Any] Генерация сводной статистики по типу и серьёзности

Этапы обнаружения: 1. Извлечение — разбор ключей YAML (YAMLExtractor), полей dataclass из схемы (ASTExtractor), ссылок в коде (CodeScanner) 2. Перекрёстная проверка — сравнение определённых и используемых параметров 3. Фильтрация ЛС — удаление ложных срабатываний (переменные окружения, динамический доступ, известные шаблоны) 4. Отчёт — формирование списка OrphanFinding

OrphanFinding

Датакласс, представляющий одно обнаружение конфигурационного параметра-сироты.

Поле Тип Описание
orphan_type str Один из: yaml_unused, yaml_missing, code_orphan, path_mismatch, orphaned_dataclass, unused_default
severity str error, warning или info
param_name str Путь параметра через точку (например, timeouts.http_client)
description str Описание на естественном языке
file_path str Файл, в котором обнаружена проблема
line_number int Номер строки в файле
suggestion str Предложение по исправлению (если есть)
metadata Dict[str, Any] Дополнительный контекст

AnalyzerConfig

Датакласс конфигурации анализатора (не привязан к домену).

Поле Тип По умолчанию Описание
config_files List[str] ["config.yaml"] YAML-файлы конфигурации для анализа
schema_files List[str] ["src/config/unified_config.py"] Файлы схемы Pydantic/dataclass
source_dirs List[str] ["src/"] Каталоги для сканирования использования конфигурации
exclude_dirs List[str] ["tests/", "scripts/", "docs/"] Каталоги, исключённые из сканирования
test_dirs List[str] ["tests/"] Каталоги тестов (сканируются отдельно)
root_dir Optional[str] None Корень проекта (определяется автоматически)
access_patterns List[str] список регулярных выражений Шаблоны типизированного доступа (cfg.section.field)
raw_access_patterns List[str] список регулярных выражений Шаблоны прямого доступа к словарю (_raw.get("key"))
dynamic_patterns List[str] список регулярных выражений Шаблоны динамического доступа (исключаются из обнаружения)

YAMLExtractor

Извлекает все конечные ключи и ссылки на переменные окружения из YAML-файлов конфигурации.

Метод Возвращает Описание
extract(config_path) Tuple[Dict[str, Any], Set[str]] Возвращает (словарь ключей, множество переменных окружения)

ASTExtractor

Разбирает файлы схемы Python через AST для извлечения полей dataclass и определений @property.

Метод Возвращает Описание
extract(schema_path) Tuple[Dict, Dict, Set] Возвращает (dataclass_fields, property_map, property_names)

CodeScanner

Сканирует исходные файлы Python на предмет шаблонов доступа к конфигурации с контекстно-зависимым отслеживанием переменных cfg. Различает переменные, содержащие полный объект UnifiedConfig и подсвойства.

Метод Возвращает Описание
scan(source_dirs, exclude_dirs) Tuple[Dict, Set, bool] Возвращает (ссылки, raw_keys, has_dynamic_access)

Покомпонентная конфигурация анализа

Модуль: src/analysis/scope_config.py

Позволяет выбирать разные правила анализа для разных частей проекта, например для main, tests, vendor и других групп файлов.

AnalysisScope

Датакласс, описывающий один именованный скоуп анализа.

Поле Тип Описание
name str Имя скоупа
paths list[str] Глоб-шаблоны путей файлов
rules str \| list[str] Режим правил: all, critical_only, taint_only или явный список правил
severity_threshold str Минимальный уровень серьёзности
suppress_policy str Идентификатор политики подавления
enabled bool Включён ли скоуп
Метод Возвращает Описание
matches(file_path) bool Проверяет, относится ли файл к этому скоупу
to_dict() dict Сериализует скоуп

ScopeConfig

Датакласс, который хранит все скоупы анализа проекта.

Поле Тип Описание
scopes list[AnalysisScope] Упорядоченный список доступных скоупов
default_scope str Имя скоупа по умолчанию
Метод Возвращает Описание
from_yaml(path) ScopeConfig Загружает правила из YAML
from_config_dict(config_dict) ScopeConfig Строит конфигурацию из словаря
resolve_scope(file_path) AnalysisScope \| None Определяет эффективный скоуп для файла
should_report(file_path, severity, rule_id) bool Решает, нужно ли показывать finding для этого файла
to_dict() dict Сериализует всю конфигурацию

Смотрите также