Detailed documentation for CodeGraph’s agent system: pipeline agents, ACP protocol, and domain-specific analyzers. Covers all 48 files and 70+ classes across three architectural tiers.
Table of Contents¶
- Overview
- Agent System Architecture
- Module Map
- Core Pipeline Agents
- AnalyzerAgent
- RetrieverAgent
- EnrichmentAgent
- GeneratorAgent
- InterpreterAgent
- Pipeline Support Modules
- CallChainAnalyzer
- ControlFlowGenerator
- LogicSynthesizer
- AdaptiveQueryRefiner
- AnswerGenerator
- ResultReranker
- ConfidenceCalculator
- PromptBuilder
- QueryVariantGenerator
- Fallback Strategies
- Other Support Modules
- ACP — Agent Client Protocol
- ACP Overview
- CodeGraphACPAgent
- Session Management
- Capability Negotiation
- Thread and Turn Lifecycle
- Diagnostics and Hover
- Approval Bridge
- Notification Router
- File and Terminal Handlers
- Transport Layer
- Integration Adapters
- ACP CLI
- Domain-Specific Agents
- Security Agents
- Performance Agents
- Architecture Agents
- Configuration Reference
- AgentConfidenceConfig
- ACPDiagnosticsConfig
- Extending the Agent System
- Adding a New Pipeline Agent
- Adding ACP Handlers
- Next Steps
Overview¶
Agent System Architecture¶
CodeGraph’s agent system is organized into three tiers:
-
Pipeline Agents (
src/agents/) — 5 core agents plus 15 support modules that form the RAG query pipeline. These agents handle question analysis, hybrid retrieval, enrichment, query generation, and answer interpretation. -
ACP — Agent Client Protocol (
src/acp/) — IDE integration layer using JSON-RPC 2.0. Supports Zed, JetBrains, and VS Code through stdio, HTTP, and WebSocket transports. -
Domain-Specific Agents — Specialized analyzers for security vulnerability detection, performance bottleneck analysis, and architecture validation.
graph TB
subgraph "Pipeline Agents (src/agents/)"
AA[AnalyzerAgent] --> RA[RetrieverAgent]
RA --> EA[EnrichmentAgent]
EA --> GA[GeneratorAgent]
GA --> IA[InterpreterAgent]
RA -.-> CCA[CallChainAnalyzer]
RA -.-> CFG[ControlFlowGenerator]
GA -.-> PB[PromptBuilder]
GA -.-> QV[QueryVariantGenerator]
IA -.-> AG[AnswerGenerator]
IA -.-> RP[ResultParser]
end
subgraph "ACP Protocol (src/acp/)"
IDE[IDE Client] -->|JSON-RPC| ACPA[CodeGraphACPAgent]
ACPA --> SM[SessionManager]
ACPA --> TM[ThreadManager]
ACPA --> TO[TurnOrchestrator]
ACPA --> DE[DiagnosticsEngine]
ACPA --> HP[HoverProvider]
ACPA -->|adapters| WA[WorkflowAdapter]
WA --> MSC[MultiScenarioCopilot]
end
subgraph "Domain Agents"
SS[SecurityScanner]
PP[PerformanceProfiler]
DA[DependencyAnalyzer]
end
The pipeline agents process queries sequentially: AnalyzerAgent extracts intent and keywords, RetrieverAgent performs hybrid search across ChromaDB and DuckDB, EnrichmentAgent adds semantic tags, GeneratorAgent produces SQL queries, and InterpreterAgent converts results into natural language answers. Support modules provide specialized capabilities like call chain analysis, adaptive query refinement, and confidence scoring.
The ACP layer bridges IDE clients to the core pipeline through a JSON-RPC 2.0 protocol. It manages sessions, threads, turns, diagnostics, hover information, and file change approvals. Integration adapters connect the protocol to internal services like ChatService and the MultiScenarioCopilot workflow engine.
Domain-specific agents provide targeted analysis for security, performance, and architecture concerns, each with their own data models and reporting infrastructure.
Module Map¶
| Package | Files | Key Classes | Purpose |
|---|---|---|---|
src/agents/ |
20 | AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent, InterpreterAgent + 15 support | RAG query pipeline |
src/agents/enrichment/ |
6 | EnrichmentAgent + helpers | Tag-based enrichment |
src/acp/server/ |
8 | CodeGraphACPAgent, ACPSessionManager, ACPThreadManager, TurnOrchestrator, DiagnosticsEngine, HoverProvider, ACPApprovalBridge, NotificationRouter | ACP protocol server |
src/acp/transport/ |
4 | BaseTransport, StdioTransport, HTTPTransport, WebSocketTransport | Transport layer |
src/acp/integration/ |
4 | ChatServiceACPAdapter, SessionACPAdapter, WorkflowACPAdapter, MCPBridge | Integration adapters |
src/security/ |
1 | SecurityScanner, DataFlowAnalyzer, VulnerabilityReporter, RemediationAdvisor | Security analysis |
src/performance/agents/ |
4 | PerformanceProfiler, ResourceAnalyzer, OptimizationAdvisor | Performance analysis |
src/architecture/agents/ |
4 | DependencyAnalyzer, LayerValidator, ArchitectureReporter | Architecture analysis |
Core Pipeline Agents¶
AnalyzerAgent¶
File: src/agents/analyzer_agent.py
Purpose: Question understanding and intent extraction. The AnalyzerAgent is the first stage in the pipeline, responsible for parsing natural language questions into structured analysis objects that downstream agents consume. It identifies the user’s intent, target domain, relevant keywords, query classification, and a confidence score.
from src.agents import AnalyzerAgent
analyzer = AnalyzerAgent(
llm=None, # Optional LLM for advanced analysis
cpg_config: Optional[CPGConfig] = None # Domain-specific config
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
Optional[Any] |
None |
LLM interface for advanced analysis. When None, rule-based analysis is used |
cpg_config |
Optional[CPGConfig] |
None |
Domain-specific CPG configuration. Falls back to get_global_cpg_config() |
Key Methods:
# Analyze a question — returns intent, domain, keywords, confidence
analysis = analyzer.analyze("What methods handle transactions?")
# Returns: {'intent': 'find_methods', 'domain': 'transaction-manager',
# 'keywords': ['transaction'], 'query_type': 'semantic', 'confidence': 0.85}
# Classify query mode (semantic vs structural)
mode, confidence = analyzer.classify_query_mode("Find all callers of foo")
# Returns: ('structural', 0.9)
# LLM-enhanced analysis (requires llm parameter to be set)
analysis = analyzer.analyze_with_llm("Explain the transaction commit flow")
# Returns: enriched analysis dict with LLM-derived intent and domain
# Batch analysis — process multiple questions at once
results = analyzer.batch_analyze(["Q1...", "Q2...", "Q3..."])
# Returns: list of analysis dicts
# Get ChromaDB filter for domain
domain_filter = analyzer.get_domain_filter("transaction-manager")
# Returns: dict suitable for ChromaDB where clause
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
analyze |
question: str |
Dict |
No | Full question analysis |
classify_query_mode |
question: str |
Tuple[str, float] |
No | Classify as semantic or structural |
analyze_with_llm |
question: str |
Dict |
No | LLM-enhanced analysis |
batch_analyze |
questions: List[str] |
List[Dict] |
No | Analyze multiple questions |
get_domain_filter |
domain: str |
Dict |
No | ChromaDB filter for domain |
Analysis Result Structure:
The analysis dictionary returned by analyze() contains the following fields:
| Field | Type | Description |
|---|---|---|
intent |
str |
Detected intent (e.g., find_methods, find_callers, explain-concept, find_definition) |
domain |
str |
Target subsystem/domain (e.g., transaction-manager, locking, storage-engine) |
keywords |
List[str] |
Extracted keywords relevant to the query |
query_type |
str |
Query classification: semantic, structural, or security |
confidence |
float |
Confidence score between 0.0 and 1.0 |
RetrieverAgent¶
File: src/agents/retriever_agent.py
Purpose: Hybrid retrieval combining semantic search (ChromaDB) and structural search (DuckDB). The RetrieverAgent is the second stage in the pipeline, responsible for finding relevant code snippets, Q&A pairs, and graph data based on the analysis from AnalyzerAgent. It supports multiple retrieval modes, result caching, and reranking.
from src.agents import RetrieverAgent
retriever = RetrieverAgent(
vector_store=vector_store, # VectorStoreReal instance
analyzer_agent=analyzer, # AnalyzerAgent instance
cpg_service=None, # Optional CPGQueryService for graph queries
cache_size: int = 128, # Max cached entries
cache_ttl_seconds: Optional[float] = None, # Cache TTL (None = no expiration)
enable_cache_metrics: bool = True, # Collect cache metrics
enable_hybrid: bool = True # Enable hybrid retrieval
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
vector_store |
VectorStoreReal |
Required | ChromaDB vector store instance |
analyzer_agent |
AnalyzerAgent |
Required | Analyzer for question processing |
cpg_service |
Optional[CPGQueryService] |
None |
DuckDB CPG service for graph queries |
cache_size |
int |
128 |
Maximum number of cached retrieval results |
cache_ttl_seconds |
Optional[float] |
None |
Cache time-to-live in seconds. None means no expiration |
enable_cache_metrics |
bool |
True |
Whether to collect cache hit/miss metrics |
enable_hybrid |
bool |
True |
Whether to enable hybrid (vector + graph) retrieval |
Key Methods:
# Basic vector retrieval
result = retriever.retrieve(question, analysis, top_k_qa=3, top_k_sql=5)
# Hybrid retrieval (vector + graph with RRF)
result = retriever.retrieve_hybrid(
question="Find memory allocation patterns",
analysis=analysis,
mode="hybrid", # hybrid | vector_only | graph_only
query_type="structural", # Affects weight distribution
top_k=10,
use_ranker=True
)
# With reranking
result = retriever.retrieve_with_reranking(question, analysis,
top_k_qa=5, top_k_sql=10, final_k_qa=3, final_k_sql=5)
# With enrichment-aware reranking
result = retriever.retrieve_with_enrichment(question, analysis, enrichment_hints)
# Keyword-based retrieval
result = retriever.retrieve_by_keywords(["lock", "acquire"], top_k=5)
# Cache management
stats = retriever.get_stats()
metrics = retriever.get_cache_metrics()
cleared = retriever.invalidate_cache(pattern="lock*")
warmed = retriever.warm_cache(["Q1", "Q2"])
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
retrieve |
question, analysis, top_k_qa=3, top_k_sql=5 |
Dict |
No | Basic vector retrieval |
retrieve_hybrid |
question, analysis, mode, query_type, top_k, use_ranker |
Dict |
No | Hybrid vector + graph retrieval with RRF merging |
retrieve_with_reranking |
question, analysis, top_k_qa, top_k_sql, final_k_qa, final_k_sql |
Dict |
No | Retrieval with post-retrieval reranking |
retrieve_with_enrichment |
question, analysis, enrichment_hints |
Dict |
No | Enrichment-aware retrieval and reranking |
retrieve_by_keywords |
keywords: List[str], top_k: int |
Dict |
No | Keyword-based retrieval |
get_stats |
— | Dict |
No | Retrieval statistics |
get_cache_metrics |
— | Dict |
No | Cache hit/miss metrics |
invalidate_cache |
pattern: Optional[str] |
int |
No | Invalidate cache entries matching pattern |
warm_cache |
questions: List[str] |
int |
No | Pre-populate cache for common questions |
Weight Adaptation:
The RetrieverAgent dynamically adjusts the balance between vector (semantic) and graph (structural) search based on the query type:
| Query Type | Vector Weight | Graph Weight | Rationale |
|---|---|---|---|
semantic |
0.75 | 0.25 | Favor semantic similarity for conceptual questions |
structural |
0.25 | 0.75 | Favor graph traversal for call/data flow questions |
security |
0.50 | 0.50 | Equal weight for security analysis |
default |
0.60 | 0.40 | Slight semantic bias for general questions |
Retrieval Result Structure:
{
'results': [...], # Raw retrieval results
'ranked_results': [...], # Ranked results with scores
'retrieval_stats': {
'total_retrieved': 15,
'source_distribution': {'vector': 8, 'graph': 7},
'cache_hit': False,
'retrieval_time_ms': 42.5
}
}
EnrichmentAgent¶
File: src/agents/enrichment/agent.py (facade: src/agents/enrichment_agent.py)
Purpose: Map questions to CPG enrichment tags for improved retrieval. The EnrichmentAgent analyzes questions and their analysis results to produce enrichment hints — structured metadata that helps downstream agents find more relevant results. It supports 12 enrichment layers and includes fallback strategies for low-coverage scenarios.
from src.agents import EnrichmentAgent
enrichment = EnrichmentAgent(
enable_fallback: bool = True # Enable Phase 4 fallback strategies
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
enable_fallback |
bool |
True |
Enable Phase 4 fallback strategies for low-coverage enrichment |
Key Methods:
# Get enrichment hints for a question
hints = enrichment.get_enrichment_hints(
question="How does the lock manager work?",
analysis={'intent': 'explain-concept', 'domain': 'locking', 'keywords': ['lock']}
)
# Returns dict with: function_purposes, data_structures, concurrency_patterns,
# memory_management, error_handling, security_attributes, etc.
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
get_enrichment_hints |
question: str, analysis: Dict |
Dict |
No | Generate enrichment hints from question and analysis |
Enrichment Hints Structure:
The hints dictionary returned by get_enrichment_hints() may contain any of the following keys, each mapping to a list of tag strings:
| Key | Description | Example Tags |
|---|---|---|
function_purposes |
What the function does | ["lock-acquire", "resource-management"] |
data_structures |
Relevant data structures | ["hash-table", "linked-list"] |
concurrency_patterns |
Threading and synchronization | ["mutex", "spinlock", "atomic-operation"] |
memory_management |
Memory allocation patterns | ["allocation", "deallocation", "pool"] |
error_handling |
Error handling strategies | ["exception-handler", "error-code"] |
security_attributes |
Security-relevant properties | ["input-validation", "taint-source"] |
transaction_boundaries |
Transaction demarcation | ["begin-transaction", "commit", "rollback"] |
performance_indicators |
Performance characteristics | ["hot-path", "blocking-operation"] |
acid_properties |
ACID compliance markers | ["atomicity", "isolation"] |
lock_primitives |
Lock types and operations | ["shared-lock", "exclusive-lock"] |
data_flow_categories |
Data flow classification | ["source", "sink", "transform"] |
control_flow_patterns |
Control flow characteristics | ["loop", "branch", "recursion"] |
Enrichment Subpackage (src/agents/enrichment/):
The enrichment subpackage contains six modules that support the main EnrichmentAgent:
| File | Function | Purpose |
|---|---|---|
agent.py |
EnrichmentAgent class |
Main enrichment agent implementation |
coverage.py |
calculate_coverage(hints) |
Calculate tag coverage score (0.0 to 1.0) indicating how many enrichment layers have tags |
fallback.py |
general_domain_fallback(hints) |
Apply fallback enrichment for low-coverage scenarios using general domain knowledge |
keyword_matchers.py |
enhance_with_keywords(hints, keywords) |
Add enrichment tags based on keyword matching against known patterns |
prompt_formatter.py |
format_for_prompt(hints) |
Format enrichment hints as text suitable for inclusion in LLM prompts |
tag_filters.py |
generate_tag_filters(hints) |
Generate ChromaDB tag filters from enrichment hints for filtered retrieval |
Coverage Calculation:
The calculate_coverage() function returns a score between 0.0 and 1.0 representing the proportion of enrichment layers that have at least one tag. When coverage falls below a threshold, the fallback system activates to ensure adequate enrichment.
GeneratorAgent¶
File: src/agents/generator_agent.py
Purpose: Generate SQL queries from natural language using enriched context. The GeneratorAgent takes the question, analysis, and enrichment context to produce DuckDB SQL queries against the CPG database. It supports retry logic, batch generation, query variant generation (Query Funnel), and tag effectiveness feedback.
from src.agents import GeneratorAgent
generator = GeneratorAgent(
sql_generator: SQLQueryGenerator, # SQL generation backend
enable_feedback: bool = True, # Enable tag feedback tracking
use_semantic: bool = False # Use semantic mode
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
sql_generator |
SQLQueryGenerator |
Required | SQL generation backend that produces actual queries |
enable_feedback |
bool |
True |
Enable tag effectiveness tracking via TagEffectivenessTracker |
use_semantic |
bool |
False |
Use semantic mode for query generation |
Key Methods:
# Generate SQL query
query, is_valid, error = generator.generate(question, context)
# Returns: (query_string, validation_bool, error_message_or_none)
# With retries on failure
query, is_valid, error, attempts = generator.generate_with_retries(
question, context, max_retries=2
)
# Returns: (query_string, validation_bool, error_message_or_none, attempt_count)
# Batch generation for multiple questions
results = generator.generate_batch(questions, contexts)
# Returns: list of (query, is_valid, error) tuples
# Query Funnel — generate multiple query variants
variants = generator.generate_query_variants(question, context, num_variants=3)
# Returns: [{'query': '...', 'strategy': 'precise'}, ...]
# Explain a generated query in natural language
explanation = generator.explain_query(query, context)
# Returns: string explanation of what the query does
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
generate |
question: str, context: Dict |
Tuple[str, bool, Optional[str]] |
No | Generate a single SQL query |
generate_with_retries |
question, context, max_retries=2 |
Tuple[str, bool, Optional[str], int] |
No | Generate with automatic retries on failure |
generate_batch |
questions: List[str], contexts: List[Dict] |
List[Tuple] |
No | Batch generation for multiple questions |
generate_query_variants |
question, context, num_variants=3 |
List[Dict] |
No | Generate multiple query variants (Query Funnel) |
explain_query |
query: str, context: Dict |
str |
No | Explain a query in natural language |
Delegates to:
- PromptBuilder — formats prompts for the SQL generation backend
- QueryVariantGenerator — generates multiple query variants with different strategies
- TagEffectivenessTracker — records which enrichment tags lead to successful queries
InterpreterAgent¶
File: src/agents/interpreter_agent.py
Purpose: Convert query execution results to natural language answers. The InterpreterAgent is the final stage in the core pipeline, responsible for transforming raw query results into human-readable responses. It supports LLM-based generation and template-based fallback.
from src.agents.interpreter_agent import InterpreterAgent
interpreter = InterpreterAgent(
llm_interface=None, # Optional LLM (falls back to templates)
use_semantic: bool = False, # Semantic mode (extract comments)
cpg_config: Optional[CPGConfig] = None # Domain-specific config
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm_interface |
Optional[Any] |
None |
LLM interface for generating answers. When None, template-based generation is used |
use_semantic |
bool |
False |
Enable semantic mode for comment extraction and interpretation |
cpg_config |
Optional[CPGConfig] |
None |
Domain-specific CPG configuration |
Delegates to three helper classes:
| Class | File | Purpose |
|---|---|---|
ResultParser |
src/agents/result_parser.py |
Parse various result formats (function lists, semantic results, raw data) |
AnswerGenerator |
src/agents/answer_generator.py |
Generate natural language summaries using LLM or templates |
SemanticInterpreter |
src/agents/semantic_interpreter.py |
Handle semantic query interpretation with comment extraction |
Key Methods:
# Interpret query results into a natural language answer
answer = interpreter.interpret(
question="What methods call LWLockAcquire?",
results=[...], # Raw query results
query="SELECT...", # The executed query
enrichment_hints=None # Optional enrichment context
)
# Returns: {
# 'answer': 'LWLockAcquire is called by 15 methods including...',
# 'confidence': 0.85,
# 'sources': [{'method': 'heap_insert', 'file': 'heapam.c', 'line': 234}, ...]
# }
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
interpret |
question, results, query, enrichment_hints=None |
Dict |
No | Full interpretation pipeline |
Pipeline Support Modules¶
CallChainAnalyzer¶
File: src/agents/call_chain_analyzer.py
Purpose: Build call chains and call graphs from query results. Takes entry point results, keyword method results, and call graph edges to construct adjacency lists, call chains, and identify key functions.
from src.agents.call_chain_analyzer import CallChainAnalyzer
analyzer = CallChainAnalyzer() # No parameters
result = analyzer.analyze(
entry_point_result={"method": "main", ...},
keyword_methods_result=[{"method": "foo", ...}, ...],
call_graph_result=[{"caller": "a", "callee": "b"}, ...],
question_keywords=["transaction", "commit"]
)
Constructor Parameters: None.
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
analyze |
entry_point_result, keyword_methods_result, call_graph_result, question_keywords |
Dict |
No | Full call chain analysis |
Analysis Result Structure:
{
'entry_point': 'main', # Detected entry point function
'call_graph': { # Adjacency list representation
'main': ['init', 'process'],
'process': ['validate', 'commit'],
...
},
'call_chains': [ # Paths through the call graph
['main', 'process', 'commit'],
['main', 'process', 'validate'],
...
],
'key_functions': ['commit', 'validate'], # Functions matching keywords
'metadata': {
'total_functions': 12,
'max_chain_depth': 5,
'keyword_matches': 3
}
}
ControlFlowGenerator¶
File: src/agents/control_flow_generator.py
Purpose: Generate SQL queries for control flow analysis and call chain tracing. Produces specialized DuckDB queries for entry point detection, keyword method search, and call graph construction.
from src.agents.control_flow_generator import ControlFlowGenerator
generator = ControlFlowGenerator(llm=None)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
Optional[Any] |
None |
Optional LLM for advanced query generation |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
generate_entry_point_query |
question: str, analysis: Dict |
str |
No | Generate SQL to find entry point functions |
generate_keyword_methods_query |
keywords: List[str] |
str |
No | Generate SQL to find methods matching keywords |
generate_call_graph_query |
method_name: str, max_depth: int |
str |
No | Generate SQL for call graph traversal |
LogicSynthesizer¶
File: src/agents/logic_synthesizer.py
Purpose: Synthesize natural language explanations from call chain analysis. Takes the output of CallChainAnalyzer and produces readable explanations of code execution flows, optionally using an LLM for enhanced generation.
from src.agents.logic_synthesizer import LogicSynthesizer
synthesizer = LogicSynthesizer(
llm: Optional[Any] = None, # LLM for generating explanations
language: str = "en" # Language for prompts
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
Optional[Any] |
None |
LLM interface for generating explanations. Falls back to templates when None |
language |
str |
"en" |
Language for prompt templates ("en" or "ru") |
Key Methods:
result = synthesizer.synthesize(
question="How does the shutdown sequence work?",
call_chain_analysis=call_chain_result, # From CallChainAnalyzer.analyze()
max_tokens=1000
)
# Returns: {'explanation': '...', 'metadata': {...}}
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
synthesize |
question: str, call_chain_analysis: Dict, max_tokens: int = 1000 |
Dict |
No | Synthesize explanation from call chain analysis |
Helper Function:
from src.agents.logic_synthesizer import format_call_chain_for_prompt
formatted = format_call_chain_for_prompt(analysis_result)
# Returns dict with: entry_point, call_graph, call_chains, key_functions
The format_call_chain_for_prompt() function takes a call chain analysis result and formats it as a structured dictionary suitable for inclusion in LLM prompts. It extracts the entry point, formats the call graph as readable text, lists call chains, and highlights key functions.
AdaptiveQueryRefiner¶
File: src/agents/adaptive_refiner.py
Purpose: Learn from query outcomes and suggest better query patterns. The AdaptiveQueryRefiner maintains a knowledge base of query patterns, tracking success rates and execution characteristics. Over time, it learns which query strategies work best for different question types and can suggest refinements for failed queries.
from src.agents.adaptive_refiner import AdaptiveQueryRefiner
refiner = AdaptiveQueryRefiner(
persistence_path: Optional[str] = None # Path to persist patterns
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
persistence_path |
Optional[str] |
None |
File path for persisting learned patterns. When None, patterns are kept in memory only |
Key Methods:
# Record query outcome for learning
refiner.record_query_outcome(
question="...", question_type="find_callers", query="SELECT...",
success=True, result_count=15, execution_time=0.5
)
# Suggest refinements for a failed query
suggestions = refiner.suggest_refinements(
question="...", question_type="find_callers",
failed_query="SELECT...", max_suggestions=3
)
# Returns: list of suggested query strings
# Get best pattern for a question type
pattern = refiner.get_best_pattern_for_type("find_callers")
# Returns: QueryPattern or None
# Statistics
stats = refiner.get_statistics()
# Returns: {'total_patterns': 25, 'avg_success_rate': 0.78, ...}
# Persistence
refiner.save_patterns()
refiner.load_patterns()
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
record_query_outcome |
question, question_type, query, success, result_count, execution_time |
None |
No | Record outcome for pattern learning |
suggest_refinements |
question, question_type, failed_query, max_suggestions=3 |
List[str] |
No | Suggest alternative queries |
get_best_pattern_for_type |
question_type: str |
Optional[QueryPattern] |
No | Get highest-confidence pattern |
get_statistics |
— | Dict |
No | Get learning statistics |
save_patterns |
— | None |
No | Persist patterns to disk |
load_patterns |
— | None |
No | Load patterns from disk |
Supporting Class: QueryPattern
The QueryPattern dataclass represents a learned query pattern:
| Field | Type | Description |
|---|---|---|
question_type |
str |
The type of question this pattern applies to |
query_template |
str |
SQL query template |
total_attempts |
int |
Number of times this pattern was used |
successful_attempts |
int |
Number of successful executions |
avg_result_count |
float |
Average number of results returned |
avg_execution_time |
float |
Average execution time in seconds |
Properties:
- success_rate — Returns successful_attempts / total_attempts (0.0 to 1.0)
- confidence — Combined confidence score factoring in success rate and sample size
AnswerGenerator¶
File: src/agents/answer_generator.py
Purpose: Generate natural language answers from query results. Supports both LLM-based generation for rich, contextual answers and template-based fallback for deterministic output when no LLM is available.
from src.agents.answer_generator import AnswerGenerator
generator = AnswerGenerator(
llm_interface=None, # Optional LLM
code_analyst_title: str = "code analyst", # Domain-specific title
language: str = "en" # Prompt language
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm_interface |
Optional[Any] |
None |
LLM interface for generating rich answers. Falls back to templates when None |
code_analyst_title |
str |
"code analyst" |
Title used in prompts to frame the assistant’s role |
language |
str |
"en" |
Language for generated answers ("en" or "ru") |
Key Methods:
# Group functions by enrichment tags
grouped = generator.group_by_tags(functions, enrichment_hints)
# Returns: dict mapping tag categories to lists of functions
# Generate answer from grouped results
answer = generator.generate(question, query, grouped, enrichment_hints,
used_fallback=False, fallback_query=None)
# Returns: string answer
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
group_by_tags |
functions: List[Dict], enrichment_hints: Dict |
Dict[str, List] |
No | Group functions by enrichment tag categories |
generate |
question, query, grouped, enrichment_hints, used_fallback=False, fallback_query=None |
str |
No | Generate natural language answer |
Factory Function:
from src.agents.answer_generator import create_answer_generator
generator = create_answer_generator(
llm_interface=llm,
code_analyst_title="code analyst",
language="en"
)
# Returns: AnswerGenerator instance
ResultReranker¶
File: src/agents/result_reranker.py
Purpose: Strategy-based result reranking. Applies configurable scoring strategies to reorder retrieval results, boosting the most relevant items to the top. Supports Q&A reranking, SQL example reranking, and enrichment-aware reranking.
from src.agents.result_reranker import ResultReranker, create_qa_reranker
# Using factory functions
reranker = create_qa_reranker()
reranker = create_sql_reranker()
reranker = create_enrichment_reranker()
# Or direct construction with a strategy
reranker = ResultReranker(strategy=QARerankerStrategy())
# Rerank items
ranked = reranker.rerank(items, analysis, top_k=5)
# Returns: list of items sorted by relevance score, trimmed to top_k
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
strategy |
RerankerStrategy |
Required | Reranking strategy to apply |
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
rerank |
items: List, analysis: Dict, top_k: int = 5 |
List |
No | Rerank items using the configured strategy |
Strategy Hierarchy:
RerankerStrategy (ABC)
+-- QARerankerStrategy # Q&A pair reranking
+-- SQLRerankerStrategy # SQL example reranking
+-- EnrichmentRerankerStrategy # Enrichment-aware reranking
| Strategy | Description | Key Scoring Factors |
|---|---|---|
QARerankerStrategy |
Reranks Q&A pairs based on relevance to the analysis | Intent match, domain match, keyword overlap |
SQLRerankerStrategy |
Reranks SQL examples based on structural similarity | Query pattern match, table overlap, join complexity |
EnrichmentRerankerStrategy |
Reranks based on enrichment tag alignment | Tag coverage, tag relevance, domain alignment |
Factory Functions:
| Function | Returns | Description |
|---|---|---|
create_qa_reranker() |
ResultReranker |
Reranker with QARerankerStrategy |
create_sql_reranker() |
ResultReranker |
Reranker with SQLRerankerStrategy |
create_enrichment_reranker() |
ResultReranker |
Reranker with EnrichmentRerankerStrategy |
Dataclass: ScoringBoost
from src.agents.result_reranker import ScoringBoost
boost = ScoringBoost(
name="domain_match", # Human-readable boost name
multiplier=1.5, # Score multiplier when condition is true
condition=lambda item, analysis: item.get("domain") == analysis.get("domain")
)
| Field | Type | Description |
|---|---|---|
name |
str |
Human-readable name for the boost |
multiplier |
float |
Score multiplier applied when condition evaluates to True |
condition |
Callable |
Function taking (item, analysis) and returning bool |
ConfidenceCalculator¶
File: src/agents/confidence_calculator.py
Purpose: Configurable confidence scoring via the Strategy pattern. Provides multiple calculation strategies for computing confidence scores based on different signal types: result counts, analysis quality, pattern matching, and LLM signals.
from src.agents.confidence_calculator import (
ConfidenceCalculator, create_result_calculator,
create_analysis_calculator, create_pattern_calculator, create_llm_calculator
)
# Using factory functions
calc = create_result_calculator()
score = calc.calculate(result_count=15, execution_success=True, used_fallback=False)
# Returns: float between 0.0 and 1.0
# Or from factors dataclass
from src.agents.confidence_calculator import ConfidenceFactors
factors = ConfidenceFactors(result_count=15, execution_success=True)
score = calc.calculate_from_factors(factors)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
strategy |
ConfidenceStrategy |
Required | Confidence calculation strategy |
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
calculate |
**kwargs |
float |
No | Calculate confidence from keyword arguments |
calculate_from_factors |
factors: ConfidenceFactors |
float |
No | Calculate confidence from a ConfidenceFactors dataclass |
Strategy Hierarchy:
ConfidenceStrategy (ABC)
+-- ResultConfidenceStrategy # Based on result count and execution status
+-- AnalysisConfidenceStrategy # Based on intent/domain/keyword analysis
+-- PatternConfidenceStrategy # Based on pattern matching success
+-- LLMConfidenceStrategy # Based on LLM confidence signals
| Strategy | Key Inputs | Description |
|---|---|---|
ResultConfidenceStrategy |
result_count, execution_success, used_fallback |
Higher confidence with more results and successful execution; penalizes fallback usage |
AnalysisConfidenceStrategy |
intent_confidence, domain_confidence, keyword_count |
Higher confidence when intent and domain are clearly identified |
PatternConfidenceStrategy |
pattern_match_score, pattern_count |
Higher confidence when known patterns are matched |
LLMConfidenceStrategy |
llm_confidence, coherence_score |
Uses LLM self-reported confidence and output coherence |
Factory Functions:
| Function | Returns | Description |
|---|---|---|
create_result_calculator() |
ConfidenceCalculator |
Calculator with ResultConfidenceStrategy |
create_analysis_calculator() |
ConfidenceCalculator |
Calculator with AnalysisConfidenceStrategy |
create_pattern_calculator() |
ConfidenceCalculator |
Calculator with PatternConfidenceStrategy |
create_llm_calculator() |
ConfidenceCalculator |
Calculator with LLMConfidenceStrategy |
Dataclass: ConfidenceFactors
from src.agents.confidence_calculator import ConfidenceFactors
factors = ConfidenceFactors(
result_count=15,
execution_success=True,
used_fallback=False,
intent_confidence=0.9,
domain_confidence=0.85,
keyword_count=3,
pattern_match_score=0.75,
pattern_count=2,
llm_confidence=0.8,
coherence_score=0.9
)
All fields are optional with sensible defaults, allowing each strategy to consume only the fields it needs.
PromptBuilder¶
File: src/agents/prompt_builder.py
Purpose: Prompt formatting for the GeneratorAgent. Constructs well-structured prompts that combine the user’s question, retrieval context, enrichment hints, Q&A examples, and SQL examples into a format suitable for SQL generation.
from src.agents.prompt_builder import PromptBuilder
builder = PromptBuilder(semantic_prompts: Optional[Dict[str, str]] = None)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
semantic_prompts |
Optional[Dict[str, str]] |
None |
Custom prompt templates for semantic mode. Keys are template names, values are template strings |
Key Methods:
# Build enriched prompt with full context
prompt = builder.build_enriched_prompt(question, context)
# Build simple prompt without enrichment
prompt = builder.build_simple_prompt(question, context)
# Build semantic prompt for semantic mode
prompt = builder.build_semantic_prompt(question, context)
# Formatting helpers
enrichment_ctx = builder.format_enrichment_context(hints)
qa_text = builder.format_qa_examples(qa_pairs)
sql_text = builder.format_sql_examples(examples)
guidance = builder.get_domain_guidance(analysis)
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
build_enriched_prompt |
question: str, context: Dict |
str |
No | Full prompt with enrichment context |
build_simple_prompt |
question: str, context: Dict |
str |
No | Simple prompt without enrichment |
build_semantic_prompt |
question: str, context: Dict |
str |
No | Prompt for semantic mode |
format_enrichment_context |
hints: Dict |
str |
No | Format enrichment hints as text |
format_qa_examples |
qa_pairs: List[Dict] |
str |
No | Format Q&A examples for prompt |
format_sql_examples |
examples: List[Dict] |
str |
No | Format SQL examples for prompt |
get_domain_guidance |
analysis: Dict |
str |
No | Get domain-specific guidance text |
Factory Function:
from src.agents.prompt_builder import create_prompt_builder
builder = create_prompt_builder(semantic_prompts=None)
# Returns: PromptBuilder instance
QueryVariantGenerator¶
File: src/agents/query_variants.py
Purpose: Generate query variants for the Query Funnel approach. Instead of generating a single query, this module produces multiple variants with different strategies (precise, broad, alternative) to maximize the chance of finding relevant results.
from src.agents.query_variants import QueryVariantGenerator
gen = QueryVariantGenerator()
variants = gen.generate_variants(question, context, num_variants=3)
# Returns: [{'query': '...', 'strategy': 'precise'}, {'query': '...', 'strategy': 'broad'}, ...]
Constructor Parameters: None.
Key Methods:
# Generate multiple query variants
variants = gen.generate_variants(question, context, num_variants=3)
# Detection helpers for specialized query types
gen.is_data_flow_question("How does data flow from A to B?") # True
gen.is_control_flow_question("What is the shutdown sequence?") # True
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
generate_variants |
question: str, context: Dict, num_variants: int = 3 |
List[Dict] |
No | Generate query variants with different strategies |
is_data_flow_question |
question: str |
bool |
No | Check if question is about data flow |
is_control_flow_question |
question: str |
bool |
No | Check if question is about control flow |
Variant Strategies:
| Strategy | Description |
|---|---|
precise |
Exact-match query targeting specific function/method names |
broad |
Wider search using LIKE patterns and partial matches |
alternative |
Alternative query structure using different tables or joins |
data_flow |
Specialized query for data flow tracing (when detected) |
control_flow |
Specialized query for control flow analysis (when detected) |
Factory Function:
from src.agents.query_variants import create_variant_generator
gen = create_variant_generator()
# Returns: QueryVariantGenerator instance
Fallback Strategies¶
File: src/agents/fallback_strategies.py
Purpose: Intelligent fallback for low-coverage enrichment (Phase 4). When the EnrichmentAgent produces hints with low tag coverage, these fallback strategies activate to ensure adequate context for query generation. Three classes work together to map keywords to tags, build hybrid queries, and select the appropriate fallback strategy.
Three classes:
KeywordToTagMapper¶
from src.agents.fallback_strategies import KeywordToTagMapper
mapper = KeywordToTagMapper(similarity_threshold=None)
| Parameter | Type | Default | Description |
|---|---|---|---|
similarity_threshold |
Optional[float] |
None |
Minimum similarity score for fuzzy keyword-to-tag matching. When None, uses a default threshold |
Maps keywords to CPG enrichment tags using fuzzy string matching. When a keyword does not exactly match a known tag, the mapper finds the closest matching tags above the similarity threshold.
HybridQueryBuilder¶
from src.agents.fallback_strategies import HybridQueryBuilder
builder = HybridQueryBuilder()
No constructor parameters. Builds hybrid queries that combine name-based matching with tag-based filtering. Used when enrichment tags alone are insufficient for precise retrieval.
FallbackStrategySelector¶
from src.agents.fallback_strategies import FallbackStrategySelector
selector = FallbackStrategySelector()
No constructor parameters. Orchestrates the fallback process by evaluating coverage, selecting the appropriate strategy, and applying it.
Convenience Function:
from src.agents.fallback_strategies import get_fallback_selector
selector = get_fallback_selector()
enhanced_hints = selector.apply_fallback(hints, question, analysis)
# Returns: enhanced hints dict with additional tags from fallback strategies
Method Reference for FallbackStrategySelector:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
apply_fallback |
hints: Dict, question: str, analysis: Dict |
Dict |
No | Apply appropriate fallback strategy to enhance hints |
evaluate_coverage |
hints: Dict |
float |
No | Calculate current coverage score |
select_strategy |
coverage: float, analysis: Dict |
str |
No | Select fallback strategy based on coverage and analysis |
Other Support Modules¶
| File | Class/Function | Constructor/Parameters | Purpose |
|---|---|---|---|
semantic_interpreter.py |
SemanticInterpreter |
(llm_interface, prompt_template: Optional[str] = None, code_analyst_title: str = "code analyst") |
Semantic query interpretation with comment extraction from code |
result_parser.py |
ResultParser |
() — no parameters |
Parse function lists, semantic results, and raw query output into structured format |
tag_effectiveness_tracker.py |
TagEffectivenessTracker |
(persistence_path: Optional[str] = None, min_samples: int = 3) |
Track which enrichment tags lead to successful queries; singleton via get_global_tracker() |
utils.py |
sanitize_sql_value() |
(value: str) -> str |
Sanitize a string for safe use in SQL queries |
utils.py |
sanitize_like_pattern() |
(pattern: str) -> str |
Sanitize a LIKE pattern, escaping special characters |
SemanticInterpreter details:
from src.agents.semantic_interpreter import SemanticInterpreter
interpreter = SemanticInterpreter(
llm_interface=llm,
prompt_template=None, # Custom prompt template
code_analyst_title="code analyst" # Role title in prompts
)
result = interpreter.interpret(question, results, query)
# Returns: dict with 'answer', 'confidence', 'sources'
TagEffectivenessTracker details:
Supporting dataclass: TagUsageRecord — fields: tag_name, tag_value, question_domain, question_intent, query_valid, query_executed, execution_successful, timestamp, coverage_score.
from src.agents.tag_effectiveness_tracker import TagEffectivenessTracker, TagUsageRecord, get_global_tracker
# Singleton access
tracker = get_global_tracker()
# Record tag usage and outcome
tracker.record(tags=["concurrency", "lock-acquire"], success=True, result_count=15)
# Get effectiveness report
report = tracker.get_report()
# Returns: dict mapping tag names to effectiveness scores
# Persistence
tracker.save()
tracker.load()
ResultParser details:
from src.agents.result_parser import ResultParser
parser = ResultParser()
# Parse function list results
functions = parser.parse_function_list(raw_results)
# Parse semantic results
semantic = parser.parse_semantic_result(raw_results)
# Check for empty results
parser.is_empty(raw_results)
ACP — Agent Client Protocol¶
ACP Overview¶
ACP provides IDE integration via JSON-RPC 2.0. It allows IDEs to communicate with CodeGraph’s analysis pipeline through a structured protocol supporting sessions, threads, turns, diagnostics, hover information, and file change approvals. Supported IDEs: Zed, JetBrains, VS Code.
IDE <--> Transport (stdio | HTTP | WebSocket) <--> CodeGraphACPAgent <--> Adapters <--> Core Services
Entry point:
python -m src.acp # Default: stdio transport
python -m src.acp --transport http # HTTP transport on default port
python -m src.acp --transport ws # WebSocket transport
The ACP system follows a layered architecture:
- Transport Layer — Handles raw communication (stdio, HTTP, WebSocket)
- Protocol Layer — JSON-RPC 2.0 request/response handling
- Agent Layer — Method dispatch and business logic
- Integration Layer — Adapters connecting to core CodeGraph services
graph LR
subgraph "Transport"
STDIO[StdioTransport]
HTTP[HTTPTransport]
WS[WebSocketTransport]
end
subgraph "Protocol"
AGENT[CodeGraphACPAgent]
end
subgraph "Server Components"
SM[SessionManager]
TM[ThreadManager]
TO[TurnOrchestrator]
DE[DiagnosticsEngine]
HP[HoverProvider]
AB[ApprovalBridge]
NR[NotificationRouter]
end
subgraph "Integration"
CA[ChatAdapter]
SA[SessionAdapter]
WA[WorkflowAdapter]
MB[MCPBridge]
end
STDIO --> AGENT
HTTP --> AGENT
WS --> AGENT
AGENT --> SM
AGENT --> TM
AGENT --> TO
AGENT --> DE
AGENT --> HP
AGENT --> AB
AGENT --> NR
AGENT --> CA
AGENT --> SA
AGENT --> WA
AGENT --> MB
CodeGraphACPAgent¶
File: src/acp/server/agent.py
Purpose: Main ACP agent that dispatches 25 JSON-RPC methods to their respective handlers. This is the central coordinator of the ACP system, lazily loading server components and integration adapters as needed.
from src.acp import CodeGraphACPAgent
agent = CodeGraphACPAgent() # No parameters
# Main entry point (async)
response = await agent.handle(
request: JSONRPCRequest,
notification_callback: Optional[Callable] = None,
user_id: Optional[str] = None
)
Constructor Parameters: None. All components are lazy-loaded on first access.
Main Method:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
handle |
request: JSONRPCRequest, notification_callback: Optional[Callable] = None, user_id: Optional[str] = None |
JSONRPCResponse |
Yes | Dispatch a JSON-RPC request to the appropriate handler |
JSON-RPC Methods (25 total):
| Category | Method | Handler | Streaming | Description |
|---|---|---|---|---|
| Init | initialize |
_handle_initialize |
No | Initialize protocol, negotiate capabilities |
| Auth | authenticate |
_handle_authenticate |
No | Authenticate client with credentials |
| Session | session/new |
_handle_session_new |
No | Create a new session with working directory and MCP servers |
| Session | session/load |
_handle_session_load |
No | Load an existing session by ID |
| Session | session/prompt |
_handle_session_prompt |
Yes | Send a prompt and receive streaming response |
| Session | session/cancel |
_handle_session_cancel |
No | Cancel a running session prompt |
| File System | fs/read_text_file |
_handle_fs_read |
No | Read a text file from the filesystem |
| File System | fs/write_text_file |
_handle_fs_write |
No | Write content to a text file |
| Terminal | terminal/create |
_handle_terminal_create |
No | Create a terminal subprocess |
| Terminal | terminal/output |
_handle_terminal_output |
No | Read terminal output |
| Terminal | terminal/wait_for_exit |
_handle_terminal_wait |
No | Wait for terminal process to exit |
| Terminal | terminal/kill |
_handle_terminal_kill |
No | Kill a terminal process |
| Terminal | terminal/release |
_handle_terminal_release |
No | Release terminal resources |
| Thread | thread/start |
_handle_thread_start |
No | Start a new conversation thread |
| Thread | thread/resume |
_handle_thread_resume |
No | Resume an existing thread |
| Thread | thread/fork |
_handle_thread_fork |
No | Fork a thread at a specific turn |
| Thread | thread/list |
_handle_thread_list |
No | List threads with optional filters |
| Thread | thread/archive |
_handle_thread_archive |
No | Archive a thread |
| Thread | thread/compact |
_handle_thread_compact |
No | Compact thread history to save memory |
| Turn | turn/start |
_handle_turn_start |
Yes | Start a new turn within a thread (streaming) |
| Turn | turn/interrupt |
_handle_turn_interrupt |
No | Interrupt a running turn |
| Diagnostics | diagnostics/subscribe |
_handle_diagnostics_subscribe |
No | Subscribe to diagnostics for a file URI |
| Diagnostics | diagnostics/unsubscribe |
_handle_diagnostics_unsubscribe |
No | Unsubscribe from diagnostics for a file URI |
| Hover | hover/request |
_handle_hover_request |
No | Request hover information for a position |
| Approval | item/fileChange/respondApproval |
_handle_respond_approval |
No | Respond to a file change approval request |
Lazy-Loaded Properties:
The following properties are initialized on first access to avoid unnecessary resource allocation:
| Property | Type | Description |
|---|---|---|
session_manager |
ACPSessionManager |
Manages session lifecycle and state |
chat_adapter |
ChatServiceACPAdapter |
Bridges prompts to ChatService |
capability_negotiator |
CapabilityNegotiator |
Negotiates client/server capabilities |
thread_manager |
ACPThreadManager |
Manages conversation threads |
turn_orchestrator |
TurnOrchestrator |
Orchestrates turn execution and streaming |
diagnostics_engine |
DiagnosticsEngine |
Generates and publishes diagnostics |
hover_provider |
HoverProvider |
Provides hover information from CPG |
approval_bridge |
ACPApprovalBridge |
Manages file change approval flow |
Protocol Types (Pydantic models defined in the same file):
| Model | Fields | Purpose |
|---|---|---|
JSONRPCRequest |
jsonrpc: str, method: str, params: Optional[Dict], id: Optional[Union[str, int]] |
JSON-RPC 2.0 request envelope |
JSONRPCResponse |
jsonrpc: str, result: Optional[Any], error: Optional[Dict], id: Optional[Union[str, int]] |
JSON-RPC 2.0 response envelope |
JSONRPCNotification |
jsonrpc: str, method: str, params: Optional[Dict] |
JSON-RPC 2.0 notification (no id field, no response expected) |
InitializeParams |
clientInfo: ClientInfo, capabilities: ClientCapabilities, protocolVersion: str |
Parameters for the initialize method |
InitializeResult |
serverInfo: AgentInfo, capabilities: AgentCapabilities, protocolVersion: str |
Result of the initialize method |
SessionNewParams |
cwd: str, mcpServers: Optional[List[Dict]], userId: Optional[str] |
Parameters for session/new |
SessionNewResult |
sessionId: str, projectId: Optional[str] |
Result of session/new |
SessionPromptParams |
sessionId: str, contentBlocks: List[ContentBlock] |
Parameters for session/prompt |
ContentBlock |
type: str, text: Optional[str], data: Optional[str], mimeType: Optional[str] |
Content block in a prompt (text, image, file) |
StopReason |
Enum: end_turn, max_tokens, cancelled |
Reason why generation stopped |
AgentCapabilities |
loadSession: bool, setMode: bool, promptCapabilities: PromptCapabilities, mcp: MCPCapabilities, authMethods: List[str] |
Server capability advertisement |
PromptCapabilities |
image: bool, audio: bool, embeddedContext: bool |
Supported prompt content types |
MCPCapabilities |
stdio: bool, http: bool, sse: bool |
MCP transport support flags |
AgentInfo |
name: str, version: str, title: str |
Server identity information |
ClientCapabilities |
fs: bool, terminal: bool, diagnostics: bool, hover: bool, approval: bool |
Client capability advertisement |
ClientInfo |
name: str, version: str |
Client identity information |
PlanEntry |
id: str, title: str, status: str, toolCallId: Optional[str] |
Entry in an execution plan |
PlanUpdate |
entries: List[PlanEntry] |
Update to the execution plan |
AgentMessageChunk |
text: str, stopReason: Optional[StopReason] |
Streaming text chunk |
ToolCallUpdate |
toolCallId: str, toolName: str, status: str, result: Optional[Any] |
Tool call status update |
Session Management¶
File: src/acp/server/session_manager.py
Purpose: Manages ACP session lifecycle including creation, loading, timeout, and cleanup. Each session represents an IDE connection with its own working directory, project context, and MCP server configuration.
from src.acp import ACPSessionManager
manager = ACPSessionManager(
max_sessions: int = 100,
session_timeout_minutes: int = 60
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
max_sessions |
int |
100 |
Maximum number of concurrent sessions |
session_timeout_minutes |
int |
60 |
Session timeout in minutes after last activity |
Key Methods:
# Create a new session
session = await manager.create_session(
cwd="/path/to/project",
mcp_servers=[{"id": "...", "transport": "stdio", ...}],
user_id="user-123"
)
# Load existing session
session = await manager.load_session(session_id="...")
# Get session by ID
session = manager.get_session(session_id="...")
# Cancel a session
await manager.cancel_session(session_id="...")
# Cleanup expired sessions
cleaned = await manager.cleanup_expired()
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
create_session |
cwd: str, mcp_servers: Optional[List[Dict]] = None, user_id: Optional[str] = None |
ACPSession |
Yes | Create a new session |
load_session |
session_id: str |
ACPSession |
Yes | Load an existing session |
get_session |
session_id: str |
Optional[ACPSession] |
No | Get session by ID (returns None if not found) |
cancel_session |
session_id: str |
None |
Yes | Cancel a running session |
cleanup_expired |
— | int |
Yes | Remove expired sessions, returns count cleaned |
Dataclass: ACPSession
| Field | Type | Description |
|---|---|---|
session_id |
str |
Unique session identifier (UUID) |
cwd |
str |
Working directory for this session |
user_id |
Optional[str] |
Authenticated user ID |
project_id |
Optional[str] |
Associated CodeGraph project ID |
codegraph_session_id |
Optional[str] |
Internal session ID for core services |
mcp_servers |
List[Dict] |
List of MCP server configurations |
created_at |
datetime |
Session creation timestamp |
last_activity |
datetime |
Last activity timestamp (updated on each request) |
cancelled |
bool |
Whether the session has been cancelled |
context |
Dict |
Arbitrary session context data |
Capability Negotiation¶
File: src/acp/server/capabilities.py
Purpose: Handles protocol capability negotiation between client and server during the initialize handshake. Determines which features both sides support and configures the protocol accordingly.
from src.acp.server.capabilities import CapabilityNegotiator
negotiator = CapabilityNegotiator()
Constructor Parameters: None.
Key Methods:
# Negotiate capabilities from initialize params
server_caps = negotiator.negotiate(init_params)
# Returns: AgentCapabilities with negotiated feature flags
# Check if a capability is supported
negotiator.supports("diagnostics") # True/False
negotiator.supports("hover") # True/False
# Check if a notification should be sent
negotiator.should_send_notification("diagnostics/publish") # True/False
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
negotiate |
init_params: InitializeParams |
AgentCapabilities |
No | Negotiate capabilities and return server capabilities |
supports |
capability: str |
bool |
No | Check if a specific capability is supported |
should_send_notification |
method: str |
bool |
No | Check if a notification method should be sent based on negotiated capabilities and opt-out list |
Dataclass: ClientCapabilitiesV2
Extended client capabilities with granular control:
| Field | Type | Default | Description |
|---|---|---|---|
fs |
bool |
False |
Client supports filesystem operations |
terminal |
bool |
False |
Client supports terminal operations |
diagnostics |
bool |
False |
Client supports diagnostics display |
hover |
bool |
False |
Client supports hover information |
approval_flow |
bool |
False |
Client supports file change approval flow |
opt_out_notification_methods |
List[str] |
[] |
List of notification methods the client does not want to receive |
Thread and Turn Lifecycle¶
File: src/acp/server/thread_manager.py
Purpose: Manages conversation threads. Threads represent ongoing conversations that persist across multiple turns. They can be started, resumed, forked, listed, archived, and compacted.
from src.acp.server.thread_manager import ACPThreadManager
tm = ACPThreadManager()
Constructor Parameters: None.
Key Methods:
# Start a new thread
thread = await tm.start_thread(cwd="/path", name="analysis", config={})
# Resume an existing thread
thread = await tm.resume_thread(thread_id="...")
# Fork a thread at a specific turn
thread = await tm.fork_thread(thread_id="...", at_turn=5)
# List threads with filters
threads = await tm.list_threads(status="active", limit=20, offset=0)
# Archive a thread (preserves but marks inactive)
await tm.archive_thread(thread_id="...")
# Compact thread history to reduce memory
await tm.compact_thread(thread_id="...")
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
start_thread |
cwd: str, name: Optional[str] = None, config: Optional[Dict] = None |
Dict |
Yes | Start a new conversation thread |
resume_thread |
thread_id: str |
Dict |
Yes | Resume an existing thread |
fork_thread |
thread_id: str, at_turn: Optional[int] = None |
Dict |
Yes | Fork a thread, optionally at a specific turn number |
list_threads |
status: Optional[str] = None, limit: int = 20, offset: int = 0 |
List[Dict] |
Yes | List threads with optional status filter and pagination |
archive_thread |
thread_id: str |
None |
Yes | Archive a thread |
compact_thread |
thread_id: str |
None |
Yes | Compact thread history |
File: src/acp/server/turn_orchestrator.py
Purpose: Orchestrates turn execution within threads. A turn represents a single user-assistant interaction. The orchestrator manages the lifecycle of a turn including background execution, streaming, and interruption.
from src.acp.server.turn_orchestrator import TurnOrchestrator
orchestrator = TurnOrchestrator()
Constructor Parameters: None.
Key Methods:
# Start a turn (launches background task for streaming)
result = await orchestrator.start_turn(
thread_id="...", prompt="Analyze this code",
context={"files": [...]},
notification_router=router
)
# Interrupt a running turn
result = await orchestrator.interrupt_turn(thread_id="...", turn_id="...")
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
start_turn |
thread_id: str, prompt: str, context: Optional[Dict] = None, notification_router: Optional[NotificationRouter] = None |
Dict |
Yes | Start a new turn with optional streaming notifications |
interrupt_turn |
thread_id: str, turn_id: str |
Dict |
Yes | Interrupt a running turn |
The start_turn method launches a background task that processes the prompt and sends streaming notifications (plan updates, message chunks, tool call updates) through the notification router. The method returns immediately with a turn ID, and progress is communicated through notifications.
Diagnostics and Hover¶
File: src/acp/server/diagnostics.py
Purpose: Generates and publishes diagnostics for subscribed file URIs. Uses CPG data to identify issues like taint flows, high complexity, deprecated API usage, and dead code. Publishes diagnostics to subscribed clients via the notification router.
from src.acp.server.diagnostics import DiagnosticsEngine
engine = DiagnosticsEngine()
Constructor Parameters: None.
Key Methods:
# Subscribe to diagnostics for a file
engine.subscribe(uri="file:///path/to/file.py")
# Generate diagnostics for a file
diagnostics = await engine.generate_diagnostics(uri="file:///path/to/file.py")
# Publish diagnostics through notification router
await engine.publish_diagnostics(uri, notification_router=router)
# Unsubscribe from diagnostics
engine.unsubscribe(uri="file:///path/to/file.py")
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
subscribe |
uri: str |
None |
No | Subscribe to diagnostics for a file URI |
generate_diagnostics |
uri: str |
List[Dict] |
Yes | Generate diagnostics for a file |
publish_diagnostics |
uri: str, notification_router: NotificationRouter |
None |
Yes | Publish diagnostics to subscribed clients |
unsubscribe |
uri: str |
None |
No | Unsubscribe from diagnostics for a file URI |
Diagnostic Categories:
| Category | Description | Severity |
|---|---|---|
taint |
Taint flow from source to sink | Error |
complexity |
Cyclomatic complexity above threshold | Warning/Error |
deprecated |
Usage of deprecated APIs | Warning |
todo |
TODO/FIXME comments | Information |
dead_code |
Unreachable or unused code | Hint |
File: src/acp/server/hover.py
Purpose: Provides hover information for code positions. When the user hovers over a symbol in their IDE, this provider queries the CPG to return callers, callees, complexity metrics, and taint status for the symbol at that position.
from src.acp.server.hover import HoverProvider
hover = HoverProvider()
Constructor Parameters: None.
Key Methods:
# Get hover information for a position
info = await hover.get_hover_info(
uri="file:///path/to/file.py",
line=42,
character=10
)
# Returns: dict with callers, callees, complexity, taint status from CPG
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
get_hover_info |
uri: str, line: int, character: int |
Dict |
Yes | Get hover information from CPG for a symbol at the given position |
Hover Result Structure:
{
'symbol': 'process_request',
'kind': 'method',
'file': 'handler.py',
'line': 42,
'callers': ['main', 'dispatch'], # Functions that call this symbol
'callees': ['validate', 'execute'], # Functions called by this symbol
'complexity': 8, # Cyclomatic complexity
'taint_status': 'clean', # clean | tainted | sanitized
'documentation': '...' # Extracted doc comment if available
}
Approval Bridge¶
File: src/acp/server/approval_bridge.py
Purpose: Manages the file change approval flow. When an agent wants to modify files, it requests approval from the IDE client through this bridge. The client can approve or reject the change, with optional timeout handling.
from src.acp.server.approval_bridge import ACPApprovalBridge
bridge = ACPApprovalBridge()
Constructor Parameters: None.
Key Methods:
# Request approval for a file change
result = await bridge.request_file_change_approval(
thread_id="...", turn_id="...", item_id="...",
diff="...", files=["file.py"], reason="Refactoring",
confidence=0.9, notification_router=router, timeout=30.0
)
# Returns: dict with approval decision
# Resolve a pending approval (called when client responds)
bridge.resolve_approval(request_id="...", decision={"approved": True})
# Check pending approvals
bridge.pending_count # Number of pending approvals
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
request_file_change_approval |
thread_id, turn_id, item_id, diff, files, reason, confidence, notification_router, timeout=30.0 |
Dict |
Yes | Request approval from IDE client |
resolve_approval |
request_id: str, decision: Dict |
None |
No | Resolve a pending approval with client’s decision |
Properties:
| Property | Type | Description |
|---|---|---|
pending_count |
int |
Number of pending approval requests |
Notification Router¶
File: src/acp/server/notification_router.py
Purpose: Routes notifications from server components to the IDE client. Respects capability negotiation to avoid sending notifications the client does not support or has opted out of.
from src.acp.server.notification_router import NotificationRouter
router = NotificationRouter(
callback=notification_callback,
negotiator=capability_negotiator
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
callback |
Callable |
Required | Async callback function that sends notifications to the client |
negotiator |
CapabilityNegotiator |
Required | Capability negotiator for checking what notifications the client supports |
Key Methods:
# Send a notification
sent = await router.send("session/update", {"sessionUpdate": "plan", ...})
# Returns: bool indicating whether the notification was sent
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
send |
method: str, params: Dict |
bool |
Yes | Send a notification if the client supports it. Returns True if sent, False if suppressed |
File and Terminal Handlers¶
File: src/acp/server/handlers.py
Purpose: Module-level async functions for filesystem and terminal operations. These are not class methods but standalone async functions called by the CodeGraphACPAgent.
Filesystem Handlers:
from src.acp.server.handlers import handle_fs_read, handle_fs_write
# Read a text file
result = await handle_fs_read(
{"path": "/path/to/file.py"},
client_capabilities
)
# Returns: {"content": "file contents...", "encoding": "utf-8"}
# Write to a text file
result = await handle_fs_write(
{"path": "/path/to/file.py", "content": "new content..."},
client_capabilities
)
# Returns: {"success": True}
Terminal Handlers:
from src.acp.server.handlers import (
handle_terminal_create,
handle_terminal_output,
handle_terminal_wait,
handle_terminal_kill,
handle_terminal_release
)
# Create a terminal subprocess
result = await handle_terminal_create(
{"command": "python", "args": ["-m", "pytest"], "cwd": "/project"},
client_capabilities
)
# Returns: {"terminalId": "..."}
# Read terminal output
result = await handle_terminal_output(
{"terminalId": "..."},
client_capabilities
)
# Returns: {"output": "...", "isComplete": False}
# Wait for terminal to exit
result = await handle_terminal_wait(
{"terminalId": "...", "timeout": 30},
client_capabilities
)
# Returns: {"exitCode": 0}
# Kill terminal process
result = await handle_terminal_kill(
{"terminalId": "..."},
client_capabilities
)
# Returns: {"success": True}
# Release terminal resources
result = await handle_terminal_release(
{"terminalId": "..."},
client_capabilities
)
# Returns: {"success": True}
Function Reference:
| Function | Parameters | Returns | Async | Description |
|---|---|---|---|---|
handle_fs_read |
params: Dict, capabilities |
Dict |
Yes | Read a text file |
handle_fs_write |
params: Dict, capabilities |
Dict |
Yes | Write content to a text file |
handle_terminal_create |
params: Dict, capabilities |
Dict |
Yes | Create a terminal subprocess |
handle_terminal_output |
params: Dict, capabilities |
Dict |
Yes | Read terminal output |
handle_terminal_wait |
params: Dict, capabilities |
Dict |
Yes | Wait for terminal exit |
handle_terminal_kill |
params: Dict, capabilities |
Dict |
Yes | Kill terminal process |
handle_terminal_release |
params: Dict, capabilities |
Dict |
Yes | Release terminal resources |
Transport Layer¶
File: src/acp/transport/base.py
Purpose: Abstract base class for ACP transports. Defines the interface that all transport implementations must follow.
from src.acp.transport.base import BaseTransport # ABC
class BaseTransport(ABC):
def __init__(self, agent: Any): ...
@property
def is_running(self) -> bool: ...
async def start(self) -> None: ... # abstract
async def stop(self) -> None: ... # abstract
async def send(self, message: Any) -> None: ... # abstract
async def handle_message(self, message, notification_callback=None, user_id=None) -> Optional[Any]: ...
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
agent |
Any |
Required | The CodeGraphACPAgent instance to dispatch messages to |
Abstract Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
start |
— | None |
Yes | Start the transport (begin listening) |
stop |
— | None |
Yes | Stop the transport (cleanup) |
send |
message: Any |
None |
Yes | Send a message to the client |
Concrete Method:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
handle_message |
message, notification_callback=None, user_id=None |
Optional[Any] |
Yes | Parse message and dispatch to agent |
Properties:
| Property | Type | Description |
|---|---|---|
is_running |
bool |
Whether the transport is currently running |
Implementations:
| Transport | File | Purpose | Entry Point |
|---|---|---|---|
StdioTransport |
src/acp/transport/stdio.py |
stdin/stdout JSON-RPC communication | run_stdio() |
HTTPTransport |
src/acp/transport/http.py |
HTTP REST endpoint for JSON-RPC | get_http_transport() |
WebSocketTransport |
src/acp/transport/websocket.py |
WebSocket bidirectional communication | — |
StdioTransport (src/acp/transport/stdio.py):
Reads JSON-RPC messages from stdin and writes responses to stdout. Used for local IDE integration where the IDE spawns the ACP process.
from src.acp.transport.stdio import StdioTransport, run_stdio
transport = StdioTransport(agent=agent)
await transport.start() # Blocks, reading from stdin
# Or use the convenience function
await run_stdio() # Creates agent and transport, runs until EOF
HTTPTransport (src/acp/transport/http.py):
Exposes a JSON-RPC endpoint over HTTP. Used for remote IDE connections or web-based clients.
from src.acp.transport.http import HTTPTransport, get_http_transport
transport = HTTPTransport(agent=agent, host="0.0.0.0", port=8080)
await transport.start()
# Or use the factory function
transport = get_http_transport(agent=agent)
await transport.start()
WebSocketTransport (src/acp/transport/websocket.py):
Provides bidirectional WebSocket communication. Supports server-initiated notifications and concurrent message handling.
from src.acp.transport.websocket import WebSocketTransport
transport = WebSocketTransport(agent=agent)
WebSocketManager (same file):
Manages multiple WebSocket connections and supports broadcasting to specific users.
from src.acp.transport.websocket import get_websocket_manager
manager = get_websocket_manager() # Singleton
# Add a connection
await manager.add_connection(user_id="user-123", transport=ws_transport)
# Broadcast to all connections of a user
count = await manager.broadcast_to_user(user_id="user-123", message={"method": "update", ...})
# Get connection count
total = manager.get_connection_count() # All connections
user_count = manager.get_connection_count(user_id="user-123") # Specific user
WebSocketManager Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
add_connection |
user_id: str, transport: WebSocketTransport |
None |
Yes | Register a WebSocket connection for a user |
remove_connection |
user_id: str, transport: WebSocketTransport |
None |
Yes | Remove a WebSocket connection |
broadcast_to_user |
user_id: str, message: Any |
int |
Yes | Send message to all connections of a user, returns count sent |
get_connection_count |
user_id: Optional[str] = None |
int |
No | Get total or per-user connection count |
Integration Adapters¶
ChatServiceACPAdapter¶
File: src/acp/integration/chat_adapter.py
Purpose: Bridges ACP prompts to the ChatService and WorkflowACPAdapter. Receives content blocks from ACP sessions, processes them through the chat pipeline, and sends streaming notifications back to the client (plan updates, tool call updates, message chunks).
from src.acp.integration.chat_adapter import ChatServiceACPAdapter
adapter = ChatServiceACPAdapter()
Constructor Parameters: None.
Key Methods:
# Process a prompt from an ACP session
answer = await adapter.process_prompt(
content_blocks=[{"type": "text", "text": "Analyze code..."}],
session=acp_session,
notification_callback=callback,
user_id="user-123"
)
# Returns: string answer
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
process_prompt |
content_blocks: List[Dict], session: ACPSession, notification_callback: Optional[Callable] = None, user_id: Optional[str] = None |
str |
Yes | Process ACP prompt and return answer with streaming notifications |
The adapter sends the following notification types during processing:
- Plan updates — session/update with sessionUpdate: "plan" containing execution plan entries
- Tool call updates — session/update with sessionUpdate: "tool_call" containing tool execution status
- Message chunks — session/update with sessionUpdate: "message" containing streaming text
SessionACPAdapter¶
File: src/acp/integration/session_adapter.py
Purpose: Bridges ACP session operations to CodeGraph’s internal session management. Handles session creation, loading, context retrieval, and dialogue history.
from src.acp.integration.session_adapter import SessionACPAdapter
adapter = SessionACPAdapter()
Constructor Parameters: None.
Key Methods:
# Create a session
session_id = await adapter.create_session(user_id="user-123", metadata={"ide": "vscode"})
# Get a session
session = await adapter.get_session(session_id="...", user_id="user-123")
# Get session context (recent interactions)
context = await adapter.get_context(session_id="...", user_id="user-123", limit=10)
# Add a dialogue turn
await adapter.add_dialogue_turn(
session_id="...", user_id="user-123",
role="user", content="Hello"
)
# Get dialogue history
history = await adapter.get_dialogue_history(session_id="...", user_id="user-123")
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
create_session |
user_id: str, metadata: Optional[Dict] = None |
str |
Yes | Create a new session, returns session ID |
get_session |
session_id: str, user_id: str |
Dict |
Yes | Get session details |
get_context |
session_id: str, user_id: str, limit: int = 10 |
Dict |
Yes | Get recent session context |
add_dialogue_turn |
session_id: str, user_id: str, role: str, content: str |
None |
Yes | Add a dialogue turn to session history |
get_dialogue_history |
session_id: str, user_id: str |
List[Dict] |
Yes | Get full dialogue history |
WorkflowACPAdapter¶
File: src/acp/integration/workflow_adapter.py
Purpose: Bridges ACP to the MultiScenarioCopilot workflow engine. Maps ACP tool calls to scenario executions, manages execution plans, and sends progress notifications.
from src.acp.integration.workflow_adapter import WorkflowACPAdapter
adapter = WorkflowACPAdapter()
Constructor Parameters: None.
Key Methods:
# Get tool kind for a scenario
kind = adapter.get_tool_kind("scenario_1") # "search", "execute", "analyze", etc.
# Get human-readable tool title
title = adapter.get_tool_title("scenario_1") # e.g., "Code Search"
# Create plan entries for a scenario execution
plan = adapter.create_plan_entries("scenario_1")
# Returns: list of PlanEntry dicts
# Execute workflow with streaming updates
result = await adapter.execute_with_updates(
query="...", context={}, callback=notification_callback, tool_call_id="tc-1"
)
# Returns: execution result dict
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
get_tool_kind |
scenario_id: str |
str |
No | Get tool kind classification for a scenario |
get_tool_title |
scenario_id: str |
str |
No | Get human-readable title for a scenario |
create_plan_entries |
scenario_id: str |
List[Dict] |
No | Create execution plan entries |
execute_with_updates |
query: str, context: Dict, callback: Optional[Callable], tool_call_id: str |
Dict |
Yes | Execute workflow with streaming progress updates |
MCPBridge¶
File: src/acp/integration/mcp_bridge.py
Purpose: Bridges ACP to external MCP (Model Context Protocol) servers. Manages connections to multiple MCP servers, aggregates their tools and resources, and proxies tool calls and resource reads.
from src.acp.integration.mcp_bridge import MCPBridge
bridge = MCPBridge()
Constructor Parameters: None.
Key Methods:
# Connect to MCP servers
connected = await bridge.connect_servers([
{"id": "server1", "name": "My MCP", "transport": "stdio",
"command": "node", "args": ["server.js"]},
{"id": "server2", "name": "Remote MCP", "transport": "http",
"url": "http://localhost:3000"}
])
# Returns: number of successfully connected servers
# Get all available tools across all servers
tools = bridge.get_all_tools() # List[MCPTool]
# Get all available resources across all servers
resources = bridge.get_all_resources() # List[MCPResource]
# Call a tool on a specific server
result = await bridge.call_tool("server1", "tool_name", {"arg": "value"})
# Read a resource from a specific server
content = await bridge.read_resource("server1", "resource://uri")
# Disconnect all servers
await bridge.disconnect_all()
Method Reference:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
connect_servers |
servers: List[Dict] |
int |
Yes | Connect to MCP servers, returns count of successful connections |
get_all_tools |
— | List[MCPTool] |
No | Get all tools from all connected servers |
get_all_resources |
— | List[MCPResource] |
No | Get all resources from all connected servers |
call_tool |
server_id: str, tool_name: str, arguments: Dict |
Any |
Yes | Call a tool on a specific server |
read_resource |
server_id: str, uri: str |
Any |
Yes | Read a resource from a specific server |
disconnect_all |
— | None |
Yes | Disconnect all MCP servers |
Dataclasses:
MCPServer:
| Field | Type | Description |
|---|---|---|
id |
str |
Unique server identifier |
name |
str |
Human-readable server name |
transport |
str |
Transport type: "stdio", "http", or "ws" |
command |
Optional[str] |
Command to spawn (for stdio transport) |
args |
Optional[List[str]] |
Command arguments (for stdio transport) |
url |
Optional[str] |
Server URL (for http/ws transport) |
headers |
Optional[Dict[str, str]] |
HTTP headers (for http transport) |
env |
Optional[Dict[str, str]] |
Environment variables (for stdio transport) |
MCPTool:
| Field | Type | Description |
|---|---|---|
name |
str |
Tool name |
description |
str |
Tool description |
input_schema |
Dict |
JSON Schema for tool input parameters |
server_id |
str |
ID of the server providing this tool |
MCPResource:
| Field | Type | Description |
|---|---|---|
uri |
str |
Resource URI |
name |
str |
Resource name |
description |
str |
Resource description |
mime_type |
Optional[str] |
MIME type of the resource content |
server_id |
str |
ID of the server providing this resource |
MCPConnection:
| Field | Type | Description |
|---|---|---|
server |
MCPServer |
Server configuration |
Manages the connection lifecycle (connect, disconnect, health check) for a single MCP server.
ACP CLI¶
File: src/acp/cli.py
Purpose: Command-line entry point for the ACP server. Parses arguments and starts the appropriate transport.
from src.acp.cli import setup_logging, main, run
# Functions
setup_logging() # Configure logging for ACP server
await main() # Async entry point — parses args and starts transport
run() # Sync wrapper — calls asyncio.run(main())
Function Reference:
| Function | Parameters | Returns | Async | Description |
|---|---|---|---|---|
setup_logging |
— | None |
No | Configure logging for ACP server with appropriate formatters and handlers |
main |
— | None |
Yes | Async entry point: parse CLI arguments, create agent, start transport |
run |
— | None |
No | Sync wrapper that calls asyncio.run(main()) |
CLI Arguments:
| Argument | Type | Default | Description |
|---|---|---|---|
--transport |
str |
"stdio" |
Transport type: stdio, http, or ws |
--host |
str |
"0.0.0.0" |
Host to bind (HTTP/WS only) |
--port |
int |
8080 |
Port to bind (HTTP/WS only) |
--log-level |
str |
"info" |
Logging level: debug, info, warning, error |
Domain-Specific Agents¶
Security Agents¶
File: src/security/security_agents.py
Purpose: Four specialized agents for security vulnerability detection, data flow analysis, reporting, and remediation advice. These agents use CPG data to identify security issues and produce structured findings.
SecurityScanner¶
The primary entry point for security analysis. Scans CPG data for known vulnerability patterns.
from src.security.security_agents import SecurityScanner
scanner = SecurityScanner(cpg_service: Optional[CPGQueryService] = None)
# Context manager support for automatic resource cleanup
with SecurityScanner(cpg_service) as scanner:
findings = scanner.scan_all_patterns(limit_per_pattern=50)
# Returns: List[SecurityFinding]
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
Optional[CPGQueryService] |
None |
CPG query service for graph access |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
scan_all_patterns |
limit_per_pattern: int = 50 |
List[SecurityFinding] |
No | Scan all known security patterns |
scan_pattern |
pattern: str, limit: int = 50 |
List[SecurityFinding] |
No | Scan a specific pattern |
Patterns detected:
- SQL Injection (CWE-89)
- Buffer Overflow (CWE-120)
- Command Injection (CWE-78)
- Format String (CWE-134)
- Path Traversal (CWE-22)
- Use After Free (CWE-416)
- Integer Overflow (CWE-190)
- Cross-Site Scripting (CWE-79)
- And additional patterns from SECURITY_PATTERNS
DataFlowAnalyzer¶
Traces data flows from taint sources to sinks through the CPG.
from src.security.security_agents import DataFlowAnalyzer
analyzer = DataFlowAnalyzer(cpg_service: Optional[CPGQueryService] = None)
paths = analyzer.trace_taint_flows(limit=100)
# Returns: List[DataFlowPath]
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
Optional[CPGQueryService] |
None |
CPG query service for graph access |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
trace_taint_flows |
limit: int = 100 |
List[DataFlowPath] |
No | Trace all taint flows from sources to sinks |
trace_flow |
source: str, sink: str |
Optional[DataFlowPath] |
No | Trace a specific source-to-sink flow |
VulnerabilityReporter¶
Generates structured vulnerability reports from findings and data flow paths.
from src.security.security_agents import VulnerabilityReporter
reporter = VulnerabilityReporter()
report = reporter.generate_report(findings, data_flow_paths)
# Returns: VulnerabilityReport
Constructor Parameters: None.
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
generate_report |
findings: List[SecurityFinding], data_flows: List[DataFlowPath] |
VulnerabilityReport |
No | Generate a structured vulnerability report |
RemediationAdvisor¶
Suggests fixes for identified vulnerabilities.
from src.security.security_agents import RemediationAdvisor
advisor = RemediationAdvisor()
advice = advisor.get_remediation_advice(finding)
# Returns: RemediationAdvice
plan = advisor.get_bulk_remediation_plan(findings)
# Returns: List[RemediationAdvice]
Constructor Parameters: None.
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
get_remediation_advice |
finding: SecurityFinding |
RemediationAdvice |
No | Get remediation advice for a specific finding |
get_bulk_remediation_plan |
findings: List[SecurityFinding] |
List[RemediationAdvice] |
No | Get advice for multiple findings |
Data Models:
SecurityFinding:
| Field | Type | Description |
|---|---|---|
finding_id |
str |
Unique identifier for the finding |
pattern_name |
str |
Name of the security pattern matched |
category |
str |
Category (e.g., injection, buffer, crypto) |
severity |
str |
Severity level: critical, high, medium, low |
method_name |
str |
Name of the affected method |
filename |
str |
Source file containing the vulnerability |
cwe_ids |
List[str] |
Associated CWE identifiers |
confidence |
float |
Detection confidence (0.0 to 1.0) |
DataFlowPath:
| Field | Type | Description |
|---|---|---|
source_method |
str |
Taint source method |
sink_method |
str |
Taint sink method |
path_length |
int |
Number of hops in the flow |
taint_type |
str |
Type of taint (e.g., user_input, file_read) |
sanitized |
bool |
Whether the flow passes through a sanitizer |
VulnerabilityReport:
| Field | Type | Description |
|---|---|---|
total_findings |
int |
Total number of findings |
critical_count |
int |
Number of critical findings |
high_count |
int |
Number of high findings |
medium_count |
int |
Number of medium findings |
low_count |
int |
Number of low findings |
findings |
List[SecurityFinding] |
All findings |
data_flows |
List[DataFlowPath] |
All data flow paths |
RemediationAdvice:
| Field | Type | Description |
|---|---|---|
remediation_steps |
List[str] |
Ordered steps to fix the vulnerability |
code_example |
str |
Example code showing the fix |
references |
List[str] |
External references (CVEs, documentation) |
estimated_effort |
str |
Effort estimate: low, medium, high |
priority |
int |
Remediation priority (1 = highest) |
Performance Agents¶
Package: src/performance/agents/ (facade: src/performance/performance_agents.py)
Purpose: Three agents for performance bottleneck detection, resource analysis, and optimization recommendations.
PerformanceProfiler¶
File: src/performance/agents/profiler.py
The primary entry point for performance analysis. Scans CPG data for performance bottleneck patterns.
from src.performance.agents import PerformanceProfiler
profiler = PerformanceProfiler(cpg_service: Optional[CPGQueryService] = None)
# Context manager support
with PerformanceProfiler(cpg_service) as profiler:
bottlenecks = profiler.profile_all_bottlenecks(limit_per_pattern=30)
# Returns: List[BottleneckFinding]
pattern_findings = profiler.profile_pattern(pattern, limit=30)
# Returns: List[BottleneckFinding]
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
Optional[CPGQueryService] |
None |
CPG query service for graph access |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
profile_all_bottlenecks |
limit_per_pattern: int = 30 |
List[BottleneckFinding] |
No | Profile all known bottleneck patterns |
profile_pattern |
pattern: str, limit: int = 30 |
List[BottleneckFinding] |
No | Profile a specific bottleneck pattern |
ResourceAnalyzer¶
File: src/performance/agents/resource_analyzer.py
Analyzes resource usage of individual methods.
from src.performance.agents import ResourceAnalyzer
analyzer = ResourceAnalyzer(cpg_service: Optional[CPGQueryService] = None)
# Context manager support
with ResourceAnalyzer(cpg_service) as analyzer:
usage = analyzer.analyze_method_resources("method_name", filename="file.py")
# Returns: ResourceUsage
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
Optional[CPGQueryService] |
None |
CPG query service for graph access |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
analyze_method_resources |
method_name: str, filename: Optional[str] = None |
ResourceUsage |
No | Analyze resource usage of a specific method |
analyze_all_resources |
limit: int = 50 |
List[ResourceUsage] |
No | Analyze resource usage for top methods |
OptimizationAdvisor¶
File: src/performance/agents/optimizer.py
Creates optimization plans and generates performance reports from profiling results.
from src.performance.agents import OptimizationAdvisor
advisor = OptimizationAdvisor()
plan = advisor.create_optimization_plan(findings, resource_analyses)
# Returns: List[OptimizationRecommendation]
report = advisor.generate_report(findings, resource_analyses)
# Returns: PerformanceReport
Constructor Parameters: None.
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
create_optimization_plan |
findings: List[BottleneckFinding], resource_analyses: List[ResourceUsage] |
List[OptimizationRecommendation] |
No | Create an optimization plan |
generate_report |
findings: List[BottleneckFinding], resource_analyses: List[ResourceUsage] |
PerformanceReport |
No | Generate a performance report |
Data Models:
BottleneckFinding:
| Field | Type | Description |
|---|---|---|
pattern_name |
str |
Name of the bottleneck pattern |
category |
str |
Category (e.g., cpu, memory, io, concurrency) |
severity |
str |
Severity: critical, high, medium, low |
method_name |
str |
Affected method name |
optimization_technique |
str |
Suggested optimization technique |
potential_speedup |
str |
Estimated speedup (e.g., "2x", "10%") |
ResourceUsage:
| Field | Type | Description |
|---|---|---|
method_name |
str |
Method being analyzed |
complexity_score |
float |
Computed complexity score |
call_count |
int |
Number of calls to this method |
estimated_memory_impact |
str |
Memory impact estimate: low, medium, high |
resource_intensity |
str |
Overall resource intensity: light, moderate, heavy |
OptimizationRecommendation:
| Field | Type | Description |
|---|---|---|
optimization_steps |
List[str] |
Ordered optimization steps |
code_example |
str |
Example code showing the optimization |
estimated_speedup |
str |
Estimated speedup from this optimization |
priority |
int |
Recommendation priority (1 = highest) |
risk_level |
str |
Risk of regression: low, medium, high |
PerformanceReport:
| Field | Type | Description |
|---|---|---|
total_bottlenecks |
int |
Total number of bottlenecks found |
by_severity |
Dict[str, int] |
Counts by severity level |
findings |
List[BottleneckFinding] |
All findings |
recommendations |
List[OptimizationRecommendation] |
All recommendations |
summary |
str |
Human-readable summary |
ProfilingResult:
| Field | Type | Description |
|---|---|---|
function_name |
str |
Name of the profiled function |
total_calls |
int |
Total number of calls observed |
total_time |
float |
Total execution time in seconds |
bottleneck_score |
float |
Computed bottleneck score (0.0 to 1.0) |
MemoryProfilingResult:
| Field | Type | Description |
|---|---|---|
function_name |
str |
Name of the profiled function |
memory_usage_mb |
float |
Memory usage in megabytes |
allocations |
int |
Number of memory allocations |
allocation_rate |
float |
Allocations per second |
PerformanceBaseline:
| Field | Type | Description |
|---|---|---|
method_name |
str |
Method name |
execution_time_ms |
float |
Baseline execution time in milliseconds |
memory_usage_mb |
float |
Baseline memory usage in megabytes |
PerformanceTrend:
| Field | Type | Description |
|---|---|---|
baseline |
PerformanceBaseline |
Reference baseline |
current |
PerformanceBaseline |
Current measurement |
time_delta_percent |
float |
Percentage change in execution time |
regression_detected |
bool |
Whether a regression was detected |
Utility Function:
from src.performance.agents import run_complete_performance_analysis
report = run_complete_performance_analysis(cpg_service, limit=30)
# Returns: PerformanceReport with findings and recommendations
This convenience function runs the full performance analysis pipeline: profiling, resource analysis, optimization planning, and report generation.
Architecture Agents¶
Package: src/architecture/agents/ (facade: src/architecture/architecture_agents.py)
Purpose: Three agents for dependency analysis, layer validation, and architecture reporting. These agents detect architectural violations like circular dependencies, god modules, and layer boundary violations.
DependencyAnalyzer¶
Analyzes module dependencies and detects architectural violations.
from src.architecture.agents import DependencyAnalyzer
analyzer = DependencyAnalyzer(cpg_service)
violations = analyzer.detect_all_violations(limit_per_pattern=20)
# Returns: List[ViolationFinding]
analysis = analyzer.calculate_dependency_metrics(violations)
# Returns: DependencyAnalysis
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
CPGQueryService |
Required | CPG query service for graph access |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
detect_all_violations |
limit_per_pattern: int = 20 |
List[ViolationFinding] |
No | Detect all architectural violations |
detect_circular_dependencies |
limit: int = 20 |
List[ViolationFinding] |
No | Detect circular dependency violations |
calculate_dependency_metrics |
violations: List[ViolationFinding] |
DependencyAnalysis |
No | Calculate dependency metrics from violations |
LayerValidator¶
Validates that module dependencies respect layer boundaries.
from src.architecture.agents import LayerValidator
validator = LayerValidator(
cpg_service,
layer_hierarchy: Optional[Dict[str, int]] = None # Custom layer ordering
)
violations = validator.validate_all_layers(limit=20)
# Returns: List[ViolationFinding]
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
cpg_service |
CPGQueryService |
Required | CPG query service for graph access |
layer_hierarchy |
Optional[Dict[str, int]] |
None |
Custom layer ordering. Keys are layer names, values are numeric levels (higher = more abstract). When None, uses a default hierarchy |
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
validate_all_layers |
limit: int = 20 |
List[ViolationFinding] |
No | Validate all layer boundary rules |
validate_layer |
layer_name: str, limit: int = 20 |
List[ViolationFinding] |
No | Validate a specific layer |
ArchitectureReporter¶
Generates architecture reports and remediation plans from findings and metrics.
from src.architecture.agents import ArchitectureReporter
reporter = ArchitectureReporter()
report = reporter.generate_report(findings, dependency_analysis, layer_metrics)
# Returns: ArchitectureReport
plan = reporter.create_remediation_plan(findings)
# Returns: List[RemediationAction]
Constructor Parameters: None.
Key Methods:
| Method | Parameters | Returns | Async | Description |
|---|---|---|---|---|
generate_report |
findings: List[ViolationFinding], dependency_analysis: DependencyAnalysis, layer_metrics: Optional[Dict] = None |
ArchitectureReport |
No | Generate an architecture report |
create_remediation_plan |
findings: List[ViolationFinding] |
List[RemediationAction] |
No | Create a remediation plan for violations |
Data Models:
ViolationFinding:
| Field | Type | Description |
|---|---|---|
pattern_name |
str |
Name of the violation pattern |
category |
str |
Category (e.g., circular_dependency, layer_violation, god_module) |
severity |
str |
Severity: critical, high, medium, low |
module_a |
str |
First module involved |
module_b |
str |
Second module involved |
violation_details |
str |
Human-readable description of the violation |
DependencyMetrics:
| Field | Type | Description |
|---|---|---|
module_name |
str |
Module being analyzed |
fan_in |
int |
Number of modules that depend on this module |
fan_out |
int |
Number of modules this module depends on |
instability |
float |
Instability metric: fan_out / (fan_in + fan_out) (0.0 = stable, 1.0 = unstable) |
coupling_score |
float |
Overall coupling score |
is_god_module |
bool |
Whether this module has excessive responsibilities |
DependencyAnalysis:
| Field | Type | Description |
|---|---|---|
total_modules |
int |
Total number of modules analyzed |
circular_dependency_count |
int |
Number of circular dependencies detected |
god_module_count |
int |
Number of god modules detected |
module_metrics |
List[DependencyMetrics] |
Per-module metrics |
LayerRule:
| Field | Type | Description |
|---|---|---|
from_layer |
str |
Source layer name |
to_layer |
str |
Target layer name |
allowed |
bool |
Whether this dependency direction is allowed |
description |
str |
Human-readable rule description |
RemediationAction:
| Field | Type | Description |
|---|---|---|
title |
str |
Action title |
steps |
List[str] |
Ordered remediation steps |
priority |
int |
Action priority (1 = highest) |
estimated_effort |
str |
Effort estimate: low, medium, high |
risk_level |
str |
Risk of the remediation: low, medium, high |
ArchitectureReport:
| Field | Type | Description |
|---|---|---|
total_violations |
int |
Total number of violations found |
findings |
List[ViolationFinding] |
All violation findings |
dependency_analysis |
DependencyAnalysis |
Dependency metrics and analysis |
remediation_actions |
List[RemediationAction] |
Suggested remediation actions |
summary |
str |
Human-readable summary |
Configuration Reference¶
AgentConfidenceConfig¶
File: src/config/unified_config.py
The AgentConfidenceConfig section of the unified configuration controls confidence calculation parameters used by the AnalyzerAgent and ConfidenceCalculator.
from src.config import get_unified_config
cfg = get_unified_config()
cfg.agent_confidence.query_mode_base # 0.5 — Base confidence for query mode classification
cfg.agent_confidence.query_mode_diff_factor # 0.1 — Factor for confidence differential
cfg.agent_confidence.query_mode_score_factor # 0.05 — Factor for raw score contribution
cfg.agent_confidence.pattern_match_confidence # 0.75 — Confidence when a known pattern is matched
cfg.agent_confidence.traversal_confidence # 0.65 — Confidence for graph traversal results
Configuration Fields:
| Field | Type | Default | Description |
|---|---|---|---|
query_mode_base |
float |
0.5 |
Base confidence score for query mode classification |
query_mode_diff_factor |
float |
0.1 |
Multiplier for the difference between semantic and structural scores |
query_mode_score_factor |
float |
0.05 |
Multiplier for the raw classification score |
pattern_match_confidence |
float |
0.75 |
Default confidence when a known pattern is matched by the PatternConfidenceStrategy |
traversal_confidence |
float |
0.65 |
Default confidence for results obtained via graph traversal |
YAML Configuration:
agent_confidence:
query_mode_base: 0.5
query_mode_diff_factor: 0.1
query_mode_score_factor: 0.05
pattern_match_confidence: 0.75
traversal_confidence: 0.65
ACPDiagnosticsConfig¶
File: src/config/unified_config.py
The ACPDiagnosticsConfig section controls which diagnostic categories are enabled, debounce timing, and threshold values for the DiagnosticsEngine.
from src.config import get_unified_config
cfg = get_unified_config()
# General settings
cfg.acp_diagnostics.enabled # True — Master switch for diagnostics
cfg.acp_diagnostics.debounce_ms # 2000 — Debounce interval in milliseconds
# Diagnostic categories (True = enabled, False = disabled)
cfg.acp_diagnostics.categories["taint"] # True
cfg.acp_diagnostics.categories["complexity"] # True
cfg.acp_diagnostics.categories["deprecated"] # True
cfg.acp_diagnostics.categories["todo"] # False
cfg.acp_diagnostics.categories["dead_code"] # False
# Thresholds
cfg.acp_diagnostics.thresholds["complexity_warning"] # 15
cfg.acp_diagnostics.thresholds["complexity_error"] # 25
cfg.acp_diagnostics.thresholds["fan_in_warning"] # 20
General Settings:
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
True |
Master switch to enable or disable all diagnostics |
debounce_ms |
int |
2000 |
Minimum interval between diagnostic generations for the same file, in milliseconds |
Diagnostic Categories:
| Category | Default | Description |
|---|---|---|
taint |
True |
Taint flow detection (source-to-sink data flows) |
complexity |
True |
Cyclomatic complexity analysis |
deprecated |
True |
Deprecated API usage detection |
todo |
False |
TODO/FIXME comment detection |
dead_code |
False |
Unreachable or unused code detection |
Thresholds:
| Threshold | Default | Description |
|---|---|---|
complexity_warning |
15 |
Cyclomatic complexity level that triggers a warning diagnostic |
complexity_error |
25 |
Cyclomatic complexity level that triggers an error diagnostic |
fan_in_warning |
20 |
Fan-in count that triggers a warning about high coupling |
YAML Configuration:
acp_diagnostics:
enabled: true
debounce_ms: 2000
categories:
taint: true
complexity: true
deprecated: true
todo: false
dead_code: false
thresholds:
complexity_warning: 15
complexity_error: 25
fan_in_warning: 20
Extending the Agent System¶
Adding a New Pipeline Agent¶
To add a new pipeline agent, create a new module in src/agents/:
from typing import Dict, List, Optional
from src.config import CPGConfig, get_global_cpg_config
class CustomAgent:
"""Custom pipeline agent for specialized analysis."""
def __init__(self, cpg_config: Optional[CPGConfig] = None):
self.cpg_config = cpg_config or get_global_cpg_config()
def process(self, question: str, context: Dict) -> Dict:
"""Process question with context and return results.
Args:
question: The user's natural language question.
context: Dictionary containing analysis, enrichment hints,
and other pipeline state.
Returns:
Dictionary with 'results' list and 'metadata' dict.
"""
results = self._analyze(question, context)
return {
"results": results,
"metadata": {
"agent": "custom",
"result_count": len(results),
},
}
def _analyze(self, question: str, context: Dict) -> List[Dict]:
"""Internal analysis method.
Implement your custom analysis logic here.
"""
return []
Integrate into the workflow by passing the agent to MultiScenarioCopilot or calling it from a scenario handler in src/workflow/scenarios/.
Adding ACP Handlers¶
To add a new JSON-RPC handler to the ACP agent, add a new entry to the _handlers dict in CodeGraphACPAgent:
# In src/acp/server/agent.py, inside CodeGraphACPAgent.__init__()
self._handlers["custom/method"] = self._handle_custom
Then implement the handler method:
async def _handle_custom(self, params: Dict, user_id: Optional[str] = None) -> Dict:
"""Handle the custom/method JSON-RPC call.
Args:
params: JSON-RPC parameters from the client.
user_id: Authenticated user ID (if available).
Returns:
Result dictionary to send back to the client.
"""
# Validate params
required_field = params.get("required_field")
if not required_field:
return {"error": "required_field is required"}
# Process the request
result = await self._process_custom(required_field)
return {"status": "ok", "result": result}
The handler will be automatically dispatched when the client sends a custom/method JSON-RPC request.
Next Steps¶
- Workflows Reference — Workflow orchestration and scenarios
- API Reference — REST API documentation
- MCP Tools — MCP server tools reference