Полный справочник по программному интерфейсу CodeGraph для Python.
Ищете документацию по REST API? См. Документация по REST API для HTTP-эндпоинтов, аутентификации и примеров запросов.
Ищете документацию по агентам? См. Справочник агентов для подробного описания AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent и других агентов.
Содержание¶
- Обзор
- Схема модулей
- Карта модулей
- Основные сервисы
- CPGQueryService
- VectorStoreReal
- HybridRetriever
- Модели извлечения
- RetrievalResult
- HybridRetrievalConfig
- DomainBoostContext
- Ранжирование
- RelevanceScore
- Рабочие процессы
- MultiScenarioCopilot
- MultiScenarioState
- Конфигурация
- CPGConfig
- DomainRegistry
- get_unified_config()
- Инфраструктура API
- ProjectContext
- ProjectScopedServices
- Зависимости FastAPI
- Стек промежуточного ПО
- Аутентификация и авторизация
- Обработка ошибок
- LocalizedHTTPException
- LLMProviderError
- AgentExecutionError
- Прочие исключения
- Классы усиления безопасности
- HardeningScanner
- HardeningCategory
- HardeningSeverity
- HardeningCheck
- HardeningFinding
- Вспомогательные функции
- Константы D3FEND
- Дальнейшие шаги
Обзор¶
Python SDK CodeGraph даёт доступ к анализу графов свойств кода (CPG) и связанным сервисам. Документация построена по трём уровням: сервисы данных, рабочие процессы и API-инфраструктура.
Схема модулей¶
graph TB
subgraph "Точки входа"
API["FastAPI App<br>src/api/main.py"]
CLI["CLI<br>src/cli/"]
MCP["MCP Server<br>src/mcp/"]
end
subgraph "Оркестрация"
Copilot["MultiScenarioCopilot<br>src/workflow/orchestration/copilot.py"]
Scenarios["21 сценарий<br>src/workflow/scenarios/"]
end
subgraph "Извлечение данных"
Hybrid["HybridRetriever<br>src/retrieval/hybrid/"]
Vector["VectorStoreReal<br>src/retrieval/vector_store_real.py"]
CPG["CPGQueryService<br>src/services/cpg/"]
end
subgraph "Хранение данных"
DuckDB["DuckDB<br>CPG-граф"]
ChromaDB["ChromaDB<br>Векторные коллекции"]
end
API --> Copilot
CLI --> Copilot
MCP --> Copilot
Copilot --> Scenarios
Scenarios --> Hybrid
Hybrid --> Vector
Hybrid --> CPG
Vector --> ChromaDB
CPG --> DuckDB
Карта модулей¶
| Модуль | Пакет | Назначение |
|---|---|---|
| CPGQueryService | src.services.cpg |
Запросы к графу свойств кода (DuckDB) |
| VectorStoreReal | src.retrieval.vector_store_real |
Семантический поиск (ChromaDB) |
| HybridRetriever | src.retrieval.hybrid.retriever |
Гибридное извлечение (вектор + граф) |
| MultiScenarioCopilot | src.workflow.orchestration.copilot |
LangGraph-оркестратор сценариев |
| ProjectContext | src.api.context |
Контекст проекта для API-запросов |
| ProjectScopedServices | src.api.services.project_services |
LRU-кэш сервисов по проектам |
| CPGConfig | src.config.cpg_config |
Конфигурация домена и LLM |
| DomainRegistry | src.domains.registry |
Управление доменными плагинами |
| get_unified_config | src.config.unified_config |
Единая конфигурация (Pydantic) |
| HardeningScanner | src.security |
Сканер соответствия D3FEND |
| LocalizedHTTPException | src.api.errors |
Локализованные HTTP-ошибки |
| Агенты | src.agents |
См. Справочник агентов |
Основные сервисы¶
CPGQueryService¶
Файл: src/services/cpg/__init__.py
Унифицированный сервис запросов к графу свойств кода. Построен на базе CPGQueryBase и 11 миксинов, каждый из которых добавляет предметно-ориентированные методы запросов, разделяя единое подключение к DuckDB.
Миксины:
| Миксин | Назначение |
|---|---|
SubsystemQueriesMixin |
Запросы по подсистемам |
CallGraphQueriesMixin |
Граф вызовов |
SecurityQueriesMixin |
Уязвимости и безопасность |
PerformanceQueriesMixin |
Узкие места производительности |
QualityQueriesMixin |
Метрики качества кода |
SemanticQueriesMixin |
Семантические запросы |
StatisticsQueriesMixin |
Статистика базы данных |
CommentQueriesMixin |
Поиск по комментариям |
ExternalQueriesMixin |
Внешние зависимости |
TypeQueriesMixin |
Типы и структуры |
PatternQueriesMixin |
Шаблоны кода |
CollectionQueriesMixin |
Операции с коллекциями |
Конструктор¶
from src.services.cpg import CPGQueryService
# Путь к БД определяется через ProjectManager.get_active_db_path()
service = CPGQueryService()
# Явное указание пути
service = CPGQueryService(db_path="data/projects/myproject.duckdb")
# С ограничением допустимых путей (API-слой)
service = CPGQueryService(
db_path="data/projects/myproject.duckdb",
allowed_db_paths={"/data/projects/myproject.duckdb"}
)
Сигнатура:
def __init__(
self,
db_path: Optional[str] = None,
allowed_db_paths: Optional[set] = None,
)
Параметры:
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
db_path |
Optional[str] |
None |
Путь к DuckDB-файлу CPG. Если None, берётся из ProjectManager |
allowed_db_paths |
Optional[set] |
None |
Множество разрешённых путей. None = без ограничений (CLI) |
Основные методы¶
Выполнение запросов¶
# Произвольный SQL-запрос
results = service.execute_query("SELECT * FROM nodes_method LIMIT 10")
# SQL-запрос с параметрами
results = service.execute_query(
"SELECT * FROM nodes_method WHERE name = ?",
parameters=["CommitTransaction"]
)
# Запрос, возвращающий список словарей
rows = service.execute_sql_dict("SELECT name, file FROM nodes_method LIMIT 5")
Управление базой данных¶
# Переключить базу данных
service.set_database("data/projects/another.duckdb")
# Переключить проект (автоматически меняет домен)
service.switch_project("postgresql")
Граф вызовов¶
# Получить вызывающие функции
callers = service.get_callers("LWLockAcquire", limit=20)
# Возвращает: [{'name': 'heap_insert', 'file': 'heapam.c', ...}]
# Получить вызываемые функции
callees = service.get_callees("CommitTransaction", limit=20)
# Возвращает: [{'name': 'MarkBufferDirty', 'file': 'bufmgr.c', ...}]
# Построить граф вызовов
graph = service.get_call_graph(method_id=123, depth=2, direction="both")
Подсистемы¶
# Получить список подсистем
subsystems = service.get_subsystems()
# Возвращает: [{'name': 'executor', 'method_count': 1234}, ...]
# Методы подсистемы
methods = service.get_methods_by_subsystem("executor", limit=100)
Безопасность¶
# Потенциальные уязвимости
hotspots = service.get_security_hotspots(limit=100)
# Поиск SQL-инъекций
injections = service.find_sql_injections()
Производительность¶
# Узкие места производительности
perf_hotspots = service.get_performance_hotspots(limit=100)
# Рекурсивные методы
recursive = service.get_recursive_methods(limit=100)
# Сложные методы
complex_methods = service.get_complex_methods()
# Длинные методы
long_methods = service.get_long_methods(min_lines=100, limit=100)
Статистика¶
# Общая статистика базы данных
stats = service.get_database_stats()
# Возвращает: {'total_methods': 52303, 'total_files': 1200, ...}
# Количество узлов по типам
counts = service.get_node_type_counts()
# Возвращает: {'method': 52303, 'struct': 1200, 'macro': 5600, ...}
Использование как контекстный менеджер¶
with CPGQueryService("data/projects/myproject.duckdb") as service:
stats = service.get_database_stats()
methods = service.get_methods_by_subsystem("executor")
VectorStoreReal¶
Файл: src/retrieval/vector_store_real.py
Семантическое векторное хранилище на базе ChromaDB. Управляет 6 коллекциями документов с поддержкой кэширования и проектной изоляции.
Конструктор¶
from src.retrieval.vector_store_real import VectorStoreReal
store = VectorStoreReal(
persist_directory="chromadb_storage",
cache_size=100,
cache_ttl=3600,
collection_prefix="myproject_"
)
Сигнатура:
def __init__(
self,
persist_directory: Optional[str] = None,
cache_size: int = 100,
cache_ttl: int = 3600,
collection_prefix: str = "",
)
Параметры:
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
persist_directory |
Optional[str] |
None |
Путь к хранилищу ChromaDB |
cache_size |
int |
100 |
Размер LRU-кэша результатов |
cache_ttl |
int |
3600 |
TTL кэша в секундах |
collection_prefix |
str |
"" |
Префикс для изоляции коллекций по проектам |
Методы¶
retrieve_qa¶
Поиск по коллекции вопросов и ответов.
results = store.retrieve_qa(
query="Как работает фиксация транзакции?",
top_k=3,
filter_dict=None
)
# Возвращает: [{'question': '...', 'answer': '...', 'score': 0.85}]
retrieve_sql¶
Поиск примеров SQL-запросов.
examples = store.retrieve_sql(
query="найти вызывающие методы",
keywords=["callers", "call_graph"],
query_type="structural",
top_k=5
)
retrieve_generated_docs¶
Поиск по автоматически сгенерированной документации.
docs = store.retrieve_generated_docs(
query="менеджер буферов",
top_k=5,
language="ru"
)
retrieve_documentation¶
Поиск по документации методов.
docs = store.retrieve_documentation(
query="управление транзакциями",
top_k=5
)
retrieve_comments¶
Поиск по комментариям в коде.
comments = store.retrieve_comments(
query="инициализация блокировки",
top_k=5
)
retrieve_domain_patterns¶
Поиск доменных шаблонов.
patterns = store.retrieve_domain_patterns(
query="обработка ошибок",
top_k=5
)
HybridRetriever¶
Файл: src/retrieval/hybrid/retriever.py
Параллельный гибридный движок извлечения, объединяющий результаты векторного и графового поиска через RRF (Reciprocal Rank Fusion) с доменным бустингом.
Конструктор¶
from src.retrieval.hybrid.retriever import HybridRetriever
from src.retrieval.hybrid.models import HybridRetrievalConfig
config = HybridRetrievalConfig(
vector_weight=0.6,
graph_weight=0.4,
final_top_k=10
)
retriever = HybridRetriever(
vector_store=vector_store,
cpg_service=cpg_service,
config=config
)
Сигнатура:
def __init__(
self,
vector_store, # Экземпляр VectorStoreReal
cpg_service, # Экземпляр CPGQueryService
config: Optional[HybridRetrievalConfig] = None,
)
Методы¶
async retrieve¶
Выполняет гибридное извлечение. Параллельно запускает векторный и графовый поиск, объединяя результаты через RRF.
import asyncio
results = asyncio.run(retriever.retrieve(
query="обработка фиксации транзакции",
mode="hybrid", # "hybrid", "vector_only", "graph_only"
query_type="semantic" # "semantic", "structural", "security"
))
# Возвращает: List[RetrievalResult]
Параметры:
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
query |
str |
— | Поисковый запрос |
mode |
str |
"hybrid" |
Режим: "hybrid", "vector_only", "graph_only" |
query_type |
Optional[str] |
None |
Тип запроса: "semantic", "structural", "security" |
**kwargs |
Дополнительные параметры |
Веса по типу запроса:
| Тип запроса | Вектор | Граф |
|---|---|---|
semantic |
75% | 25% |
structural |
25% | 75% |
| по умолчанию | 60% | 40% |
Модели извлечения¶
RetrievalResult¶
Файл: src/retrieval/hybrid/models.py
Унифицированный результат извлечения из любого источника.
from src.retrieval.hybrid.models import RetrievalResult
@dataclass
class RetrievalResult:
id: str # Уникальный идентификатор результата
content: str # Извлечённое содержимое
score: float # Оценка релевантности
source: str # "vector", "graph" или "hybrid"
entity_type: str = "method" # "method", "struct", "macro", "type", "caller", "callee"
metadata: Dict[str, Any] = field(default_factory=dict)
node_id: Optional[int] = None # Идентификатор узла CPG (для дедупликации)
Пример использования:
for result in results:
print(f"[{result.source}] {result.content[:80]}... (оценка: {result.score:.2f})")
if result.node_id:
print(f" Узел CPG: {result.node_id}")
HybridRetrievalConfig¶
Файл: src/retrieval/hybrid/models.py
Конфигурация параметров гибридного извлечения.
from src.retrieval.hybrid.models import HybridRetrievalConfig
@dataclass
class HybridRetrievalConfig:
vector_weight: float = 0.6 # Вес результатов векторного поиска
graph_weight: float = 0.4 # Вес результатов графового поиска
vector_top_k: int = 20 # Top-K из векторного поиска
graph_top_k: int = 20 # Top-K из графового поиска
final_top_k: int = 10 # Итоговое количество результатов после объединения
min_score_threshold: float = 0.1 # Минимальная оценка для включения
enable_reranking: bool = False # Переранжирование через LLM (дорогостоящая операция)
Пример:
# Конфигурация для структурных запросов (приоритет графа)
structural_config = HybridRetrievalConfig(
vector_weight=0.25,
graph_weight=0.75,
final_top_k=15
)
# Конфигурация для семантических запросов (приоритет вектора)
semantic_config = HybridRetrievalConfig(
vector_weight=0.75,
graph_weight=0.25,
final_top_k=10,
enable_reranking=True
)
DomainBoostContext¶
Файл: src/retrieval/hybrid/models.py
Контекст доменного бустинга для RRF-объединения. Позволяет мультипликативно повышать оценки результатов, связанных с доменом.
from src.retrieval.hybrid.models import DomainBoostContext
@dataclass
class DomainBoostContext:
taint_sources: Set[str] = field(default_factory=set) # Источники заражения
taint_sinks: Set[str] = field(default_factory=set) # Стоки заражения
target_subsystem: Optional[str] = None # Целевая подсистема
query_type: Optional[str] = None # Тип запроса
entry_point_boost: float = 1.2 # Множитель для точек входа
security_boost: float = 1.5 # Множитель для результатов безопасности
subsystem_boost: float = 1.3 # Множитель для целевой подсистемы
Пример использования:
boost = DomainBoostContext(
taint_sources={"user_input", "network_recv"},
taint_sinks={"sql_exec", "system_call"},
target_subsystem="executor",
query_type="security",
security_boost=2.0
)
Ранжирование¶
RelevanceScore¶
Файл: src/ranking/result_ranker.py
Детализированная оценка релевантности результата с разбивкой по компонентам.
from src.ranking.result_ranker import RelevanceScore
@dataclass
class RelevanceScore:
total: float # Итоговая оценка
keyword_match: float = 0.0 # Совпадение ключевых слов
tag_coverage: float = 0.0 # Покрытие тегов
name_match: float = 0.0 # Совпадение имени
length_bonus: float = 0.0 # Бонус за длину
semantic_similarity: float = 0.0 # Семантическое сходство
source_confidence: float = 0.0 # Уверенность в источнике
retrieval_score: float = 0.0 # Исходная оценка RRF/сходства
Метод get_breakdown¶
Возвращает разбивку оценки в виде словаря.
score = RelevanceScore(total=0.87, keyword_match=0.3, name_match=0.5)
breakdown = score.get_breakdown()
# Возвращает: {
# 'keyword_match': 0.3,
# 'tag_coverage': 0.0,
# 'name_match': 0.5,
# 'length_bonus': 0.0,
# 'semantic_similarity': 0.0,
# 'source_confidence': 0.0,
# 'retrieval_score': 0.0
# }
Рабочие процессы¶
MultiScenarioCopilot¶
Файл: src/workflow/orchestration/copilot.py
Главный оркестратор анализа на основе сценариев. Использует LangGraph для маршрутизации запросов через 21 сценарий с автоматическим определением намерения.
Конструктор¶
from src.workflow.orchestration.copilot import MultiScenarioCopilot
copilot = MultiScenarioCopilot()
Метод run¶
Выполняет запрос через систему рабочих процессов.
# Автоматическое определение сценария
result = copilot.run("Найти риски переполнения буфера")
# Принудительный выбор сценария
result = copilot.run(
"Проанализировать модуль",
context={"scenario_id": "scenario_2"}
)
Сигнатура:
def run(self, query: str, context: Optional[Dict] = None) -> Dict
Параметры:
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
query |
str |
— | Запрос на естественном языке |
context |
Optional[Dict] |
None |
Контекст (файл, подсистема, scenario_id) |
Возвращает словарь состояния MultiScenarioState с полями: query, intent, scenario_id, confidence, answer, evidence, metadata.
Конвейер обработки:
START -> classify_intent -> pre_retrieval -> route_by_intent -> [сценарий] -> END
См. Справочник рабочих процессов для полного списка 21 доступного сценария.
MultiScenarioState¶
Файл: src/workflow/state.py
Состояние выполнения рабочего процесса. Определено как TypedDict и передаётся через все узлы графа LangGraph.
from src.workflow.state import MultiScenarioState
class MultiScenarioState(TypedDict):
# Входные данные
query: str # Исходный запрос пользователя
context: Optional[Dict[str, Any]] # Контекст (файл, подсистема и т.д.)
language: Optional[str] # Язык ответа ("en", "ru")
# Классификация намерения
intent: Optional[str] # Определённое намерение
scenario_id: Optional[str] # ID сценария ("scenario_2")
confidence: Optional[float] # Уверенность (0.0-1.0)
classification_method: Optional[str] # Метод: "keyword" или "llm"
# Данные CPG (заполняются сценариями)
cpg_results: Optional[List[Dict]] # Результаты CPG-запросов
subsystems: Optional[List[str]] # Релевантные подсистемы
methods: Optional[List[Dict]] # Метаданные методов
call_graph: Optional[Any] # Граф NetworkX
# Итоговый вывод
answer: Optional[str] # Ответ на естественном языке
evidence: Optional[List[str]] # Доказательная база (факты CPG)
metadata: Optional[Dict[str, Any]] # Метаданные сценария
retrieved_functions: Optional[List[str]] # Извлечённые имена функций
# Обработка ошибок
error: Optional[str] # Сообщение об ошибке
retry_count: int # Счётчик повторных попыток
# Конфигурация
enrichment_config: Optional[Dict[str, Any]] # Настройки обогащения
vector_store: Optional[Any] # Векторное хранилище
# Мультитенантная изоляция
db_path: Optional[str] # Путь к базе данных проекта
collection_prefix: Optional[str] # Префикс коллекций
# Предварительное извлечение (Фаза E)
pre_retrieval_results: Optional[List[Dict[str, Any]]] # Результаты предвыборки
Конфигурация¶
CPGConfig¶
Файл: src/config/cpg_config.py
Управление конфигурацией домена и языковой модели (LLM). Автоматически определяет тип проекта по расширениям файлов.
from src.config.cpg_config import CPGConfig
config = CPGConfig()
Методы¶
set_cpg_type¶
Установить активный домен.
config.set_cpg_type("postgresql_v2")
# Доступные домены: "generic_cpp", "go", "java", "python_django",
# "python_generic", "c_generic", "csharp", "javascript", "typescript",
# "php", "ruby", "kotlin", "swift"
get_code_analyst_title¶
Получить название аналитика, специфичное для домена.
title = config.get_code_analyst_title()
# Возвращает: "PostgreSQL 17.6 expert"
DomainRegistry¶
Файл: src/domains/registry.py
Реестр доменных плагинов. Управляет жизненным циклом плагинов: регистрация, активация, деактивация.
from src.domains import DomainRegistry, get_active_domain
Методы¶
activate¶
Активировать доменный плагин.
DomainRegistry.activate("postgresql")
get_active_or_none¶
Получить текущий активный домен.
domain = DomainRegistry.get_active_or_none()
if domain:
print(f"Активный домен: {domain.name}")
sources = domain.get_taint_sources()
sinks = domain.get_taint_sinks()
Каждый доменный плагин (DomainPluginV3) предоставляет:
- get_module_names() – имена модулей
- get_subsystem_names() – имена подсистем
- get_taint_sources() – источники заражения
- get_taint_sinks() – стоки заражения
- 10 YAML-файлов конфигурации на домен
get_unified_config()¶
Файл: src/config/unified_config.py
Главная точка доступа к единой конфигурации приложения. Возвращает экземпляр UnifiedConfig на основе Pydantic.
from src.config import get_unified_config
cfg = get_unified_config()
# Веса извлечения
cfg.retrieval_weights
# Настройки переранжирования
cfg.reranking.boost_domain_match
# Таймауты (НИКОГДА не захардкоживать!)
cfg.timeouts.external_api
# Размеры батчей (НИКОГДА не захардкоживать!)
cfg.batch_processing.extraction_default_limit
# Рабочие процессы
cfg.workflows.pre_retrieval
# Безопасность
cfg.security.rate_limit_requests_per_minute
Важно: Всегда используйте атрибутный доступ (
cfg.retrieval_weights), а НЕcfg.get("retrieval_weights"). Конфигурация построена на Pydantic, не на словарях.
Инфраструктура API¶
ProjectContext¶
Файл: src/api/context.py
Неизменяемый (frozen) dataclass, несущий контекст проекта для каждого HTTP-запроса. Заменяет прямые вызовы ProjectManager.get_active_db_path() в API-слое.
from src.api.context import ProjectContext
@dataclass(frozen=True)
class ProjectContext:
project_id: UUID # Идентификатор проекта
group_id: UUID # Идентификатор группы
project_name: str # Имя проекта
db_path: str # Путь к базе данных
domain: Optional[str] = None # Активный домен
language: Optional[str] = None # Язык проекта
collection_prefix: str = "" # Префикс коллекций ChromaDB
Приоритет разрешения¶
Контекст проекта определяется в следующем порядке:
- Заголовок
X-Project-Idв HTTP-запросе - Активный проект из
ProjectManager - Резервный контекст (
from_global_fallback())
# Классовый метод для создания из глобального состояния
ctx = ProjectContext.from_global_fallback()
Использование в обработчиках¶
from src.api.context import ProjectContext
from fastapi import Depends
@router.get("/stats")
async def get_stats(ctx: ProjectContext = Depends(get_project_context)):
service = CPGQueryService(db_path=ctx.db_path)
return service.get_database_stats()
ProjectScopedServices¶
Файл: src/api/services/project_services.py
LRU-кэш сервисных экземпляров, привязанных к проектам. Подключения DuckDB хранятся открытыми в режиме только для чтения, что избавляет от накладных расходов на создание подключения при каждом запросе.
from src.api.services.project_services import ProjectScopedServices
Классовые методы¶
get_cpg¶
Получить экземпляр CPGQueryService для контекста проекта.
cpg = ProjectScopedServices.get_cpg(ctx)
stats = cpg.get_database_stats()
get_vector_store¶
Получить экземпляр VectorStoreReal для контекста проекта.
vs = ProjectScopedServices.get_vector_store(ctx)
results = vs.retrieve_qa("запрос", top_k=5)
init_from_config¶
Инициализировать кэш из конфигурации проектов.
ProjectScopedServices.init_from_config()
reset¶
Сбросить кэш (освободить все подключения).
ProjectScopedServices.reset()
Зависимости FastAPI¶
Стандартные зависимости, инжектируемые в обработчики:
from src.api.context import get_project_context
from src.api.auth.jwt_handler import get_current_user
from src.api.dependencies import require_role
@router.post("/analyze")
async def analyze(
ctx: ProjectContext = Depends(get_project_context), # Контекст проекта
user: UserClaims = Depends(get_current_user), # Текущий пользователь
_: None = Depends(require_role("analyst")), # Проверка роли
):
...
| Зависимость | Назначение |
|---|---|
get_project_context |
Разрешение ProjectContext из запроса |
get_current_user |
Извлечение пользователя из JWT-токена |
require_role(role) |
Проверка наличия роли (RBAC) |
get_language |
Определение языка (Accept-Language или ?lang=) |
Стек промежуточного ПО¶
Приложение FastAPI использует многоуровневый стек промежуточного ПО (middleware):
| Промежуточное ПО | Назначение |
|---|---|
CORSMiddleware |
Управление междоменными запросами |
RateLimiterMiddleware |
Ограничение частоты запросов (3 уровня: IP, JWT, API-ключ) |
SecurityHeadersMiddleware |
Заголовки безопасности (CSP, HSTS, X-Frame-Options) |
MetricsMiddleware |
Метрики Prometheus |
DLPMiddleware |
Предотвращение утечки данных (условное включение) |
RequestIdMiddleware |
Уникальный ID для каждого запроса |
Порядок выполнения (снизу вверх): запрос проходит через RequestId -> DLP -> Metrics -> SecurityHeaders -> RateLimiter -> CORS -> обработчик.
Аутентификация и авторизация¶
CodeGraph поддерживает несколько механизмов аутентификации:
| Механизм | Модуль | Описание |
|---|---|---|
| JWT | src.api.auth.jwt_handler |
Локальные JWT-токены (секрет в API_JWT_SECRET) |
| OAuth 2.0 | src.api.auth.oauth |
SourceCraft (Yandex ID) и GitVerse (Sber ID) |
| LDAP | src.api.auth.ldap_auth |
Корпоративная LDAP-аутентификация |
| API-ключи | src.api.auth.api_keys |
Программный доступ через ключи |
RBAC (управление доступом на основе ролей):
from src.api.dependencies import require_role
# В обработчике
@router.delete("/projects/{project_id}")
async def delete_project(
project_id: str,
_: None = Depends(require_role("admin")),
):
...
Примечание: Когда
multi_tenant.enabled: false(по умолчанию), RBAC-проверки пропускаются (no-op).
Обработка ошибок¶
LocalizedHTTPException¶
Файл: src/api/errors.py
HTTP-исключение с автоматической локализацией сообщений. Извлекает текст ошибки из YAML-файлов локализации.
from src.api.errors import LocalizedHTTPException
raise LocalizedHTTPException(
status_code=401,
detail_key="invalid_credentials",
lang="ru"
)
# С параметрами форматирования
raise LocalizedHTTPException(
status_code=404,
detail_key="user_not_found",
lang="ru",
username="john"
)
Сигнатура конструктора:
def __init__(
self,
status_code: int,
detail_key: str,
lang: str = "en",
headers: Optional[Dict[str, str]] = None,
**kwargs: Any,
)
Фабричные функции¶
12 предопределённых фабричных функций для типичных ошибок:
| Функция | HTTP-код | Описание |
|---|---|---|
raise_invalid_credentials |
401 | Неверные учётные данные |
raise_user_disabled |
401 | Учётная запись отключена |
raise_token_expired |
401 | Токен истёк |
raise_token_invalid |
401 | Недействительный токен |
raise_insufficient_permissions |
403 | Недостаточно прав |
raise_user_not_found |
404 | Пользователь не найден |
raise_api_key_not_found |
404 | API-ключ не найден |
raise_session_not_found |
404 | Сессия не найдена |
raise_duplicate_username |
409 | Дублирование имени пользователя |
raise_duplicate_email |
409 | Дублирование электронной почты |
raise_rate_limit_exceeded |
429 | Превышен лимит запросов |
raise_internal_error |
500 | Внутренняя ошибка сервера |
Пример использования:
from src.api.errors import raise_token_expired, raise_rate_limit_exceeded
# В обработчике
if token_is_expired:
raise_token_expired(lang="ru")
LLMProviderError¶
Файл: src/llm/base_provider.py
Исключение, выбрасываемое при ошибке провайдера языковой модели (Yandex, GigaChat, OpenAI, локальная модель).
from src.llm.base_provider import LLMProviderError
try:
response = await provider.generate(prompt)
except LLMProviderError as e:
print(f"Ошибка LLM-провайдера: {e}")
AgentExecutionError¶
Файл: src/workflow/error_handling.py
Исключение, выбрасываемое при ошибке выполнения агента в рамках рабочего процесса.
from src.workflow.error_handling import AgentExecutionError
try:
result = await agent.execute(task)
except AgentExecutionError as e:
print(f"Ошибка агента: {e}")
Прочие исключения¶
| Исключение | Файл | Описание |
|---|---|---|
TokenError |
src/api/auth/jwt_handler.py |
Ошибка работы с JWT-токеном |
OAuthError |
src/api/auth/oauth.py |
Ошибка OAuth 2.0-авторизации |
LDAPError |
src/api/auth/ldap_auth.py |
Ошибка LDAP-аутентификации |
PlatformAPIError |
src/api/services/platform_client.py |
Ошибка API платформы (SourceCraft/GitVerse) |
GoCPGProcessError |
src/services/gocpg/subprocess_runner.py |
Ошибка процесса GoCPG |
GoCPGTimeoutError |
src/services/gocpg/subprocess_runner.py |
Таймаут GoCPG-процесса |
VaultError |
src/security/vault/client.py |
Ошибка хранилища секретов |
DatabaseNotConfiguredError |
src/project_manager.py |
База данных не настроена |
Шаблон обработки ошибок¶
from src.llm.base_provider import LLMProviderError
from src.workflow.error_handling import AgentExecutionError
from src.project_manager import DatabaseNotConfiguredError
try:
copilot = MultiScenarioCopilot()
result = copilot.run("Анализ безопасности модуля executor")
except DatabaseNotConfiguredError:
print("Ошибка: база данных не настроена. Используйте 'python -m src.cli import'")
except LLMProviderError as e:
print(f"Ошибка LLM: {e}")
except AgentExecutionError as e:
print(f"Ошибка выполнения агента: {e}")
except Exception as e:
print(f"Непредвиденная ошибка: {e}")
Классы усиления безопасности¶
HardeningScanner¶
Файл: src/security/hardening/hardening_scanner.py
Сканер соответствия требованиям усиления безопасности исходного кода по стандарту D3FEND (MITRE).
Конструктор¶
from src.security import HardeningScanner, HardeningCategory, HardeningSeverity
scanner = HardeningScanner(cpg_service=cpg_service, language="c")
Сигнатура:
def __init__(self, cpg_service: CPGQueryService, language: str = "c")
Методы¶
scan_all¶
Запускает все применимые проверки усиления безопасности.
findings = scanner.scan_all(limit_per_check=50)
# Возвращает: [HardeningFinding(d3fend_id='D3-VI', severity='high', ...)]
scan_by_d3fend_id¶
Запускает проверки для конкретных техник D3FEND.
findings = scanner.scan_by_d3fend_id(["D3-VI", "D3-NPC", "D3-TL"])
# D3-VI — инициализация переменных
# D3-NPC — проверка нулевых указателей
# D3-TL — доверенные библиотеки
scan_by_category¶
Запускает проверки для конкретной категории.
findings = scanner.scan_by_category(HardeningCategory.MEMORY_SAFETY)
scan_by_severity¶
Запускает проверки с минимальным уровнем серьёзности.
findings = scanner.scan_by_severity(HardeningSeverity.HIGH)
# Возвращает результаты с серьёзностью CRITICAL или HIGH
get_compliance_score¶
Рассчитывает показатели соответствия.
scores = scanner.get_compliance_score(findings)
# Возвращает: {
# 'overall_score': 85.3,
# 'total_findings': 12,
# 'by_category': {'initialization': 3, 'pointer_safety': 5, ...},
# 'by_d3fend': {'D3-VI': 3, 'D3-NPC': 5, ...},
# 'by_severity': {'high': 2, 'medium': 6, 'low': 4},
# 'category_scores': {'initialization': 70, 'pointer_safety': 50, ...},
# 'd3fend_scores': {'D3-VI': 70, 'D3-NPC': 50, ...}
# }
get_remediation_report¶
Генерирует отчёт по устранению проблем в формате Markdown.
report = scanner.get_remediation_report(findings)
print(report)
# # Отчёт по усилению безопасности D3FEND
# ## Сводка
# - **Общий показатель соответствия**: 85.3%
# - **Всего проблем**: 12
# ...
get_checks_summary¶
Получает сводку по доступным проверкам.
summary = scanner.get_checks_summary()
# Возвращает: {
# 'total_checks': 22,
# 'language': 'c',
# 'by_category': {...},
# 'by_d3fend': {...},
# 'domain_checks': 10
# }
HardeningCategory¶
Файл: src/security/hardening/base.py
Перечисление категорий усиления безопасности, соответствующих техникам D3FEND.
from src.security import HardeningCategory
class HardeningCategory(Enum):
INITIALIZATION = "initialization" # D3-VI — Инициализация переменных
CREDENTIAL_MANAGEMENT = "credential_mgmt" # D3-CS — Очистка учётных данных
INTEGER_SAFETY = "integer_safety" # D3-IRV — Проверка диапазона целых чисел
POINTER_SAFETY = "pointer_safety" # D3-PV, D3-NPC, D3-MBSV — Безопасность указателей
MEMORY_SAFETY = "memory_safety" # D3-RN — Обнуление ссылок
LIBRARY_SAFETY = "library_safety" # D3-TL — Доверенные библиотеки
TYPE_SAFETY = "type_safety" # D3-VTV — Проверка типа переменных
DOMAIN_VALIDATION = "domain_validation" # D3-DLV — Проверка доменной логики
OPERATIONAL_VALIDATION = "operational" # D3-OLV — Проверка операционной логики
HardeningSeverity¶
Файл: src/security/hardening/base.py
Перечисление уровней серьёзности обнаруженных проблем.
from src.security import HardeningSeverity
class HardeningSeverity(Enum):
CRITICAL = "critical" # Подвержено прямой эксплуатации
HIGH = "high" # Значительный риск безопасности
MEDIUM = "medium" # Умеренный риск безопасности
LOW = "low" # Незначительная проблема безопасности
INFO = "info" # Рекомендация по лучшим практикам
HardeningCheck¶
Файл: src/security/hardening/base.py
Определение проверки усиления безопасности.
from src.security import HardeningCheck
@dataclass
class HardeningCheck:
id: str # "D3-VI-001"
d3fend_id: str # "D3-VI"
d3fend_name: str # "Variable Initialization"
category: HardeningCategory
severity: HardeningSeverity
description: str # Описание проверки
cpgql_query: str # SQL-запрос к базе CPG
cwe_ids: List[str] # ["CWE-457"]
language_scope: List[str] # ["c", "cpp"] или ["*"] для всех языков
indicators: List[str] # Индикаторы проблемы
good_patterns: List[str] # Шаблоны правильного кода
remediation: str # Рекомендации по исправлению
example_code: str # Пример уязвимого/исправленного кода
confidence_weight: float # 0.0-1.0 — вес уверенности
Метод applies_to_language¶
Проверяет применимость проверки к указанному языку программирования.
check = get_check_by_id("D3-VI-001")
if check.applies_to_language("c"):
print("Применимо к коду на C")
HardeningFinding¶
Файл: src/security/hardening/base.py
Результат выполнения проверки усиления безопасности.
from src.security import HardeningFinding
@dataclass
class HardeningFinding:
finding_id: str # Уникальный идентификатор
check_id: str # "D3-VI-001"
d3fend_id: str # "D3-VI"
category: str # "initialization"
severity: str # "high"
method_name: str # "process_input"
filename: str # "src/input.c"
line_number: int # 142
code_snippet: str # "int x; use(x);"
description: str # Описание найденной проблемы
cwe_ids: List[str] # Связанные CWE
remediation: str # Рекомендация по исправлению
confidence: float # 0.0-1.0 — уверенность
metadata: Dict # Дополнительные метаданные
Методы¶
to_dict¶
Преобразует результат в словарь для сериализации.
finding_dict = finding.to_dict()
# Возвращает: {'finding_id': 'a1b2c3', 'd3fend_id': 'D3-VI', ...}
from_check_and_row (классовый метод)¶
Создаёт результат на основе определения проверки и строки результата запроса.
finding = HardeningFinding.from_check_and_row(
check=check,
row=row,
confidence=0.9
)
Вспомогательные функции¶
Набор утилитарных функций для работы с проверками усиления безопасности.
from src.security import (
get_check_by_id,
get_checks_by_category,
get_checks_by_d3fend_id,
get_all_checks,
get_checks_for_language,
D3FEND_TECHNIQUES,
D3FEND_TECHNIQUE_IDS,
)
get_check_by_id¶
Получает проверку по её идентификатору.
check = get_check_by_id("D3-VI-001")
get_checks_by_category¶
Получает все проверки в указанной категории.
memory_checks = get_checks_by_category(HardeningCategory.MEMORY_SAFETY)
get_checks_by_d3fend_id¶
Получает все проверки для техники D3FEND.
null_checks = get_checks_by_d3fend_id("D3-NPC")
get_all_checks¶
Получает все зарегистрированные проверки.
all_checks = get_all_checks()
print(f"Общее количество проверок: {len(all_checks)}")
get_checks_for_language¶
Получает проверки, применимые к конкретному языку.
c_checks = get_checks_for_language("c")
go_checks = get_checks_for_language("go")
Константы D3FEND¶
Справочные данные по техникам D3FEND.
from src.security import D3FEND_TECHNIQUE_IDS, D3FEND_TECHNIQUES
# Идентификаторы техник D3FEND
D3FEND_TECHNIQUE_IDS = [
"D3-VI", # Инициализация переменных
"D3-CS", # Очистка учётных данных
"D3-IRV", # Проверка диапазона целых чисел
"D3-PV", # Проверка указателей
"D3-RN", # Обнуление ссылок
"D3-TL", # Доверенная библиотека
"D3-VTV", # Проверка типа переменных
"D3-MBSV", # Проверка начала блока памяти
"D3-NPC", # Проверка нулевых указателей
"D3-DLV", # Проверка доменной логики
"D3-OLV", # Проверка операционной логики
]
# Метаданные техник
D3FEND_TECHNIQUES = {
"D3-VI": {
"name": "Variable Initialization",
"description": "Присвоение переменным известного значения до использования",
"url": "https://next.d3fend.mitre.org/technique/d3f:VariableInitialization",
},
# ... остальные техники
}
Дальнейшие шаги¶
- Справочник агентов – подробная документация по AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent и другим агентам конвейера
- Справочник рабочих процессов – система рабочих процессов и 21 сценарий
- Документация по REST API – HTTP-эндпоинты, аутентификация и примеры
- Интеграция с OpenCode – интеграция с OpenCode