Полная документация по модулям анализа кода в src/analysis/.
Последнее обновление: 2026-03-07
Содержание¶
- Обзор
- Архитектура модулей
- Карта модулей и сценариев
- Анализ потока управления
- CFGAnalyzer
- UnreachableCodeDetector
- PatchControlFlowAnalyzer
- Анализ потоков данных
- DataFlowTracer
- TypePropagator
- PointerAliasAnalyzer
- PathConstraintTracker
- Анализ заражения данных
- TaintPropagator
- FieldSensitiveTracer
- FieldSensitiveTracker
- ContextSensitiveTracker
- InterProcTracker
- Анализаторы безопасности
- MemoryLifetimeAnalyzer
- NullCheckAnalyzer
- CodeStringTracer
- InfoDisclosureAnalyzer
- ComplianceMapper
- Анализ графа вызовов
- CallGraphAnalyzer
- Анализ параллелизма
- ConcurrencyAnalyzer
- LockAnalyzerMixin
- RaceDetectorMixin
- SharedAccessAnalyzerMixin
- AtomicOperationsAnalyzerMixin
- Обнаружение дублей
- ASTCloneDetector
- Подсистема шаблонов
- LLMPatternGenerator
- PatternTaintBridge
- Подсистема автоисправлений
- AutofixEngine
- AutofixGenerator
- DiffValidator
- PromptBuilder
- SSRAutofixBridge
- Анализаторы проверки изменений
- PatchCallGraphAnalyzer
- PatchDataFlowAnalyzer
- PatchDependencyAnalyzer
- Дополнительные модули потоков данных
- RaceConditionAnalyzer (dataflow)
- Модули моделей данных
- Справочник схемы базы данных
- Устранение неполадок
- Движок анализа методов
- ExplainResult
- ExplainAnalyzer
- Анализ конфигурационных параметров
- ConfigOrphanAnalyzer
- OrphanFinding
- AnalyzerConfig
- YAMLExtractor
- ASTExtractor
- CodeScanner
- Покомпонентная конфигурация анализа
- AnalysisScope
- ScopeConfig
- Смотрите также
Обзор¶
Модули анализа обеспечивают расширенные возможности статического анализа на основе CPG (графа свойств кода), хранящегося в DuckDB. Организация по подпакетам:
src/analysis/
├── cfg_analyzer.py # Граф потока управления
├── cfg_unreachable.py # Обнаружение недостижимого кода
├── clone_detector.py # Обнаружение дублей кода
├── compliance.py # Маппинг стандартов безопасности
├── explain.py # Движок анализа методов
├── concurrency_core.py # Анализ параллелизма (4 примеси)
├── field_sensitive_tracer.py # Анализ заражения с учётом полей
├── callgraph/ # Анализ графа вызовов (8 модулей)
├── dataflow/ # Анализ потоков данных (12 модулей)
│ └── taint/ # Механизм распространения заражения (7 модулей)
├── autofix/ # Автоматическая генерация исправлений (5 модулей)
└── patterns/ # Подсистема шаблонов (2 модуля)
Архитектура модулей¶
graph TD
subgraph "Поток управления"
CFG[CFGAnalyzer] --> PatchCF[PatchControlFlowAnalyzer]
CFG --> Unreachable[UnreachableCodeDetector]
end
subgraph "Потоки данных"
DFBase[BaseTracer] --> DFTracer[DataFlowTracer]
DFBase --> TypeProp[TypePropagator]
DFBase --> PtrAlias[PointerAliasAnalyzer]
end
subgraph "Анализ заражения"
TaintProp[TaintPropagator] --> FieldSensTracker[FieldSensitiveTracker]
TaintProp --> ContextSens[ContextSensitiveTracker]
TaintProp --> InterProc[InterProcTracker]
TaintProp --> DFTracker[DataflowTracker]
TaintProp --> SymExec[PathConstraintTracker]
DFTracer --> FieldTracer[FieldSensitiveTracer]
end
subgraph "Граф вызовов"
CGA[CallGraphAnalyzer] --> PathFind[PathFinder]
CGA --> Impact[ImpactAnalyzer]
CGA --> Centrality[CentralityAnalyzer]
CGA --> Components[ComponentAnalyzer]
CGA --> CrossLang[CrossLanguageAnalyzer]
CGA --> Complexity[ComplexityAnalyzer]
end
subgraph "Шаблоны + Автоисправления"
Patterns[LLMPatternGenerator] --> Bridge[PatternTaintBridge]
Bridge --> TaintProp
AutofixEng[AutofixEngine] --> AutofixGen[AutofixGenerator]
AutofixEng --> DiffVal[DiffValidator]
AutofixEng --> SSR[SSRAutofixBridge]
end
Карта модулей и сценариев¶
| Модуль | Подпакет | Тип | Ключевые сценарии | DuckPGQ |
|---|---|---|---|---|
| CFGAnalyzer | корень | анализатор | С05, С06, С13 (рефакторинг, производительность) | Нет |
| UnreachableCodeDetector | корень | анализатор | С05, С13 (мёртвый код) | Нет |
| ASTCloneDetector | корень | анализатор | С07, С13 (рефакторинг) | Нет |
| ComplianceMapper | корень | маппер | С02, С08 (безопасность, соответствие) | Нет |
| ConcurrencyAnalyzer | корень | анализатор | С16 (параллелизм) | Нет |
| FieldSensitiveTracer | корень | анализатор | С02, С08, С14 (безопасность) | Нет |
| DataFlowTracer | dataflow | фасад | С02, С14 (безопасность, инциденты) | Нет |
| TypePropagator | dataflow | анализатор | С02 (путаница типов) | Нет |
| PointerAliasAnalyzer | dataflow | анализатор | С02 (use-after-free) | Нет |
| MemoryLifetimeAnalyzer | dataflow | анализатор | С02, С14 (безопасность памяти) | Нет |
| NullCheckAnalyzer | dataflow | анализатор | С02, С05 (разыменование null) | Нет |
| CodeStringTracer | dataflow | анализатор | С02 (инъекция кода) | Нет |
| InfoDisclosureAnalyzer | dataflow | анализатор | С02 (утечка информации) | Нет |
| TaintPropagator | dataflow/taint | движок | С02, С14 (безопасность) | Нет |
| CallGraphAnalyzer | callgraph | фасад | С01, С12, С14 (ввод в систему, кросс-репо) | Да |
| PathFinder | callgraph | анализатор | С01, С14 (цепочки вызовов) | Да |
| CentralityAnalyzer | callgraph | анализатор | С05, С12 (точки нагрузки) | Да |
| ComponentAnalyzer | callgraph | анализатор | С12, С13 (компоненты) | Да |
| ImpactAnalyzer | callgraph | анализатор | С09, С14 (оценка влияния) | Нет |
| CrossLanguageAnalyzer | callgraph | анализатор | С12 (кросс-репо) | Нет |
| LLMPatternGenerator | patterns | генератор | С21 (поиск шаблонов) | Нет |
| PatternTaintBridge | patterns | мост | С02, С21 (безопасность) | Нет |
| AutofixEngine | autofix | движок | С02 (автоисправления) | Нет |
| PatchControlFlowAnalyzer | patch_review | анализатор | С09 (проверка изменений) | Нет |
Анализ потока управления¶
CFGAnalyzer¶
Файл: src/analysis/cfg_analyzer.py
Сценарии: С05, С06, С13 (рефакторинг, производительность, массовый рефакторинг)
Анализ на основе графа потока управления (CFG) с использованием таблицы edges_cfg.
Ключевые классы¶
@dataclass
class CFGStructure:
"""Описывает структуру CFG метода."""
method_name: str
method_full_name: str
nodes: List[int]
edges: List[Tuple[int, int]] # пары (источник, назначение)
entry_nodes: List[int]
exit_nodes: List[int]
node_count: int
edge_count: int
@dataclass
class CFGPath:
"""Описывает путь выполнения в CFG."""
path_id: str
nodes: List[int]
length: int
has_loop: bool = False
Справочник API¶
CFGAnalyzer(cpg_service)¶
Инициализация с экземпляром CPGQueryService или подключением DuckDB.
get_method_cfg(method_name: str) -> Optional[CFGStructure]¶
Получить структуру CFG для метода.
from src.analysis.cfg_analyzer import CFGAnalyzer
analyzer = CFGAnalyzer(cpg_service)
cfg = analyzer.get_method_cfg("heap_insert")
print(f"Узлы: {cfg.node_count}, Рёбра: {cfg.edge_count}")
compute_cyclomatic_complexity(method_name: str) -> int¶
Вычислить цикломатическую сложность Маккейба: M = E - N + 2
complexity = analyzer.compute_cyclomatic_complexity("heap_insert")
print(f"Сложность: {complexity}") # например, 15
enumerate_paths(method_name: str, max_paths: int = 100, max_depth: int = 50) -> List[CFGPath]¶
Найти пути выполнения в CFG с обнаружением циклов.
paths = analyzer.enumerate_paths("process_query", max_paths=50)
for path in paths:
print(f"Путь {path.path_id}: {path.length} узлов, цикл={path.has_loop}")
find_dominators(method_name: str) -> Dict[int, Set[int]]¶
Построить дерево доминаторов с использованием таблицы edges_dominate.
find_post_dominators(method_name: str) -> Dict[int, Set[int]]¶
Построить дерево пост-доминаторов с использованием таблицы edges_post_dominate.
get_cfg_successors(node_id: int) -> List[int]¶
Получить узлы-наследники в CFG.
get_cfg_predecessors(node_id: int) -> List[int]¶
Получить узлы-предшественники в CFG.
get_control_flow_paths(source_node: int, sink_node: int, max_depth: int = 20) -> List[List[int]]¶
Найти все пути между двумя узлами CFG.
analyze_complexity_distribution(threshold: int = 10) -> Dict[str, Any]¶
Проанализировать распределение сложности по всем методам.
dist = analyzer.analyze_complexity_distribution()
print(f"Средняя сложность: {dist['average']}")
print(f"Методы с высокой сложностью: {dist['high_complexity_methods']}")
Используемые таблицы БД¶
nodes_method— Метаданные методовedges_contains— Связи методов с узламиedges_cfg— Рёбра графа потока управленияedges_dominate— Отношения доминированияedges_post_dominate— Отношения пост-доминирования
Рекомендации по производительности¶
- Перечисление путей ограничено
max_paths(по умолчанию 100) иmax_depth(по умолчанию 50) - Распределение сложности сканирует все методы — может быть медленным на больших кодовых базах (>50K методов)
UnreachableCodeDetector¶
Файл: src/analysis/cfg_unreachable.py
Сценарии: С05, С13 (обнаружение мёртвого кода)
Обнаружение недостижимого кода через анализ CFG — код после return, exit(), функций noreturn.
Ключевые классы¶
@dataclass
class UnreachableCodeFinding:
"""Описывает находку недостижимого кода."""
method_name: str
method_id: int
filename: str
line_number: int
terminating_type: str # 'return', 'exit_call', 'error_call', 'noreturn'
terminating_node_id: int
terminating_line: int
unreachable_node_id: int
unreachable_code: Optional[str] = None
confidence: float = 0.9
Справочник API¶
UnreachableCodeDetector(cpg_service)¶
Инициализация с экземпляром CPGQueryService.
detect_unreachable_code() -> List[UnreachableCodeFinding]¶
Найти все паттерны недостижимого кода.
from src.analysis.cfg_unreachable import UnreachableCodeDetector
detector = UnreachableCodeDetector(cpg_service)
findings = detector.detect_unreachable_code()
for f in findings:
print(f"{f.filename}:{f.line_number} — недостижимый код после {f.terminating_type}")
Используемые таблицы БД¶
edges_cfg— Рёбра CFG для проверки достижимости потомковnodes_method— Метаданные методовnodes_return— Узлы операторов возврата
PatchControlFlowAnalyzer¶
Файл: src/patch_review/analyzers/control_flow_analyzer.py
Сценарии: С09 (проверка изменений)
Анализирует влияние изменений на поток управления с использованием CFGAnalyzer.
Ключевые классы¶
@dataclass
class NewLoopFinding:
method_name: str
loop_type: str
line_number: int
is_nested: bool
has_io: bool
is_unbounded: bool
severity: Severity # HIGH, MEDIUM, LOW
details: str
@dataclass
class ErrorHandlingChange:
method_name: str
change_type: str
error_type: str
line_number: int
details: str
@dataclass
class BranchCoverageImpact:
new_branches: int
removed_branches: int
net_change: int
methods_with_new_branches: List[str]
uncovered_paths: List[Dict[str, Any]]
Справочник API¶
PatchControlFlowAnalyzer(conn, delta_cpg=None)¶
Инициализация с подключением DuckDB и необязательным DeltaCPG.
analyze_control_flow_changes(patch, delta_cpg) -> ControlFlowAnalysisResult¶
Полный анализ потока управления для изменений.
from src.patch_review.analyzers.control_flow_analyzer import PatchControlFlowAnalyzer
analyzer = PatchControlFlowAnalyzer(conn)
result = analyzer.analyze_control_flow_changes(patch, delta_cpg)
print(f"Дельта сложности: {result.complexity_delta}")
print(f"Новые циклы: {len(result.new_loops)}")
analyze_complexity_change(changed_methods) -> List[ComplexityDelta]¶
Расчёт сложности до/после для изменённых методов.
detect_new_loops(changed_methods) -> List[NewLoopFinding]¶
Обнаружение вновь введённых циклов с классификацией рисков.
analyze_error_handling_changes(changed_methods) -> List[ErrorHandlingChange]¶
Отслеживание изменений в обработке ошибок.
analyze_branch_coverage_impact(changed_methods) -> BranchCoverageImpact¶
Оценка влияния на покрытие ветвлений.
Классификация критичности циклов¶
- HIGH (Высокий): Вложенные циклы, циклы с операциями ввода-вывода, неограниченные циклы
- MEDIUM (Средний): Циклы с внешними вызовами
- LOW (Низкий): Простые ограниченные циклы
Анализ потоков данных¶
DataFlowTracer¶
Файл: src/analysis/dataflow/tracer.py
Сценарии: С02, С14 (безопасность, реагирование на инциденты)
Основной анализ потоков данных через рёбра REACHING_DEF. Наследует BaseTracer (src/analysis/dataflow/base.py).
Ключевые классы¶
@dataclass
class DataFlowPath:
"""Путь потока данных от определения к использованию."""
path_id: str
variable_name: str
source_location: Dict[str, Any]
sink_location: Dict[str, Any]
path_length: int
intermediate_nodes: List[Dict[str, Any]] = field(default_factory=list)
is_inter_procedural: bool = False
sanitization_points: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class VariableFlow:
"""Все потоки переменной по кодовой базе."""
variable_name: str
definition_points: List[Dict[str, Any]] = field(default_factory=list)
use_points: List[Dict[str, Any]] = field(default_factory=list)
flows: List[DataFlowPath] = field(default_factory=list)
Справочник API¶
DataFlowTracer(cpg_service)¶
Инициализация с CPGQueryService.
trace_variable(variable_name: str, method_name: Optional[str] = None, max_depth: Optional[int] = None) -> VariableFlow¶
Отследить все потоки переменной через рёбра REACHING_DEF.
from src.analysis.dataflow.tracer import DataFlowTracer
tracer = DataFlowTracer(cpg_service)
flow = tracer.trace_variable("user_input", method_name="process_request")
print(f"Определения: {len(flow.definition_points)}, Использования: {len(flow.use_points)}")
find_reaching_definitions(node_id: int) -> List[Dict]¶
Найти все определения, достигающие точки использования.
find_variable_uses(node_id: int) -> List[Dict]¶
Найти все использования определения.
trace_inter_procedural(variable_name: str, max_depth: int = 5) -> List[DataFlowPath]¶
Отследить поток данных через границы функций.
Используемые таблицы БД¶
nodes_identifier— Ссылки на переменныеedges_reaching_def— Цепочки «определение — использование»edges_argument— Рёбра аргументов функций
TypePropagator¶
Файл: src/analysis/dataflow/type_propagator.py
Сценарии: С02 (уязвимости путаницы типов)
Отслеживает преобразования типов вдоль путей потока данных (приведения, расширения, усечения).
Ключевые классы¶
@dataclass
class TypeTransformation:
"""Преобразование типа в конкретной точке."""
node_id: int
from_type: str
to_type: str
transformation_kind: str # 'cast', 'promotion', 'truncation', 'reinterpret'
is_safe: bool
line_number: int
@dataclass
class TypeFlow:
"""Распространение типа вдоль пути данных."""
variable_name: str
initial_type: str
final_type: str
transformations: List[TypeTransformation]
has_unsafe_cast: bool
has_truncation: bool
Справочник API¶
TypePropagator(cpg_service)¶
trace_type_flow(variable_name: str, method_name: Optional[str] = None) -> TypeFlow¶
Отследить изменения типов вдоль потока данных переменной.
find_unsafe_casts(limit: int = 100) -> List[TypeTransformation]¶
Найти потенциально небезопасные приведения типов.
PointerAliasAnalyzer¶
Файл: src/analysis/dataflow/pointer_alias.py
Сценарии: С02 (use-after-free, double-free)
Анализ псевдонимов указателей для определения, какие указатели могут ссылаться на одну область памяти.
Ключевые классы¶
@dataclass
class AllocationSite:
"""Описывает точку выделения памяти."""
node_id: int
function_name: str # malloc, calloc и т.д.
variable_name: str
line_number: int
Справочник API¶
PointerAliasAnalyzer(cpg_service)¶
find_aliases(variable_name: str) -> List[str]¶
Найти все переменные, которые могут быть псевдонимами указателя.
trace_allocation_lifetime(alloc_site: AllocationSite) -> Dict¶
Отследить время жизни аллокации от создания до освобождения.
PathConstraintTracker¶
Файл: src/analysis/dataflow/symbolic_execution.py
Сценарии: С02 (проверка достижимости путей в анализе заражения)
Легковесное символьное выполнение для проверки достижимости путей через решатель Z3.
Ключевые классы¶
class SymbolicExecutionConfig:
"""Конфигурация движка символьного выполнения."""
enabled: bool = True
max_constraints: int = 20
solver_timeout_ms: int = 500
solver_timeout_uf_ms: int = 2000
max_parse_depth: int = 10
enable_function_models: bool = True
enable_arithmetic: bool = True
@dataclass
class PathConstraint:
"""Ограничение на пути выполнения."""
node_id: int
condition: str
is_true_branch: bool
variables: List[str]
class PathConstraintTracker:
"""Отслеживание ограничений пути для анализа достижимости."""
Справочник API¶
PathConstraintTracker(cpg_service)¶
check_path_feasibility(path_nodes: List[int]) -> bool¶
Проверить достижимость пути данных с учётом условий ветвления.
Анализ заражения данных¶
TaintPropagator¶
Файл: src/analysis/dataflow/taint/propagator.py
Сценарии: С02, С14 (анализ безопасности, реагирование на инциденты)
Основной движок анализа заражения данных. Объединяет все подмодули в единый конвейер.
Ключевые классы¶
@dataclass
class TaintNode:
"""Узел в пути заражения."""
node_id: int
name: str
code: str
line_number: int
filename: str
node_type: str
@dataclass
class TaintPath:
"""Полный путь заражения от источника к приёмнику."""
source: TaintNode
sink: TaintNode
intermediate: List[TaintNode]
path_length: int
confidence: float
sink_category: str
is_sanitized: bool = False
sanitization_point: Optional[TaintNode] = None
Справочник API¶
TaintPropagator(cpg_service, enable_inter_proc=True, enable_control_flow=True, enable_field_sensitive=True, enable_context_sensitive=True, enable_symbolic_execution=True)¶
Инициализация с CPG-сервисом и переключателями возможностей.
from src.analysis.dataflow.taint.propagator import TaintPropagator
propagator = TaintPropagator(cpg_service)
paths = propagator.find_taint_paths(
sources=["getenv", "fgets", "read"],
sinks=["system", "exec", "popen"]
)
for path in paths:
print(f"Источник: {path.source.name} -> Приёмник: {path.sink.name}")
print(f"Уверенность: {path.confidence}, Очищен: {path.is_sanitized}")
find_taint_paths(sources, sinks, max_depth=None) -> List[TaintPath]¶
Найти пути заражения от функций-источников к функциям-приёмникам.
analyze_sql_injections() -> List[TaintPath]¶
Удобный метод для обнаружения SQL-инъекций.
Подмодули¶
| Подмодуль | Файл | Назначение |
|---|---|---|
DataflowTracker |
tracker.py |
Основное распространение заражения (BFS) |
InterProcTracker |
interprocedural.py |
Отслеживание через границы функций |
FieldSensitiveTracker |
field_sensitive.py |
Заражение с учётом полей объектов |
ContextSensitiveTracker |
context_sensitive.py |
Учёт контекста точки вызова |
ControlFlowAnalyzer |
control_flow.py |
Анализ зависимостей по управлению |
FieldSensitiveTracer¶
Файл: src/analysis/field_sensitive_tracer.py
Сценарии: С02, С08, С14 (безопасность, соответствие, реагирование на инциденты)
Отслеживание путей полей верхнего уровня для точного анализа заражения. Различает разные поля одного объекта (например, user.password и user.name).
Примечание: Не путать с
FieldSensitiveTracker(dataflow/taint/field_sensitive.py) — низкоуровневым трекером заражения, используемым внутриTaintPropagator.
Ключевые классы¶
@dataclass
class FieldPath:
"""Путь доступа к полю, например obj.field1.field2."""
base_variable: str
field_chain: List[str]
full_path: str
node_ids: List[int] = field(default_factory=list)
type_full_name: Optional[str] = None
@classmethod
def from_code(cls, code: str) -> "FieldPath": ...
def matches(self, other: "FieldPath") -> Tuple[bool, str]: ...
@dataclass
class FieldAccess:
"""Один доступ к полю в коде."""
node_id: int
base_variable: str
field_name: str
access_code: str
line_number: int
filename: str
access_type: str = "read" # 'read', 'write', 'call'
containing_method: Optional[str] = None
@dataclass
class FieldSensitiveFlow:
"""Путь передачи данных с учётом полей."""
source_path: FieldPath
sink_path: FieldPath
intermediate_fields: List[FieldPath]
is_tainted: bool
relationship: str # 'exact', 'prefix', 'suffix', 'propagated'
confidence: float = 1.0
Справочник API¶
FieldSensitiveTracer(cpg_service)¶
Инициализация с CPGQueryService или подключением DuckDB.
parse_field_path(code: str) -> FieldPath¶
Разобрать строку доступа к полю в структурированный объект FieldPath.
from src.analysis.field_sensitive_tracer import FieldSensitiveTracer
tracer = FieldSensitiveTracer(cpg_service)
# Разбор нотации указателей
path = tracer.parse_field_path("user->password")
print(path.base_variable) # "user"
print(path.field_chain) # ["password"]
# Разбор нотации через точку
path = tracer.parse_field_path("request.data.buffer")
print(path.full_path) # "request.data.buffer"
get_struct_fields(type_name: str) -> List[Dict[str, Any]]¶
Получить поля структуры с информацией о типах.
fields = tracer.get_struct_fields("UserData")
for field in fields:
print(f"{field['name']}: {field['type']}")
find_field_accesses(base_variable: str, field_name: Optional[str] = None) -> List[FieldAccess]¶
Найти все обращения к конкретному полю.
accesses = tracer.find_field_accesses("user", "password")
for access in accesses:
print(f"{access.filename}:{access.line_number} - {access.access_type}")
find_all_field_identifiers(field_name: Optional[str] = None, limit: int = 100) -> List[Dict]¶
Найти все узлы идентификаторов полей, с необязательной фильтрацией по имени.
trace_field_taint(source_variable: str, source_field: Optional[str] = None, sink_patterns: Optional[List[str]] = None, max_depth: int = 10) -> List[FieldSensitiveFlow]¶
Отследить заражение от исходного поля к функциям-приёмникам.
flows = tracer.trace_field_taint(
source_variable="credentials",
source_field="password",
sink_patterns=["printf", "log", "send"]
)
for flow in flows:
print(f"Заражённый поток: {flow.source_path.full_path} -> {flow.sink_path.full_path}")
find_sensitive_field_flows(sensitive_fields: List[str] = None, sink_functions: List[str] = None) -> List[Dict[str, Any]]¶
Найти потоки данных из чувствительных полей в опасные приёмники.
# Чувствительные поля по умолчанию: password, token, secret, private_key, credential, auth
flows = tracer.find_sensitive_field_flows()
print(f"Найдено {len(flows)} потенциальных утечек чувствительных данных")
Категории чувствительных полей¶
Чувствительные поля, отслеживаемые по умолчанию:
- password, passwd, pwd
- token, auth_token, access_token
- secret, api_secret, client_secret
- private_key, secret_key
- credential, credentials
- auth, authorization
Используемые таблицы БД¶
nodes_field_identifier— Узлы доступа к полямnodes_identifier— Идентификаторы переменныхnodes_member— Определения полей структурedges_reaching_def— Рёбра достижимых определенийedges_argument— Рёбра аргументов функций
FieldSensitiveTracker¶
Файл: src/analysis/dataflow/taint/field_sensitive.py
Сценарии: С02, С14 (внутреннее использование в TaintPropagator)
Низкоуровневое отслеживание заражения по полям, используемое внутри TaintPropagator. Определяет, какие конкретные поля объекта заражены, а какие чисты.
Ключевые классы¶
@dataclass
class FieldTaint:
"""Статус заражения одного поля."""
field_name: str
is_tainted: bool
source_node: Optional[int] = None
class FieldTaintMap:
"""Карта полей объекта и их статуса заражения."""
def set_taint(self, base_var: str, field: str, tainted: bool): ...
def is_tainted(self, base_var: str, field: str) -> bool: ...
def get_tainted_fields(self, base_var: str) -> List[str]: ...
ContextSensitiveTracker¶
Файл: src/analysis/dataflow/taint/context_sensitive.py
Сценарии: С02 (повышение точности анализа заражения)
Отслеживание контекста точки вызова для предотвращения ложных срабатываний, когда одна и та же функция вызывается с разным состоянием заражения.
Справочник API¶
ContextSensitiveTracker(cpg_service)¶
track_call_context(caller_id: int, callee_id: int) -> CallContext¶
Создать контекст вызова для распространения заражения.
InterProcTracker¶
Файл: src/analysis/dataflow/taint/interprocedural.py
Сценарии: С02, С14 (межпроцедурное отслеживание заражения)
Отслеживание распространения заражения через границы функций — передача параметров и возвращаемые значения.
Справочник API¶
InterProcTracker(cpg_service)¶
find_callers(method_name: str) -> List[Dict]¶
Найти все точки вызова метода.
map_arguments(call_site_id: int) -> Dict[int, int]¶
Сопоставить фактические аргументы с формальными параметрами.
Анализаторы безопасности¶
MemoryLifetimeAnalyzer¶
Файл: src/analysis/dataflow/memory_lifetime.py
Сценарии: С02 (use-after-free, double-free, утечки памяти)
Отслеживание паттернов выделения/освобождения памяти для обнаружения нарушений времени жизни.
Ключевые классы¶
class MemoryState(Enum):
ALLOCATED = "allocated"
FREED = "freed"
UNKNOWN = "unknown"
@dataclass
class MemoryOperation:
node_id: int
operation: str # 'alloc', 'free', 'realloc'
function_name: str
variable_name: str
line_number: int
filename: str
@dataclass
class UseAfterFreePath:
alloc: MemoryOperation
free: MemoryOperation
use: MemoryOperation
path_length: int
@dataclass
class DoubleFreeePath:
alloc: MemoryOperation
first_free: MemoryOperation
second_free: MemoryOperation
Справочник API¶
MemoryLifetimeAnalyzer(cpg_service)¶
find_use_after_free(limit: int = 50) -> List[UseAfterFreePath]¶
Обнаружить паттерны использования после освобождения.
find_double_free(limit: int = 50) -> List[DoubleFreeePath]¶
Обнаружить паттерны двойного освобождения.
find_memory_leaks(limit: int = 100) -> List[Dict]¶
Найти аллокации без соответствующего освобождения.
NullCheckAnalyzer¶
Файл: src/analysis/dataflow/null_check.py
Сценарии: С02, С05 (разыменование null)
Обнаружение отсутствующих проверок на null после функций, которые могут вернуть NULL.
Ключевые классы¶
@dataclass
class NullCheckPath:
function_name: str
return_variable: str
dereference_line: int
dereference_file: str
has_null_check: bool
check_line: Optional[int] = None
Справочник API¶
NullCheckAnalyzer(cpg_service)¶
find_missing_null_checks(limit: int = 100) -> List[NullCheckPath]¶
Найти разыменования значений без предварительной проверки на NULL.
CodeStringTracer¶
Файл: src/analysis/dataflow/code_string_tracer.py
Сценарии: С02 (инъекция кода)
Отслеживание паттернов конструирования строк для обнаружения инъекций (SQL, команды, eval).
Ключевые классы¶
@dataclass
class CodeInjectionPoint:
function_name: str
string_variable: str
injection_type: str # 'sql', 'command', 'eval'
line_number: int
filename: str
user_input_source: Optional[str] = None
Справочник API¶
CodeStringTracer(cpg_service)¶
find_string_concat_injections(sink_functions: List[str], limit: int = 50) -> List[CodeInjectionPoint]¶
Найти паттерны конкатенации строк, попадающих в опасные приёмники.
InfoDisclosureAnalyzer¶
Файл: src/analysis/dataflow/info_disclosure.py
Сценарии: С02 (утечка информации)
Обнаружение потоков чувствительных данных в функции вывода и логирования.
Справочник API¶
InfoDisclosureAnalyzer(cpg_service)¶
find_info_leaks(sensitive_patterns: List[str] = None, limit: int = 50) -> List[InfoDisclosurePath]¶
Найти потоки чувствительных данных в функции вывода.
ComplianceMapper¶
Файл: src/analysis/compliance.py
Сценарии: С02, С08 (отчёты о соответствии)
Маппинг обнаруженных уязвимостей на стандарты безопасности (CWE, OWASP, CERT C, MISRA C).
Ключевые классы¶
@dataclass
class ComplianceFinding:
vulnerability_type: str
severity: str # 'critical', 'high', 'medium', 'low'
cwe_id: str
cwe_name: str
owasp_category: Optional[str]
cert_rule: Optional[str]
misra_rule: Optional[str]
file_path: str
line_number: int
function_name: str
description: str
recommendation: str
risk_score: float
@dataclass
class ComplianceReport:
scan_date: str
codebase: str
total_findings: int
findings_by_severity: Dict[str, int]
findings_by_cwe: Dict[str, int]
findings_by_owasp: Dict[str, int]
compliance_score: float # 0.0-1.0
findings: List[ComplianceFinding]
standards_used: List[str]
Справочник API¶
ComplianceMapper()¶
Инициализация (без внешних зависимостей).
map_vulnerability(vuln_type, severity, file_path, line_number, function_name, description, risk_score) -> ComplianceFinding¶
Сопоставить уязвимость со стандартами безопасности.
from src.analysis.compliance import ComplianceMapper
mapper = ComplianceMapper()
finding = mapper.map_vulnerability(
vuln_type="sql_injection",
severity="high",
file_path="src/api.c",
line_number=42,
function_name="process_query",
description="Пользовательский ввод конкатенирован в SQL-запрос",
risk_score=8.5
)
print(f"{finding.cwe_id}: {finding.cwe_name}")
# CWE-89: Improper Neutralization of Special Elements used in an SQL Command
Анализ графа вызовов¶
CallGraphAnalyzer¶
Файл: src/analysis/callgraph/analyzer.py
Сценарии: С01, С09, С12, С14 (ввод в систему, проверка изменений, кросс-репо, реагирование на инциденты)
Главный фасад, объединяющий все модули анализа графа вызовов. Делегирует специализированным подмодулям.
Архитектура¶
CallGraphAnalyzer
├── PathFinder — кратчайший путь, вызывающие/вызываемые
├── CentralityAnalyzer — PageRank, промежуточная центральность
├── ComponentAnalyzer — компоненты SCC, WCC
├── ComplexityAnalyzer — сложность на основе CFG
├── ImpactAnalyzer — оценка влияния, точки входа, пути атаки
└── CrossLanguageAnalyzer — обнаружение границ FFI
Справочник API¶
CallGraphAnalyzer(cpg_service)¶
find_shortest_path(source_method: str, target_method: str, max_depth: Optional[int] = None) -> Optional[CallPath]¶
Найти кратчайшую цепочку вызовов между двумя методами.
from src.analysis.callgraph.analyzer import CallGraphAnalyzer
cga = CallGraphAnalyzer(cpg_service)
path = cga.find_shortest_path("handle_request", "exec_query")
if path:
print(f"Длина пути: {path.length}")
print(f"Методы: {' -> '.join(path.methods)}")
find_all_callers(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]¶
Найти все методы, вызывающие данный метод.
find_all_callees(method_name: str, max_depth: Optional[int] = None, direct_only: bool = False) -> List[str]¶
Найти все методы, вызываемые данным методом.
find_cross_language_calls(method_name=None, source_language=None, target_language=None) -> List[CrossLanguageCall]¶
Найти межъязыковые рёбра вызовов (границы FFI).
detect_cycles(max_cycle_length: int = 10) -> List[CallCycle]¶
Обнаружить циклы рекурсии через алгоритм Тарьяна (SCC).
analyze_impact(method_name: str, max_depth: Optional[int] = None) -> ImpactAnalysis¶
Оценить влияние изменений (транзитивные вызывающие, затронутые компоненты).
get_call_statistics() -> Dict[str, Any]¶
Общая статистика графа вызовов (всего методов, вызовов, разветвлённость).
find_entry_points(method_name: str, max_depth: int = 10) -> List[Dict]¶
Найти точки входа публичного API, способные достичь метода.
trace_attack_paths(entry_points, vuln_method, vuln_file="", vuln_line=0, max_paths=5) -> List[AttackPath]¶
Трассировка путей атаки от точек входа к уязвимости.
compute_pagerank(top_n: Optional[int] = None) -> List[Dict]¶
Вычислить оценки PageRank для ранжирования важности методов.
compute_betweenness_centrality(sample_size=None, top_n=None) -> List[Dict]¶
Вычислить промежуточную центральность для выявления мостовых методов.
find_hotspots(min_in_degree: int = 3, limit: int = 25) -> List[Dict]¶
Найти точки нагрузки (методы с большим числом вызывающих).
Ключевые модели данных¶
@dataclass
class CallPath:
source: str
target: str
methods: List[str]
length: int
@dataclass
class CallCycle:
methods: List[str]
length: int
is_direct: bool
@dataclass
class ImpactAnalysis:
method_name: str
direct_callers: List[str]
transitive_callers: List[str]
affected_files: List[str]
impact_score: float
@dataclass
class AttackPath:
entry_point: str
vulnerability: str
path: List[str]
risk_amplification: float
@dataclass
class CrossLanguageCall:
caller: str
callee: str
caller_language: str
callee_language: str
call_type: str # 'ffi', 'cgo', 'jni' и т.д.
Подмодули¶
| Модуль | Файл | Ключевые методы | DuckPGQ |
|---|---|---|---|
| PathFinder | callgraph/pathfinding.py |
find_shortest_path, find_all_callers, find_all_callees | Нет |
| PGQPathFinder | callgraph/pathfinding.py |
Тот же API, использует синтаксис DuckPGQ MATCH |
Да |
| CentralityAnalyzer | callgraph/centrality.py |
compute_pagerank, compute_betweenness_centrality | Нет |
| PGQCentralityAnalyzer | callgraph/centrality.py |
Тот же API, использует графовые запросы DuckPGQ | Да |
| ComponentAnalyzer | callgraph/components.py |
compute_scc, compute_wcc | Нет |
| PGQComponentAnalyzer | callgraph/components.py |
Тот же API, ускорение DuckPGQ | Да |
| ComplexityAnalyzer | callgraph/complexity.py |
compute_cyclomatic_complexity (на основе CFG) | Нет |
| ImpactAnalyzer | callgraph/impact.py |
analyze_impact, find_entry_points, trace_attack_paths | Нет |
| CrossLanguageAnalyzer | callgraph/cross_language.py |
find_cross_language_calls | Нет |
Анализ параллелизма¶
ConcurrencyAnalyzer¶
Файл: src/analysis/concurrency_core.py
Сценарии: С16 (анализ параллелизма)
Составлен из 4 специализированных примесей. Шаблоны блокировок и разделяемой памяти загружаются из доменного плагина.
class ConcurrencyAnalyzer(
LockAnalyzerMixin,
RaceDetectorMixin,
SharedAccessAnalyzerMixin,
AtomicOperationsAnalyzerMixin,
): ...
Справочник API¶
ConcurrencyAnalyzer(cpg_service)¶
Доступ ко всем методам 4 примесей, описанных ниже.
from src.analysis.concurrency_core import ConcurrencyAnalyzer
analyzer = ConcurrencyAnalyzer(cpg_service)
races = analyzer.detect_race_conditions()
locks = analyzer.find_lock_usage(lock_type="lwlock")
stats = analyzer.get_concurrency_statistics()
Модели данных¶
@dataclass
class LockUsage:
function_name: str
lock_type: str
lock_name: Optional[str]
operation: str
file_name: str
line_number: int
@dataclass
class RaceConditionPattern:
pattern_id: str
pattern_type: str # 'toctou', 'unprotected_access', 'signal_handler', 'double_check'
affected_functions: List[str]
shared_resource: str
severity: str
description: str
@dataclass
class SharedAccess:
variable_name: str
accessor_functions: List[str]
access_type: str
is_protected: bool
protecting_lock: Optional[str]
@dataclass
class LockOrderViolation:
violation_id: str
lock_a: str
lock_b: str
function_acquiring_a_then_b: str
function_acquiring_b_then_a: str
risk_level: str
LockAnalyzerMixin¶
Файл: src/analysis/lock_analyzer.py
find_lock_usage(lock_type=None, function_name=None, limit=None) -> List[LockUsage]detect_lock_ordering_issues(limit=None) -> List[LockOrderViolation]detect_potential_deadlocks(limit=None) -> List[Dict]analyze_lock_graph() -> Dict[str, Any]get_lock_statistics() -> Dict[str, Any]
RaceDetectorMixin¶
Файл: src/analysis/race_detector.py
detect_race_conditions(pattern_types=None, limit=None) -> List[RaceConditionPattern]
Типы паттернов: toctou, signal_handler, unprotected_access, double_check.
SharedAccessAnalyzerMixin¶
Файл: src/analysis/shared_access_analyzer.py
analyze_shared_access(variable_pattern=None, limit=None) -> List[SharedAccess]
AtomicOperationsAnalyzerMixin¶
Файл: src/analysis/atomic_operations_analyzer.py
find_atomic_operations(limit: int = 100) -> List[Dict]find_condition_variables(limit: int = 50) -> List[Dict]analyze_function_concurrency(function_name: str) -> Dictget_concurrency_statistics() -> Dict[str, Any]
Обнаружение дублей¶
ASTCloneDetector¶
Файл: src/analysis/clone_detector.py
Сценарии: С07, С13 (рефакторинг, обнаружение дублей)
Многоуровневое обнаружение дублей на основе AST: - Type-1: Точные дубли (идентичный код) - Type-2: Переименованные дубли (изменены только идентификаторы) - Type-3: Структурные дубли (схожая структура с модификациями) - Type-4: Семантические дубли (разный код, одинаковое поведение)
Ключевые классы¶
@dataclass
class CloneResult:
method1_id: int
method1_name: str
method1_file: str
method2_id: int
method2_name: str
method2_file: str
similarity: float
clone_type: str # 'exact', 'renamed', 'structural', 'semantic'
shared_patterns: List[str] = field(default_factory=list)
line_count1: int = 0
line_count2: int = 0
Справочник API¶
ASTCloneDetector(cpg_service)¶
detect_clones(min_similarity=None, category=None, max_methods=None, min_lines=None) -> List[CloneResult]¶
Обнаружить дубли кода по кодовой базе.
from src.analysis.clone_detector import ASTCloneDetector
detector = ASTCloneDetector(cpg_service)
clones = detector.detect_clones(min_similarity=0.8, min_lines=10)
for clone in clones:
print(f"{clone.method1_name} ≈ {clone.method2_name} ({clone.clone_type}, {clone.similarity:.0%})")
detect_clones_for_category(category: str, min_similarity: Optional[float] = None) -> List[CloneResult]¶
Обнаружить дубли в рамках категории (например, “null_check”, “string_operations”).
Рекомендации по производительности¶
- Попарное сравнение: O(n²), где n — количество методов
- Используйте
max_methodsдля ограничения на больших кодовых базах - Используйте
min_linesдля пропуска тривиальных методов
Подсистема шаблонов¶
LLMPatternGenerator¶
Файл: src/analysis/patterns/llm_pattern_generator.py
Сценарии: С21 (поиск шаблонов)
Генерация YAML-правил для структурного поиска шаблонов с помощью LLM из описания на естественном языке.
Ключевые классы¶
@dataclass
class GeneratedRule:
"""Результат генерации правила через LLM."""
yaml_text: str
rule_id: str
language: str
has_fix: bool
validated: bool
validation_errors: List[str] = field(default_factory=list)
generation_attempts: int = 1
Справочник API¶
LLMPatternGenerator()¶
Инициализация (загружает язык промптов из глобального реестра).
async generate_rule(description: str, language: str, domain: Optional[str] = None, with_fix: bool = True, max_retries: int = 3, examples: Optional[List[str]] = None) -> GeneratedRule¶
Сгенерировать YAML-правило из описания на естественном языке. Асинхронный метод.
from src.analysis.patterns.llm_pattern_generator import LLMPatternGenerator
generator = LLMPatternGenerator()
rule = await generator.generate_rule(
description="Найти вызовы malloc без соответствующего free",
language="c",
domain="linux_kernel"
)
print(rule.rule_id) # напр., "malloc-without-free"
print(rule.validated) # True, если прошёл gocpg validate-rule
print(rule.yaml_text) # Полное содержимое YAML-правила
PatternTaintBridge¶
Файл: src/analysis/patterns/taint_bridge.py
Сценарии: С02, С21 (безопасность, поиск шаблонов)
Мост между результатами структурного поиска шаблонов и анализом заражения. Обогащает находки информацией о потоках данных.
Справочник API¶
PatternTaintBridge(cpg_service, taint_propagator=None)¶
Инициализация с CPG-сервисом и необязательным TaintPropagator (откат к легковесному SQL BFS).
async enrich_findings_with_taint(findings: List[Dict]) -> List[Dict]¶
Добавить информацию о путях заражения к находкам шаблонов. Асинхронный метод.
from src.analysis.patterns.taint_bridge import PatternTaintBridge
bridge = PatternTaintBridge(cpg_service, taint_propagator)
enriched = await bridge.enrich_findings_with_taint(pattern_findings)
for finding in enriched:
if finding.get("taint_enriched"):
print(f"Находка {finding['rule_id']} имеет {len(finding['taint_paths'])} путей заражения")
Подсистема автоисправлений¶
AutofixEngine¶
Файл: src/analysis/autofix/engine.py
Сценарии: С02 (автоматическая генерация исправлений безопасности)
Оркестрация генерации исправлений: сначала шаблоны, затем откат к LLM, затем валидация.
Конвейер¶
TaintPath → Разбор расположения → Чтение исходников → Определение типа уязвимости
→ Попытка шаблонного исправления → (если нет) Откат к LLM → Валидация diff → AutofixResult
Ключевые классы¶
@dataclass
class AutofixResult:
fix: FixSuggestion
strategy: str # "template" или "llm"
validated: bool
validation: Optional[ValidationResult] = None
taint_path: Optional[TaintPath] = None
cwe_id: str = ""
Справочник API¶
AutofixEngine(source_root: str = "", dry_run: bool = True)¶
Инициализация с корневой директорией исходников. dry_run=True генерирует diff без применения.
generate_fixes(taint_paths: List[TaintPath], vulnerability_type: str = "") -> List[AutofixResult]¶
Сгенерировать исправления для списка путей заражения.
from src.analysis.autofix.engine import AutofixEngine
engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths, vulnerability_type="sql_injection")
for result in results:
print(f"Стратегия: {result.strategy}, Валидирован: {result.validated}")
print(f"Diff:\n{result.fix.diff_patch}")
AutofixGenerator¶
Файл: src/analysis/autofix/generator.py
Генерация исправлений на основе шаблонов с использованием регулярных выражений для известных типов уязвимостей.
@dataclass
class FixSuggestion:
vulnerability_type: str
severity: str
file_path: str
line_number: int
original_code: str
fixed_code: str
explanation: str
confidence: float # 0.0-1.0
diff_patch: str # Формат unified diff
@dataclass
class FixTemplate:
vuln_type: str
name: str
description: str
pattern: str # Регулярное выражение
replacement: str # Шаблон замены
explanation: str
AutofixGenerator()— загружает шаблоны из доменного плагинаgenerate_fix(code, vuln_type, context) -> Optional[FixSuggestion]
DiffValidator¶
Файл: src/analysis/autofix/diff_validator.py
Валидация безопасности применения сгенерированных исправлений.
@dataclass
class ValidationResult:
valid: bool
error: Optional[str] = None
patched_content: Optional[str] = None
DiffValidator()—MAX_CHANGE_RATIO = 0.5validate(original_code, fixed_code, file_path, source_root) -> ValidationResult
PromptBuilder¶
Файл: src/analysis/autofix/prompt_builder.py
Построение промптов для LLM при генерации исправлений, когда шаблоны не подходят.
@dataclass
class AutofixPromptContext:
vulnerable_code: str
vulnerability_type: str
cwe_id: str
taint_path_summary: str
language: str
PromptBuilder()— используетconfig/prompts/autofix/build_prompt(context: AutofixPromptContext) -> str
SSRAutofixBridge¶
Файл: src/analysis/autofix/ssr_bridge.py
Мост между движком автоисправлений и структурным поиском и заменой (SSR) GoCPG.
SSRAutofixBridge(gocpg_client)apply_ssr_rule(rule_id: str, file_path: str) -> Optional[FixSuggestion]
Анализаторы проверки изменений¶
Дополнительные анализаторы в src/patch_review/analyzers/, расширяющие базовые модули для оценки влияния изменений.
PatchCallGraphAnalyzer¶
Файл: src/patch_review/analyzers/call_graph_analyzer.py
Сценарии: С09 (проверка изменений)
Анализ влияния изменений на граф вызовов — радиус поражения, ломающие изменения, эффект волны.
@dataclass
class CallGraphNode:
method_name: str
full_name: str
filename: str
callers: List[str] = field(default_factory=list)
callees: List[str] = field(default_factory=list)
is_changed: bool = False
change_type: Optional[ChangeType] = None
PatchCallGraphAnalyzer(conn, delta_cpg=None)analyze_call_graph_changes(patch, delta_cpg) -> CallGraphAnalysisResultcompute_blast_radius(changed_methods) -> BlastRadiusdetect_breaking_changes(changed_methods) -> List[BreakingChange]compute_ripple_effect(changed_methods) -> RippleEffect
PatchDataFlowAnalyzer¶
Файл: src/patch_review/analyzers/dataflow_analyzer.py
Сценарии: С09 (проверка изменений)
Анализ влияния изменений на потоки данных — новые пути заражения, обход санитизации, потоки чувствительных данных.
@dataclass
class DataFlowChange:
change_type: str
source_method: str
affected_variable: str
severity: Severity
PatchDataFlowAnalyzer(conn, delta_cpg=None)analyze_dataflow_changes(patch, delta_cpg) -> DataFlowAnalysisResult
PatchDependencyAnalyzer¶
Файл: src/patch_review/analyzers/dependency_analyzer.py
Сценарии: С09 (проверка изменений)
Анализ влияния изменений на зависимости модулей, импорты и архитектурную связность.
class DependencyChangeType(Enum):
ADDED = "added"
REMOVED = "removed"
MODIFIED = "modified"
CIRCULAR_INTRODUCED = "circular_introduced"
LAYER_VIOLATION = "layer_violation"
@dataclass
class DependencyChange:
change_type: DependencyChangeType
source_module: str
target_module: str
source_file: str
@dataclass
class CircularDependency:
cycle_path: List[str]
introduced_edge: Tuple[str, str]
severity: Severity
@dataclass
class LayerViolation:
source_layer: str
target_layer: str
source_module: str
target_module: str
@dataclass
class CouplingMetrics:
afferent_coupling: int
efferent_coupling: int
instability: float
@dataclass
class DependencyAnalysisResult:
dependency_changes: List[DependencyChange]
circular_dependencies: List[CircularDependency]
layer_violations: List[LayerViolation]
coupling_metrics: Dict[str, CouplingMetrics]
PatchDependencyAnalyzer(conn, delta_cpg=None)analyze_dependency_changes(patch, delta_cpg) -> DependencyAnalysisResult
Дополнительные модули потоков данных¶
RaceConditionAnalyzer (dataflow)¶
Файл: src/analysis/dataflow/race_condition.py
Сценарии: С02, С16 (безопасность, параллелизм)
Обнаружение уязвимостей TOCTOU и гонок данных через анализ потоков данных с проверкой зависимостей по управлению.
@dataclass
class RaceConditionPath:
race_type: str # 'toctou', 'data_race', 'missing_lock'
check_location: str
check_function: str
use_location: str
use_function: str
shared_resource: str
has_lock: bool
lock_function: Optional[str]
path_nodes: List[TaintNode]
confidence: float
risk_score: float
RaceConditionAnalyzer(cpg_service)analyze(max_paths=100, max_hops=None, min_confidence=0.7) -> List[RaceConditionPath]
Модули моделей данных¶
Следующие модули определяют общие модели данных, используемые подпакетами анализа:
| Модуль | Файл | Ключевые классы |
|---|---|---|
| Модели потоков данных | src/analysis/dataflow/models.py |
DataFlowPath, VariableFlow |
| Модели заражения | src/analysis/dataflow/taint/models.py |
TaintNode, TaintPath, ControlDependency, CallContext |
| Модели параллелизма | src/analysis/concurrency_dataclasses.py |
LockUsage, RaceConditionPattern, SharedAccess, LockOrderViolation |
| Модели графа вызовов | src/analysis/callgraph/models.py |
CallPath, CallCycle, ImpactAnalysis, AttackPath, CrossLanguageCall |
| Базовый класс графа | src/analysis/callgraph/base.py |
BaseAnalyzer (абстрактная база для всех подмодулей callgraph) |
| Конфигурация сложности | src/analysis/callgraph/complexity.py |
ComplexityAnalyzer, DefaultThresholds |
Справочник схемы базы данных¶
Основные таблицы для анализа¶
| Таблица | Назначение |
|---|---|
nodes_method |
Определения методов |
nodes_control_structure |
Структуры управления (if, for, while) |
nodes_field_identifier |
Выражения доступа к полям |
nodes_identifier |
Ссылки на переменные |
nodes_call |
Точки вызова функций |
nodes_return |
Операторы возврата |
nodes_member |
Определения полей структур |
edges_cfg |
Рёбра графа потока управления |
edges_contains |
Отношения вложенности |
edges_dominate |
Отношения доминирования |
edges_post_dominate |
Отношения пост-доминирования |
edges_reaching_def |
Рёбра достижимых определений |
edges_argument |
Рёбра аргументов функций |
edges_call |
Рёбра графа вызовов |
Типы рёбер для анализа потока данных¶
| Тип ребра | Таблица | Назначение |
|---|---|---|
| CFG | edges_cfg |
Поток управления между операторами |
| REACHING_DEF | edges_reaching_def |
Цепочки «определение — использование» |
| ARGUMENT | edges_argument |
Аргументы вызова функций |
| CONTAINS | edges_contains |
Вложенность областей видимости |
| CALL | edges_call |
Связи вызовов функций |
| DOMINATE | edges_dominate |
Доминирование для зависимостей по управлению |
Устранение неполадок¶
«Метод не найден»¶
Имя метода должно совпадать точно (с учётом регистра). Используйте простое имя, а не полное квалифицированное.
# Правильно
cfg = analyzer.get_method_cfg("heap_insert")
# Неправильно
cfg = analyzer.get_method_cfg("heap_insert(Relation, HeapTuple)")
«Данные CFG не найдены»¶
Убедитесь, что экспорт CPG включал рёбра CFG. Проверьте:
SELECT COUNT(*) FROM edges_cfg;
«Обращения к полям не найдены»¶
Для отслеживания обращений к полям необходимы данные nodes_field_identifier:
SELECT COUNT(*) FROM nodes_field_identifier;
«Пути заражения не найдены»¶
Проверьте наличие рёбер REACHING_DEF и функций-источников/приёмников:
SELECT COUNT(*) FROM edges_reaching_def;
SELECT DISTINCT name FROM nodes_call WHERE name IN ('system', 'exec', 'popen');
Рекомендации по производительности¶
- Перечисление путей ограничено
max_pathsиmax_depthдля предотвращения экспоненциального роста - Используйте
max_depthдля ограничения глубины отслеживания заражения (по умолчанию из конфигурации) - Обнаружение дублей имеет сложность O(n²) — используйте
max_methodsна больших кодовых базах - Варианты с DuckPGQ (
PGQ*классы) быстрее для обхода графов - В больших методах может быть много путей; рассмотрите выборочный анализ
Движок анализа методов¶
Модуль: src/analysis/explain.py
Комплексный движок анализа методов, объединяющий метрики CPG, данные графа вызовов, информацию о заражении и доменный контекст в единый результат. Используется MCP-инструментом codegraph_explain и CLI.
ExplainResult¶
Агрегированный результат анализа метода (dataclass).
| Поле | Тип | Описание |
|---|---|---|
method_name |
str |
Краткое имя метода |
full_name |
str |
Полное квалифицированное имя |
file_path |
str |
Путь к исходному файлу |
line_start |
int |
Начальная строка |
line_end |
int |
Конечная строка |
line_count |
int |
Количество строк |
signature |
str |
Сигнатура метода (первая строка кода) |
cyclomatic_complexity |
int |
Оценка цикломатической сложности |
risk_level |
str |
Уровень риска: low (<10), moderate (10–19), high (20–49), critical (≥50) |
fan_in |
int |
Количество вызывающих методов |
fan_out |
int |
Количество вызываемых методов |
direct_callers |
List[str] |
Имена прямых вызывающих методов |
transitive_caller_count |
int |
Общее количество транзитивных вызывающих до глубины depth |
direct_callees |
List[str] |
Имена прямых вызываемых методов |
is_taint_source |
bool |
Метод является источником заражения (из доменного плагина) |
is_taint_sink |
bool |
Метод является приёмником заражения (из доменного плагина) |
taint_paths_through |
int |
Пути заражения, проходящие через метод |
subsystem |
str |
Имя подсистемы домена |
pattern_flags |
Dict[str, bool] |
Флаги: has_todo_fixme, has_deprecated, has_debug_code |
docstring |
str |
Строка документации метода |
ExplainAnalyzer¶
from src.analysis.explain import ExplainAnalyzer, ExplainResult
analyzer = ExplainAnalyzer(db_path="project.duckdb")
result = analyzer.collect("process_data", depth=2)
Конструктор: ExplainAnalyzer(db_path: Optional[str] = None)
Методы:
| Метод | Возвращает | Описание |
|---|---|---|
collect(method_name, depth=2) |
Optional[ExplainResult] |
Собрать комплексные данные анализа для метода |
to_dict(result) |
Dict[str, Any] |
Преобразовать ExplainResult в словарь для JSON-вывода |
fuzzy_search(method_name, limit=5) |
List[str] |
Нечёткий поиск имён методов при неточном совпадении |
Анализ конфигурационных параметров¶
Модуль: src/analysis/config_analyzer.py
Обнаруживает неиспользуемые, отсутствующие и несогласованные конфигурационные параметры путём перекрёстной проверки YAML-конфигурации, схемы Pydantic/dataclass и использования в исходном коде. Выполняет 4-этапный конвейер: извлечение → перекрёстная проверка → фильтрация ложных срабатываний → отчёт.
Используется: S12 (технический долг) через OrphanConfigHandler, CLI dogfood config-check.
ConfigOrphanAnalyzer¶
from src.analysis.config_analyzer import ConfigOrphanAnalyzer, AnalyzerConfig
analyzer = ConfigOrphanAnalyzer(AnalyzerConfig())
orphans = analyzer.scan()
summary = analyzer.get_summary(orphans)
Конструктор: ConfigOrphanAnalyzer(config: Optional[AnalyzerConfig] = None)
Методы:
| Метод | Возвращает | Описание |
|---|---|---|
scan() |
List[OrphanFinding] |
Запуск полного конвейера обнаружения (4 этапа) |
get_summary(orphans) |
Dict[str, Any] |
Генерация сводной статистики по типу и серьёзности |
Этапы обнаружения:
1. Извлечение — разбор ключей YAML (YAMLExtractor), полей dataclass из схемы (ASTExtractor), ссылок в коде (CodeScanner)
2. Перекрёстная проверка — сравнение определённых и используемых параметров
3. Фильтрация ЛС — удаление ложных срабатываний (переменные окружения, динамический доступ, известные шаблоны)
4. Отчёт — формирование списка OrphanFinding
OrphanFinding¶
Датакласс, представляющий одно обнаружение конфигурационного параметра-сироты.
| Поле | Тип | Описание |
|---|---|---|
orphan_type |
str |
Один из: yaml_unused, yaml_missing, code_orphan, path_mismatch, orphaned_dataclass, unused_default |
severity |
str |
error, warning или info |
param_name |
str |
Путь параметра через точку (например, timeouts.http_client) |
description |
str |
Описание на естественном языке |
file_path |
str |
Файл, в котором обнаружена проблема |
line_number |
int |
Номер строки в файле |
suggestion |
str |
Предложение по исправлению (если есть) |
metadata |
Dict[str, Any] |
Дополнительный контекст |
AnalyzerConfig¶
Датакласс конфигурации анализатора (не привязан к домену).
| Поле | Тип | По умолчанию | Описание |
|---|---|---|---|
config_files |
List[str] |
["config.yaml"] |
YAML-файлы конфигурации для анализа |
schema_files |
List[str] |
["src/config/unified_config.py"] |
Файлы схемы Pydantic/dataclass |
source_dirs |
List[str] |
["src/"] |
Каталоги для сканирования использования конфигурации |
exclude_dirs |
List[str] |
["tests/", "scripts/", "docs/"] |
Каталоги, исключённые из сканирования |
test_dirs |
List[str] |
["tests/"] |
Каталоги тестов (сканируются отдельно) |
root_dir |
Optional[str] |
None |
Корень проекта (определяется автоматически) |
access_patterns |
List[str] |
список регулярных выражений | Шаблоны типизированного доступа (cfg.section.field) |
raw_access_patterns |
List[str] |
список регулярных выражений | Шаблоны прямого доступа к словарю (_raw.get("key")) |
dynamic_patterns |
List[str] |
список регулярных выражений | Шаблоны динамического доступа (исключаются из обнаружения) |
YAMLExtractor¶
Извлекает все конечные ключи и ссылки на переменные окружения из YAML-файлов конфигурации.
| Метод | Возвращает | Описание |
|---|---|---|
extract(config_path) |
Tuple[Dict[str, Any], Set[str]] |
Возвращает (словарь ключей, множество переменных окружения) |
ASTExtractor¶
Разбирает файлы схемы Python через AST для извлечения полей dataclass и определений @property.
| Метод | Возвращает | Описание |
|---|---|---|
extract(schema_path) |
Tuple[Dict, Dict, Set] |
Возвращает (dataclass_fields, property_map, property_names) |
CodeScanner¶
Сканирует исходные файлы Python на предмет шаблонов доступа к конфигурации с контекстно-зависимым отслеживанием переменных cfg. Различает переменные, содержащие полный объект UnifiedConfig и подсвойства.
| Метод | Возвращает | Описание |
|---|---|---|
scan(source_dirs, exclude_dirs) |
Tuple[Dict, Set, bool] |
Возвращает (ссылки, raw_keys, has_dynamic_access) |
Покомпонентная конфигурация анализа¶
Модуль: src/analysis/scope_config.py
Позволяет выбирать разные правила анализа для разных частей проекта, например для main, tests, vendor и других групп файлов.
AnalysisScope¶
Датакласс, описывающий один именованный скоуп анализа.
| Поле | Тип | Описание |
|---|---|---|
name |
str |
Имя скоупа |
paths |
list[str] |
Глоб-шаблоны путей файлов |
rules |
str \| list[str] |
Режим правил: all, critical_only, taint_only или явный список правил |
severity_threshold |
str |
Минимальный уровень серьёзности |
suppress_policy |
str |
Идентификатор политики подавления |
enabled |
bool |
Включён ли скоуп |
| Метод | Возвращает | Описание |
|---|---|---|
matches(file_path) |
bool |
Проверяет, относится ли файл к этому скоупу |
to_dict() |
dict |
Сериализует скоуп |
ScopeConfig¶
Датакласс, который хранит все скоупы анализа проекта.
| Поле | Тип | Описание |
|---|---|---|
scopes |
list[AnalysisScope] |
Упорядоченный список доступных скоупов |
default_scope |
str |
Имя скоупа по умолчанию |
| Метод | Возвращает | Описание |
|---|---|---|
from_yaml(path) |
ScopeConfig |
Загружает правила из YAML |
from_config_dict(config_dict) |
ScopeConfig |
Строит конфигурацию из словаря |
resolve_scope(file_path) |
AnalysisScope \| None |
Определяет эффективный скоуп для файла |
should_report(file_path, severity, rule_id) |
bool |
Решает, нужно ли показывать finding для этого файла |
to_dict() |
dict |
Сериализует всю конфигурацию |
Смотрите также¶
- Справочник по схеме — Полная схема базы данных
- Сборник SQL-запросов — Примеры запросов
- Справочник по агентам — Конвейер агентов
- Руководство по сценариям — Сценарии использования