Этот документ описывает, как в CodeGraph устроены рабочие процессы: оркестрация на базе LangGraph, 21 сценарий анализа, составные сценарии, exec-конвейер и потоковая выдача результатов.
Содержание¶
- Архитектура рабочего процесса
- MultiScenarioCopilot
- Состояние рабочего процесса
- MultiScenarioState
- Специализированные состояния
- Узлы рабочего процесса
- Классификация намерений
- Предварительный поиск
- Маршрутизация сценариев
- Выполнение сценария
- Сценарные рабочие процессы
- Структура каталогов
- Таблица сценариев
- BaseHandler и HandlerResult
- Композитные рабочие процессы
- Аудит
- Синхронизация документации по интерфейсам
- Валидация пользовательских историй
- Пайплайн Exec
- Обработка ошибок
- Логика повторных попыток
- Резервные стратегии
- Пользовательские рабочие процессы
- Создание пользовательского рабочего процесса
- Условная маршрутизация
- Потоковая передача данных
- Связанная документация
- Расположение ключевых файлов
Архитектура рабочего процесса¶
CodeGraph использует LangGraph для оркестрации рабочих процессов. Все запросы проходят через единую точку входа — MultiScenarioCopilot, который определяет тип задачи, при необходимости подгружает контекст и направляет запрос в подходящий сценарий.
Цепочка узлов графа:
classify_intent -> pre_retrieval -> route_by_intent -> scenarios -> END
Общая схема:
graph TD
A[Запрос пользователя] --> B[classify_intent]
B --> C[pre_retrieval]
C --> D[route_by_intent]
D --> S01[onboarding_workflow]
D --> S02[security_workflow]
D --> S03[documentation_workflow]
D --> S04[feature_dev_workflow]
D --> S05[refactoring_workflow]
D --> S06[performance_workflow]
D --> S07[test_coverage_workflow]
D --> S08[compliance_workflow]
D --> S09[code_review_workflow]
D --> S10[cross_repo_workflow]
D --> S11[architecture_workflow]
D --> S12[tech_debt_workflow]
D --> S13[mass_refactoring_workflow]
D --> S14[security_incident_workflow]
D --> S15[debugging_workflow]
D --> S16[entry_points_workflow]
D --> S17[file_editing_workflow]
D --> S18[optimization_workflow]
D --> S19[standards_check_workflow]
D --> S20[dependencies_workflow]
D --> S21[interface_docs_sync_workflow]
S01 --> E[END]
S02 --> E
S03 --> E
S04 --> E
S05 --> E
S06 --> E
S07 --> E
S08 --> E
S09 --> E
S10 --> E
S11 --> E
S12 --> E
S13 --> E
S14 --> E
S15 --> E
S16 --> E
S17 --> E
S18 --> E
S19 --> E
S20 --> E
S21 --> E
MultiScenarioCopilot¶
Главная точка входа для выполнения всех рабочих процессов.
Расположение: src/workflow/orchestration/copilot.py (реэкспорт из src/workflow/)
class MultiScenarioCopilot:
def __init__(self):
self.graph = build_multi_scenario_graph()
def run(self, query: str, context: Optional[Dict] = None) -> Dict[str, Any]:
...
Метод run() синхронный. Потоковая передача симулируется через thread pool executor на уровне API.
Пример использования:
from src.workflow import MultiScenarioCopilot
copilot = MultiScenarioCopilot()
# Автоматическое определение сценария по запросу
result = copilot.run("Найти SQL-инъекции")
# Принудительный выбор сценария
result = copilot.run(
"Проанализировать модуль",
context={"scenario_id": "scenario_2"}
)
# Структура результата
{
"query": "Найти SQL-инъекции",
"intent": "security_audit",
"scenario_id": "scenario_2",
"confidence": 0.92,
"answer": "Обнаружено 3 потенциальных SQL-инъекции...",
"evidence": ["Функция exec_simple_query в строке 142..."],
"metadata": {...}
}
Внутренне копилот строит граф LangGraph через build_multi_scenario_graph(), который связывает классификацию намерений, предварительный поиск, маршрутизацию и выполнение обработчиков.
Состояние рабочего процесса¶
MultiScenarioState¶
Все рабочие процессы используют MultiScenarioState – TypedDict из 21 поля, который проходит через узлы LangGraph.
Расположение: src/workflow/state.py
class MultiScenarioState(TypedDict):
# Входные данные
query: str
context: Optional[Dict[str, Any]]
language: Optional[str] # "en" или "ru"
# Классификация намерений
intent: Optional[str]
scenario_id: Optional[str]
confidence: Optional[float]
classification_method: Optional[str]
# Данные CPG
cpg_results: Optional[List[Dict]]
subsystems: Optional[List[str]]
methods: Optional[List[Dict]]
call_graph: Optional[Any]
# Выходные данные
answer: Optional[str]
evidence: Optional[List[str]]
metadata: Optional[Dict[str, Any]]
retrieved_functions: Optional[List[str]]
# Обработка ошибок
error: Optional[str]
retry_count: int
# Конфигурация
enrichment_config: Optional[Dict[str, Any]]
vector_store: Optional[Any]
# Мультитенантная изоляция
db_path: Optional[str]
collection_prefix: Optional[str]
# Результаты предварительного поиска (Phase E)
pre_retrieval_results: Optional[List[Dict[str, Any]]]
Создание начального состояния – функция create_initial_state() принимает только query и context, без параметра language:
from src.workflow.state import create_initial_state
state = create_initial_state(
query="Найти утечки памяти",
context={"subsystem": "executor"}
)
Специализированные состояния¶
Некоторые сценарии расширяют базовое состояние дополнительными полями:
| Класс состояния | Дополнительные поля |
|---|---|
SecurityWorkflowState |
vulnerabilities, taint_paths, security_findings, risk_score |
PerformanceWorkflowState |
hotspots, complexity_metrics, bottlenecks, optimization_suggestions |
ArchitectureWorkflowState |
dependencies, layer_violations, circular_deps, subsystem_info |
Узлы рабочего процесса¶
Классификация намерений¶
Первый узел графа классифицирует запрос пользователя в один из 21 сценария, используя двуязычное (EN/RU) сопоставление ключевых слов и необязательный резервный вызов LLM.
Расположение: src/workflow/orchestration/intent_classifier.py
from src.workflow.orchestration.intent_classifier import classify_intent_node
# Вызывается внутренне графом
state = classify_intent_node(state)
# Заполняет: state["intent"], state["scenario_id"], state["confidence"],
# state["classification_method"]
Результат classification_method – "keyword" или "llm".
Предварительный поиск¶
Второй узел выполняет предварительный гибридный поиск (Phase E) между классификацией намерений и маршрутизацией. Результаты сохраняются в state["pre_retrieval_results"] и доступны обработчикам сценариев.
Расположение: src/workflow/orchestration/pre_retrieval.py
Включён по умолчанию. Конфигурация: config.yaml -> workflows.pre_retrieval.
Маппинг intent -> query_type для адаптивного взвешивания HybridRetriever:
| Тип запроса | Намерения | Веса (семантика/структура) |
|---|---|---|
semantic |
onboarding, documentation, feature_development, debugging, test_coverage |
75/25 |
structural |
architecture_violations, cross_repo_impact, dependencies, mass_refactoring, tech_debt, refactoring, performance |
25/75 |
security |
security_audit, security_incident, entry_points, compliance |
специализированные |
default |
code_review, file_editing, code_optimization, standards_check |
60/40 |
Маршрутизация сценариев¶
Третий узел направляет запрос к конкретному сценарному рабочему процессу на основе классифицированного намерения.
Расположение: src/workflow/orchestration/router.py
from src.workflow.orchestration.router import route_by_intent
# Возвращает имя узла сценария
next_node = route_by_intent(state)
# Например: "security_workflow", "onboarding_workflow"
Выполнение сценария¶
Каждый сценарный рабочий процесс – это подграф LangGraph, который:
- Выполняет запросы к базе CPG через
CPGQueryService - Обрабатывает результаты через обработчики, специфичные для сценария
- Форматирует ответ с помощью локализованных форматтеров
Сценарные рабочие процессы¶
Структура каталогов¶
src/workflow/scenarios/
+-- _base/ # Базовый класс обработчика
| +-- handler.py # BaseHandler, HandlerResult
+-- _intent/ # Классификация намерений
+-- onboarding/ # S01 Онбординг
+-- security/ # S02 Безопасность
+-- documentation_handlers/ # S03 Документация
+-- feature_dev_handlers/ # S04 Разработка функций
+-- refactoring_handlers/ # S05 Рефакторинг
+-- performance_handlers/ # S06 Производительность
+-- coverage_handlers/ # S07 Покрытие тестами
+-- compliance_handlers/ # S08 Соответствие
+-- code_review_handlers/ # S09 Проверка кода
+-- cross_repo_handlers/ # S10 Кросс-репозиторный анализ
+-- architecture_handlers/ # S11 Архитектура
+-- tech_debt_handlers/ # S12 Техдолг
+-- mass_refactoring.py # S13 Рефакторинг корпоративного масштаба
+-- security_incident.py # S14 Реагирование на инциденты
+-- debugging_handlers/ # S15 Отладка
+-- entry_points.py # S16 Точки входа
+-- file_editing.py # S17 Редактирование файлов
+-- code_optimization.py # S18 Оптимизация кода
+-- standards_check.py # S19 Проверка стандартов
+-- dependencies_analysis.py # S20 Анализ зависимостей
+-- interface_docs_sync_composite.py # S21 Синхронизация документации
+-- audit_composite.py # Аудит (композитный)
+-- story_validation_composite.py # Валидация историй (композитный)
Таблица сценариев¶
21 пронумерованный сценарий (нумерация по graph_builder.py и intent_classifier.py):
| ID | Название | Ключ намерения | Точка входа | Назначение |
|---|---|---|---|---|
| 01 | onboarding | onboarding |
onboarding_workflow |
Онбординг и навигация по кодовой базе |
| 02 | security | security_audit |
security_workflow |
Обнаружение уязвимостей |
| 03 | documentation | documentation |
documentation_workflow |
Генерация документации |
| 04 | feature_dev | feature_development |
feature_dev_workflow |
Разработка функций |
| 05 | refactoring | refactoring |
refactoring_workflow |
Помощь в рефакторинге |
| 06 | performance | performance |
performance_workflow |
Производительность и сложность |
| 07 | test_coverage | test_coverage |
test_coverage_workflow |
Анализ покрытия тестами |
| 08 | compliance | compliance |
compliance_workflow |
Проверка соответствия |
| 09 | code_review | code_review |
code_review_workflow |
Автоматизация проверки кода |
| 10 | cross_repo | cross_repo_impact |
cross_repo_workflow |
Кросс-репозиторный анализ |
| 11 | architecture | architecture_violations |
architecture_workflow |
Архитектурный анализ |
| 12 | tech_debt | tech_debt |
tech_debt_workflow |
Оценка техдолга |
| 13 | mass_refactoring | mass_refactoring |
mass_refactoring_workflow |
Рефакторинг корпоративного масштаба |
| 14 | security_incident | security_incident |
security_incident_workflow |
Реагирование на инциденты |
| 15 | debugging | debugging |
debugging_workflow |
Поддержка отладки |
| 16 | entry_points | entry_points |
entry_points_workflow |
Анализ точек входа |
| 17 | file_editing | file_editing |
file_editing_workflow |
Редактирование файлов на основе AST |
| 18 | code_optimization | code_optimization |
optimization_workflow |
Оптимизация кода |
| 19 | standards_check | standards_check |
standards_check_workflow |
Оптимизация по стандартам |
| 20 | dependencies | dependencies |
dependencies_workflow |
Анализ зависимостей |
| 21 | interface_docs_sync | interface_docs_sync |
interface_docs_sync_workflow |
Синхронизация документации по интерфейсам |
BaseHandler и HandlerResult¶
Все обработчики сценариев наследуют от BaseHandler.
Расположение: src/workflow/scenarios/_base/handler.py
BaseHandler¶
class BaseHandler(ABC, Generic[T]):
def __init__(self, cpg: Any, state: MultiScenarioState):
...
Атрибуты, доступные после инициализации:
| Атрибут | Тип | Описание |
|---|---|---|
self.cpg |
CPGQueryService |
Экземпляр CPG-сервиса (проектно-изолированный) |
self.state |
MultiScenarioState |
Текущее состояние рабочего процесса |
self.query |
str |
Исходный запрос пользователя |
self.context |
Dict[str, Any] |
Контекст запроса |
self.language |
str |
Язык ("en" или "ru") |
self.cfg |
UnifiedConfig |
Унифицированная конфигурация |
Основные методы:
| Метод | Описание |
|---|---|
can_handle(query_info) |
Проверяет, может ли обработчик обработать запрос (по умолчанию True) |
handle(query_info) |
Выполняет логику обработчика (абстрактный метод) |
apply_result(result) |
Применяет результат к состоянию |
log_info(message) |
Логирование с контекстом обработчика |
log_debug(message) |
Отладочное логирование |
log_warning(message) |
Логирование предупреждений |
Предупреждение: НЕ используйте
AnalysisHandlerизsrc/workflow/handlers/analysis.pyв качестве базового класса – его сигнатура конструктора несовместима с реестром сценариев (self.cpgиself.stateне будут установлены).
HandlerResult¶
HandlerResult – обобщённый (generic) датакласс с 8 полями:
@dataclass
class HandlerResult(Generic[T]):
data: Optional[T] = None
cpg_results: List[Dict[str, Any]] = field(default_factory=list)
retrieved_functions: List[str] = field(default_factory=list)
answer: str = ""
evidence: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
should_return: bool = True
llm_context: Dict[str, Any] = field(default_factory=dict)
| Поле | Тип | По умолчанию | Описание |
|---|---|---|---|
data |
Optional[T] |
None |
Структурированные данные (отчёт, результат анализа) |
cpg_results |
List[Dict] |
[] |
Необработанные результаты CPG-запросов |
retrieved_functions |
List[str] |
[] |
Имена найденных функций (для метрик IR) |
answer |
str |
"" |
Форматированный ответ |
evidence |
List[str] |
[] |
Доказательная база |
metadata |
Dict[str, Any] |
{} |
Дополнительные метаданные |
should_return |
bool |
True |
Если True – возврат немедленно; если False – передача в LLM |
llm_context |
Dict[str, Any] |
{} |
Контекст для LLM (при should_return=False) |
Пример пользовательского обработчика:
from src.workflow.scenarios._base.handler import BaseHandler, HandlerResult
class MyHandler(BaseHandler):
def handle(self, query_info: Dict[str, Any]) -> HandlerResult:
results = self.cpg.get_methods_by_subsystem("executor")
return HandlerResult(
answer="Найдены методы...",
evidence=["Метод heap_insert в строке 142"],
metadata={"handler": "my_handler"}
)
Композитные рабочие процессы¶
Композитные рабочие процессы запускают несколько подсценариев параллельно или последовательно. Они не входят в нумерацию 21 сценария.
| Композитный | Описание | Подсценарии / фазы | Режим | Таймаут |
|---|---|---|---|---|
AuditRunner |
Аудит качества по 12 измерениям | 9 подсценариев | Параллельно | 600 сек |
InterfaceDocsSyncRunner |
Синхронизация документации по интерфейсам | 5 фаз, 7 интерфейсов | Пайплайн | 120 сек |
StoryValidationRunner |
Валидация пользовательских историй | 5 интерфейсов | — | — |
Сценарии S18 и S19 также являются композитными по внутренней реализации:
| Сценарий | ID подсценариев | Режим | Таймаут |
|---|---|---|---|
| S18 code_optimization | 02, 05, 06, 11, 12 | Параллельно | 60 сек |
| S19 standards_check | 08, 17, 18 | Последовательно | 45 сек |
Аудит¶
Запускает 9 подсценариев параллельно, покрывая 12 измерений качества кода: безопасность, сложность, дубликаты, зависимости, именование, обработка ошибок, тестирование, документация, производительность, переносимость, стиль, архитектура.
python -m src.cli audit --db PATH [--language ru] [--format json]
Разрешение конфликтов использует приоритетный режим с повышением для безопасности (1.5x) и соответствия требованиям (1.3x). Конфигурация в config.yaml -> composition.
Синхронизация документации по интерфейсам¶
5-фазный пайплайн: обнаружение -> парсинг документации -> генерация -> детекция дрифта -> отчёт. Сканирует 6 интерфейсов (REST API, CLI, MCP, ACP, gRPC, WebSocket) на предмет расхождений документации.
python -m src.cli docs-sync --db PATH [--check] [--format json] [--interfaces rest_api,cli]
Также доступен как MCP-инструмент codegraph_docs_sync и REST-эндпоинт POST /api/v1/documentation/sync. CI-интеграция через .github/workflows/docs-sync.yml. Конфигурация в config.yaml -> composition.orchestrators.interface_docs_sync.
Валидация пользовательских историй¶
StoryValidationRunner валидирует пользовательские истории со статусом Done против 5 интерфейсов. Типы свидетельств: dedicated (порог 0.8 и выше), passthrough (порог 0.5 и ниже), scenario_map (0.7). Отслеживает все совпавшие функции через all_matched_names. Поддерживает Go CPG через параметр --go-db.
python -m src.cli.import_commands dogfood validate-stories
Пайплайн Exec¶
Отдельный инструмент для CI/CD-пайплайнов, не связанный с LangGraph-графом.
Расположение: src/cli/exec_command.py
python -m src.cli exec --prompt "Review security" --base-ref origin/main \
--sarif-file out.sarif --comment-file comment.md --sandbox read-only
Поддерживает экспорт результатов в формат SARIF через SARIFExporter (src/security/sarif_exporter.py) и генерацию комментариев для PR.
Обработка ошибок¶
Логика повторных попыток¶
Рабочие процессы поддерживают автоматический повтор с уточнением запроса. Количество попыток контролируется полем state["retry_count"] (по умолчанию до 2 повторов с адаптивным уточнением):
# Встроено в граф -- настраивается через state["retry_count"]
# По умолчанию: до 2 повторных попыток с адаптивным уточнением запроса
Резервные стратегии¶
При сбое генерации на базе LLM рабочие процессы переключаются на сопоставление шаблонов:
# Автоматическая цепочка запасных вариантов:
# 1. SQL-запрос, сгенерированный LLM
# 2. Запрос по шаблону из примеров
# 3. Прямой вызов метода CPG
Пользовательские рабочие процессы¶
Создание пользовательского рабочего процесса¶
from langgraph.graph import StateGraph
from src.workflow.state import MultiScenarioState
def create_custom_workflow():
workflow = StateGraph(MultiScenarioState)
workflow.add_node("analyze", my_analyze_node)
workflow.add_node("process", my_process_node)
workflow.add_node("interpret", my_interpret_node)
workflow.add_edge("analyze", "process")
workflow.add_edge("process", "interpret")
workflow.set_entry_point("analyze")
workflow.set_finish_point("interpret")
return workflow.compile()
result = create_custom_workflow().invoke({"query": "..."})
Условная маршрутизация¶
def my_route(state: MultiScenarioState) -> str:
if state["intent"] == "security_audit":
return "security_node"
elif state["intent"] == "performance":
return "performance_node"
else:
return "general_node"
workflow.add_conditional_edges(
"analyze",
my_route,
{
"security_node": "security",
"performance_node": "performance",
"general_node": "general"
}
)
Потоковая передача данных¶
Потоковая передача поддерживается на нескольких уровнях:
| Компонент | Механизм | Расположение |
|---|---|---|
ChatService.process_query_stream() |
AsyncGenerator с SSE |
src/services/chat_service.py |
POST /api/v1/chat/stream |
SSE-эндпоинт | src/api/routers/ |
| WebSocket | Полнодуплексное соединение | src/api/websocket/handlers.py |
MultiScenarioCopilot.run() – синхронный метод. Потоковая передача на уровне API симулируется через thread pool executor, оборачивающий синхронный вызов в асинхронный генератор SSE-событий.
Связанная документация¶
- Справочник API – полный справочник REST API
- Справочник агентов – агенты и ACP-протокол
- Руководство по сценариям – примеры использования сценариев
Расположение ключевых файлов¶
| Файл | Назначение |
|---|---|
src/workflow/orchestration/copilot.py |
MultiScenarioCopilot |
src/workflow/orchestration/graph_builder.py |
build_multi_scenario_graph() |
src/workflow/orchestration/intent_classifier.py |
classify_intent_node() |
src/workflow/orchestration/router.py |
route_by_intent() |
src/workflow/orchestration/pre_retrieval.py |
pre_retrieval_node() |
src/workflow/state.py |
MultiScenarioState, create_initial_state() |
src/workflow/scenarios/_base/handler.py |
BaseHandler, HandlerResult |
src/workflow/scenarios/audit_composite.py |
AuditRunner |
src/workflow/scenarios/interface_docs_sync_composite.py |
InterfaceDocsSyncRunner |
src/workflow/scenarios/story_validation_composite.py |
StoryValidationRunner |