Этот документ описывает систему агентов CodeGraph: основные конвейерные агенты, ACP-интеграцию и специализированные доменные анализаторы.
Содержание¶
- Обзор
- Архитектура системы агентов
- Карта модулей
- Основные конвейерные агенты
- AnalyzerAgent
- RetrieverAgent
- EnrichmentAgent
- GeneratorAgent
- InterpreterAgent
- Вспомогательные модули конвейера
- CallChainAnalyzer
- ControlFlowGenerator
- LogicSynthesizer
- AdaptiveQueryRefiner
- AnswerGenerator
- ResultReranker
- ConfidenceCalculator
- PromptBuilder
- QueryVariantGenerator
- Резервные стратегии
- Прочие вспомогательные модули
- ACP — Agent Client Protocol
- Обзор ACP
- CodeGraphACPAgent
- Управление сессиями
- Согласование возможностей
- Жизненный цикл потоков и ходов
- Диагностика и всплывающие подсказки
- Мост одобрения
- Маршрутизатор уведомлений
- Обработчики файлов и терминала
- Транспортный уровень
- Адаптеры интеграции
- CLI ACP
- Доменные агенты
- Агенты безопасности
- Агенты производительности
- Агенты архитектуры
- Справочник конфигурации
- AgentConfidenceConfig
- ACPDiagnosticsConfig
- Расширение системы агентов
- Добавление нового конвейерного агента
- Добавление обработчиков ACP
- Дальнейшие шаги
Обзор¶
Архитектура системы агентов¶
Система агентов CodeGraph разделена на три уровня:
- Конвейерные агенты (
src/agents/) — 5 основных + 15 вспомогательных модулей для RAG-конвейера - ACP — Agent Client Protocol (
src/acp/) — интеграция с IDE через JSON-RPC 2.0 - Доменные агенты — анализ безопасности, производительности, архитектуры
graph TB
subgraph "Pipeline Agents (src/agents/)"
AA[AnalyzerAgent] --> RA[RetrieverAgent]
RA --> EA[EnrichmentAgent]
EA --> GA[GeneratorAgent]
GA --> IA[InterpreterAgent]
RA -.-> CCA[CallChainAnalyzer]
RA -.-> CFG[ControlFlowGenerator]
GA -.-> PB[PromptBuilder]
GA -.-> QV[QueryVariantGenerator]
IA -.-> AG[AnswerGenerator]
IA -.-> RP[ResultParser]
end
subgraph "ACP Protocol (src/acp/)"
IDE[IDE Client] -->|JSON-RPC| ACPA[CodeGraphACPAgent]
ACPA --> SM[SessionManager]
ACPA --> TM[ThreadManager]
ACPA --> TO[TurnOrchestrator]
ACPA --> DE[DiagnosticsEngine]
ACPA --> HP[HoverProvider]
ACPA -->|adapters| WA[WorkflowAdapter]
WA --> MSC[MultiScenarioCopilot]
end
subgraph "Domain Agents"
SS[SecurityScanner]
PP[PerformanceProfiler]
DA[DependencyAnalyzer]
end
Карта модулей¶
| Пакет | Файлов | Ключевые классы | Назначение |
|---|---|---|---|
src/agents/ |
20 | AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent, InterpreterAgent + 15 вспомогательных | RAG-конвейер запросов |
src/agents/enrichment/ |
6 | EnrichmentAgent + вспомогательные | Обогащение на основе тегов |
src/acp/server/ |
8 | CodeGraphACPAgent, ACPSessionManager и др. | Сервер протокола ACP |
src/acp/transport/ |
4 | BaseTransport, StdioTransport, HTTPTransport, WebSocketTransport | Транспортный уровень |
src/acp/integration/ |
4 | ChatServiceACPAdapter, MCPBridge и др. | Адаптеры интеграции |
src/security/ |
1 | SecurityScanner + 3 агента | Анализ безопасности |
src/performance/agents/ |
4 | PerformanceProfiler + 2 агента | Анализ производительности |
src/architecture/agents/ |
4 | DependencyAnalyzer + 2 агента | Анализ архитектуры |
Основные конвейерные агенты¶
AnalyzerAgent¶
Файл: src/agents/analyzer_agent.py
Назначение: Понимание вопроса и извлечение намерений
from src.agents import AnalyzerAgent
analyzer = AnalyzerAgent(
llm=None, # Опциональный LLM для расширенного анализа
cpg_config: Optional[CPGConfig] = None # Доменная конфигурация
)
Обязанности: - Извлечение намерения пользователя из естественного языка - Определение целевого домена/подсистемы - Извлечение релевантных ключевых слов - Классификация типа запроса (семантический/структурный/безопасность)
Ключевые методы:
# Анализ вопроса — возвращает намерение, домен, ключевые слова, уверенность
analysis = analyzer.analyze("Какие методы обрабатывают транзакции?")
# Возвращает: {'intent': 'find_methods', 'domain': 'transaction-manager',
# 'keywords': ['transaction'], 'query_type': 'semantic', 'confidence': 0.85}
# Классификация режима запроса (семантический vs структурный)
mode, confidence = analyzer.classify_query_mode("Найти всех вызывающих foo")
# Анализ с помощью LLM
analysis = analyzer.analyze_with_llm("Объясните процесс фиксации транзакции")
# Пакетный анализ
results = analyzer.batch_analyze(["Q1...", "Q2...", "Q3..."])
# Получить фильтр ChromaDB для домена
domain_filter = analyzer.get_domain_filter("transaction-manager")
RetrieverAgent¶
Файл: src/agents/retriever_agent.py
Назначение: Гибридный поиск, объединяющий семантический (ChromaDB) и структурный (DuckDB) поиск
from src.agents import RetrieverAgent
retriever = RetrieverAgent(
vector_store=vector_store, # Экземпляр VectorStoreReal
analyzer_agent=analyzer, # Экземпляр AnalyzerAgent
cpg_service=None, # Опциональный CPGQueryService для графовых запросов
cache_size: int = 128, # Максимум кешированных записей
cache_ttl_seconds: Optional[float] = None, # TTL кеша (None = без ограничения)
enable_cache_metrics: bool = True, # Сбор метрик кеша
enable_hybrid: bool = True # Включить гибридный поиск
)
Обязанности: - Параллельный асинхронный поиск по векторам и графу - Объединение результатов на основе RRF (Reciprocal Rank Fusion) - Кросс-источниковое ранжирование - Кеширование результатов
Ключевые методы:
# Базовый векторный поиск
result = retriever.retrieve(question, analysis, top_k_qa=3, top_k_sql=5)
# Гибридный поиск (вектор + граф с RRF)
result = retriever.retrieve_hybrid(
question="Найти шаблоны выделения памяти",
analysis=analysis,
mode="hybrid", # hybrid | vector_only | graph_only
query_type="structural", # Влияет на распределение весов
top_k=10,
use_ranker=True
)
# С переранжированием
result = retriever.retrieve_with_reranking(question, analysis,
top_k_qa=5, top_k_sql=10, final_k_qa=3, final_k_sql=5)
# С переранжированием с учётом обогащения
result = retriever.retrieve_with_enrichment(question, analysis, enrichment_hints)
# Поиск по ключевым словам
result = retriever.retrieve_by_keywords(["lock", "acquire"], top_k=5)
# Управление кешем
stats = retriever.get_stats()
metrics = retriever.get_cache_metrics()
cleared = retriever.invalidate_cache(pattern="lock*")
warmed = retriever.warm_cache(["Q1", "Q2"])
Адаптация весов:
| Тип запроса | Вес вектора | Вес графа |
|---|---|---|
| semantic | 0.75 | 0.25 |
| structural | 0.25 | 0.75 |
| security | 0.50 | 0.50 |
| default | 0.60 | 0.40 |
EnrichmentAgent¶
Файл: src/agents/enrichment/agent.py (фасад: src/agents/enrichment_agent.py)
Назначение: Сопоставление вопросов с тегами обогащения CPG для улучшения поиска
from src.agents import EnrichmentAgent
enrichment = EnrichmentAgent(
enable_fallback: bool = True # Включить резервные стратегии фазы 4
)
Обязанности: - 12-уровневое семантическое обогащение - Маркировка доменных концепций - Обнаружение свойств ACID - Аннотация атрибутов безопасности
Слои обогащения:
- Архитектурный уровень (access-method, storage-engine и т.д.)
- Свойства ACID
- Шаблоны многопоточности
- Показатели производительности
- Атрибуты безопасности
- Шаблоны обработки ошибок
- Управление памятью
- Примитивы блокировок
- Границы транзакций
- Категории потоков данных
- Шаблоны потоков управления
- Доменно-специфичные концепции
Ключевые методы:
# Получить подсказки обогащения для вопроса
hints = enrichment.get_enrichment_hints(
question="Как работает менеджер блокировок?",
analysis={'intent': 'explain-concept', 'domain': 'locking', 'keywords': ['lock']}
)
Подпакет обогащения (src/agents/enrichment/):
| Файл | Функция | Назначение |
|---|---|---|
coverage.py |
calculate_coverage(hints) |
Расчёт покрытия тегами (0.0–1.0) |
fallback.py |
general_domain_fallback(hints) |
Применение резервных стратегий при низком покрытии |
keyword_matchers.py |
enhance_with_keywords(hints, keywords) |
Добавление тегов по ключевым словам |
prompt_formatter.py |
format_for_prompt(hints) |
Форматирование подсказок для промпта LLM |
tag_filters.py |
generate_tag_filters(hints) |
Генерация фильтров тегов ChromaDB |
GeneratorAgent¶
Файл: src/agents/generator_agent.py
Назначение: Генерация SQL-запросов из естественного языка с использованием обогащённого контекста
from src.agents import GeneratorAgent
generator = GeneratorAgent(
sql_generator: SQLQueryGenerator, # Бэкенд генерации SQL
enable_feedback: bool = True, # Включить отслеживание обратной связи по тегам
use_semantic: bool = False # Использовать семантический режим
)
Обязанности: - Генерация SQL-запросов к базе данных CPG - Сопоставление с шаблонами - Оптимизация запросов
Ключевые методы:
# Генерация SQL-запроса
query, is_valid, error = generator.generate(question, context)
# С повторными попытками
query, is_valid, error, attempts = generator.generate_with_retries(
question, context, max_retries=2
)
# Пакетная генерация
results = generator.generate_batch(questions, contexts)
# Воронка запросов — генерация нескольких вариантов
variants = generator.generate_query_variants(question, context, num_variants=3)
# Объяснение запроса
explanation = generator.explain_query(query, context)
Делегирует: PromptBuilder (форматирование промптов), QueryVariantGenerator (генерация вариантов), TagEffectivenessTracker (обратная связь).
InterpreterAgent¶
Файл: src/agents/interpreter_agent.py
Назначение: Преобразование результатов выполнения запросов в ответы на естественном языке
from src.agents.interpreter_agent import InterpreterAgent
interpreter = InterpreterAgent(
llm_interface=None, # Опциональный LLM (резерв — шаблоны)
use_semantic: bool = False, # Семантический режим (извлечение комментариев)
cpg_config: Optional[CPGConfig] = None # Доменная конфигурация
)
Обязанности: - Синтез результатов - Оценка уверенности - Указание источников - Форматирование ответа
Ключевые методы:
# Интерпретация результатов
answer = interpreter.interpret(
question="Какие методы вызывают LWLockAcquire?",
results=[...],
query="SELECT..."
)
# Результат
{
'answer': 'LWLockAcquire вызывается 15 методами, включая...',
'confidence': 0.85,
'sources': [
{'method': 'heap_insert', 'file': 'heapam.c', 'line': 234},
...
]
}
Делегирует трём вспомогательным классам:
- ResultParser — разбор различных форматов результатов
- AnswerGenerator — генерация ответов (LLM или шаблоны)
- SemanticInterpreter — интерпретация семантических запросов
Вспомогательные модули конвейера¶
CallChainAnalyzer¶
Файл: src/agents/call_chain_analyzer.py
Назначение: Построение цепочек вызовов и графов вызовов из результатов запросов
from src.agents.call_chain_analyzer import CallChainAnalyzer
analyzer = CallChainAnalyzer()
# Анализ результатов CPGQL
result = analyzer.analyze(
entry_point_result=entry_point,
keyword_methods_result=methods,
call_graph_result=graph,
question_keywords=["lock", "acquire"]
)
# Возвращает: {
# 'entry_point': 'LWLockAcquire',
# 'call_graph': {'LWLockAcquire': ['LWLockWaitForVar', ...]},
# 'call_chains': [{'path': [...], 'length': 4}],
# 'key_functions': [{'method': '...', 'score': 0.9, 'file': '...'}],
# 'metadata': {...}
# }
ControlFlowGenerator¶
Файл: src/agents/control_flow_generator.py
Назначение: Генерация SQL-запросов для анализа потока управления и трассировки цепочек вызовов
from src.agents.control_flow_generator import ControlFlowGenerator
cfg_gen = ControlFlowGenerator(llm=None)
# Генерация SQL для обнаружения точки входа
entry_sql = cfg_gen.generate_entry_point_query("shutdown")
# Генерация SQL для поиска методов по ключевым словам
keyword_sql = cfg_gen.generate_keyword_methods_query(["lock", "acquire"])
# Генерация SQL для графа вызовов
call_graph_sql = cfg_gen.generate_call_graph_query("LWLockAcquire", depth=5)
Стратегии:
1. Обнаружение точки входа — поиск основного метода, обрабатывающего механизм
2. Поиск по ключевым словам — поиск связанных методов
3. Построение графа вызовов — построение отношений вызовов через edges_call
LogicSynthesizer¶
Файл: src/agents/logic_synthesizer.py
Назначение: Синтез объяснений на естественном языке из анализа цепочек вызовов
from src.agents.logic_synthesizer import LogicSynthesizer, format_call_chain_for_prompt
synthesizer = LogicSynthesizer(llm=llm_interface)
# Синтез объяснения
explanation = synthesizer.synthesize(
question="Как происходит корректное завершение?",
analysis_result=call_chain_result
)
# Форматирование цепочки вызовов для промпта LLM
formatted = format_call_chain_for_prompt(analysis_result)
# Возвращает: {
# 'entry_point': 'ShutdownXLOG',
# 'call_graph': '- ShutdownXLOG -> CreateCheckPoint, ...',
# 'call_chains': '1. ShutdownXLOG -> ... (length: 4)',
# 'key_functions': '1. CreateCheckPoint (score: 0.9) - xlog.c:1234'
# }
AdaptiveQueryRefiner¶
Файл: src/agents/adaptive_refiner.py
Назначение: Обучение на результатах выполнения запросов и предложение улучшений
from src.agents.adaptive_refiner import AdaptiveQueryRefiner
refiner = AdaptiveQueryRefiner()
# Записать результат выполнения
refiner.record_outcome(
question_type="find_callers",
query_pattern="SELECT ... FROM edges_call ...",
success=True,
result_count=5
)
# Получить наилучший шаблон для типа вопроса
best = refiner.get_best_pattern("find_callers")
# Уточнить неудачный запрос
refined = refiner.refine_query(
original_query="...",
error="Результаты не найдены",
context=analysis
)
Вспомогательный класс: QueryPattern — изученный шаблон запроса со свойствами success_rate и confidence.
AnswerGenerator¶
Файл: src/agents/answer_generator.py
Назначение: Генерация ответов на естественном языке (LLM или шаблонный резерв)
from src.agents.answer_generator import AnswerGenerator
gen = AnswerGenerator(
llm_interface=llm, # Опциональный LLM
code_analyst_title="analyst", # Заголовок для ИИ-аналитика
language="ru" # Язык промптов
)
# Группировка по тегам
grouped = gen.group_by_tags(functions, enrichment_hints)
# Генерация ответа
answer = gen.generate(
question="...",
query="SELECT ...",
grouped_results=grouped,
raw_results=[...]
)
Фабрика: create_answer_generator(...) -> AnswerGenerator
ResultReranker¶
Файл: src/agents/result_reranker.py
Назначение: Переранжирование результатов на основе стратегий
Иерархия стратегий:
RerankerStrategy(ABC) — абстрактная базаQARerankerStrategy— переранжирование пар вопрос-ответSQLRerankerStrategy— переранжирование SQL-примеровEnrichmentRerankerStrategy— переранжирование с учётом обогащения
Структура данных: ScoringBoost(name, multiplier, condition) — условное повышение оценки.
from src.agents.result_reranker import (
create_qa_reranker,
create_sql_reranker,
create_enrichment_reranker,
ScoringBoost,
)
# Создание переранжирователя для пар вопрос-ответ
qa_reranker = create_qa_reranker()
# Переранжирование результатов
reranked = qa_reranker.rerank(
results=raw_results,
analysis=analysis,
query_text="...",
top_k=5
)
# Пользовательское повышение оценки
boost = ScoringBoost(
name="domain_match",
multiplier=1.3,
condition=lambda item, qt, desc: "lock" in qt.lower()
)
ConfidenceCalculator¶
Файл: src/agents/confidence_calculator.py
Назначение: Настраиваемый расчёт уверенности через паттерн «Стратегия»
Иерархия стратегий:
ConfidenceStrategy(ABC)ResultConfidenceStrategy— на основе количества результатов и статуса выполненияAnalysisConfidenceStrategy— на основе анализа намерения/домена/ключевых словPatternConfidenceStrategy— на основе успешности сопоставления с шаблонамиLLMConfidenceStrategy— на основе сигналов уверенности LLM
from src.agents.confidence_calculator import (
create_result_calculator,
create_analysis_calculator,
create_pattern_calculator,
ConfidenceFactors,
)
# Расчёт уверенности на основе результатов
calc = create_result_calculator()
score = calc.calculate(
result_count=5,
execution_success=True,
used_fallback=False,
has_enrichment=True
)
# Расчёт уверенности на основе анализа
analysis_calc = create_analysis_calculator()
score = analysis_calc.calculate(
intent="find-function",
domain="locking",
keywords=["lock", "acquire"],
domain_confidence=0.8
)
# Контейнер факторов для развёрнутого расчёта
factors = ConfidenceFactors(
result_count=10,
execution_success=True,
intent="find-function",
domain="locking"
)
PromptBuilder¶
Файл: src/agents/prompt_builder.py
Назначение: Форматирование промптов для GeneratorAgent
from src.agents.prompt_builder import PromptBuilder
builder = PromptBuilder(semantic_prompts=None)
# Построение обогащённого промпта
prompt = builder.build_enriched_prompt(
question="Найти методы, работающие с блокировками",
context={
'enrichment_hints': hints,
'similar_qa': qa_examples,
'sql_examples': sql_examples
}
)
# Построение простого промпта (без обогащения)
prompt = builder.build_simple_prompt(question, context)
Фабрика: create_prompt_builder(...) -> PromptBuilder
QueryVariantGenerator¶
Файл: src/agents/query_variants.py
Назначение: Генерация вариантов запросов для подхода «воронка запросов»
from src.agents.query_variants import QueryVariantGenerator
gen = QueryVariantGenerator()
# Генерация вариантов с разной специфичностью
variants = gen.generate_variants(
question="Найти методы блокировки",
context={'enrichment_hints': hints},
num_variants=3
)
# Возвращает: [
# {'specificity': 'PRECISE', 'query': 'SELECT ...'}, # Высокая точность
# {'specificity': 'BALANCED', 'query': 'SELECT ...'}, # Баланс
# {'specificity': 'BROAD', 'query': 'SELECT ...'} # Высокая полнота
# ]
Фабрика: create_variant_generator() -> QueryVariantGenerator
Резервные стратегии¶
Файл: src/agents/fallback_strategies.py
Назначение: Интеллектуальные резервные стратегии для низкого покрытия обогащения (фаза 4)
Три класса:
KeywordToTagMapper(similarity_threshold=None)— нечёткое сопоставление ключевых слов с тегамиHybridQueryBuilder()— построение гибридных запросов имя+тегFallbackStrategySelector()— оркестрация резервных стратегий
from src.agents.fallback_strategies import (
KeywordToTagMapper,
HybridQueryBuilder,
FallbackStrategySelector,
get_fallback_selector,
)
# Сопоставление ключевых слов с тегами
mapper = KeywordToTagMapper(similarity_threshold=0.6)
new_tags = mapper.map_keywords_to_tags(
keywords=["allocat", "palloc"],
existing_tags=[{"category": "function-purpose", "value": "memory-management"}]
)
# Построение гибридного запроса
builder = HybridQueryBuilder()
query = builder.build_hybrid_query(
name_pattern="lock%",
tag_filters=[{"category": "function-purpose", "value": "locking"}]
)
# Оркестрация
selector = get_fallback_selector()
result = selector.apply_fallback(
question="...",
analysis=analysis,
hints=current_hints,
coverage=0.15
)
Прочие вспомогательные модули¶
| Файл | Класс/Функция | Назначение |
|---|---|---|
semantic_interpreter.py |
SemanticInterpreter(...) |
Интерпретация семантических запросов |
result_parser.py |
ResultParser() |
Разбор списков функций, семантических результатов |
tag_effectiveness_tracker.py |
TagEffectivenessTracker(...), TagUsageRecord |
Отслеживание эффективности тегов; get_global_tracker() |
utils.py |
sanitize_sql_value(), sanitize_like_pattern() |
Утилиты очистки SQL |
ACP — Agent Client Protocol¶
Обзор ACP¶
ACP обеспечивает интеграцию с IDE через JSON-RPC 2.0. Поддерживаемые IDE: Zed, JetBrains, VS Code.
IDE <-> Транспорт (stdio | HTTP | WebSocket) <-> CodeGraphACPAgent <-> Адаптеры <-> Основные сервисы
Точка входа:
python -m src.acp # По умолчанию: транспорт stdio
python -m src.acp --transport http # Транспорт HTTP
python -m src.acp --transport ws # Транспорт WebSocket
CodeGraphACPAgent¶
Файл: src/acp/server/agent.py
Назначение: Главный агент ACP — диспетчеризация 25 методов JSON-RPC
from src.acp.server.agent import CodeGraphACPAgent
agent = CodeGraphACPAgent()
# Обработка JSON-RPC запроса
response = await agent.handle_request(request)
Таблица методов JSON-RPC:
| Категория | Метод | Обработчик | Потоковый |
|---|---|---|---|
| Инициализация | initialize |
_handle_initialize |
Нет |
| Аутентификация | authenticate |
_handle_authenticate |
Нет |
| Сессии | session/new |
_handle_session_new |
Нет |
| Сессии | session/load |
_handle_session_load |
Нет |
| Сессии | session/prompt |
_handle_session_prompt |
Да |
| Сессии | session/cancel |
_handle_session_cancel |
Нет |
| Файловая система | fs/read_text_file |
_handle_fs_read |
Нет |
| Файловая система | fs/write_text_file |
_handle_fs_write |
Нет |
| Терминал | terminal/create |
_handle_terminal_create |
Нет |
| Терминал | terminal/output |
_handle_terminal_output |
Нет |
| Терминал | terminal/wait_for_exit |
_handle_terminal_wait |
Нет |
| Терминал | terminal/kill |
_handle_terminal_kill |
Нет |
| Терминал | terminal/release |
_handle_terminal_release |
Нет |
| Потоки | thread/start |
_handle_thread_start |
Нет |
| Потоки | thread/resume |
_handle_thread_resume |
Нет |
| Потоки | thread/fork |
_handle_thread_fork |
Нет |
| Потоки | thread/list |
_handle_thread_list |
Нет |
| Потоки | thread/archive |
_handle_thread_archive |
Нет |
| Потоки | thread/compact |
_handle_thread_compact |
Нет |
| Ходы | turn/start |
_handle_turn_start |
Да |
| Ходы | turn/interrupt |
_handle_turn_interrupt |
Нет |
| Диагностика | diagnostics/subscribe |
_handle_diagnostics_subscribe |
Нет |
| Диагностика | diagnostics/unsubscribe |
_handle_diagnostics_unsubscribe |
Нет |
| Всплывающие подсказки | hover/request |
_handle_hover_request |
Нет |
| Одобрение | item/fileChange/respondApproval |
_handle_respond_approval |
Нет |
Отложенно загружаемые свойства:
session_manager—ACPSessionManagerchat_adapter—ChatServiceACPAdaptercapability_negotiator—CapabilityNegotiatorthread_manager—ACPThreadManagerturn_orchestrator—TurnOrchestratordiagnostics_engine—DiagnosticsEnginehover_provider—HoverProviderapproval_bridge—ACPApprovalBridge
Типы протокола (модели Pydantic):
| Тип | Назначение |
|---|---|
AgentCapabilities |
Возможности агента (loadSession, promptCapabilities, mcp, authMethods) |
PromptCapabilities |
Поддерживаемые типы содержимого промптов (image, audio, embeddedContext) |
MCPCapabilities |
Флаги поддержки транспорта MCP (stdio, http, sse) |
AgentInfo |
Информация об агенте (name, version, title) |
ClientCapabilities |
Возможности клиента (fs, terminal) |
ClientInfo |
Информация о клиенте (name, version) |
InitializeParams |
Параметры инициализации (protocolVersion, clientCapabilities, clientInfo) |
InitializeResult |
Результат инициализации (protocolVersion, agentCapabilities, agentInfo) |
SessionNewParams |
Параметры создания сессии (cwd, mcpServers) |
SessionNewResult |
Результат создания сессии (sessionId) |
ContentBlock |
Блок содержимого (type, text, uri, mimeType, data) |
SessionPromptParams |
Параметры промпта (sessionId, prompt) |
StopReason |
Причина остановки (end_turn, max_tokens, cancelled) |
JSONRPCRequest |
Запрос JSON-RPC 2.0 |
JSONRPCResponse |
Ответ JSON-RPC 2.0 |
JSONRPCNotification |
Уведомление JSON-RPC 2.0 (без id) |
PlanEntry |
Запись плана выполнения (id, title, status) |
PlanUpdate |
Обновление плана |
AgentMessageChunk |
Фрагмент потоковой передачи сообщения агента |
ToolCallUpdate |
Обновление вызова инструмента (toolCallId, title, kind, status, content) |
Управление сессиями¶
Файл: src/acp/server/session_manager.py
Назначение: Управление жизненным циклом сессий ACP с привязкой к сессиям CodeGraph
from src.acp.server.session_manager import ACPSessionManager, ACPSession
manager = ACPSessionManager(
max_sessions=100, # Максимум одновременных сессий
session_timeout_minutes=60 # Таймаут сессии в минутах
)
# Создание сессии
session = await manager.create_session(
cwd="/path/to/project",
mcp_servers=[...],
user_id="user-123"
)
# Загрузка существующей сессии
session = manager.get_session(session_id)
# Отмена сессии
await manager.cancel_session(session_id)
# Очистка устаревших сессий
removed = manager.cleanup_expired()
Модель данных ACPSession:
@dataclass
class ACPSession:
session_id: str
cwd: str
user_id: Optional[str] = None
project_id: Optional[str] = None # Контекст мультитенантности
codegraph_session_id: Optional[str] = None
mcp_servers: List[Dict[str, Any]] = field(default_factory=list)
created_at: datetime = field(default_factory=_utc_now)
last_activity: datetime = field(default_factory=_utc_now)
cancelled: bool = False
context: List[Dict[str, str]] = field(default_factory=list)
Согласование возможностей¶
Файл: src/acp/server/capabilities.py
Назначение: Обратно совместимое согласование возможностей между старыми и новыми клиентами ACP
from src.acp.server.capabilities import CapabilityNegotiator, ClientCapabilitiesV2
negotiator = CapabilityNegotiator()
# Согласование при инициализации
server_caps = negotiator.negotiate(initialize_params)
# Проверка возможностей клиента
if negotiator.client.diagnostics:
# Клиент поддерживает диагностику
pass
# Проверка необходимости отправки уведомления
should_send = negotiator.should_send_notification("diagnostics/publish")
Расширенные возможности клиента (ClientCapabilitiesV2):
@dataclass
class ClientCapabilitiesV2:
fs: Optional[Dict[str, bool]] = None
terminal: bool = False
# Флаги возможностей F8 (по умолчанию False для обратной совместимости)
diagnostics: bool = False
hover: bool = False
approval_flow: bool = False
opt_out_notification_methods: List[str] = field(default_factory=list)
Серверные возможности:
_SERVER_CAPABILITIES = {
"threadManagement": True,
"diagnostics": True,
"hover": True,
"approvalFlow": True,
}
Жизненный цикл потоков и ходов¶
Файл (потоки): src/acp/server/thread_manager.py
Файл (ходы): src/acp/server/turn_orchestrator.py
ACPThreadManager — управление жизненным циклом потоков через прослойку Harness:
from src.acp.server.thread_manager import ACPThreadManager
tm = ACPThreadManager()
# Создание потока
thread = await tm.start_thread(cwd="/project", name="Analysis")
# Возвращает: {'threadId': '...', 'name': 'Analysis', 'status': 'active', ...}
# Возобновление потока
thread = await tm.resume_thread(thread_id="...")
# Ответвление потока
forked = await tm.fork_thread(thread_id="...", name="Branch")
# Список потоков
threads = await tm.list_threads()
# Архивация потока
await tm.archive_thread(thread_id="...")
# Компактизация потока (удаление промежуточных ходов)
await tm.compact_thread(thread_id="...")
TurnOrchestrator — оркестрация выполнения хода с потоковой передачей:
from src.acp.server.turn_orchestrator import TurnOrchestrator
to = TurnOrchestrator()
# Начало хода (выполнение в фоне, немедленный ответ)
result = await to.start_turn(
thread_id="...",
prompt="Проанализировать безопасность",
context={"scenario_id": "security_workflow"},
notification_router=router
)
# Возвращает: {'threadId': '...', 'turnId': '...'}
# Прерывание хода
await to.interrupt_turn(thread_id="...", turn_id="...")
Потоковая передача элементов следует паттерну started -> delta -> completed и доставляется через NotificationRouter.
Диагностика и всплывающие подсказки¶
Файл (диагностика): src/acp/server/diagnostics.py
Файл (подсказки): src/acp/server/hover.py
DiagnosticsEngine — генерация диагностик из предвычисленных флагов CPG:
from src.acp.server.diagnostics import DiagnosticsEngine
diag = DiagnosticsEngine()
# Подписка на диагностику файла
diag.subscribe(uri="file:///src/main.py", connection_id="conn-1")
# Генерация диагностик
diagnostics = await diag.generate(
uri="file:///src/main.py",
db_path="/path/to/cpg.duckdb",
notification_router=router
)
# Отписка
diag.unsubscribe(uri="file:///src/main.py", connection_id="conn-1")
# Проверка подписки
is_sub = diag.is_subscribed("file:///src/main.py")
Источники диагностик:
- Цикломатическая сложность (has_high_complexity)
- Устаревшие методы (has_deprecated)
- Комментарии TODO/FIXME (has_todo_fixme)
- Высокий fan-in (fan_in выше порога)
- Анализ заражения (taint analysis)
HoverProvider — разрешение всплывающих подсказок для позиций в исходном коде:
from src.acp.server.hover import HoverProvider
hover = HoverProvider()
# Получить информацию при наведении
info = await hover.get_hover_info(
uri="file:///src/main.py",
line=42,
character=10
)
# Возвращает: {
# 'contents': {
# 'method': 'process_request',
# 'signature': 'def process_request(self, data: bytes) -> Response',
# 'metrics': {'complexity': 12, 'fan_in': 5, 'fan_out': 8},
# 'flags': {'has_deprecated': False, 'has_todo': True},
# 'callers': ['handle_connection', ...],
# 'callees': ['validate_input', 'execute_query', ...]
# }
# }
Мост одобрения¶
Файл: src/acp/server/approval_bridge.py
Назначение: Мост между механизмом одобрения Harness и уведомлениями ACP для утверждения изменений файлов клиентом IDE
from src.acp.server.approval_bridge import ACPApprovalBridge
bridge = ACPApprovalBridge()
# Запрос одобрения изменения файла (блокирует до ответа клиента)
decision = await bridge.request_file_change_approval(
thread_id="...",
turn_id="...",
item_id="...",
diff="--- a/file.py\n+++ b/file.py\n...",
files=["src/file.py"],
reason="Рефакторинг метода process_request",
confidence=0.85,
notification_router=router,
timeout=300 # секунд
)
# Возвращает: {'decision': 'accept'} или {'decision': 'decline', 'reason': '...'}
# Обработка ответа клиента
await bridge.resolve(request_id="...", decision={"decision": "accept"})
Если клиент не отвечает в течение таймаута, запрос автоматически отклоняется.
Маршрутизатор уведомлений¶
Файл: src/acp/server/notification_router.py
Назначение: Маршрутизация уведомлений JSON-RPC с учётом отказов клиента от определённых типов уведомлений
from src.acp.server.notification_router import NotificationRouter
router = NotificationRouter(
callback=notification_callback, # Функция обратного вызова
negotiator=capability_negotiator # Согласователь возможностей
)
# Отправка уведомления (с учётом фильтрации отказов)
sent = await router.send(
method="diagnostics/publish",
params={"uri": "file:///src/main.py", "diagnostics": [...]}
)
# Возвращает: True если отправлено, False если подавлено
Маршрутизатор автоматически проверяет opt_out_notification_methods клиента через CapabilityNegotiator.
Обработчики файлов и терминала¶
Файл: src/acp/server/handlers.py
Назначение: Реализация операций файловой системы и терминала
Файловая система:
# Чтение файла (сначала проверяет кеш CPG)
result = await handle_fs_read({"path": "/abs/path/to/file.py"})
# Возвращает: {'content': '...', 'encoding': 'utf-8'}
# Запись файла
result = await handle_fs_write({
"path": "/abs/path/to/file.py",
"content": "...",
"encoding": "utf-8"
})
Терминал:
# Создание терминальной сессии
result = await handle_terminal_create({
"command": "python",
"args": ["-m", "pytest", "tests/"],
"cwd": "/project"
})
# Возвращает: {'terminalId': '...'}
# Получение вывода
output = await handle_terminal_output({"terminalId": "..."})
# Ожидание завершения
exit_info = await handle_terminal_wait({"terminalId": "..."})
# Принудительное завершение
await handle_terminal_kill({"terminalId": "..."})
# Освобождение ресурсов
await handle_terminal_release({"terminalId": "..."})
Транспортный уровень¶
Пакет: src/acp/transport/
Базовый класс:
from src.acp.transport.base import BaseTransport
class BaseTransport(abc.ABC):
"""Абстрактный базовый класс для транспортов ACP."""
def __init__(self, agent: CodeGraphACPAgent):
self.agent = agent
self._running = False
@property
def is_running(self) -> bool: ...
@abc.abstractmethod
async def start(self) -> None: ...
@abc.abstractmethod
async def stop(self) -> None: ...
@abc.abstractmethod
async def send(self, message: Any) -> None: ...
Реализации:
| Транспорт | Файл | Назначение |
|---|---|---|
StdioTransport |
transport/stdio.py |
Стандартный ввод/вывод для локального подпроцесса |
HTTPTransport |
transport/http.py |
REST API для удалённого доступа |
WebSocketTransport |
transport/websocket.py |
Потоковая передача для обновлений в реальном времени |
Использование:
from src.acp.transport.stdio import StdioTransport
from src.acp.transport.http import HTTPTransport
from src.acp.transport.websocket import WebSocketTransport
# Stdio (по умолчанию для IDE)
transport = StdioTransport(agent)
await transport.start()
# HTTP
transport = HTTPTransport(agent, host="0.0.0.0", port=8080)
await transport.start()
# WebSocket
transport = WebSocketTransport(agent, host="0.0.0.0", port=8081)
await transport.start()
Адаптеры интеграции¶
Пакет: src/acp/integration/
ChatServiceACPAdapter¶
Файл: src/acp/integration/chat_adapter.py
Назначение: Мост между промптами ACP и ChatService CodeGraph
from src.acp.integration.chat_adapter import ChatServiceACPAdapter
adapter = ChatServiceACPAdapter()
# Преобразование ContentBlock[] -> строка запроса
query = adapter.extract_query(content_blocks)
# Обработка промпта через ChatService
result = await adapter.process_prompt(
session_id="...",
content_blocks=[...],
notification_callback=callback
)
Преобразования:
- ContentBlock[] -> строка запроса
- ChatResponse -> SessionUpdate[]
- Evidence -> содержимое ToolCall
SessionACPAdapter¶
Файл: src/acp/integration/session_adapter.py
Назначение: Мост между сессиями ACP и SessionService CodeGraph
from src.acp.integration.session_adapter import SessionACPAdapter
adapter = SessionACPAdapter()
# Создание сессии CodeGraph
session_id = await adapter.create_session(
user_id="user-123",
metadata={"source": "acp"}
)
# Воспроизведение истории диалога для session/load
history = await adapter.replay_history(session_id)
WorkflowACPAdapter¶
Файл: src/acp/integration/workflow_adapter.py
Назначение: Сопоставление сценариев CodeGraph с вызовами инструментов ACP
from src.acp.integration.workflow_adapter import WorkflowACPAdapter
adapter = WorkflowACPAdapter()
# Получить тип инструмента ACP для сценария
kind = adapter.get_tool_kind("security_workflow") # "search"
title = adapter.get_scenario_title("security_workflow") # "Security Analysis"
# Выполнить сценарий с потоковой передачей обновлений
result = await adapter.execute_scenario(
scenario_id="security_workflow",
query="Проверить безопасность модуля аутентификации",
context={},
notification_router=router
)
Сопоставление сценариев с типами инструментов:
| Сценарий | Тип инструмента | Заголовок |
|---|---|---|
onboarding_workflow |
search |
Codebase Exploration |
security_workflow |
search |
Security Analysis |
performance_workflow |
search |
Performance Analysis |
refactoring_workflow |
edit |
Refactoring Suggestions |
documentation_workflow |
read |
Documentation Generation |
feature_dev_workflow |
think |
Feature Development Guide |
code_review_workflow |
read |
Code Review |
architecture_workflow |
think |
Architecture Analysis |
debugging_workflow |
search |
Debugging Session |
MCPBridge¶
Файл: src/acp/integration/mcp_bridge.py
Назначение: Интеграция внешних MCP-серверов, предоставленных клиентом
from src.acp.integration.mcp_bridge import MCPBridge, MCPServer, MCPTool, MCPResource
bridge = MCPBridge()
# Подключение к MCP-серверу
server = MCPServer(
id="external-db",
name="Database Tools",
transport="stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-postgres"]
)
await bridge.connect(server)
# Список инструментов
tools = await bridge.list_tools(server_id="external-db")
# Вызов инструмента
result = await bridge.call_tool(
server_id="external-db",
tool_name="query",
arguments={"sql": "SELECT * FROM users LIMIT 5"}
)
# Список ресурсов
resources = await bridge.list_resources(server_id="external-db")
# Отключение
await bridge.disconnect(server_id="external-db")
Модели данных:
@dataclass
class MCPServer:
id: str
name: str
transport: str # stdio, http, sse
command: Optional[str] = None # Для stdio
args: Optional[List[str]] = None
url: Optional[str] = None # Для http/sse
headers: Optional[Dict[str, str]] = None
env: Optional[Dict[str, str]] = None
@dataclass
class MCPTool:
name: str
description: str
input_schema: Dict[str, Any]
server_id: str
@dataclass
class MCPResource:
uri: str
name: str
description: Optional[str] = None
mime_type: Optional[str] = None
server_id: str = ""
CLI ACP¶
Файл: src/acp/cli.py
Назначение: Точка входа для запуска CodeGraph как ACP-агента в режиме подпроцесса
# Запуск ACP-агента (stdio транспорт)
python -m src.acp.cli
# Или через установленную команду
codegraph-acp
# Программное использование
from src.acp.cli import main
await main()
Логирование направляется в stderr (так как stdout используется для протокола). Автоматически снижается уровень логирования для httpx и httpcore.
Доменные агенты¶
Агенты безопасности¶
Файл: src/security/security_agents.py
Назначение: Обнаружение уязвимостей безопасности с использованием шаблонов CPG
Четыре специализированных агента:
- SecurityScanner — поиск уязвимостей в CPG с использованием шаблонов безопасности
- DataFlowAnalyzer — трассировка потоков данных от источников заражения к приёмникам
- VulnerabilityReporter — генерация структурированных отчётов об уязвимостях
- RemediationAdvisor — рекомендации по исправлению на основе шаблонов уязвимостей
from src.security.security_agents import (
SecurityScanner,
DataFlowAnalyzer,
VulnerabilityReporter,
RemediationAdvisor,
SecurityFinding,
DataFlowPath,
VulnerabilityReport,
)
# Инициализация сканера
scanner = SecurityScanner(cpg_service=cpg_service)
# Сканирование на уязвимости
findings: List[SecurityFinding] = await scanner.scan(
patterns=get_critical_patterns(),
scope="full"
)
# Анализ потоков данных
flow_analyzer = DataFlowAnalyzer(cpg_service=cpg_service)
paths: List[DataFlowPath] = await flow_analyzer.trace_taint_flows(
source_pattern="user_input",
sink_pattern="sql_execute"
)
# Генерация отчёта
reporter = VulnerabilityReporter()
report: VulnerabilityReport = reporter.generate(
findings=findings,
data_flows=paths
)
# Рекомендации по исправлению
advisor = RemediationAdvisor()
fixes = advisor.suggest_fixes(findings)
Обнаруживаемые шаблоны: - SQL-инъекции (CWE-89) - Переполнение буфера (CWE-120) - Инъекции команд (CWE-78) - Строки форматирования (CWE-134)
Структуры данных:
@dataclass
class SecurityFinding:
finding_id: str
pattern_id: str
pattern_name: str
category: str
severity: str
method_id: int
method_name: str
filename: str
line_number: int
code_snippet: str
description: str
cwe_ids: list[str]
confidence: float
@dataclass
class DataFlowPath:
path_id: str
source_method: str
source_file: str
source_line: int
sink_method: str
sink_file: str
sink_line: int
path_length: int
intermediate_nodes: list[dict[str, Any]]
taint_type: str # "user_input", "file_read" и т.д.
sanitized: bool # Включает ли путь санитизацию
Агенты производительности¶
Файл: src/performance/agents/ (фасад: src/performance/performance_agents.py)
Назначение: Обнаружение узких мест производительности
Три специализированных агента:
- PerformanceProfiler (
src/performance/agents/profiler.py) — профилирование методов по шаблонам узких мест - ResourceAnalyzer (
src/performance/agents/resource_analyzer.py) — анализ использования ресурсов отдельных методов - OptimizationAdvisor (
src/performance/agents/optimizer.py) — рекомендации по оптимизации
from src.performance.agents import (
PerformanceProfiler,
ResourceAnalyzer,
OptimizationAdvisor,
run_complete_performance_analysis,
)
# Профилирование (поддержка контекстного менеджера)
with PerformanceProfiler(cpg_service) as profiler:
bottlenecks = profiler.profile_all_bottlenecks(limit_per_pattern=30)
# Анализ ресурсов
with ResourceAnalyzer(cpg_service) as analyzer:
usage = analyzer.analyze_method_resources("method_name", filename="file.py")
# Рекомендации
advisor = OptimizationAdvisor()
plan = advisor.create_optimization_plan(bottlenecks, [usage])
report = advisor.generate_report(bottlenecks, [usage])
# Полный анализ (все три агента)
report = run_complete_performance_analysis(cpg_service, limit=30)
Модели данных: BottleneckFinding, ResourceUsage, OptimizationRecommendation, PerformanceReport, ProfilingResult, MemoryProfilingResult, PerformanceBaseline, PerformanceTrend
Агенты архитектуры¶
Файл: src/architecture/agents/ (фасад: src/architecture/architecture_agents.py)
Назначение: Анализ архитектурных шаблонов
Три специализированных агента:
- DependencyAnalyzer — анализ зависимостей между модулями
- LayerValidator — проверка соблюдения слоёв архитектуры
- ArchitectureReporter — генерация отчётов об архитектуре
from src.architecture.agents import (
DependencyAnalyzer,
LayerValidator,
ArchitectureReporter,
DependencyAnalysis,
DependencyMetrics,
ViolationFinding,
LayerRule,
RemediationAction,
ArchitectureReport,
)
# Анализ зависимостей
dep_analyzer = DependencyAnalyzer(cpg_service=cpg_service)
analysis: DependencyAnalysis = await dep_analyzer.analyze()
metrics: DependencyMetrics = analysis.metrics
# Валидация слоёв
validator = LayerValidator(rules=[
LayerRule(name="presentation", allowed_deps=["service"]),
LayerRule(name="service", allowed_deps=["repository"]),
LayerRule(name="repository", allowed_deps=[]),
])
violations: List[ViolationFinding] = await validator.validate(cpg_service)
# Генерация отчёта
reporter = ArchitectureReporter()
report: ArchitectureReport = reporter.generate(analysis, violations)
Анализ: - Границы слоёв - Зависимости между модулями - Обнаружение шаблонов проектирования - Сопоставление подсистем
Справочник конфигурации¶
AgentConfidenceConfig¶
Конфигурация расчёта уверенности агентов в config.yaml:
# config.yaml -> confidence
confidence:
result_base: 0.5 # Базовая уверенность при наличии результатов
result_sweet_spot_min: 1 # Минимум результатов для полной уверенности
result_sweet_spot_max: 10 # Максимум результатов до штрафа
result_too_many_penalty: 0.1 # Штраф за слишком много результатов
fallback_penalty: 0.15 # Штраф за использование резервной стратегии
enrichment_bonus: 0.1 # Бонус за наличие обогащения
no_results_confidence: 0.1 # Уверенность при пустых результатах
execution_failure: 0.0 # Уверенность при ошибке выполнения
Доступ из кода:
from src.config import get_unified_config
cfg = get_unified_config()
cfg.confidence.result_base # 0.5
cfg.confidence.fallback_penalty # 0.15
cfg.confidence.enrichment_bonus # 0.1
ACPDiagnosticsConfig¶
Конфигурация диагностик ACP в config.yaml:
# config.yaml -> acp
acp:
diagnostics:
complexity_threshold: 15 # Порог цикломатической сложности
fan_in_threshold: 10 # Порог fan-in для предупреждений
enable_taint: true # Включить анализ заражения
max_diagnostics_per_file: 50 # Максимум диагностик на файл
hover:
caller_callee_limit: 10 # Максимум вызывающих/вызываемых во всплывающей подсказке
session:
max_sessions: 100 # Максимум одновременных сессий
timeout_minutes: 60 # Таймаут сессии
Доступ из кода:
from src.config import get_unified_config
cfg = get_unified_config()
cfg.acp.diagnostics.complexity_threshold # 15
cfg.acp.hover.caller_callee_limit # 10
cfg.acp.session.max_sessions # 100
Расширение системы агентов¶
Добавление нового конвейерного агента¶
Создайте нового агента, следуя базовому шаблону:
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class AgentState:
"""Общее состояние, разделяемое между агентами."""
question: str
analysis: Optional[Dict] = None
retrieval_results: List = field(default_factory=list)
enrichments: Dict = field(default_factory=dict)
generated_query: Optional[str] = None
execution_results: List = field(default_factory=list)
answer: Optional[str] = None
confidence: float = 0.0
errors: List[str] = field(default_factory=list)
class CustomAgent:
"""Пример пользовательского агента."""
def __init__(self, cpg_config: Optional[CPGConfig] = None):
from src.config import get_global_cpg_config
self.cpg_config = cpg_config or get_global_cpg_config()
def process(self, state: AgentState) -> AgentState:
"""Обрабатывает состояние и возвращает обновлённое состояние."""
# Ваша логика здесь
state.custom_results = self._analyze(state.question)
return state
def _analyze(self, question: str) -> Dict:
"""Внутренний метод анализа."""
pass
Зарегистрируйте в рабочем процессе:
from src.workflow import register_agent
register_agent("custom", CustomAgent)
Добавление обработчиков ACP¶
Для добавления нового метода JSON-RPC:
- Определите обработчик в
CodeGraphACPAgent:
# В src/acp/server/agent.py
class CodeGraphACPAgent:
def __init__(self):
# ... существующие обработчики ...
self._handlers["custom/analyze"] = self._handle_custom_analyze
async def _handle_custom_analyze(
self, params: Dict[str, Any], **kwargs
) -> Dict[str, Any]:
"""Обработчик пользовательского метода."""
uri = params.get("uri")
if not uri:
raise ValueError("uri is required")
# Ваша логика
result = await self._do_analysis(uri)
return {"status": "ok", "result": result}
- При необходимости добавьте согласование возможностей:
# В src/acp/server/capabilities.py
@dataclass
class ClientCapabilitiesV2:
# ... существующие поля ...
custom_analyze: bool = False # Новая возможность
- Добавьте тесты:
# В tests/acp/test_custom.py
async def test_custom_analyze():
agent = CodeGraphACPAgent()
result = await agent.handle_request(JSONRPCRequest(
method="custom/analyze",
params={"uri": "file:///src/main.py"}
))
assert result.result["status"] == "ok"
Дальнейшие шаги¶
- Справочник рабочих процессов — Оркестрация рабочих процессов и сценарии
- Справочник API — Документация REST API
- Инструменты MCP — Справочник инструментов MCP-сервера