Справочник по рабочим процессам

Этот документ описывает, как в CodeGraph устроены рабочие процессы: оркестрация на базе LangGraph, 21 сценарий анализа, составные сценарии, exec-конвейер и потоковая выдача результатов.

Содержание

Архитектура рабочего процесса

CodeGraph использует LangGraph для оркестрации рабочих процессов. Все запросы проходят через единую точку входа — MultiScenarioCopilot, который определяет тип задачи, при необходимости подгружает контекст и направляет запрос в подходящий сценарий.

Цепочка узлов графа:

classify_intent -> pre_retrieval -> route_by_intent -> scenarios -> END

Общая схема:

graph TD
    A[Запрос пользователя] --> B[classify_intent]
    B --> C[pre_retrieval]
    C --> D[route_by_intent]
    D --> S01[onboarding_workflow]
    D --> S02[security_workflow]
    D --> S03[documentation_workflow]
    D --> S04[feature_dev_workflow]
    D --> S05[refactoring_workflow]
    D --> S06[performance_workflow]
    D --> S07[test_coverage_workflow]
    D --> S08[compliance_workflow]
    D --> S09[code_review_workflow]
    D --> S10[cross_repo_workflow]
    D --> S11[architecture_workflow]
    D --> S12[tech_debt_workflow]
    D --> S13[mass_refactoring_workflow]
    D --> S14[security_incident_workflow]
    D --> S15[debugging_workflow]
    D --> S16[entry_points_workflow]
    D --> S17[file_editing_workflow]
    D --> S18[optimization_workflow]
    D --> S19[standards_check_workflow]
    D --> S20[dependencies_workflow]
    D --> S21[interface_docs_sync_workflow]
    S01 --> E[END]
    S02 --> E
    S03 --> E
    S04 --> E
    S05 --> E
    S06 --> E
    S07 --> E
    S08 --> E
    S09 --> E
    S10 --> E
    S11 --> E
    S12 --> E
    S13 --> E
    S14 --> E
    S15 --> E
    S16 --> E
    S17 --> E
    S18 --> E
    S19 --> E
    S20 --> E
    S21 --> E

MultiScenarioCopilot

Главная точка входа для выполнения всех рабочих процессов.

Расположение: src/workflow/orchestration/copilot.py (реэкспорт из src/workflow/)

class MultiScenarioCopilot:
    def __init__(self):
        self.graph = build_multi_scenario_graph()

    def run(self, query: str, context: Optional[Dict] = None) -> Dict[str, Any]:
        ...

Метод run() синхронный. Потоковая передача симулируется через thread pool executor на уровне API.

Пример использования:

from src.workflow import MultiScenarioCopilot

copilot = MultiScenarioCopilot()

# Автоматическое определение сценария по запросу
result = copilot.run("Найти SQL-инъекции")

# Принудительный выбор сценария
result = copilot.run(
    "Проанализировать модуль",
    context={"scenario_id": "scenario_2"}
)

# Структура результата
{
    "query": "Найти SQL-инъекции",
    "intent": "security_audit",
    "scenario_id": "scenario_2",
    "confidence": 0.92,
    "answer": "Обнаружено 3 потенциальных SQL-инъекции...",
    "evidence": ["Функция exec_simple_query в строке 142..."],
    "metadata": {...}
}

Внутренне копилот строит граф LangGraph через build_multi_scenario_graph(), который связывает классификацию намерений, предварительный поиск, маршрутизацию и выполнение обработчиков.

Состояние рабочего процесса

MultiScenarioState

Все рабочие процессы используют MultiScenarioStateTypedDict из 21 поля, который проходит через узлы LangGraph.

Расположение: src/workflow/state.py

class MultiScenarioState(TypedDict):
    # Входные данные
    query: str
    context: Optional[Dict[str, Any]]
    language: Optional[str]               # "en" или "ru"

    # Классификация намерений
    intent: Optional[str]
    scenario_id: Optional[str]
    confidence: Optional[float]
    classification_method: Optional[str]

    # Данные CPG
    cpg_results: Optional[List[Dict]]
    subsystems: Optional[List[str]]
    methods: Optional[List[Dict]]
    call_graph: Optional[Any]

    # Выходные данные
    answer: Optional[str]
    evidence: Optional[List[str]]
    metadata: Optional[Dict[str, Any]]
    retrieved_functions: Optional[List[str]]

    # Обработка ошибок
    error: Optional[str]
    retry_count: int

    # Конфигурация
    enrichment_config: Optional[Dict[str, Any]]
    vector_store: Optional[Any]

    # Мультитенантная изоляция
    db_path: Optional[str]
    collection_prefix: Optional[str]

    # Результаты предварительного поиска (Phase E)
    pre_retrieval_results: Optional[List[Dict[str, Any]]]

Создание начального состояния – функция create_initial_state() принимает только query и context, без параметра language:

from src.workflow.state import create_initial_state

state = create_initial_state(
    query="Найти утечки памяти",
    context={"subsystem": "executor"}
)

Специализированные состояния

Некоторые сценарии расширяют базовое состояние дополнительными полями:

Класс состояния Дополнительные поля
SecurityWorkflowState vulnerabilities, taint_paths, security_findings, risk_score
PerformanceWorkflowState hotspots, complexity_metrics, bottlenecks, optimization_suggestions
ArchitectureWorkflowState dependencies, layer_violations, circular_deps, subsystem_info

Узлы рабочего процесса

Классификация намерений

Первый узел графа классифицирует запрос пользователя в один из 21 сценария, используя двуязычное (EN/RU) сопоставление ключевых слов и необязательный резервный вызов LLM.

Расположение: src/workflow/orchestration/intent_classifier.py

from src.workflow.orchestration.intent_classifier import classify_intent_node

# Вызывается внутренне графом
state = classify_intent_node(state)
# Заполняет: state["intent"], state["scenario_id"], state["confidence"],
#             state["classification_method"]

Результат classification_method"keyword" или "llm".

Предварительный поиск

Второй узел выполняет предварительный гибридный поиск (Phase E) между классификацией намерений и маршрутизацией. Результаты сохраняются в state["pre_retrieval_results"] и доступны обработчикам сценариев.

Расположение: src/workflow/orchestration/pre_retrieval.py

Включён по умолчанию. Конфигурация: config.yaml -> workflows.pre_retrieval.

Маппинг intent -> query_type для адаптивного взвешивания HybridRetriever:

Тип запроса Намерения Веса (семантика/структура)
semantic onboarding, documentation, feature_development, debugging, test_coverage 75/25
structural architecture_violations, cross_repo_impact, dependencies, mass_refactoring, tech_debt, refactoring, performance 25/75
security security_audit, security_incident, entry_points, compliance специализированные
default code_review, file_editing, code_optimization, standards_check 60/40

Маршрутизация сценариев

Третий узел направляет запрос к конкретному сценарному рабочему процессу на основе классифицированного намерения.

Расположение: src/workflow/orchestration/router.py

from src.workflow.orchestration.router import route_by_intent

# Возвращает имя узла сценария
next_node = route_by_intent(state)
# Например: "security_workflow", "onboarding_workflow"

Выполнение сценария

Каждый сценарный рабочий процесс – это подграф LangGraph, который:

  1. Выполняет запросы к базе CPG через CPGQueryService
  2. Обрабатывает результаты через обработчики, специфичные для сценария
  3. Форматирует ответ с помощью локализованных форматтеров

Сценарные рабочие процессы

Структура каталогов

src/workflow/scenarios/
+-- _base/                      # Базовый класс обработчика
|   +-- handler.py              # BaseHandler, HandlerResult
+-- _intent/                    # Классификация намерений
+-- onboarding/                 # S01 Онбординг
+-- security/                   # S02 Безопасность
+-- documentation_handlers/     # S03 Документация
+-- feature_dev_handlers/       # S04 Разработка функций
+-- refactoring_handlers/       # S05 Рефакторинг
+-- performance_handlers/       # S06 Производительность
+-- coverage_handlers/          # S07 Покрытие тестами
+-- compliance_handlers/        # S08 Соответствие
+-- code_review_handlers/       # S09 Проверка кода
+-- cross_repo_handlers/        # S10 Кросс-репозиторный анализ
+-- architecture_handlers/      # S11 Архитектура
+-- tech_debt_handlers/         # S12 Техдолг
+-- mass_refactoring.py         # S13 Рефакторинг корпоративного масштаба
+-- security_incident.py        # S14 Реагирование на инциденты
+-- debugging_handlers/         # S15 Отладка
+-- entry_points.py             # S16 Точки входа
+-- file_editing.py             # S17 Редактирование файлов
+-- code_optimization.py        # S18 Оптимизация кода
+-- standards_check.py          # S19 Проверка стандартов
+-- dependencies_analysis.py    # S20 Анализ зависимостей
+-- interface_docs_sync_composite.py  # S21 Синхронизация документации
+-- audit_composite.py          # Аудит (композитный)
+-- story_validation_composite.py  # Валидация историй (композитный)

Таблица сценариев

21 пронумерованный сценарий (нумерация по graph_builder.py и intent_classifier.py):

ID Название Ключ намерения Точка входа Назначение
01 onboarding onboarding onboarding_workflow Онбординг и навигация по кодовой базе
02 security security_audit security_workflow Обнаружение уязвимостей
03 documentation documentation documentation_workflow Генерация документации
04 feature_dev feature_development feature_dev_workflow Разработка функций
05 refactoring refactoring refactoring_workflow Помощь в рефакторинге
06 performance performance performance_workflow Производительность и сложность
07 test_coverage test_coverage test_coverage_workflow Анализ покрытия тестами
08 compliance compliance compliance_workflow Проверка соответствия
09 code_review code_review code_review_workflow Автоматизация проверки кода
10 cross_repo cross_repo_impact cross_repo_workflow Кросс-репозиторный анализ
11 architecture architecture_violations architecture_workflow Архитектурный анализ
12 tech_debt tech_debt tech_debt_workflow Оценка техдолга
13 mass_refactoring mass_refactoring mass_refactoring_workflow Рефакторинг корпоративного масштаба
14 security_incident security_incident security_incident_workflow Реагирование на инциденты
15 debugging debugging debugging_workflow Поддержка отладки
16 entry_points entry_points entry_points_workflow Анализ точек входа
17 file_editing file_editing file_editing_workflow Редактирование файлов на основе AST
18 code_optimization code_optimization optimization_workflow Оптимизация кода
19 standards_check standards_check standards_check_workflow Оптимизация по стандартам
20 dependencies dependencies dependencies_workflow Анализ зависимостей
21 interface_docs_sync interface_docs_sync interface_docs_sync_workflow Синхронизация документации по интерфейсам

BaseHandler и HandlerResult

Все обработчики сценариев наследуют от BaseHandler.

Расположение: src/workflow/scenarios/_base/handler.py

BaseHandler

class BaseHandler(ABC, Generic[T]):
    def __init__(self, cpg: Any, state: MultiScenarioState):
        ...

Атрибуты, доступные после инициализации:

Атрибут Тип Описание
self.cpg CPGQueryService Экземпляр CPG-сервиса (проектно-изолированный)
self.state MultiScenarioState Текущее состояние рабочего процесса
self.query str Исходный запрос пользователя
self.context Dict[str, Any] Контекст запроса
self.language str Язык ("en" или "ru")
self.cfg UnifiedConfig Унифицированная конфигурация

Основные методы:

Метод Описание
can_handle(query_info) Проверяет, может ли обработчик обработать запрос (по умолчанию True)
handle(query_info) Выполняет логику обработчика (абстрактный метод)
apply_result(result) Применяет результат к состоянию
log_info(message) Логирование с контекстом обработчика
log_debug(message) Отладочное логирование
log_warning(message) Логирование предупреждений

Предупреждение: НЕ используйте AnalysisHandler из src/workflow/handlers/analysis.py в качестве базового класса – его сигнатура конструктора несовместима с реестром сценариев (self.cpg и self.state не будут установлены).

HandlerResult

HandlerResult – обобщённый (generic) датакласс с 8 полями:

@dataclass
class HandlerResult(Generic[T]):
    data: Optional[T] = None
    cpg_results: List[Dict[str, Any]] = field(default_factory=list)
    retrieved_functions: List[str] = field(default_factory=list)
    answer: str = ""
    evidence: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
    should_return: bool = True
    llm_context: Dict[str, Any] = field(default_factory=dict)
Поле Тип По умолчанию Описание
data Optional[T] None Структурированные данные (отчёт, результат анализа)
cpg_results List[Dict] [] Необработанные результаты CPG-запросов
retrieved_functions List[str] [] Имена найденных функций (для метрик IR)
answer str "" Форматированный ответ
evidence List[str] [] Доказательная база
metadata Dict[str, Any] {} Дополнительные метаданные
should_return bool True Если True – возврат немедленно; если False – передача в LLM
llm_context Dict[str, Any] {} Контекст для LLM (при should_return=False)

Пример пользовательского обработчика:

from src.workflow.scenarios._base.handler import BaseHandler, HandlerResult

class MyHandler(BaseHandler):
    def handle(self, query_info: Dict[str, Any]) -> HandlerResult:
        results = self.cpg.get_methods_by_subsystem("executor")
        return HandlerResult(
            answer="Найдены методы...",
            evidence=["Метод heap_insert в строке 142"],
            metadata={"handler": "my_handler"}
        )

Композитные рабочие процессы

Композитные рабочие процессы запускают несколько подсценариев параллельно или последовательно. Они не входят в нумерацию 21 сценария.

Композитный Описание Подсценарии / фазы Режим Таймаут
AuditRunner Аудит качества по 12 измерениям 9 подсценариев Параллельно 600 сек
InterfaceDocsSyncRunner Синхронизация документации по интерфейсам 5 фаз, 7 интерфейсов Пайплайн 120 сек
StoryValidationRunner Валидация пользовательских историй 5 интерфейсов

Сценарии S18 и S19 также являются композитными по внутренней реализации:

Сценарий ID подсценариев Режим Таймаут
S18 code_optimization 02, 05, 06, 11, 12 Параллельно 60 сек
S19 standards_check 08, 17, 18 Последовательно 45 сек

Аудит

Запускает 9 подсценариев параллельно, покрывая 12 измерений качества кода: безопасность, сложность, дубликаты, зависимости, именование, обработка ошибок, тестирование, документация, производительность, переносимость, стиль, архитектура.

python -m src.cli audit --db PATH [--language ru] [--format json]

Разрешение конфликтов использует приоритетный режим с повышением для безопасности (1.5x) и соответствия требованиям (1.3x). Конфигурация в config.yaml -> composition.

Синхронизация документации по интерфейсам

5-фазный пайплайн: обнаружение -> парсинг документации -> генерация -> детекция дрифта -> отчёт. Сканирует 6 интерфейсов (REST API, CLI, MCP, ACP, gRPC, WebSocket) на предмет расхождений документации.

python -m src.cli docs-sync --db PATH [--check] [--format json] [--interfaces rest_api,cli]

Также доступен как MCP-инструмент codegraph_docs_sync и REST-эндпоинт POST /api/v1/documentation/sync. CI-интеграция через .github/workflows/docs-sync.yml. Конфигурация в config.yaml -> composition.orchestrators.interface_docs_sync.

Валидация пользовательских историй

StoryValidationRunner валидирует пользовательские истории со статусом Done против 5 интерфейсов. Типы свидетельств: dedicated (порог 0.8 и выше), passthrough (порог 0.5 и ниже), scenario_map (0.7). Отслеживает все совпавшие функции через all_matched_names. Поддерживает Go CPG через параметр --go-db.

python -m src.cli.import_commands dogfood validate-stories

Пайплайн Exec

Отдельный инструмент для CI/CD-пайплайнов, не связанный с LangGraph-графом.

Расположение: src/cli/exec_command.py

python -m src.cli exec --prompt "Review security" --base-ref origin/main \
    --sarif-file out.sarif --comment-file comment.md --sandbox read-only

Поддерживает экспорт результатов в формат SARIF через SARIFExporter (src/security/sarif_exporter.py) и генерацию комментариев для PR.

Обработка ошибок

Логика повторных попыток

Рабочие процессы поддерживают автоматический повтор с уточнением запроса. Количество попыток контролируется полем state["retry_count"] (по умолчанию до 2 повторов с адаптивным уточнением):

# Встроено в граф -- настраивается через state["retry_count"]
# По умолчанию: до 2 повторных попыток с адаптивным уточнением запроса

Резервные стратегии

При сбое генерации на базе LLM рабочие процессы переключаются на сопоставление шаблонов:

# Автоматическая цепочка запасных вариантов:
# 1. SQL-запрос, сгенерированный LLM
# 2. Запрос по шаблону из примеров
# 3. Прямой вызов метода CPG

Пользовательские рабочие процессы

Создание пользовательского рабочего процесса

from langgraph.graph import StateGraph
from src.workflow.state import MultiScenarioState

def create_custom_workflow():
    workflow = StateGraph(MultiScenarioState)

    workflow.add_node("analyze", my_analyze_node)
    workflow.add_node("process", my_process_node)
    workflow.add_node("interpret", my_interpret_node)

    workflow.add_edge("analyze", "process")
    workflow.add_edge("process", "interpret")

    workflow.set_entry_point("analyze")
    workflow.set_finish_point("interpret")

    return workflow.compile()

result = create_custom_workflow().invoke({"query": "..."})

Условная маршрутизация

def my_route(state: MultiScenarioState) -> str:
    if state["intent"] == "security_audit":
        return "security_node"
    elif state["intent"] == "performance":
        return "performance_node"
    else:
        return "general_node"

workflow.add_conditional_edges(
    "analyze",
    my_route,
    {
        "security_node": "security",
        "performance_node": "performance",
        "general_node": "general"
    }
)

Потоковая передача данных

Потоковая передача поддерживается на нескольких уровнях:

Компонент Механизм Расположение
ChatService.process_query_stream() AsyncGenerator с SSE src/services/chat_service.py
POST /api/v1/chat/stream SSE-эндпоинт src/api/routers/
WebSocket Полнодуплексное соединение src/api/websocket/handlers.py

MultiScenarioCopilot.run() – синхронный метод. Потоковая передача на уровне API симулируется через thread pool executor, оборачивающий синхронный вызов в асинхронный генератор SSE-событий.

Связанная документация

Расположение ключевых файлов

Файл Назначение
src/workflow/orchestration/copilot.py MultiScenarioCopilot
src/workflow/orchestration/graph_builder.py build_multi_scenario_graph()
src/workflow/orchestration/intent_classifier.py classify_intent_node()
src/workflow/orchestration/router.py route_by_intent()
src/workflow/orchestration/pre_retrieval.py pre_retrieval_node()
src/workflow/state.py MultiScenarioState, create_initial_state()
src/workflow/scenarios/_base/handler.py BaseHandler, HandlerResult
src/workflow/scenarios/audit_composite.py AuditRunner
src/workflow/scenarios/interface_docs_sync_composite.py InterfaceDocsSyncRunner
src/workflow/scenarios/story_validation_composite.py StoryValidationRunner