Справочник агентов

Этот документ описывает систему агентов CodeGraph: основные конвейерные агенты, ACP-интеграцию и специализированные доменные анализаторы.

Содержание


Обзор

Архитектура системы агентов

Система агентов CodeGraph разделена на три уровня:

  1. Конвейерные агенты (src/agents/) — 5 основных + 15 вспомогательных модулей для RAG-конвейера
  2. ACP — Agent Client Protocol (src/acp/) — интеграция с IDE через JSON-RPC 2.0
  3. Доменные агенты — анализ безопасности, производительности, архитектуры
graph TB
    subgraph "Pipeline Agents (src/agents/)"
        AA[AnalyzerAgent] --> RA[RetrieverAgent]
        RA --> EA[EnrichmentAgent]
        EA --> GA[GeneratorAgent]
        GA --> IA[InterpreterAgent]
        RA -.-> CCA[CallChainAnalyzer]
        RA -.-> CFG[ControlFlowGenerator]
        GA -.-> PB[PromptBuilder]
        GA -.-> QV[QueryVariantGenerator]
        IA -.-> AG[AnswerGenerator]
        IA -.-> RP[ResultParser]
    end
    subgraph "ACP Protocol (src/acp/)"
        IDE[IDE Client] -->|JSON-RPC| ACPA[CodeGraphACPAgent]
        ACPA --> SM[SessionManager]
        ACPA --> TM[ThreadManager]
        ACPA --> TO[TurnOrchestrator]
        ACPA --> DE[DiagnosticsEngine]
        ACPA --> HP[HoverProvider]
        ACPA -->|adapters| WA[WorkflowAdapter]
        WA --> MSC[MultiScenarioCopilot]
    end
    subgraph "Domain Agents"
        SS[SecurityScanner]
        PP[PerformanceProfiler]
        DA[DependencyAnalyzer]
    end

Карта модулей

Пакет Файлов Ключевые классы Назначение
src/agents/ 20 AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent, InterpreterAgent + 15 вспомогательных RAG-конвейер запросов
src/agents/enrichment/ 6 EnrichmentAgent + вспомогательные Обогащение на основе тегов
src/acp/server/ 8 CodeGraphACPAgent, ACPSessionManager и др. Сервер протокола ACP
src/acp/transport/ 4 BaseTransport, StdioTransport, HTTPTransport, WebSocketTransport Транспортный уровень
src/acp/integration/ 4 ChatServiceACPAdapter, MCPBridge и др. Адаптеры интеграции
src/security/ 1 SecurityScanner + 3 агента Анализ безопасности
src/performance/agents/ 4 PerformanceProfiler + 2 агента Анализ производительности
src/architecture/agents/ 4 DependencyAnalyzer + 2 агента Анализ архитектуры

Основные конвейерные агенты

AnalyzerAgent

Файл: src/agents/analyzer_agent.py

Назначение: Понимание вопроса и извлечение намерений

from src.agents import AnalyzerAgent

analyzer = AnalyzerAgent(
    llm=None,                          # Опциональный LLM для расширенного анализа
    cpg_config: Optional[CPGConfig] = None  # Доменная конфигурация
)

Обязанности: - Извлечение намерения пользователя из естественного языка - Определение целевого домена/подсистемы - Извлечение релевантных ключевых слов - Классификация типа запроса (семантический/структурный/безопасность)

Ключевые методы:

# Анализ вопроса — возвращает намерение, домен, ключевые слова, уверенность
analysis = analyzer.analyze("Какие методы обрабатывают транзакции?")
# Возвращает: {'intent': 'find_methods', 'domain': 'transaction-manager',
#              'keywords': ['transaction'], 'query_type': 'semantic', 'confidence': 0.85}

# Классификация режима запроса (семантический vs структурный)
mode, confidence = analyzer.classify_query_mode("Найти всех вызывающих foo")

# Анализ с помощью LLM
analysis = analyzer.analyze_with_llm("Объясните процесс фиксации транзакции")

# Пакетный анализ
results = analyzer.batch_analyze(["Q1...", "Q2...", "Q3..."])

# Получить фильтр ChromaDB для домена
domain_filter = analyzer.get_domain_filter("transaction-manager")

RetrieverAgent

Файл: src/agents/retriever_agent.py

Назначение: Гибридный поиск, объединяющий семантический (ChromaDB) и структурный (DuckDB) поиск

from src.agents import RetrieverAgent

retriever = RetrieverAgent(
    vector_store=vector_store,           # Экземпляр VectorStoreReal
    analyzer_agent=analyzer,             # Экземпляр AnalyzerAgent
    cpg_service=None,                    # Опциональный CPGQueryService для графовых запросов
    cache_size: int = 128,               # Максимум кешированных записей
    cache_ttl_seconds: Optional[float] = None,  # TTL кеша (None = без ограничения)
    enable_cache_metrics: bool = True,   # Сбор метрик кеша
    enable_hybrid: bool = True           # Включить гибридный поиск
)

Обязанности: - Параллельный асинхронный поиск по векторам и графу - Объединение результатов на основе RRF (Reciprocal Rank Fusion) - Кросс-источниковое ранжирование - Кеширование результатов

Ключевые методы:

# Базовый векторный поиск
result = retriever.retrieve(question, analysis, top_k_qa=3, top_k_sql=5)

# Гибридный поиск (вектор + граф с RRF)
result = retriever.retrieve_hybrid(
    question="Найти шаблоны выделения памяти",
    analysis=analysis,
    mode="hybrid",           # hybrid | vector_only | graph_only
    query_type="structural", # Влияет на распределение весов
    top_k=10,
    use_ranker=True
)

# С переранжированием
result = retriever.retrieve_with_reranking(question, analysis,
    top_k_qa=5, top_k_sql=10, final_k_qa=3, final_k_sql=5)

# С переранжированием с учётом обогащения
result = retriever.retrieve_with_enrichment(question, analysis, enrichment_hints)

# Поиск по ключевым словам
result = retriever.retrieve_by_keywords(["lock", "acquire"], top_k=5)

# Управление кешем
stats = retriever.get_stats()
metrics = retriever.get_cache_metrics()
cleared = retriever.invalidate_cache(pattern="lock*")
warmed = retriever.warm_cache(["Q1", "Q2"])

Адаптация весов:

Тип запроса Вес вектора Вес графа
semantic 0.75 0.25
structural 0.25 0.75
security 0.50 0.50
default 0.60 0.40

EnrichmentAgent

Файл: src/agents/enrichment/agent.py (фасад: src/agents/enrichment_agent.py)

Назначение: Сопоставление вопросов с тегами обогащения CPG для улучшения поиска

from src.agents import EnrichmentAgent

enrichment = EnrichmentAgent(
    enable_fallback: bool = True  # Включить резервные стратегии фазы 4
)

Обязанности: - 12-уровневое семантическое обогащение - Маркировка доменных концепций - Обнаружение свойств ACID - Аннотация атрибутов безопасности

Слои обогащения:

  1. Архитектурный уровень (access-method, storage-engine и т.д.)
  2. Свойства ACID
  3. Шаблоны многопоточности
  4. Показатели производительности
  5. Атрибуты безопасности
  6. Шаблоны обработки ошибок
  7. Управление памятью
  8. Примитивы блокировок
  9. Границы транзакций
  10. Категории потоков данных
  11. Шаблоны потоков управления
  12. Доменно-специфичные концепции

Ключевые методы:

# Получить подсказки обогащения для вопроса
hints = enrichment.get_enrichment_hints(
    question="Как работает менеджер блокировок?",
    analysis={'intent': 'explain-concept', 'domain': 'locking', 'keywords': ['lock']}
)

Подпакет обогащения (src/agents/enrichment/):

Файл Функция Назначение
coverage.py calculate_coverage(hints) Расчёт покрытия тегами (0.0–1.0)
fallback.py general_domain_fallback(hints) Применение резервных стратегий при низком покрытии
keyword_matchers.py enhance_with_keywords(hints, keywords) Добавление тегов по ключевым словам
prompt_formatter.py format_for_prompt(hints) Форматирование подсказок для промпта LLM
tag_filters.py generate_tag_filters(hints) Генерация фильтров тегов ChromaDB

GeneratorAgent

Файл: src/agents/generator_agent.py

Назначение: Генерация SQL-запросов из естественного языка с использованием обогащённого контекста

from src.agents import GeneratorAgent

generator = GeneratorAgent(
    sql_generator: SQLQueryGenerator,    # Бэкенд генерации SQL
    enable_feedback: bool = True,        # Включить отслеживание обратной связи по тегам
    use_semantic: bool = False           # Использовать семантический режим
)

Обязанности: - Генерация SQL-запросов к базе данных CPG - Сопоставление с шаблонами - Оптимизация запросов

Ключевые методы:

# Генерация SQL-запроса
query, is_valid, error = generator.generate(question, context)

# С повторными попытками
query, is_valid, error, attempts = generator.generate_with_retries(
    question, context, max_retries=2
)

# Пакетная генерация
results = generator.generate_batch(questions, contexts)

# Воронка запросов — генерация нескольких вариантов
variants = generator.generate_query_variants(question, context, num_variants=3)

# Объяснение запроса
explanation = generator.explain_query(query, context)

Делегирует: PromptBuilder (форматирование промптов), QueryVariantGenerator (генерация вариантов), TagEffectivenessTracker (обратная связь).


InterpreterAgent

Файл: src/agents/interpreter_agent.py

Назначение: Преобразование результатов выполнения запросов в ответы на естественном языке

from src.agents.interpreter_agent import InterpreterAgent

interpreter = InterpreterAgent(
    llm_interface=None,                  # Опциональный LLM (резерв — шаблоны)
    use_semantic: bool = False,          # Семантический режим (извлечение комментариев)
    cpg_config: Optional[CPGConfig] = None  # Доменная конфигурация
)

Обязанности: - Синтез результатов - Оценка уверенности - Указание источников - Форматирование ответа

Ключевые методы:

# Интерпретация результатов
answer = interpreter.interpret(
    question="Какие методы вызывают LWLockAcquire?",
    results=[...],
    query="SELECT..."
)

# Результат
{
    'answer': 'LWLockAcquire вызывается 15 методами, включая...',
    'confidence': 0.85,
    'sources': [
        {'method': 'heap_insert', 'file': 'heapam.c', 'line': 234},
        ...
    ]
}

Делегирует трём вспомогательным классам: - ResultParser — разбор различных форматов результатов - AnswerGenerator — генерация ответов (LLM или шаблоны) - SemanticInterpreter — интерпретация семантических запросов


Вспомогательные модули конвейера

CallChainAnalyzer

Файл: src/agents/call_chain_analyzer.py

Назначение: Построение цепочек вызовов и графов вызовов из результатов запросов

from src.agents.call_chain_analyzer import CallChainAnalyzer

analyzer = CallChainAnalyzer()

# Анализ результатов CPGQL
result = analyzer.analyze(
    entry_point_result=entry_point,
    keyword_methods_result=methods,
    call_graph_result=graph,
    question_keywords=["lock", "acquire"]
)
# Возвращает: {
#   'entry_point': 'LWLockAcquire',
#   'call_graph': {'LWLockAcquire': ['LWLockWaitForVar', ...]},
#   'call_chains': [{'path': [...], 'length': 4}],
#   'key_functions': [{'method': '...', 'score': 0.9, 'file': '...'}],
#   'metadata': {...}
# }

ControlFlowGenerator

Файл: src/agents/control_flow_generator.py

Назначение: Генерация SQL-запросов для анализа потока управления и трассировки цепочек вызовов

from src.agents.control_flow_generator import ControlFlowGenerator

cfg_gen = ControlFlowGenerator(llm=None)

# Генерация SQL для обнаружения точки входа
entry_sql = cfg_gen.generate_entry_point_query("shutdown")

# Генерация SQL для поиска методов по ключевым словам
keyword_sql = cfg_gen.generate_keyword_methods_query(["lock", "acquire"])

# Генерация SQL для графа вызовов
call_graph_sql = cfg_gen.generate_call_graph_query("LWLockAcquire", depth=5)

Стратегии: 1. Обнаружение точки входа — поиск основного метода, обрабатывающего механизм 2. Поиск по ключевым словам — поиск связанных методов 3. Построение графа вызовов — построение отношений вызовов через edges_call


LogicSynthesizer

Файл: src/agents/logic_synthesizer.py

Назначение: Синтез объяснений на естественном языке из анализа цепочек вызовов

from src.agents.logic_synthesizer import LogicSynthesizer, format_call_chain_for_prompt

synthesizer = LogicSynthesizer(llm=llm_interface)

# Синтез объяснения
explanation = synthesizer.synthesize(
    question="Как происходит корректное завершение?",
    analysis_result=call_chain_result
)

# Форматирование цепочки вызовов для промпта LLM
formatted = format_call_chain_for_prompt(analysis_result)
# Возвращает: {
#   'entry_point': 'ShutdownXLOG',
#   'call_graph': '- ShutdownXLOG -> CreateCheckPoint, ...',
#   'call_chains': '1. ShutdownXLOG -> ... (length: 4)',
#   'key_functions': '1. CreateCheckPoint (score: 0.9) - xlog.c:1234'
# }

AdaptiveQueryRefiner

Файл: src/agents/adaptive_refiner.py

Назначение: Обучение на результатах выполнения запросов и предложение улучшений

from src.agents.adaptive_refiner import AdaptiveQueryRefiner

refiner = AdaptiveQueryRefiner()

# Записать результат выполнения
refiner.record_outcome(
    question_type="find_callers",
    query_pattern="SELECT ... FROM edges_call ...",
    success=True,
    result_count=5
)

# Получить наилучший шаблон для типа вопроса
best = refiner.get_best_pattern("find_callers")

# Уточнить неудачный запрос
refined = refiner.refine_query(
    original_query="...",
    error="Результаты не найдены",
    context=analysis
)

Вспомогательный класс: QueryPattern — изученный шаблон запроса со свойствами success_rate и confidence.


AnswerGenerator

Файл: src/agents/answer_generator.py

Назначение: Генерация ответов на естественном языке (LLM или шаблонный резерв)

from src.agents.answer_generator import AnswerGenerator

gen = AnswerGenerator(
    llm_interface=llm,           # Опциональный LLM
    code_analyst_title="analyst",  # Заголовок для ИИ-аналитика
    language="ru"                  # Язык промптов
)

# Группировка по тегам
grouped = gen.group_by_tags(functions, enrichment_hints)

# Генерация ответа
answer = gen.generate(
    question="...",
    query="SELECT ...",
    grouped_results=grouped,
    raw_results=[...]
)

Фабрика: create_answer_generator(...) -> AnswerGenerator


ResultReranker

Файл: src/agents/result_reranker.py

Назначение: Переранжирование результатов на основе стратегий

Иерархия стратегий:

  • RerankerStrategy (ABC) — абстрактная база
  • QARerankerStrategy — переранжирование пар вопрос-ответ
  • SQLRerankerStrategy — переранжирование SQL-примеров
  • EnrichmentRerankerStrategy — переранжирование с учётом обогащения

Структура данных: ScoringBoost(name, multiplier, condition) — условное повышение оценки.

from src.agents.result_reranker import (
    create_qa_reranker,
    create_sql_reranker,
    create_enrichment_reranker,
    ScoringBoost,
)

# Создание переранжирователя для пар вопрос-ответ
qa_reranker = create_qa_reranker()

# Переранжирование результатов
reranked = qa_reranker.rerank(
    results=raw_results,
    analysis=analysis,
    query_text="...",
    top_k=5
)

# Пользовательское повышение оценки
boost = ScoringBoost(
    name="domain_match",
    multiplier=1.3,
    condition=lambda item, qt, desc: "lock" in qt.lower()
)

ConfidenceCalculator

Файл: src/agents/confidence_calculator.py

Назначение: Настраиваемый расчёт уверенности через паттерн «Стратегия»

Иерархия стратегий:

  • ConfidenceStrategy (ABC)
  • ResultConfidenceStrategy — на основе количества результатов и статуса выполнения
  • AnalysisConfidenceStrategy — на основе анализа намерения/домена/ключевых слов
  • PatternConfidenceStrategy — на основе успешности сопоставления с шаблонами
  • LLMConfidenceStrategy — на основе сигналов уверенности LLM
from src.agents.confidence_calculator import (
    create_result_calculator,
    create_analysis_calculator,
    create_pattern_calculator,
    ConfidenceFactors,
)

# Расчёт уверенности на основе результатов
calc = create_result_calculator()
score = calc.calculate(
    result_count=5,
    execution_success=True,
    used_fallback=False,
    has_enrichment=True
)

# Расчёт уверенности на основе анализа
analysis_calc = create_analysis_calculator()
score = analysis_calc.calculate(
    intent="find-function",
    domain="locking",
    keywords=["lock", "acquire"],
    domain_confidence=0.8
)

# Контейнер факторов для развёрнутого расчёта
factors = ConfidenceFactors(
    result_count=10,
    execution_success=True,
    intent="find-function",
    domain="locking"
)

PromptBuilder

Файл: src/agents/prompt_builder.py

Назначение: Форматирование промптов для GeneratorAgent

from src.agents.prompt_builder import PromptBuilder

builder = PromptBuilder(semantic_prompts=None)

# Построение обогащённого промпта
prompt = builder.build_enriched_prompt(
    question="Найти методы, работающие с блокировками",
    context={
        'enrichment_hints': hints,
        'similar_qa': qa_examples,
        'sql_examples': sql_examples
    }
)

# Построение простого промпта (без обогащения)
prompt = builder.build_simple_prompt(question, context)

Фабрика: create_prompt_builder(...) -> PromptBuilder


QueryVariantGenerator

Файл: src/agents/query_variants.py

Назначение: Генерация вариантов запросов для подхода «воронка запросов»

from src.agents.query_variants import QueryVariantGenerator

gen = QueryVariantGenerator()

# Генерация вариантов с разной специфичностью
variants = gen.generate_variants(
    question="Найти методы блокировки",
    context={'enrichment_hints': hints},
    num_variants=3
)
# Возвращает: [
#   {'specificity': 'PRECISE', 'query': 'SELECT ...'},   # Высокая точность
#   {'specificity': 'BALANCED', 'query': 'SELECT ...'},   # Баланс
#   {'specificity': 'BROAD', 'query': 'SELECT ...'}       # Высокая полнота
# ]

Фабрика: create_variant_generator() -> QueryVariantGenerator


Резервные стратегии

Файл: src/agents/fallback_strategies.py

Назначение: Интеллектуальные резервные стратегии для низкого покрытия обогащения (фаза 4)

Три класса:

  1. KeywordToTagMapper(similarity_threshold=None) — нечёткое сопоставление ключевых слов с тегами
  2. HybridQueryBuilder() — построение гибридных запросов имя+тег
  3. FallbackStrategySelector() — оркестрация резервных стратегий
from src.agents.fallback_strategies import (
    KeywordToTagMapper,
    HybridQueryBuilder,
    FallbackStrategySelector,
    get_fallback_selector,
)

# Сопоставление ключевых слов с тегами
mapper = KeywordToTagMapper(similarity_threshold=0.6)
new_tags = mapper.map_keywords_to_tags(
    keywords=["allocat", "palloc"],
    existing_tags=[{"category": "function-purpose", "value": "memory-management"}]
)

# Построение гибридного запроса
builder = HybridQueryBuilder()
query = builder.build_hybrid_query(
    name_pattern="lock%",
    tag_filters=[{"category": "function-purpose", "value": "locking"}]
)

# Оркестрация
selector = get_fallback_selector()
result = selector.apply_fallback(
    question="...",
    analysis=analysis,
    hints=current_hints,
    coverage=0.15
)

Прочие вспомогательные модули

Файл Класс/Функция Назначение
semantic_interpreter.py SemanticInterpreter(...) Интерпретация семантических запросов
result_parser.py ResultParser() Разбор списков функций, семантических результатов
tag_effectiveness_tracker.py TagEffectivenessTracker(...), TagUsageRecord Отслеживание эффективности тегов; get_global_tracker()
utils.py sanitize_sql_value(), sanitize_like_pattern() Утилиты очистки SQL

ACP — Agent Client Protocol

Обзор ACP

ACP обеспечивает интеграцию с IDE через JSON-RPC 2.0. Поддерживаемые IDE: Zed, JetBrains, VS Code.

IDE <-> Транспорт (stdio | HTTP | WebSocket) <-> CodeGraphACPAgent <-> Адаптеры <-> Основные сервисы

Точка входа:

python -m src.acp                    # По умолчанию: транспорт stdio
python -m src.acp --transport http   # Транспорт HTTP
python -m src.acp --transport ws     # Транспорт WebSocket

CodeGraphACPAgent

Файл: src/acp/server/agent.py

Назначение: Главный агент ACP — диспетчеризация 25 методов JSON-RPC

from src.acp.server.agent import CodeGraphACPAgent

agent = CodeGraphACPAgent()

# Обработка JSON-RPC запроса
response = await agent.handle_request(request)

Таблица методов JSON-RPC:

Категория Метод Обработчик Потоковый
Инициализация initialize _handle_initialize Нет
Аутентификация authenticate _handle_authenticate Нет
Сессии session/new _handle_session_new Нет
Сессии session/load _handle_session_load Нет
Сессии session/prompt _handle_session_prompt Да
Сессии session/cancel _handle_session_cancel Нет
Файловая система fs/read_text_file _handle_fs_read Нет
Файловая система fs/write_text_file _handle_fs_write Нет
Терминал terminal/create _handle_terminal_create Нет
Терминал terminal/output _handle_terminal_output Нет
Терминал terminal/wait_for_exit _handle_terminal_wait Нет
Терминал terminal/kill _handle_terminal_kill Нет
Терминал terminal/release _handle_terminal_release Нет
Потоки thread/start _handle_thread_start Нет
Потоки thread/resume _handle_thread_resume Нет
Потоки thread/fork _handle_thread_fork Нет
Потоки thread/list _handle_thread_list Нет
Потоки thread/archive _handle_thread_archive Нет
Потоки thread/compact _handle_thread_compact Нет
Ходы turn/start _handle_turn_start Да
Ходы turn/interrupt _handle_turn_interrupt Нет
Диагностика diagnostics/subscribe _handle_diagnostics_subscribe Нет
Диагностика diagnostics/unsubscribe _handle_diagnostics_unsubscribe Нет
Всплывающие подсказки hover/request _handle_hover_request Нет
Одобрение item/fileChange/respondApproval _handle_respond_approval Нет

Отложенно загружаемые свойства:

  • session_managerACPSessionManager
  • chat_adapterChatServiceACPAdapter
  • capability_negotiatorCapabilityNegotiator
  • thread_managerACPThreadManager
  • turn_orchestratorTurnOrchestrator
  • diagnostics_engineDiagnosticsEngine
  • hover_providerHoverProvider
  • approval_bridgeACPApprovalBridge

Типы протокола (модели Pydantic):

Тип Назначение
AgentCapabilities Возможности агента (loadSession, promptCapabilities, mcp, authMethods)
PromptCapabilities Поддерживаемые типы содержимого промптов (image, audio, embeddedContext)
MCPCapabilities Флаги поддержки транспорта MCP (stdio, http, sse)
AgentInfo Информация об агенте (name, version, title)
ClientCapabilities Возможности клиента (fs, terminal)
ClientInfo Информация о клиенте (name, version)
InitializeParams Параметры инициализации (protocolVersion, clientCapabilities, clientInfo)
InitializeResult Результат инициализации (protocolVersion, agentCapabilities, agentInfo)
SessionNewParams Параметры создания сессии (cwd, mcpServers)
SessionNewResult Результат создания сессии (sessionId)
ContentBlock Блок содержимого (type, text, uri, mimeType, data)
SessionPromptParams Параметры промпта (sessionId, prompt)
StopReason Причина остановки (end_turn, max_tokens, cancelled)
JSONRPCRequest Запрос JSON-RPC 2.0
JSONRPCResponse Ответ JSON-RPC 2.0
JSONRPCNotification Уведомление JSON-RPC 2.0 (без id)
PlanEntry Запись плана выполнения (id, title, status)
PlanUpdate Обновление плана
AgentMessageChunk Фрагмент потоковой передачи сообщения агента
ToolCallUpdate Обновление вызова инструмента (toolCallId, title, kind, status, content)

Управление сессиями

Файл: src/acp/server/session_manager.py

Назначение: Управление жизненным циклом сессий ACP с привязкой к сессиям CodeGraph

from src.acp.server.session_manager import ACPSessionManager, ACPSession

manager = ACPSessionManager(
    max_sessions=100,              # Максимум одновременных сессий
    session_timeout_minutes=60     # Таймаут сессии в минутах
)

# Создание сессии
session = await manager.create_session(
    cwd="/path/to/project",
    mcp_servers=[...],
    user_id="user-123"
)

# Загрузка существующей сессии
session = manager.get_session(session_id)

# Отмена сессии
await manager.cancel_session(session_id)

# Очистка устаревших сессий
removed = manager.cleanup_expired()

Модель данных ACPSession:

@dataclass
class ACPSession:
    session_id: str
    cwd: str
    user_id: Optional[str] = None
    project_id: Optional[str] = None      # Контекст мультитенантности
    codegraph_session_id: Optional[str] = None
    mcp_servers: List[Dict[str, Any]] = field(default_factory=list)
    created_at: datetime = field(default_factory=_utc_now)
    last_activity: datetime = field(default_factory=_utc_now)
    cancelled: bool = False
    context: List[Dict[str, str]] = field(default_factory=list)

Согласование возможностей

Файл: src/acp/server/capabilities.py

Назначение: Обратно совместимое согласование возможностей между старыми и новыми клиентами ACP

from src.acp.server.capabilities import CapabilityNegotiator, ClientCapabilitiesV2

negotiator = CapabilityNegotiator()

# Согласование при инициализации
server_caps = negotiator.negotiate(initialize_params)

# Проверка возможностей клиента
if negotiator.client.diagnostics:
    # Клиент поддерживает диагностику
    pass

# Проверка необходимости отправки уведомления
should_send = negotiator.should_send_notification("diagnostics/publish")

Расширенные возможности клиента (ClientCapabilitiesV2):

@dataclass
class ClientCapabilitiesV2:
    fs: Optional[Dict[str, bool]] = None
    terminal: bool = False
    # Флаги возможностей F8 (по умолчанию False для обратной совместимости)
    diagnostics: bool = False
    hover: bool = False
    approval_flow: bool = False
    opt_out_notification_methods: List[str] = field(default_factory=list)

Серверные возможности:

_SERVER_CAPABILITIES = {
    "threadManagement": True,
    "diagnostics": True,
    "hover": True,
    "approvalFlow": True,
}

Жизненный цикл потоков и ходов

Файл (потоки): src/acp/server/thread_manager.py Файл (ходы): src/acp/server/turn_orchestrator.py

ACPThreadManager — управление жизненным циклом потоков через прослойку Harness:

from src.acp.server.thread_manager import ACPThreadManager

tm = ACPThreadManager()

# Создание потока
thread = await tm.start_thread(cwd="/project", name="Analysis")
# Возвращает: {'threadId': '...', 'name': 'Analysis', 'status': 'active', ...}

# Возобновление потока
thread = await tm.resume_thread(thread_id="...")

# Ответвление потока
forked = await tm.fork_thread(thread_id="...", name="Branch")

# Список потоков
threads = await tm.list_threads()

# Архивация потока
await tm.archive_thread(thread_id="...")

# Компактизация потока (удаление промежуточных ходов)
await tm.compact_thread(thread_id="...")

TurnOrchestrator — оркестрация выполнения хода с потоковой передачей:

from src.acp.server.turn_orchestrator import TurnOrchestrator

to = TurnOrchestrator()

# Начало хода (выполнение в фоне, немедленный ответ)
result = await to.start_turn(
    thread_id="...",
    prompt="Проанализировать безопасность",
    context={"scenario_id": "security_workflow"},
    notification_router=router
)
# Возвращает: {'threadId': '...', 'turnId': '...'}

# Прерывание хода
await to.interrupt_turn(thread_id="...", turn_id="...")

Потоковая передача элементов следует паттерну started -> delta -> completed и доставляется через NotificationRouter.


Диагностика и всплывающие подсказки

Файл (диагностика): src/acp/server/diagnostics.py Файл (подсказки): src/acp/server/hover.py

DiagnosticsEngine — генерация диагностик из предвычисленных флагов CPG:

from src.acp.server.diagnostics import DiagnosticsEngine

diag = DiagnosticsEngine()

# Подписка на диагностику файла
diag.subscribe(uri="file:///src/main.py", connection_id="conn-1")

# Генерация диагностик
diagnostics = await diag.generate(
    uri="file:///src/main.py",
    db_path="/path/to/cpg.duckdb",
    notification_router=router
)

# Отписка
diag.unsubscribe(uri="file:///src/main.py", connection_id="conn-1")

# Проверка подписки
is_sub = diag.is_subscribed("file:///src/main.py")

Источники диагностик: - Цикломатическая сложность (has_high_complexity) - Устаревшие методы (has_deprecated) - Комментарии TODO/FIXME (has_todo_fixme) - Высокий fan-in (fan_in выше порога) - Анализ заражения (taint analysis)

HoverProvider — разрешение всплывающих подсказок для позиций в исходном коде:

from src.acp.server.hover import HoverProvider

hover = HoverProvider()

# Получить информацию при наведении
info = await hover.get_hover_info(
    uri="file:///src/main.py",
    line=42,
    character=10
)
# Возвращает: {
#   'contents': {
#     'method': 'process_request',
#     'signature': 'def process_request(self, data: bytes) -> Response',
#     'metrics': {'complexity': 12, 'fan_in': 5, 'fan_out': 8},
#     'flags': {'has_deprecated': False, 'has_todo': True},
#     'callers': ['handle_connection', ...],
#     'callees': ['validate_input', 'execute_query', ...]
#   }
# }

Мост одобрения

Файл: src/acp/server/approval_bridge.py

Назначение: Мост между механизмом одобрения Harness и уведомлениями ACP для утверждения изменений файлов клиентом IDE

from src.acp.server.approval_bridge import ACPApprovalBridge

bridge = ACPApprovalBridge()

# Запрос одобрения изменения файла (блокирует до ответа клиента)
decision = await bridge.request_file_change_approval(
    thread_id="...",
    turn_id="...",
    item_id="...",
    diff="--- a/file.py\n+++ b/file.py\n...",
    files=["src/file.py"],
    reason="Рефакторинг метода process_request",
    confidence=0.85,
    notification_router=router,
    timeout=300  # секунд
)
# Возвращает: {'decision': 'accept'} или {'decision': 'decline', 'reason': '...'}

# Обработка ответа клиента
await bridge.resolve(request_id="...", decision={"decision": "accept"})

Если клиент не отвечает в течение таймаута, запрос автоматически отклоняется.


Маршрутизатор уведомлений

Файл: src/acp/server/notification_router.py

Назначение: Маршрутизация уведомлений JSON-RPC с учётом отказов клиента от определённых типов уведомлений

from src.acp.server.notification_router import NotificationRouter

router = NotificationRouter(
    callback=notification_callback,    # Функция обратного вызова
    negotiator=capability_negotiator   # Согласователь возможностей
)

# Отправка уведомления (с учётом фильтрации отказов)
sent = await router.send(
    method="diagnostics/publish",
    params={"uri": "file:///src/main.py", "diagnostics": [...]}
)
# Возвращает: True если отправлено, False если подавлено

Маршрутизатор автоматически проверяет opt_out_notification_methods клиента через CapabilityNegotiator.


Обработчики файлов и терминала

Файл: src/acp/server/handlers.py

Назначение: Реализация операций файловой системы и терминала

Файловая система:

# Чтение файла (сначала проверяет кеш CPG)
result = await handle_fs_read({"path": "/abs/path/to/file.py"})
# Возвращает: {'content': '...', 'encoding': 'utf-8'}

# Запись файла
result = await handle_fs_write({
    "path": "/abs/path/to/file.py",
    "content": "...",
    "encoding": "utf-8"
})

Терминал:

# Создание терминальной сессии
result = await handle_terminal_create({
    "command": "python",
    "args": ["-m", "pytest", "tests/"],
    "cwd": "/project"
})
# Возвращает: {'terminalId': '...'}

# Получение вывода
output = await handle_terminal_output({"terminalId": "..."})

# Ожидание завершения
exit_info = await handle_terminal_wait({"terminalId": "..."})

# Принудительное завершение
await handle_terminal_kill({"terminalId": "..."})

# Освобождение ресурсов
await handle_terminal_release({"terminalId": "..."})

Транспортный уровень

Пакет: src/acp/transport/

Базовый класс:

from src.acp.transport.base import BaseTransport

class BaseTransport(abc.ABC):
    """Абстрактный базовый класс для транспортов ACP."""

    def __init__(self, agent: CodeGraphACPAgent):
        self.agent = agent
        self._running = False

    @property
    def is_running(self) -> bool: ...

    @abc.abstractmethod
    async def start(self) -> None: ...

    @abc.abstractmethod
    async def stop(self) -> None: ...

    @abc.abstractmethod
    async def send(self, message: Any) -> None: ...

Реализации:

Транспорт Файл Назначение
StdioTransport transport/stdio.py Стандартный ввод/вывод для локального подпроцесса
HTTPTransport transport/http.py REST API для удалённого доступа
WebSocketTransport transport/websocket.py Потоковая передача для обновлений в реальном времени

Использование:

from src.acp.transport.stdio import StdioTransport
from src.acp.transport.http import HTTPTransport
from src.acp.transport.websocket import WebSocketTransport

# Stdio (по умолчанию для IDE)
transport = StdioTransport(agent)
await transport.start()

# HTTP
transport = HTTPTransport(agent, host="0.0.0.0", port=8080)
await transport.start()

# WebSocket
transport = WebSocketTransport(agent, host="0.0.0.0", port=8081)
await transport.start()

Адаптеры интеграции

Пакет: src/acp/integration/

ChatServiceACPAdapter

Файл: src/acp/integration/chat_adapter.py

Назначение: Мост между промптами ACP и ChatService CodeGraph

from src.acp.integration.chat_adapter import ChatServiceACPAdapter

adapter = ChatServiceACPAdapter()

# Преобразование ContentBlock[] -> строка запроса
query = adapter.extract_query(content_blocks)

# Обработка промпта через ChatService
result = await adapter.process_prompt(
    session_id="...",
    content_blocks=[...],
    notification_callback=callback
)

Преобразования: - ContentBlock[] -> строка запроса - ChatResponse -> SessionUpdate[] - Evidence -> содержимое ToolCall

SessionACPAdapter

Файл: src/acp/integration/session_adapter.py

Назначение: Мост между сессиями ACP и SessionService CodeGraph

from src.acp.integration.session_adapter import SessionACPAdapter

adapter = SessionACPAdapter()

# Создание сессии CodeGraph
session_id = await adapter.create_session(
    user_id="user-123",
    metadata={"source": "acp"}
)

# Воспроизведение истории диалога для session/load
history = await adapter.replay_history(session_id)

WorkflowACPAdapter

Файл: src/acp/integration/workflow_adapter.py

Назначение: Сопоставление сценариев CodeGraph с вызовами инструментов ACP

from src.acp.integration.workflow_adapter import WorkflowACPAdapter

adapter = WorkflowACPAdapter()

# Получить тип инструмента ACP для сценария
kind = adapter.get_tool_kind("security_workflow")  # "search"
title = adapter.get_scenario_title("security_workflow")  # "Security Analysis"

# Выполнить сценарий с потоковой передачей обновлений
result = await adapter.execute_scenario(
    scenario_id="security_workflow",
    query="Проверить безопасность модуля аутентификации",
    context={},
    notification_router=router
)

Сопоставление сценариев с типами инструментов:

Сценарий Тип инструмента Заголовок
onboarding_workflow search Codebase Exploration
security_workflow search Security Analysis
performance_workflow search Performance Analysis
refactoring_workflow edit Refactoring Suggestions
documentation_workflow read Documentation Generation
feature_dev_workflow think Feature Development Guide
code_review_workflow read Code Review
architecture_workflow think Architecture Analysis
debugging_workflow search Debugging Session

MCPBridge

Файл: src/acp/integration/mcp_bridge.py

Назначение: Интеграция внешних MCP-серверов, предоставленных клиентом

from src.acp.integration.mcp_bridge import MCPBridge, MCPServer, MCPTool, MCPResource

bridge = MCPBridge()

# Подключение к MCP-серверу
server = MCPServer(
    id="external-db",
    name="Database Tools",
    transport="stdio",
    command="npx",
    args=["-y", "@modelcontextprotocol/server-postgres"]
)
await bridge.connect(server)

# Список инструментов
tools = await bridge.list_tools(server_id="external-db")

# Вызов инструмента
result = await bridge.call_tool(
    server_id="external-db",
    tool_name="query",
    arguments={"sql": "SELECT * FROM users LIMIT 5"}
)

# Список ресурсов
resources = await bridge.list_resources(server_id="external-db")

# Отключение
await bridge.disconnect(server_id="external-db")

Модели данных:

@dataclass
class MCPServer:
    id: str
    name: str
    transport: str          # stdio, http, sse
    command: Optional[str] = None    # Для stdio
    args: Optional[List[str]] = None
    url: Optional[str] = None        # Для http/sse
    headers: Optional[Dict[str, str]] = None
    env: Optional[Dict[str, str]] = None

@dataclass
class MCPTool:
    name: str
    description: str
    input_schema: Dict[str, Any]
    server_id: str

@dataclass
class MCPResource:
    uri: str
    name: str
    description: Optional[str] = None
    mime_type: Optional[str] = None
    server_id: str = ""

CLI ACP

Файл: src/acp/cli.py

Назначение: Точка входа для запуска CodeGraph как ACP-агента в режиме подпроцесса

# Запуск ACP-агента (stdio транспорт)
python -m src.acp.cli

# Или через установленную команду
codegraph-acp
# Программное использование
from src.acp.cli import main

await main()

Логирование направляется в stderr (так как stdout используется для протокола). Автоматически снижается уровень логирования для httpx и httpcore.


Доменные агенты

Агенты безопасности

Файл: src/security/security_agents.py

Назначение: Обнаружение уязвимостей безопасности с использованием шаблонов CPG

Четыре специализированных агента:

  1. SecurityScanner — поиск уязвимостей в CPG с использованием шаблонов безопасности
  2. DataFlowAnalyzer — трассировка потоков данных от источников заражения к приёмникам
  3. VulnerabilityReporter — генерация структурированных отчётов об уязвимостях
  4. RemediationAdvisor — рекомендации по исправлению на основе шаблонов уязвимостей
from src.security.security_agents import (
    SecurityScanner,
    DataFlowAnalyzer,
    VulnerabilityReporter,
    RemediationAdvisor,
    SecurityFinding,
    DataFlowPath,
    VulnerabilityReport,
)

# Инициализация сканера
scanner = SecurityScanner(cpg_service=cpg_service)

# Сканирование на уязвимости
findings: List[SecurityFinding] = await scanner.scan(
    patterns=get_critical_patterns(),
    scope="full"
)

# Анализ потоков данных
flow_analyzer = DataFlowAnalyzer(cpg_service=cpg_service)
paths: List[DataFlowPath] = await flow_analyzer.trace_taint_flows(
    source_pattern="user_input",
    sink_pattern="sql_execute"
)

# Генерация отчёта
reporter = VulnerabilityReporter()
report: VulnerabilityReport = reporter.generate(
    findings=findings,
    data_flows=paths
)

# Рекомендации по исправлению
advisor = RemediationAdvisor()
fixes = advisor.suggest_fixes(findings)

Обнаруживаемые шаблоны: - SQL-инъекции (CWE-89) - Переполнение буфера (CWE-120) - Инъекции команд (CWE-78) - Строки форматирования (CWE-134)

Структуры данных:

@dataclass
class SecurityFinding:
    finding_id: str
    pattern_id: str
    pattern_name: str
    category: str
    severity: str
    method_id: int
    method_name: str
    filename: str
    line_number: int
    code_snippet: str
    description: str
    cwe_ids: list[str]
    confidence: float

@dataclass
class DataFlowPath:
    path_id: str
    source_method: str
    source_file: str
    source_line: int
    sink_method: str
    sink_file: str
    sink_line: int
    path_length: int
    intermediate_nodes: list[dict[str, Any]]
    taint_type: str       # "user_input", "file_read" и т.д.
    sanitized: bool       # Включает ли путь санитизацию

Агенты производительности

Файл: src/performance/agents/ (фасад: src/performance/performance_agents.py)

Назначение: Обнаружение узких мест производительности

Три специализированных агента:

  1. PerformanceProfiler (src/performance/agents/profiler.py) — профилирование методов по шаблонам узких мест
  2. ResourceAnalyzer (src/performance/agents/resource_analyzer.py) — анализ использования ресурсов отдельных методов
  3. OptimizationAdvisor (src/performance/agents/optimizer.py) — рекомендации по оптимизации
from src.performance.agents import (
    PerformanceProfiler,
    ResourceAnalyzer,
    OptimizationAdvisor,
    run_complete_performance_analysis,
)

# Профилирование (поддержка контекстного менеджера)
with PerformanceProfiler(cpg_service) as profiler:
    bottlenecks = profiler.profile_all_bottlenecks(limit_per_pattern=30)

# Анализ ресурсов
with ResourceAnalyzer(cpg_service) as analyzer:
    usage = analyzer.analyze_method_resources("method_name", filename="file.py")

# Рекомендации
advisor = OptimizationAdvisor()
plan = advisor.create_optimization_plan(bottlenecks, [usage])
report = advisor.generate_report(bottlenecks, [usage])

# Полный анализ (все три агента)
report = run_complete_performance_analysis(cpg_service, limit=30)

Модели данных: BottleneckFinding, ResourceUsage, OptimizationRecommendation, PerformanceReport, ProfilingResult, MemoryProfilingResult, PerformanceBaseline, PerformanceTrend


Агенты архитектуры

Файл: src/architecture/agents/ (фасад: src/architecture/architecture_agents.py)

Назначение: Анализ архитектурных шаблонов

Три специализированных агента:

  1. DependencyAnalyzer — анализ зависимостей между модулями
  2. LayerValidator — проверка соблюдения слоёв архитектуры
  3. ArchitectureReporter — генерация отчётов об архитектуре
from src.architecture.agents import (
    DependencyAnalyzer,
    LayerValidator,
    ArchitectureReporter,
    DependencyAnalysis,
    DependencyMetrics,
    ViolationFinding,
    LayerRule,
    RemediationAction,
    ArchitectureReport,
)

# Анализ зависимостей
dep_analyzer = DependencyAnalyzer(cpg_service=cpg_service)
analysis: DependencyAnalysis = await dep_analyzer.analyze()
metrics: DependencyMetrics = analysis.metrics

# Валидация слоёв
validator = LayerValidator(rules=[
    LayerRule(name="presentation", allowed_deps=["service"]),
    LayerRule(name="service", allowed_deps=["repository"]),
    LayerRule(name="repository", allowed_deps=[]),
])
violations: List[ViolationFinding] = await validator.validate(cpg_service)

# Генерация отчёта
reporter = ArchitectureReporter()
report: ArchitectureReport = reporter.generate(analysis, violations)

Анализ: - Границы слоёв - Зависимости между модулями - Обнаружение шаблонов проектирования - Сопоставление подсистем


Справочник конфигурации

AgentConfidenceConfig

Конфигурация расчёта уверенности агентов в config.yaml:

# config.yaml -> confidence
confidence:
  result_base: 0.5              # Базовая уверенность при наличии результатов
  result_sweet_spot_min: 1      # Минимум результатов для полной уверенности
  result_sweet_spot_max: 10     # Максимум результатов до штрафа
  result_too_many_penalty: 0.1  # Штраф за слишком много результатов
  fallback_penalty: 0.15        # Штраф за использование резервной стратегии
  enrichment_bonus: 0.1         # Бонус за наличие обогащения
  no_results_confidence: 0.1    # Уверенность при пустых результатах
  execution_failure: 0.0        # Уверенность при ошибке выполнения

Доступ из кода:

from src.config import get_unified_config

cfg = get_unified_config()
cfg.confidence.result_base          # 0.5
cfg.confidence.fallback_penalty     # 0.15
cfg.confidence.enrichment_bonus     # 0.1

ACPDiagnosticsConfig

Конфигурация диагностик ACP в config.yaml:

# config.yaml -> acp
acp:
  diagnostics:
    complexity_threshold: 15     # Порог цикломатической сложности
    fan_in_threshold: 10         # Порог fan-in для предупреждений
    enable_taint: true           # Включить анализ заражения
    max_diagnostics_per_file: 50 # Максимум диагностик на файл
  hover:
    caller_callee_limit: 10      # Максимум вызывающих/вызываемых во всплывающей подсказке
  session:
    max_sessions: 100            # Максимум одновременных сессий
    timeout_minutes: 60          # Таймаут сессии

Доступ из кода:

from src.config import get_unified_config

cfg = get_unified_config()
cfg.acp.diagnostics.complexity_threshold  # 15
cfg.acp.hover.caller_callee_limit         # 10
cfg.acp.session.max_sessions              # 100

Расширение системы агентов

Добавление нового конвейерного агента

Создайте нового агента, следуя базовому шаблону:

from typing import Dict, List, Optional
from dataclasses import dataclass, field


@dataclass
class AgentState:
    """Общее состояние, разделяемое между агентами."""
    question: str
    analysis: Optional[Dict] = None
    retrieval_results: List = field(default_factory=list)
    enrichments: Dict = field(default_factory=dict)
    generated_query: Optional[str] = None
    execution_results: List = field(default_factory=list)
    answer: Optional[str] = None
    confidence: float = 0.0
    errors: List[str] = field(default_factory=list)


class CustomAgent:
    """Пример пользовательского агента."""

    def __init__(self, cpg_config: Optional[CPGConfig] = None):
        from src.config import get_global_cpg_config
        self.cpg_config = cpg_config or get_global_cpg_config()

    def process(self, state: AgentState) -> AgentState:
        """Обрабатывает состояние и возвращает обновлённое состояние."""
        # Ваша логика здесь
        state.custom_results = self._analyze(state.question)
        return state

    def _analyze(self, question: str) -> Dict:
        """Внутренний метод анализа."""
        pass

Зарегистрируйте в рабочем процессе:

from src.workflow import register_agent

register_agent("custom", CustomAgent)

Добавление обработчиков ACP

Для добавления нового метода JSON-RPC:

  1. Определите обработчик в CodeGraphACPAgent:
# В src/acp/server/agent.py
class CodeGraphACPAgent:
    def __init__(self):
        # ... существующие обработчики ...
        self._handlers["custom/analyze"] = self._handle_custom_analyze

    async def _handle_custom_analyze(
        self, params: Dict[str, Any], **kwargs
    ) -> Dict[str, Any]:
        """Обработчик пользовательского метода."""
        uri = params.get("uri")
        if not uri:
            raise ValueError("uri is required")

        # Ваша логика
        result = await self._do_analysis(uri)
        return {"status": "ok", "result": result}
  1. При необходимости добавьте согласование возможностей:
# В src/acp/server/capabilities.py
@dataclass
class ClientCapabilitiesV2:
    # ... существующие поля ...
    custom_analyze: bool = False  # Новая возможность
  1. Добавьте тесты:
# В tests/acp/test_custom.py
async def test_custom_analyze():
    agent = CodeGraphACPAgent()
    result = await agent.handle_request(JSONRPCRequest(
        method="custom/analyze",
        params={"uri": "file:///src/main.py"}
    ))
    assert result.result["status"] == "ok"

Дальнейшие шаги