Agents Reference

Detailed documentation for CodeGraph’s agent system: pipeline agents, ACP protocol, and domain-specific analyzers. Covers all 48 files and 70+ classes across three architectural tiers.

Table of Contents


Overview

Agent System Architecture

CodeGraph’s agent system is organized into three tiers:

  1. Pipeline Agents (src/agents/) — 5 core agents plus 15 support modules that form the RAG query pipeline. These agents handle question analysis, hybrid retrieval, enrichment, query generation, and answer interpretation.

  2. ACP — Agent Client Protocol (src/acp/) — IDE integration layer using JSON-RPC 2.0. Supports Zed, JetBrains, and VS Code through stdio, HTTP, and WebSocket transports.

  3. Domain-Specific Agents — Specialized analyzers for security vulnerability detection, performance bottleneck analysis, and architecture validation.

graph TB
    subgraph "Pipeline Agents (src/agents/)"
        AA[AnalyzerAgent] --> RA[RetrieverAgent]
        RA --> EA[EnrichmentAgent]
        EA --> GA[GeneratorAgent]
        GA --> IA[InterpreterAgent]
        RA -.-> CCA[CallChainAnalyzer]
        RA -.-> CFG[ControlFlowGenerator]
        GA -.-> PB[PromptBuilder]
        GA -.-> QV[QueryVariantGenerator]
        IA -.-> AG[AnswerGenerator]
        IA -.-> RP[ResultParser]
    end
    subgraph "ACP Protocol (src/acp/)"
        IDE[IDE Client] -->|JSON-RPC| ACPA[CodeGraphACPAgent]
        ACPA --> SM[SessionManager]
        ACPA --> TM[ThreadManager]
        ACPA --> TO[TurnOrchestrator]
        ACPA --> DE[DiagnosticsEngine]
        ACPA --> HP[HoverProvider]
        ACPA -->|adapters| WA[WorkflowAdapter]
        WA --> MSC[MultiScenarioCopilot]
    end
    subgraph "Domain Agents"
        SS[SecurityScanner]
        PP[PerformanceProfiler]
        DA[DependencyAnalyzer]
    end

The pipeline agents process queries sequentially: AnalyzerAgent extracts intent and keywords, RetrieverAgent performs hybrid search across ChromaDB and DuckDB, EnrichmentAgent adds semantic tags, GeneratorAgent produces SQL queries, and InterpreterAgent converts results into natural language answers. Support modules provide specialized capabilities like call chain analysis, adaptive query refinement, and confidence scoring.

The ACP layer bridges IDE clients to the core pipeline through a JSON-RPC 2.0 protocol. It manages sessions, threads, turns, diagnostics, hover information, and file change approvals. Integration adapters connect the protocol to internal services like ChatService and the MultiScenarioCopilot workflow engine.

Domain-specific agents provide targeted analysis for security, performance, and architecture concerns, each with their own data models and reporting infrastructure.


Module Map

Package Files Key Classes Purpose
src/agents/ 20 AnalyzerAgent, RetrieverAgent, EnrichmentAgent, GeneratorAgent, InterpreterAgent + 15 support RAG query pipeline
src/agents/enrichment/ 6 EnrichmentAgent + helpers Tag-based enrichment
src/acp/server/ 8 CodeGraphACPAgent, ACPSessionManager, ACPThreadManager, TurnOrchestrator, DiagnosticsEngine, HoverProvider, ACPApprovalBridge, NotificationRouter ACP protocol server
src/acp/transport/ 4 BaseTransport, StdioTransport, HTTPTransport, WebSocketTransport Transport layer
src/acp/integration/ 4 ChatServiceACPAdapter, SessionACPAdapter, WorkflowACPAdapter, MCPBridge Integration adapters
src/security/ 1 SecurityScanner, DataFlowAnalyzer, VulnerabilityReporter, RemediationAdvisor Security analysis
src/performance/agents/ 4 PerformanceProfiler, ResourceAnalyzer, OptimizationAdvisor Performance analysis
src/architecture/agents/ 4 DependencyAnalyzer, LayerValidator, ArchitectureReporter Architecture analysis

Core Pipeline Agents

AnalyzerAgent

File: src/agents/analyzer_agent.py

Purpose: Question understanding and intent extraction. The AnalyzerAgent is the first stage in the pipeline, responsible for parsing natural language questions into structured analysis objects that downstream agents consume. It identifies the user’s intent, target domain, relevant keywords, query classification, and a confidence score.

from src.agents import AnalyzerAgent

analyzer = AnalyzerAgent(
    llm=None,                              # Optional LLM for advanced analysis
    cpg_config: Optional[CPGConfig] = None # Domain-specific config
)

Constructor Parameters:

Parameter Type Default Description
llm Optional[Any] None LLM interface for advanced analysis. When None, rule-based analysis is used
cpg_config Optional[CPGConfig] None Domain-specific CPG configuration. Falls back to get_global_cpg_config()

Key Methods:

# Analyze a question — returns intent, domain, keywords, confidence
analysis = analyzer.analyze("What methods handle transactions?")
# Returns: {'intent': 'find_methods', 'domain': 'transaction-manager',
#           'keywords': ['transaction'], 'query_type': 'semantic', 'confidence': 0.85}

# Classify query mode (semantic vs structural)
mode, confidence = analyzer.classify_query_mode("Find all callers of foo")
# Returns: ('structural', 0.9)

# LLM-enhanced analysis (requires llm parameter to be set)
analysis = analyzer.analyze_with_llm("Explain the transaction commit flow")
# Returns: enriched analysis dict with LLM-derived intent and domain

# Batch analysis — process multiple questions at once
results = analyzer.batch_analyze(["Q1...", "Q2...", "Q3..."])
# Returns: list of analysis dicts

# Get ChromaDB filter for domain
domain_filter = analyzer.get_domain_filter("transaction-manager")
# Returns: dict suitable for ChromaDB where clause

Method Reference:

Method Parameters Returns Async Description
analyze question: str Dict No Full question analysis
classify_query_mode question: str Tuple[str, float] No Classify as semantic or structural
analyze_with_llm question: str Dict No LLM-enhanced analysis
batch_analyze questions: List[str] List[Dict] No Analyze multiple questions
get_domain_filter domain: str Dict No ChromaDB filter for domain

Analysis Result Structure:

The analysis dictionary returned by analyze() contains the following fields:

Field Type Description
intent str Detected intent (e.g., find_methods, find_callers, explain-concept, find_definition)
domain str Target subsystem/domain (e.g., transaction-manager, locking, storage-engine)
keywords List[str] Extracted keywords relevant to the query
query_type str Query classification: semantic, structural, or security
confidence float Confidence score between 0.0 and 1.0

RetrieverAgent

File: src/agents/retriever_agent.py

Purpose: Hybrid retrieval combining semantic search (ChromaDB) and structural search (DuckDB). The RetrieverAgent is the second stage in the pipeline, responsible for finding relevant code snippets, Q&A pairs, and graph data based on the analysis from AnalyzerAgent. It supports multiple retrieval modes, result caching, and reranking.

from src.agents import RetrieverAgent

retriever = RetrieverAgent(
    vector_store=vector_store,           # VectorStoreReal instance
    analyzer_agent=analyzer,             # AnalyzerAgent instance
    cpg_service=None,                    # Optional CPGQueryService for graph queries
    cache_size: int = 128,               # Max cached entries
    cache_ttl_seconds: Optional[float] = None,  # Cache TTL (None = no expiration)
    enable_cache_metrics: bool = True,   # Collect cache metrics
    enable_hybrid: bool = True           # Enable hybrid retrieval
)

Constructor Parameters:

Parameter Type Default Description
vector_store VectorStoreReal Required ChromaDB vector store instance
analyzer_agent AnalyzerAgent Required Analyzer for question processing
cpg_service Optional[CPGQueryService] None DuckDB CPG service for graph queries
cache_size int 128 Maximum number of cached retrieval results
cache_ttl_seconds Optional[float] None Cache time-to-live in seconds. None means no expiration
enable_cache_metrics bool True Whether to collect cache hit/miss metrics
enable_hybrid bool True Whether to enable hybrid (vector + graph) retrieval

Key Methods:

# Basic vector retrieval
result = retriever.retrieve(question, analysis, top_k_qa=3, top_k_sql=5)

# Hybrid retrieval (vector + graph with RRF)
result = retriever.retrieve_hybrid(
    question="Find memory allocation patterns",
    analysis=analysis,
    mode="hybrid",           # hybrid | vector_only | graph_only
    query_type="structural", # Affects weight distribution
    top_k=10,
    use_ranker=True
)

# With reranking
result = retriever.retrieve_with_reranking(question, analysis,
    top_k_qa=5, top_k_sql=10, final_k_qa=3, final_k_sql=5)

# With enrichment-aware reranking
result = retriever.retrieve_with_enrichment(question, analysis, enrichment_hints)

# Keyword-based retrieval
result = retriever.retrieve_by_keywords(["lock", "acquire"], top_k=5)

# Cache management
stats = retriever.get_stats()
metrics = retriever.get_cache_metrics()
cleared = retriever.invalidate_cache(pattern="lock*")
warmed = retriever.warm_cache(["Q1", "Q2"])

Method Reference:

Method Parameters Returns Async Description
retrieve question, analysis, top_k_qa=3, top_k_sql=5 Dict No Basic vector retrieval
retrieve_hybrid question, analysis, mode, query_type, top_k, use_ranker Dict No Hybrid vector + graph retrieval with RRF merging
retrieve_with_reranking question, analysis, top_k_qa, top_k_sql, final_k_qa, final_k_sql Dict No Retrieval with post-retrieval reranking
retrieve_with_enrichment question, analysis, enrichment_hints Dict No Enrichment-aware retrieval and reranking
retrieve_by_keywords keywords: List[str], top_k: int Dict No Keyword-based retrieval
get_stats Dict No Retrieval statistics
get_cache_metrics Dict No Cache hit/miss metrics
invalidate_cache pattern: Optional[str] int No Invalidate cache entries matching pattern
warm_cache questions: List[str] int No Pre-populate cache for common questions

Weight Adaptation:

The RetrieverAgent dynamically adjusts the balance between vector (semantic) and graph (structural) search based on the query type:

Query Type Vector Weight Graph Weight Rationale
semantic 0.75 0.25 Favor semantic similarity for conceptual questions
structural 0.25 0.75 Favor graph traversal for call/data flow questions
security 0.50 0.50 Equal weight for security analysis
default 0.60 0.40 Slight semantic bias for general questions

Retrieval Result Structure:

{
    'results': [...],             # Raw retrieval results
    'ranked_results': [...],      # Ranked results with scores
    'retrieval_stats': {
        'total_retrieved': 15,
        'source_distribution': {'vector': 8, 'graph': 7},
        'cache_hit': False,
        'retrieval_time_ms': 42.5
    }
}

EnrichmentAgent

File: src/agents/enrichment/agent.py (facade: src/agents/enrichment_agent.py)

Purpose: Map questions to CPG enrichment tags for improved retrieval. The EnrichmentAgent analyzes questions and their analysis results to produce enrichment hints — structured metadata that helps downstream agents find more relevant results. It supports 12 enrichment layers and includes fallback strategies for low-coverage scenarios.

from src.agents import EnrichmentAgent

enrichment = EnrichmentAgent(
    enable_fallback: bool = True  # Enable Phase 4 fallback strategies
)

Constructor Parameters:

Parameter Type Default Description
enable_fallback bool True Enable Phase 4 fallback strategies for low-coverage enrichment

Key Methods:

# Get enrichment hints for a question
hints = enrichment.get_enrichment_hints(
    question="How does the lock manager work?",
    analysis={'intent': 'explain-concept', 'domain': 'locking', 'keywords': ['lock']}
)
# Returns dict with: function_purposes, data_structures, concurrency_patterns,
#                     memory_management, error_handling, security_attributes, etc.

Method Reference:

Method Parameters Returns Async Description
get_enrichment_hints question: str, analysis: Dict Dict No Generate enrichment hints from question and analysis

Enrichment Hints Structure:

The hints dictionary returned by get_enrichment_hints() may contain any of the following keys, each mapping to a list of tag strings:

Key Description Example Tags
function_purposes What the function does ["lock-acquire", "resource-management"]
data_structures Relevant data structures ["hash-table", "linked-list"]
concurrency_patterns Threading and synchronization ["mutex", "spinlock", "atomic-operation"]
memory_management Memory allocation patterns ["allocation", "deallocation", "pool"]
error_handling Error handling strategies ["exception-handler", "error-code"]
security_attributes Security-relevant properties ["input-validation", "taint-source"]
transaction_boundaries Transaction demarcation ["begin-transaction", "commit", "rollback"]
performance_indicators Performance characteristics ["hot-path", "blocking-operation"]
acid_properties ACID compliance markers ["atomicity", "isolation"]
lock_primitives Lock types and operations ["shared-lock", "exclusive-lock"]
data_flow_categories Data flow classification ["source", "sink", "transform"]
control_flow_patterns Control flow characteristics ["loop", "branch", "recursion"]

Enrichment Subpackage (src/agents/enrichment/):

The enrichment subpackage contains six modules that support the main EnrichmentAgent:

File Function Purpose
agent.py EnrichmentAgent class Main enrichment agent implementation
coverage.py calculate_coverage(hints) Calculate tag coverage score (0.0 to 1.0) indicating how many enrichment layers have tags
fallback.py general_domain_fallback(hints) Apply fallback enrichment for low-coverage scenarios using general domain knowledge
keyword_matchers.py enhance_with_keywords(hints, keywords) Add enrichment tags based on keyword matching against known patterns
prompt_formatter.py format_for_prompt(hints) Format enrichment hints as text suitable for inclusion in LLM prompts
tag_filters.py generate_tag_filters(hints) Generate ChromaDB tag filters from enrichment hints for filtered retrieval

Coverage Calculation:

The calculate_coverage() function returns a score between 0.0 and 1.0 representing the proportion of enrichment layers that have at least one tag. When coverage falls below a threshold, the fallback system activates to ensure adequate enrichment.


GeneratorAgent

File: src/agents/generator_agent.py

Purpose: Generate SQL queries from natural language using enriched context. The GeneratorAgent takes the question, analysis, and enrichment context to produce DuckDB SQL queries against the CPG database. It supports retry logic, batch generation, query variant generation (Query Funnel), and tag effectiveness feedback.

from src.agents import GeneratorAgent

generator = GeneratorAgent(
    sql_generator: SQLQueryGenerator,    # SQL generation backend
    enable_feedback: bool = True,        # Enable tag feedback tracking
    use_semantic: bool = False           # Use semantic mode
)

Constructor Parameters:

Parameter Type Default Description
sql_generator SQLQueryGenerator Required SQL generation backend that produces actual queries
enable_feedback bool True Enable tag effectiveness tracking via TagEffectivenessTracker
use_semantic bool False Use semantic mode for query generation

Key Methods:

# Generate SQL query
query, is_valid, error = generator.generate(question, context)
# Returns: (query_string, validation_bool, error_message_or_none)

# With retries on failure
query, is_valid, error, attempts = generator.generate_with_retries(
    question, context, max_retries=2
)
# Returns: (query_string, validation_bool, error_message_or_none, attempt_count)

# Batch generation for multiple questions
results = generator.generate_batch(questions, contexts)
# Returns: list of (query, is_valid, error) tuples

# Query Funnel — generate multiple query variants
variants = generator.generate_query_variants(question, context, num_variants=3)
# Returns: [{'query': '...', 'strategy': 'precise'}, ...]

# Explain a generated query in natural language
explanation = generator.explain_query(query, context)
# Returns: string explanation of what the query does

Method Reference:

Method Parameters Returns Async Description
generate question: str, context: Dict Tuple[str, bool, Optional[str]] No Generate a single SQL query
generate_with_retries question, context, max_retries=2 Tuple[str, bool, Optional[str], int] No Generate with automatic retries on failure
generate_batch questions: List[str], contexts: List[Dict] List[Tuple] No Batch generation for multiple questions
generate_query_variants question, context, num_variants=3 List[Dict] No Generate multiple query variants (Query Funnel)
explain_query query: str, context: Dict str No Explain a query in natural language

Delegates to: - PromptBuilder — formats prompts for the SQL generation backend - QueryVariantGenerator — generates multiple query variants with different strategies - TagEffectivenessTracker — records which enrichment tags lead to successful queries


InterpreterAgent

File: src/agents/interpreter_agent.py

Purpose: Convert query execution results to natural language answers. The InterpreterAgent is the final stage in the core pipeline, responsible for transforming raw query results into human-readable responses. It supports LLM-based generation and template-based fallback.

from src.agents.interpreter_agent import InterpreterAgent

interpreter = InterpreterAgent(
    llm_interface=None,                  # Optional LLM (falls back to templates)
    use_semantic: bool = False,          # Semantic mode (extract comments)
    cpg_config: Optional[CPGConfig] = None  # Domain-specific config
)

Constructor Parameters:

Parameter Type Default Description
llm_interface Optional[Any] None LLM interface for generating answers. When None, template-based generation is used
use_semantic bool False Enable semantic mode for comment extraction and interpretation
cpg_config Optional[CPGConfig] None Domain-specific CPG configuration

Delegates to three helper classes:

Class File Purpose
ResultParser src/agents/result_parser.py Parse various result formats (function lists, semantic results, raw data)
AnswerGenerator src/agents/answer_generator.py Generate natural language summaries using LLM or templates
SemanticInterpreter src/agents/semantic_interpreter.py Handle semantic query interpretation with comment extraction

Key Methods:

# Interpret query results into a natural language answer
answer = interpreter.interpret(
    question="What methods call LWLockAcquire?",
    results=[...],         # Raw query results
    query="SELECT...",     # The executed query
    enrichment_hints=None  # Optional enrichment context
)
# Returns: {
#     'answer': 'LWLockAcquire is called by 15 methods including...',
#     'confidence': 0.85,
#     'sources': [{'method': 'heap_insert', 'file': 'heapam.c', 'line': 234}, ...]
# }

Method Reference:

Method Parameters Returns Async Description
interpret question, results, query, enrichment_hints=None Dict No Full interpretation pipeline

Pipeline Support Modules

CallChainAnalyzer

File: src/agents/call_chain_analyzer.py

Purpose: Build call chains and call graphs from query results. Takes entry point results, keyword method results, and call graph edges to construct adjacency lists, call chains, and identify key functions.

from src.agents.call_chain_analyzer import CallChainAnalyzer

analyzer = CallChainAnalyzer()  # No parameters

result = analyzer.analyze(
    entry_point_result={"method": "main", ...},
    keyword_methods_result=[{"method": "foo", ...}, ...],
    call_graph_result=[{"caller": "a", "callee": "b"}, ...],
    question_keywords=["transaction", "commit"]
)

Constructor Parameters: None.

Key Methods:

Method Parameters Returns Async Description
analyze entry_point_result, keyword_methods_result, call_graph_result, question_keywords Dict No Full call chain analysis

Analysis Result Structure:

{
    'entry_point': 'main',                    # Detected entry point function
    'call_graph': {                           # Adjacency list representation
        'main': ['init', 'process'],
        'process': ['validate', 'commit'],
        ...
    },
    'call_chains': [                          # Paths through the call graph
        ['main', 'process', 'commit'],
        ['main', 'process', 'validate'],
        ...
    ],
    'key_functions': ['commit', 'validate'],  # Functions matching keywords
    'metadata': {
        'total_functions': 12,
        'max_chain_depth': 5,
        'keyword_matches': 3
    }
}

ControlFlowGenerator

File: src/agents/control_flow_generator.py

Purpose: Generate SQL queries for control flow analysis and call chain tracing. Produces specialized DuckDB queries for entry point detection, keyword method search, and call graph construction.

from src.agents.control_flow_generator import ControlFlowGenerator

generator = ControlFlowGenerator(llm=None)

Constructor Parameters:

Parameter Type Default Description
llm Optional[Any] None Optional LLM for advanced query generation

Key Methods:

Method Parameters Returns Async Description
generate_entry_point_query question: str, analysis: Dict str No Generate SQL to find entry point functions
generate_keyword_methods_query keywords: List[str] str No Generate SQL to find methods matching keywords
generate_call_graph_query method_name: str, max_depth: int str No Generate SQL for call graph traversal

LogicSynthesizer

File: src/agents/logic_synthesizer.py

Purpose: Synthesize natural language explanations from call chain analysis. Takes the output of CallChainAnalyzer and produces readable explanations of code execution flows, optionally using an LLM for enhanced generation.

from src.agents.logic_synthesizer import LogicSynthesizer

synthesizer = LogicSynthesizer(
    llm: Optional[Any] = None,   # LLM for generating explanations
    language: str = "en"         # Language for prompts
)

Constructor Parameters:

Parameter Type Default Description
llm Optional[Any] None LLM interface for generating explanations. Falls back to templates when None
language str "en" Language for prompt templates ("en" or "ru")

Key Methods:

result = synthesizer.synthesize(
    question="How does the shutdown sequence work?",
    call_chain_analysis=call_chain_result,  # From CallChainAnalyzer.analyze()
    max_tokens=1000
)
# Returns: {'explanation': '...', 'metadata': {...}}

Method Reference:

Method Parameters Returns Async Description
synthesize question: str, call_chain_analysis: Dict, max_tokens: int = 1000 Dict No Synthesize explanation from call chain analysis

Helper Function:

from src.agents.logic_synthesizer import format_call_chain_for_prompt

formatted = format_call_chain_for_prompt(analysis_result)
# Returns dict with: entry_point, call_graph, call_chains, key_functions

The format_call_chain_for_prompt() function takes a call chain analysis result and formats it as a structured dictionary suitable for inclusion in LLM prompts. It extracts the entry point, formats the call graph as readable text, lists call chains, and highlights key functions.


AdaptiveQueryRefiner

File: src/agents/adaptive_refiner.py

Purpose: Learn from query outcomes and suggest better query patterns. The AdaptiveQueryRefiner maintains a knowledge base of query patterns, tracking success rates and execution characteristics. Over time, it learns which query strategies work best for different question types and can suggest refinements for failed queries.

from src.agents.adaptive_refiner import AdaptiveQueryRefiner

refiner = AdaptiveQueryRefiner(
    persistence_path: Optional[str] = None  # Path to persist patterns
)

Constructor Parameters:

Parameter Type Default Description
persistence_path Optional[str] None File path for persisting learned patterns. When None, patterns are kept in memory only

Key Methods:

# Record query outcome for learning
refiner.record_query_outcome(
    question="...", question_type="find_callers", query="SELECT...",
    success=True, result_count=15, execution_time=0.5
)

# Suggest refinements for a failed query
suggestions = refiner.suggest_refinements(
    question="...", question_type="find_callers",
    failed_query="SELECT...", max_suggestions=3
)
# Returns: list of suggested query strings

# Get best pattern for a question type
pattern = refiner.get_best_pattern_for_type("find_callers")
# Returns: QueryPattern or None

# Statistics
stats = refiner.get_statistics()
# Returns: {'total_patterns': 25, 'avg_success_rate': 0.78, ...}

# Persistence
refiner.save_patterns()
refiner.load_patterns()

Method Reference:

Method Parameters Returns Async Description
record_query_outcome question, question_type, query, success, result_count, execution_time None No Record outcome for pattern learning
suggest_refinements question, question_type, failed_query, max_suggestions=3 List[str] No Suggest alternative queries
get_best_pattern_for_type question_type: str Optional[QueryPattern] No Get highest-confidence pattern
get_statistics Dict No Get learning statistics
save_patterns None No Persist patterns to disk
load_patterns None No Load patterns from disk

Supporting Class: QueryPattern

The QueryPattern dataclass represents a learned query pattern:

Field Type Description
question_type str The type of question this pattern applies to
query_template str SQL query template
total_attempts int Number of times this pattern was used
successful_attempts int Number of successful executions
avg_result_count float Average number of results returned
avg_execution_time float Average execution time in seconds

Properties: - success_rate — Returns successful_attempts / total_attempts (0.0 to 1.0) - confidence — Combined confidence score factoring in success rate and sample size


AnswerGenerator

File: src/agents/answer_generator.py

Purpose: Generate natural language answers from query results. Supports both LLM-based generation for rich, contextual answers and template-based fallback for deterministic output when no LLM is available.

from src.agents.answer_generator import AnswerGenerator

generator = AnswerGenerator(
    llm_interface=None,                    # Optional LLM
    code_analyst_title: str = "code analyst",  # Domain-specific title
    language: str = "en"                   # Prompt language
)

Constructor Parameters:

Parameter Type Default Description
llm_interface Optional[Any] None LLM interface for generating rich answers. Falls back to templates when None
code_analyst_title str "code analyst" Title used in prompts to frame the assistant’s role
language str "en" Language for generated answers ("en" or "ru")

Key Methods:

# Group functions by enrichment tags
grouped = generator.group_by_tags(functions, enrichment_hints)
# Returns: dict mapping tag categories to lists of functions

# Generate answer from grouped results
answer = generator.generate(question, query, grouped, enrichment_hints,
                            used_fallback=False, fallback_query=None)
# Returns: string answer

Method Reference:

Method Parameters Returns Async Description
group_by_tags functions: List[Dict], enrichment_hints: Dict Dict[str, List] No Group functions by enrichment tag categories
generate question, query, grouped, enrichment_hints, used_fallback=False, fallback_query=None str No Generate natural language answer

Factory Function:

from src.agents.answer_generator import create_answer_generator

generator = create_answer_generator(
    llm_interface=llm,
    code_analyst_title="code analyst",
    language="en"
)
# Returns: AnswerGenerator instance

ResultReranker

File: src/agents/result_reranker.py

Purpose: Strategy-based result reranking. Applies configurable scoring strategies to reorder retrieval results, boosting the most relevant items to the top. Supports Q&A reranking, SQL example reranking, and enrichment-aware reranking.

from src.agents.result_reranker import ResultReranker, create_qa_reranker

# Using factory functions
reranker = create_qa_reranker()
reranker = create_sql_reranker()
reranker = create_enrichment_reranker()

# Or direct construction with a strategy
reranker = ResultReranker(strategy=QARerankerStrategy())

# Rerank items
ranked = reranker.rerank(items, analysis, top_k=5)
# Returns: list of items sorted by relevance score, trimmed to top_k

Constructor Parameters:

Parameter Type Default Description
strategy RerankerStrategy Required Reranking strategy to apply

Method Reference:

Method Parameters Returns Async Description
rerank items: List, analysis: Dict, top_k: int = 5 List No Rerank items using the configured strategy

Strategy Hierarchy:

RerankerStrategy (ABC)
  +-- QARerankerStrategy      # Q&A pair reranking
  +-- SQLRerankerStrategy     # SQL example reranking
  +-- EnrichmentRerankerStrategy  # Enrichment-aware reranking
Strategy Description Key Scoring Factors
QARerankerStrategy Reranks Q&A pairs based on relevance to the analysis Intent match, domain match, keyword overlap
SQLRerankerStrategy Reranks SQL examples based on structural similarity Query pattern match, table overlap, join complexity
EnrichmentRerankerStrategy Reranks based on enrichment tag alignment Tag coverage, tag relevance, domain alignment

Factory Functions:

Function Returns Description
create_qa_reranker() ResultReranker Reranker with QARerankerStrategy
create_sql_reranker() ResultReranker Reranker with SQLRerankerStrategy
create_enrichment_reranker() ResultReranker Reranker with EnrichmentRerankerStrategy

Dataclass: ScoringBoost

from src.agents.result_reranker import ScoringBoost

boost = ScoringBoost(
    name="domain_match",       # Human-readable boost name
    multiplier=1.5,            # Score multiplier when condition is true
    condition=lambda item, analysis: item.get("domain") == analysis.get("domain")
)
Field Type Description
name str Human-readable name for the boost
multiplier float Score multiplier applied when condition evaluates to True
condition Callable Function taking (item, analysis) and returning bool

ConfidenceCalculator

File: src/agents/confidence_calculator.py

Purpose: Configurable confidence scoring via the Strategy pattern. Provides multiple calculation strategies for computing confidence scores based on different signal types: result counts, analysis quality, pattern matching, and LLM signals.

from src.agents.confidence_calculator import (
    ConfidenceCalculator, create_result_calculator,
    create_analysis_calculator, create_pattern_calculator, create_llm_calculator
)

# Using factory functions
calc = create_result_calculator()
score = calc.calculate(result_count=15, execution_success=True, used_fallback=False)
# Returns: float between 0.0 and 1.0

# Or from factors dataclass
from src.agents.confidence_calculator import ConfidenceFactors
factors = ConfidenceFactors(result_count=15, execution_success=True)
score = calc.calculate_from_factors(factors)

Constructor Parameters:

Parameter Type Default Description
strategy ConfidenceStrategy Required Confidence calculation strategy

Method Reference:

Method Parameters Returns Async Description
calculate **kwargs float No Calculate confidence from keyword arguments
calculate_from_factors factors: ConfidenceFactors float No Calculate confidence from a ConfidenceFactors dataclass

Strategy Hierarchy:

ConfidenceStrategy (ABC)
  +-- ResultConfidenceStrategy    # Based on result count and execution status
  +-- AnalysisConfidenceStrategy  # Based on intent/domain/keyword analysis
  +-- PatternConfidenceStrategy   # Based on pattern matching success
  +-- LLMConfidenceStrategy       # Based on LLM confidence signals
Strategy Key Inputs Description
ResultConfidenceStrategy result_count, execution_success, used_fallback Higher confidence with more results and successful execution; penalizes fallback usage
AnalysisConfidenceStrategy intent_confidence, domain_confidence, keyword_count Higher confidence when intent and domain are clearly identified
PatternConfidenceStrategy pattern_match_score, pattern_count Higher confidence when known patterns are matched
LLMConfidenceStrategy llm_confidence, coherence_score Uses LLM self-reported confidence and output coherence

Factory Functions:

Function Returns Description
create_result_calculator() ConfidenceCalculator Calculator with ResultConfidenceStrategy
create_analysis_calculator() ConfidenceCalculator Calculator with AnalysisConfidenceStrategy
create_pattern_calculator() ConfidenceCalculator Calculator with PatternConfidenceStrategy
create_llm_calculator() ConfidenceCalculator Calculator with LLMConfidenceStrategy

Dataclass: ConfidenceFactors

from src.agents.confidence_calculator import ConfidenceFactors

factors = ConfidenceFactors(
    result_count=15,
    execution_success=True,
    used_fallback=False,
    intent_confidence=0.9,
    domain_confidence=0.85,
    keyword_count=3,
    pattern_match_score=0.75,
    pattern_count=2,
    llm_confidence=0.8,
    coherence_score=0.9
)

All fields are optional with sensible defaults, allowing each strategy to consume only the fields it needs.


PromptBuilder

File: src/agents/prompt_builder.py

Purpose: Prompt formatting for the GeneratorAgent. Constructs well-structured prompts that combine the user’s question, retrieval context, enrichment hints, Q&A examples, and SQL examples into a format suitable for SQL generation.

from src.agents.prompt_builder import PromptBuilder

builder = PromptBuilder(semantic_prompts: Optional[Dict[str, str]] = None)

Constructor Parameters:

Parameter Type Default Description
semantic_prompts Optional[Dict[str, str]] None Custom prompt templates for semantic mode. Keys are template names, values are template strings

Key Methods:

# Build enriched prompt with full context
prompt = builder.build_enriched_prompt(question, context)

# Build simple prompt without enrichment
prompt = builder.build_simple_prompt(question, context)

# Build semantic prompt for semantic mode
prompt = builder.build_semantic_prompt(question, context)

# Formatting helpers
enrichment_ctx = builder.format_enrichment_context(hints)
qa_text = builder.format_qa_examples(qa_pairs)
sql_text = builder.format_sql_examples(examples)
guidance = builder.get_domain_guidance(analysis)

Method Reference:

Method Parameters Returns Async Description
build_enriched_prompt question: str, context: Dict str No Full prompt with enrichment context
build_simple_prompt question: str, context: Dict str No Simple prompt without enrichment
build_semantic_prompt question: str, context: Dict str No Prompt for semantic mode
format_enrichment_context hints: Dict str No Format enrichment hints as text
format_qa_examples qa_pairs: List[Dict] str No Format Q&A examples for prompt
format_sql_examples examples: List[Dict] str No Format SQL examples for prompt
get_domain_guidance analysis: Dict str No Get domain-specific guidance text

Factory Function:

from src.agents.prompt_builder import create_prompt_builder

builder = create_prompt_builder(semantic_prompts=None)
# Returns: PromptBuilder instance

QueryVariantGenerator

File: src/agents/query_variants.py

Purpose: Generate query variants for the Query Funnel approach. Instead of generating a single query, this module produces multiple variants with different strategies (precise, broad, alternative) to maximize the chance of finding relevant results.

from src.agents.query_variants import QueryVariantGenerator

gen = QueryVariantGenerator()

variants = gen.generate_variants(question, context, num_variants=3)
# Returns: [{'query': '...', 'strategy': 'precise'}, {'query': '...', 'strategy': 'broad'}, ...]

Constructor Parameters: None.

Key Methods:

# Generate multiple query variants
variants = gen.generate_variants(question, context, num_variants=3)

# Detection helpers for specialized query types
gen.is_data_flow_question("How does data flow from A to B?")   # True
gen.is_control_flow_question("What is the shutdown sequence?")  # True

Method Reference:

Method Parameters Returns Async Description
generate_variants question: str, context: Dict, num_variants: int = 3 List[Dict] No Generate query variants with different strategies
is_data_flow_question question: str bool No Check if question is about data flow
is_control_flow_question question: str bool No Check if question is about control flow

Variant Strategies:

Strategy Description
precise Exact-match query targeting specific function/method names
broad Wider search using LIKE patterns and partial matches
alternative Alternative query structure using different tables or joins
data_flow Specialized query for data flow tracing (when detected)
control_flow Specialized query for control flow analysis (when detected)

Factory Function:

from src.agents.query_variants import create_variant_generator

gen = create_variant_generator()
# Returns: QueryVariantGenerator instance

Fallback Strategies

File: src/agents/fallback_strategies.py

Purpose: Intelligent fallback for low-coverage enrichment (Phase 4). When the EnrichmentAgent produces hints with low tag coverage, these fallback strategies activate to ensure adequate context for query generation. Three classes work together to map keywords to tags, build hybrid queries, and select the appropriate fallback strategy.

Three classes:

KeywordToTagMapper

from src.agents.fallback_strategies import KeywordToTagMapper

mapper = KeywordToTagMapper(similarity_threshold=None)
Parameter Type Default Description
similarity_threshold Optional[float] None Minimum similarity score for fuzzy keyword-to-tag matching. When None, uses a default threshold

Maps keywords to CPG enrichment tags using fuzzy string matching. When a keyword does not exactly match a known tag, the mapper finds the closest matching tags above the similarity threshold.

HybridQueryBuilder

from src.agents.fallback_strategies import HybridQueryBuilder

builder = HybridQueryBuilder()

No constructor parameters. Builds hybrid queries that combine name-based matching with tag-based filtering. Used when enrichment tags alone are insufficient for precise retrieval.

FallbackStrategySelector

from src.agents.fallback_strategies import FallbackStrategySelector

selector = FallbackStrategySelector()

No constructor parameters. Orchestrates the fallback process by evaluating coverage, selecting the appropriate strategy, and applying it.

Convenience Function:

from src.agents.fallback_strategies import get_fallback_selector

selector = get_fallback_selector()
enhanced_hints = selector.apply_fallback(hints, question, analysis)
# Returns: enhanced hints dict with additional tags from fallback strategies

Method Reference for FallbackStrategySelector:

Method Parameters Returns Async Description
apply_fallback hints: Dict, question: str, analysis: Dict Dict No Apply appropriate fallback strategy to enhance hints
evaluate_coverage hints: Dict float No Calculate current coverage score
select_strategy coverage: float, analysis: Dict str No Select fallback strategy based on coverage and analysis

Other Support Modules

File Class/Function Constructor/Parameters Purpose
semantic_interpreter.py SemanticInterpreter (llm_interface, prompt_template: Optional[str] = None, code_analyst_title: str = "code analyst") Semantic query interpretation with comment extraction from code
result_parser.py ResultParser () — no parameters Parse function lists, semantic results, and raw query output into structured format
tag_effectiveness_tracker.py TagEffectivenessTracker (persistence_path: Optional[str] = None, min_samples: int = 3) Track which enrichment tags lead to successful queries; singleton via get_global_tracker()
utils.py sanitize_sql_value() (value: str) -> str Sanitize a string for safe use in SQL queries
utils.py sanitize_like_pattern() (pattern: str) -> str Sanitize a LIKE pattern, escaping special characters

SemanticInterpreter details:

from src.agents.semantic_interpreter import SemanticInterpreter

interpreter = SemanticInterpreter(
    llm_interface=llm,
    prompt_template=None,               # Custom prompt template
    code_analyst_title="code analyst"   # Role title in prompts
)

result = interpreter.interpret(question, results, query)
# Returns: dict with 'answer', 'confidence', 'sources'

TagEffectivenessTracker details:

Supporting dataclass: TagUsageRecord — fields: tag_name, tag_value, question_domain, question_intent, query_valid, query_executed, execution_successful, timestamp, coverage_score.

from src.agents.tag_effectiveness_tracker import TagEffectivenessTracker, TagUsageRecord, get_global_tracker

# Singleton access
tracker = get_global_tracker()

# Record tag usage and outcome
tracker.record(tags=["concurrency", "lock-acquire"], success=True, result_count=15)

# Get effectiveness report
report = tracker.get_report()
# Returns: dict mapping tag names to effectiveness scores

# Persistence
tracker.save()
tracker.load()

ResultParser details:

from src.agents.result_parser import ResultParser

parser = ResultParser()

# Parse function list results
functions = parser.parse_function_list(raw_results)

# Parse semantic results
semantic = parser.parse_semantic_result(raw_results)

# Check for empty results
parser.is_empty(raw_results)

ACP — Agent Client Protocol

ACP Overview

ACP provides IDE integration via JSON-RPC 2.0. It allows IDEs to communicate with CodeGraph’s analysis pipeline through a structured protocol supporting sessions, threads, turns, diagnostics, hover information, and file change approvals. Supported IDEs: Zed, JetBrains, VS Code.

IDE <--> Transport (stdio | HTTP | WebSocket) <--> CodeGraphACPAgent <--> Adapters <--> Core Services

Entry point:

python -m src.acp                    # Default: stdio transport
python -m src.acp --transport http   # HTTP transport on default port
python -m src.acp --transport ws     # WebSocket transport

The ACP system follows a layered architecture:

  1. Transport Layer — Handles raw communication (stdio, HTTP, WebSocket)
  2. Protocol Layer — JSON-RPC 2.0 request/response handling
  3. Agent Layer — Method dispatch and business logic
  4. Integration Layer — Adapters connecting to core CodeGraph services
graph LR
    subgraph "Transport"
        STDIO[StdioTransport]
        HTTP[HTTPTransport]
        WS[WebSocketTransport]
    end
    subgraph "Protocol"
        AGENT[CodeGraphACPAgent]
    end
    subgraph "Server Components"
        SM[SessionManager]
        TM[ThreadManager]
        TO[TurnOrchestrator]
        DE[DiagnosticsEngine]
        HP[HoverProvider]
        AB[ApprovalBridge]
        NR[NotificationRouter]
    end
    subgraph "Integration"
        CA[ChatAdapter]
        SA[SessionAdapter]
        WA[WorkflowAdapter]
        MB[MCPBridge]
    end
    STDIO --> AGENT
    HTTP --> AGENT
    WS --> AGENT
    AGENT --> SM
    AGENT --> TM
    AGENT --> TO
    AGENT --> DE
    AGENT --> HP
    AGENT --> AB
    AGENT --> NR
    AGENT --> CA
    AGENT --> SA
    AGENT --> WA
    AGENT --> MB

CodeGraphACPAgent

File: src/acp/server/agent.py

Purpose: Main ACP agent that dispatches 25 JSON-RPC methods to their respective handlers. This is the central coordinator of the ACP system, lazily loading server components and integration adapters as needed.

from src.acp import CodeGraphACPAgent

agent = CodeGraphACPAgent()  # No parameters

# Main entry point (async)
response = await agent.handle(
    request: JSONRPCRequest,
    notification_callback: Optional[Callable] = None,
    user_id: Optional[str] = None
)

Constructor Parameters: None. All components are lazy-loaded on first access.

Main Method:

Method Parameters Returns Async Description
handle request: JSONRPCRequest, notification_callback: Optional[Callable] = None, user_id: Optional[str] = None JSONRPCResponse Yes Dispatch a JSON-RPC request to the appropriate handler

JSON-RPC Methods (25 total):

Category Method Handler Streaming Description
Init initialize _handle_initialize No Initialize protocol, negotiate capabilities
Auth authenticate _handle_authenticate No Authenticate client with credentials
Session session/new _handle_session_new No Create a new session with working directory and MCP servers
Session session/load _handle_session_load No Load an existing session by ID
Session session/prompt _handle_session_prompt Yes Send a prompt and receive streaming response
Session session/cancel _handle_session_cancel No Cancel a running session prompt
File System fs/read_text_file _handle_fs_read No Read a text file from the filesystem
File System fs/write_text_file _handle_fs_write No Write content to a text file
Terminal terminal/create _handle_terminal_create No Create a terminal subprocess
Terminal terminal/output _handle_terminal_output No Read terminal output
Terminal terminal/wait_for_exit _handle_terminal_wait No Wait for terminal process to exit
Terminal terminal/kill _handle_terminal_kill No Kill a terminal process
Terminal terminal/release _handle_terminal_release No Release terminal resources
Thread thread/start _handle_thread_start No Start a new conversation thread
Thread thread/resume _handle_thread_resume No Resume an existing thread
Thread thread/fork _handle_thread_fork No Fork a thread at a specific turn
Thread thread/list _handle_thread_list No List threads with optional filters
Thread thread/archive _handle_thread_archive No Archive a thread
Thread thread/compact _handle_thread_compact No Compact thread history to save memory
Turn turn/start _handle_turn_start Yes Start a new turn within a thread (streaming)
Turn turn/interrupt _handle_turn_interrupt No Interrupt a running turn
Diagnostics diagnostics/subscribe _handle_diagnostics_subscribe No Subscribe to diagnostics for a file URI
Diagnostics diagnostics/unsubscribe _handle_diagnostics_unsubscribe No Unsubscribe from diagnostics for a file URI
Hover hover/request _handle_hover_request No Request hover information for a position
Approval item/fileChange/respondApproval _handle_respond_approval No Respond to a file change approval request

Lazy-Loaded Properties:

The following properties are initialized on first access to avoid unnecessary resource allocation:

Property Type Description
session_manager ACPSessionManager Manages session lifecycle and state
chat_adapter ChatServiceACPAdapter Bridges prompts to ChatService
capability_negotiator CapabilityNegotiator Negotiates client/server capabilities
thread_manager ACPThreadManager Manages conversation threads
turn_orchestrator TurnOrchestrator Orchestrates turn execution and streaming
diagnostics_engine DiagnosticsEngine Generates and publishes diagnostics
hover_provider HoverProvider Provides hover information from CPG
approval_bridge ACPApprovalBridge Manages file change approval flow

Protocol Types (Pydantic models defined in the same file):

Model Fields Purpose
JSONRPCRequest jsonrpc: str, method: str, params: Optional[Dict], id: Optional[Union[str, int]] JSON-RPC 2.0 request envelope
JSONRPCResponse jsonrpc: str, result: Optional[Any], error: Optional[Dict], id: Optional[Union[str, int]] JSON-RPC 2.0 response envelope
JSONRPCNotification jsonrpc: str, method: str, params: Optional[Dict] JSON-RPC 2.0 notification (no id field, no response expected)
InitializeParams clientInfo: ClientInfo, capabilities: ClientCapabilities, protocolVersion: str Parameters for the initialize method
InitializeResult serverInfo: AgentInfo, capabilities: AgentCapabilities, protocolVersion: str Result of the initialize method
SessionNewParams cwd: str, mcpServers: Optional[List[Dict]], userId: Optional[str] Parameters for session/new
SessionNewResult sessionId: str, projectId: Optional[str] Result of session/new
SessionPromptParams sessionId: str, contentBlocks: List[ContentBlock] Parameters for session/prompt
ContentBlock type: str, text: Optional[str], data: Optional[str], mimeType: Optional[str] Content block in a prompt (text, image, file)
StopReason Enum: end_turn, max_tokens, cancelled Reason why generation stopped
AgentCapabilities loadSession: bool, setMode: bool, promptCapabilities: PromptCapabilities, mcp: MCPCapabilities, authMethods: List[str] Server capability advertisement
PromptCapabilities image: bool, audio: bool, embeddedContext: bool Supported prompt content types
MCPCapabilities stdio: bool, http: bool, sse: bool MCP transport support flags
AgentInfo name: str, version: str, title: str Server identity information
ClientCapabilities fs: bool, terminal: bool, diagnostics: bool, hover: bool, approval: bool Client capability advertisement
ClientInfo name: str, version: str Client identity information
PlanEntry id: str, title: str, status: str, toolCallId: Optional[str] Entry in an execution plan
PlanUpdate entries: List[PlanEntry] Update to the execution plan
AgentMessageChunk text: str, stopReason: Optional[StopReason] Streaming text chunk
ToolCallUpdate toolCallId: str, toolName: str, status: str, result: Optional[Any] Tool call status update

Session Management

File: src/acp/server/session_manager.py

Purpose: Manages ACP session lifecycle including creation, loading, timeout, and cleanup. Each session represents an IDE connection with its own working directory, project context, and MCP server configuration.

from src.acp import ACPSessionManager

manager = ACPSessionManager(
    max_sessions: int = 100,
    session_timeout_minutes: int = 60
)

Constructor Parameters:

Parameter Type Default Description
max_sessions int 100 Maximum number of concurrent sessions
session_timeout_minutes int 60 Session timeout in minutes after last activity

Key Methods:

# Create a new session
session = await manager.create_session(
    cwd="/path/to/project",
    mcp_servers=[{"id": "...", "transport": "stdio", ...}],
    user_id="user-123"
)

# Load existing session
session = await manager.load_session(session_id="...")

# Get session by ID
session = manager.get_session(session_id="...")

# Cancel a session
await manager.cancel_session(session_id="...")

# Cleanup expired sessions
cleaned = await manager.cleanup_expired()

Method Reference:

Method Parameters Returns Async Description
create_session cwd: str, mcp_servers: Optional[List[Dict]] = None, user_id: Optional[str] = None ACPSession Yes Create a new session
load_session session_id: str ACPSession Yes Load an existing session
get_session session_id: str Optional[ACPSession] No Get session by ID (returns None if not found)
cancel_session session_id: str None Yes Cancel a running session
cleanup_expired int Yes Remove expired sessions, returns count cleaned

Dataclass: ACPSession

Field Type Description
session_id str Unique session identifier (UUID)
cwd str Working directory for this session
user_id Optional[str] Authenticated user ID
project_id Optional[str] Associated CodeGraph project ID
codegraph_session_id Optional[str] Internal session ID for core services
mcp_servers List[Dict] List of MCP server configurations
created_at datetime Session creation timestamp
last_activity datetime Last activity timestamp (updated on each request)
cancelled bool Whether the session has been cancelled
context Dict Arbitrary session context data

Capability Negotiation

File: src/acp/server/capabilities.py

Purpose: Handles protocol capability negotiation between client and server during the initialize handshake. Determines which features both sides support and configures the protocol accordingly.

from src.acp.server.capabilities import CapabilityNegotiator

negotiator = CapabilityNegotiator()

Constructor Parameters: None.

Key Methods:

# Negotiate capabilities from initialize params
server_caps = negotiator.negotiate(init_params)
# Returns: AgentCapabilities with negotiated feature flags

# Check if a capability is supported
negotiator.supports("diagnostics")  # True/False
negotiator.supports("hover")        # True/False

# Check if a notification should be sent
negotiator.should_send_notification("diagnostics/publish")  # True/False

Method Reference:

Method Parameters Returns Async Description
negotiate init_params: InitializeParams AgentCapabilities No Negotiate capabilities and return server capabilities
supports capability: str bool No Check if a specific capability is supported
should_send_notification method: str bool No Check if a notification method should be sent based on negotiated capabilities and opt-out list

Dataclass: ClientCapabilitiesV2

Extended client capabilities with granular control:

Field Type Default Description
fs bool False Client supports filesystem operations
terminal bool False Client supports terminal operations
diagnostics bool False Client supports diagnostics display
hover bool False Client supports hover information
approval_flow bool False Client supports file change approval flow
opt_out_notification_methods List[str] [] List of notification methods the client does not want to receive

Thread and Turn Lifecycle

File: src/acp/server/thread_manager.py

Purpose: Manages conversation threads. Threads represent ongoing conversations that persist across multiple turns. They can be started, resumed, forked, listed, archived, and compacted.

from src.acp.server.thread_manager import ACPThreadManager

tm = ACPThreadManager()

Constructor Parameters: None.

Key Methods:

# Start a new thread
thread = await tm.start_thread(cwd="/path", name="analysis", config={})

# Resume an existing thread
thread = await tm.resume_thread(thread_id="...")

# Fork a thread at a specific turn
thread = await tm.fork_thread(thread_id="...", at_turn=5)

# List threads with filters
threads = await tm.list_threads(status="active", limit=20, offset=0)

# Archive a thread (preserves but marks inactive)
await tm.archive_thread(thread_id="...")

# Compact thread history to reduce memory
await tm.compact_thread(thread_id="...")

Method Reference:

Method Parameters Returns Async Description
start_thread cwd: str, name: Optional[str] = None, config: Optional[Dict] = None Dict Yes Start a new conversation thread
resume_thread thread_id: str Dict Yes Resume an existing thread
fork_thread thread_id: str, at_turn: Optional[int] = None Dict Yes Fork a thread, optionally at a specific turn number
list_threads status: Optional[str] = None, limit: int = 20, offset: int = 0 List[Dict] Yes List threads with optional status filter and pagination
archive_thread thread_id: str None Yes Archive a thread
compact_thread thread_id: str None Yes Compact thread history

File: src/acp/server/turn_orchestrator.py

Purpose: Orchestrates turn execution within threads. A turn represents a single user-assistant interaction. The orchestrator manages the lifecycle of a turn including background execution, streaming, and interruption.

from src.acp.server.turn_orchestrator import TurnOrchestrator

orchestrator = TurnOrchestrator()

Constructor Parameters: None.

Key Methods:

# Start a turn (launches background task for streaming)
result = await orchestrator.start_turn(
    thread_id="...", prompt="Analyze this code",
    context={"files": [...]},
    notification_router=router
)

# Interrupt a running turn
result = await orchestrator.interrupt_turn(thread_id="...", turn_id="...")

Method Reference:

Method Parameters Returns Async Description
start_turn thread_id: str, prompt: str, context: Optional[Dict] = None, notification_router: Optional[NotificationRouter] = None Dict Yes Start a new turn with optional streaming notifications
interrupt_turn thread_id: str, turn_id: str Dict Yes Interrupt a running turn

The start_turn method launches a background task that processes the prompt and sends streaming notifications (plan updates, message chunks, tool call updates) through the notification router. The method returns immediately with a turn ID, and progress is communicated through notifications.


Diagnostics and Hover

File: src/acp/server/diagnostics.py

Purpose: Generates and publishes diagnostics for subscribed file URIs. Uses CPG data to identify issues like taint flows, high complexity, deprecated API usage, and dead code. Publishes diagnostics to subscribed clients via the notification router.

from src.acp.server.diagnostics import DiagnosticsEngine

engine = DiagnosticsEngine()

Constructor Parameters: None.

Key Methods:

# Subscribe to diagnostics for a file
engine.subscribe(uri="file:///path/to/file.py")

# Generate diagnostics for a file
diagnostics = await engine.generate_diagnostics(uri="file:///path/to/file.py")

# Publish diagnostics through notification router
await engine.publish_diagnostics(uri, notification_router=router)

# Unsubscribe from diagnostics
engine.unsubscribe(uri="file:///path/to/file.py")

Method Reference:

Method Parameters Returns Async Description
subscribe uri: str None No Subscribe to diagnostics for a file URI
generate_diagnostics uri: str List[Dict] Yes Generate diagnostics for a file
publish_diagnostics uri: str, notification_router: NotificationRouter None Yes Publish diagnostics to subscribed clients
unsubscribe uri: str None No Unsubscribe from diagnostics for a file URI

Diagnostic Categories:

Category Description Severity
taint Taint flow from source to sink Error
complexity Cyclomatic complexity above threshold Warning/Error
deprecated Usage of deprecated APIs Warning
todo TODO/FIXME comments Information
dead_code Unreachable or unused code Hint

File: src/acp/server/hover.py

Purpose: Provides hover information for code positions. When the user hovers over a symbol in their IDE, this provider queries the CPG to return callers, callees, complexity metrics, and taint status for the symbol at that position.

from src.acp.server.hover import HoverProvider

hover = HoverProvider()

Constructor Parameters: None.

Key Methods:

# Get hover information for a position
info = await hover.get_hover_info(
    uri="file:///path/to/file.py",
    line=42,
    character=10
)
# Returns: dict with callers, callees, complexity, taint status from CPG

Method Reference:

Method Parameters Returns Async Description
get_hover_info uri: str, line: int, character: int Dict Yes Get hover information from CPG for a symbol at the given position

Hover Result Structure:

{
    'symbol': 'process_request',
    'kind': 'method',
    'file': 'handler.py',
    'line': 42,
    'callers': ['main', 'dispatch'],         # Functions that call this symbol
    'callees': ['validate', 'execute'],      # Functions called by this symbol
    'complexity': 8,                          # Cyclomatic complexity
    'taint_status': 'clean',                 # clean | tainted | sanitized
    'documentation': '...'                   # Extracted doc comment if available
}

Approval Bridge

File: src/acp/server/approval_bridge.py

Purpose: Manages the file change approval flow. When an agent wants to modify files, it requests approval from the IDE client through this bridge. The client can approve or reject the change, with optional timeout handling.

from src.acp.server.approval_bridge import ACPApprovalBridge

bridge = ACPApprovalBridge()

Constructor Parameters: None.

Key Methods:

# Request approval for a file change
result = await bridge.request_file_change_approval(
    thread_id="...", turn_id="...", item_id="...",
    diff="...", files=["file.py"], reason="Refactoring",
    confidence=0.9, notification_router=router, timeout=30.0
)
# Returns: dict with approval decision

# Resolve a pending approval (called when client responds)
bridge.resolve_approval(request_id="...", decision={"approved": True})

# Check pending approvals
bridge.pending_count  # Number of pending approvals

Method Reference:

Method Parameters Returns Async Description
request_file_change_approval thread_id, turn_id, item_id, diff, files, reason, confidence, notification_router, timeout=30.0 Dict Yes Request approval from IDE client
resolve_approval request_id: str, decision: Dict None No Resolve a pending approval with client’s decision

Properties:

Property Type Description
pending_count int Number of pending approval requests

Notification Router

File: src/acp/server/notification_router.py

Purpose: Routes notifications from server components to the IDE client. Respects capability negotiation to avoid sending notifications the client does not support or has opted out of.

from src.acp.server.notification_router import NotificationRouter

router = NotificationRouter(
    callback=notification_callback,
    negotiator=capability_negotiator
)

Constructor Parameters:

Parameter Type Default Description
callback Callable Required Async callback function that sends notifications to the client
negotiator CapabilityNegotiator Required Capability negotiator for checking what notifications the client supports

Key Methods:

# Send a notification
sent = await router.send("session/update", {"sessionUpdate": "plan", ...})
# Returns: bool indicating whether the notification was sent

Method Reference:

Method Parameters Returns Async Description
send method: str, params: Dict bool Yes Send a notification if the client supports it. Returns True if sent, False if suppressed

File and Terminal Handlers

File: src/acp/server/handlers.py

Purpose: Module-level async functions for filesystem and terminal operations. These are not class methods but standalone async functions called by the CodeGraphACPAgent.

Filesystem Handlers:

from src.acp.server.handlers import handle_fs_read, handle_fs_write

# Read a text file
result = await handle_fs_read(
    {"path": "/path/to/file.py"},
    client_capabilities
)
# Returns: {"content": "file contents...", "encoding": "utf-8"}

# Write to a text file
result = await handle_fs_write(
    {"path": "/path/to/file.py", "content": "new content..."},
    client_capabilities
)
# Returns: {"success": True}

Terminal Handlers:

from src.acp.server.handlers import (
    handle_terminal_create,
    handle_terminal_output,
    handle_terminal_wait,
    handle_terminal_kill,
    handle_terminal_release
)

# Create a terminal subprocess
result = await handle_terminal_create(
    {"command": "python", "args": ["-m", "pytest"], "cwd": "/project"},
    client_capabilities
)
# Returns: {"terminalId": "..."}

# Read terminal output
result = await handle_terminal_output(
    {"terminalId": "..."},
    client_capabilities
)
# Returns: {"output": "...", "isComplete": False}

# Wait for terminal to exit
result = await handle_terminal_wait(
    {"terminalId": "...", "timeout": 30},
    client_capabilities
)
# Returns: {"exitCode": 0}

# Kill terminal process
result = await handle_terminal_kill(
    {"terminalId": "..."},
    client_capabilities
)
# Returns: {"success": True}

# Release terminal resources
result = await handle_terminal_release(
    {"terminalId": "..."},
    client_capabilities
)
# Returns: {"success": True}

Function Reference:

Function Parameters Returns Async Description
handle_fs_read params: Dict, capabilities Dict Yes Read a text file
handle_fs_write params: Dict, capabilities Dict Yes Write content to a text file
handle_terminal_create params: Dict, capabilities Dict Yes Create a terminal subprocess
handle_terminal_output params: Dict, capabilities Dict Yes Read terminal output
handle_terminal_wait params: Dict, capabilities Dict Yes Wait for terminal exit
handle_terminal_kill params: Dict, capabilities Dict Yes Kill terminal process
handle_terminal_release params: Dict, capabilities Dict Yes Release terminal resources

Transport Layer

File: src/acp/transport/base.py

Purpose: Abstract base class for ACP transports. Defines the interface that all transport implementations must follow.

from src.acp.transport.base import BaseTransport  # ABC

class BaseTransport(ABC):
    def __init__(self, agent: Any): ...

    @property
    def is_running(self) -> bool: ...

    async def start(self) -> None: ...      # abstract
    async def stop(self) -> None: ...       # abstract
    async def send(self, message: Any) -> None: ...  # abstract
    async def handle_message(self, message, notification_callback=None, user_id=None) -> Optional[Any]: ...

Constructor Parameters:

Parameter Type Default Description
agent Any Required The CodeGraphACPAgent instance to dispatch messages to

Abstract Methods:

Method Parameters Returns Async Description
start None Yes Start the transport (begin listening)
stop None Yes Stop the transport (cleanup)
send message: Any None Yes Send a message to the client

Concrete Method:

Method Parameters Returns Async Description
handle_message message, notification_callback=None, user_id=None Optional[Any] Yes Parse message and dispatch to agent

Properties:

Property Type Description
is_running bool Whether the transport is currently running

Implementations:

Transport File Purpose Entry Point
StdioTransport src/acp/transport/stdio.py stdin/stdout JSON-RPC communication run_stdio()
HTTPTransport src/acp/transport/http.py HTTP REST endpoint for JSON-RPC get_http_transport()
WebSocketTransport src/acp/transport/websocket.py WebSocket bidirectional communication

StdioTransport (src/acp/transport/stdio.py):

Reads JSON-RPC messages from stdin and writes responses to stdout. Used for local IDE integration where the IDE spawns the ACP process.

from src.acp.transport.stdio import StdioTransport, run_stdio

transport = StdioTransport(agent=agent)
await transport.start()  # Blocks, reading from stdin

# Or use the convenience function
await run_stdio()  # Creates agent and transport, runs until EOF

HTTPTransport (src/acp/transport/http.py):

Exposes a JSON-RPC endpoint over HTTP. Used for remote IDE connections or web-based clients.

from src.acp.transport.http import HTTPTransport, get_http_transport

transport = HTTPTransport(agent=agent, host="0.0.0.0", port=8080)
await transport.start()

# Or use the factory function
transport = get_http_transport(agent=agent)
await transport.start()

WebSocketTransport (src/acp/transport/websocket.py):

Provides bidirectional WebSocket communication. Supports server-initiated notifications and concurrent message handling.

from src.acp.transport.websocket import WebSocketTransport

transport = WebSocketTransport(agent=agent)

WebSocketManager (same file):

Manages multiple WebSocket connections and supports broadcasting to specific users.

from src.acp.transport.websocket import get_websocket_manager

manager = get_websocket_manager()  # Singleton

# Add a connection
await manager.add_connection(user_id="user-123", transport=ws_transport)

# Broadcast to all connections of a user
count = await manager.broadcast_to_user(user_id="user-123", message={"method": "update", ...})

# Get connection count
total = manager.get_connection_count()              # All connections
user_count = manager.get_connection_count(user_id="user-123")  # Specific user

WebSocketManager Method Reference:

Method Parameters Returns Async Description
add_connection user_id: str, transport: WebSocketTransport None Yes Register a WebSocket connection for a user
remove_connection user_id: str, transport: WebSocketTransport None Yes Remove a WebSocket connection
broadcast_to_user user_id: str, message: Any int Yes Send message to all connections of a user, returns count sent
get_connection_count user_id: Optional[str] = None int No Get total or per-user connection count

Integration Adapters

ChatServiceACPAdapter

File: src/acp/integration/chat_adapter.py

Purpose: Bridges ACP prompts to the ChatService and WorkflowACPAdapter. Receives content blocks from ACP sessions, processes them through the chat pipeline, and sends streaming notifications back to the client (plan updates, tool call updates, message chunks).

from src.acp.integration.chat_adapter import ChatServiceACPAdapter

adapter = ChatServiceACPAdapter()

Constructor Parameters: None.

Key Methods:

# Process a prompt from an ACP session
answer = await adapter.process_prompt(
    content_blocks=[{"type": "text", "text": "Analyze code..."}],
    session=acp_session,
    notification_callback=callback,
    user_id="user-123"
)
# Returns: string answer

Method Reference:

Method Parameters Returns Async Description
process_prompt content_blocks: List[Dict], session: ACPSession, notification_callback: Optional[Callable] = None, user_id: Optional[str] = None str Yes Process ACP prompt and return answer with streaming notifications

The adapter sends the following notification types during processing: - Plan updatessession/update with sessionUpdate: "plan" containing execution plan entries - Tool call updatessession/update with sessionUpdate: "tool_call" containing tool execution status - Message chunkssession/update with sessionUpdate: "message" containing streaming text


SessionACPAdapter

File: src/acp/integration/session_adapter.py

Purpose: Bridges ACP session operations to CodeGraph’s internal session management. Handles session creation, loading, context retrieval, and dialogue history.

from src.acp.integration.session_adapter import SessionACPAdapter

adapter = SessionACPAdapter()

Constructor Parameters: None.

Key Methods:

# Create a session
session_id = await adapter.create_session(user_id="user-123", metadata={"ide": "vscode"})

# Get a session
session = await adapter.get_session(session_id="...", user_id="user-123")

# Get session context (recent interactions)
context = await adapter.get_context(session_id="...", user_id="user-123", limit=10)

# Add a dialogue turn
await adapter.add_dialogue_turn(
    session_id="...", user_id="user-123",
    role="user", content="Hello"
)

# Get dialogue history
history = await adapter.get_dialogue_history(session_id="...", user_id="user-123")

Method Reference:

Method Parameters Returns Async Description
create_session user_id: str, metadata: Optional[Dict] = None str Yes Create a new session, returns session ID
get_session session_id: str, user_id: str Dict Yes Get session details
get_context session_id: str, user_id: str, limit: int = 10 Dict Yes Get recent session context
add_dialogue_turn session_id: str, user_id: str, role: str, content: str None Yes Add a dialogue turn to session history
get_dialogue_history session_id: str, user_id: str List[Dict] Yes Get full dialogue history

WorkflowACPAdapter

File: src/acp/integration/workflow_adapter.py

Purpose: Bridges ACP to the MultiScenarioCopilot workflow engine. Maps ACP tool calls to scenario executions, manages execution plans, and sends progress notifications.

from src.acp.integration.workflow_adapter import WorkflowACPAdapter

adapter = WorkflowACPAdapter()

Constructor Parameters: None.

Key Methods:

# Get tool kind for a scenario
kind = adapter.get_tool_kind("scenario_1")   # "search", "execute", "analyze", etc.

# Get human-readable tool title
title = adapter.get_tool_title("scenario_1") # e.g., "Code Search"

# Create plan entries for a scenario execution
plan = adapter.create_plan_entries("scenario_1")
# Returns: list of PlanEntry dicts

# Execute workflow with streaming updates
result = await adapter.execute_with_updates(
    query="...", context={}, callback=notification_callback, tool_call_id="tc-1"
)
# Returns: execution result dict

Method Reference:

Method Parameters Returns Async Description
get_tool_kind scenario_id: str str No Get tool kind classification for a scenario
get_tool_title scenario_id: str str No Get human-readable title for a scenario
create_plan_entries scenario_id: str List[Dict] No Create execution plan entries
execute_with_updates query: str, context: Dict, callback: Optional[Callable], tool_call_id: str Dict Yes Execute workflow with streaming progress updates

MCPBridge

File: src/acp/integration/mcp_bridge.py

Purpose: Bridges ACP to external MCP (Model Context Protocol) servers. Manages connections to multiple MCP servers, aggregates their tools and resources, and proxies tool calls and resource reads.

from src.acp.integration.mcp_bridge import MCPBridge

bridge = MCPBridge()

Constructor Parameters: None.

Key Methods:

# Connect to MCP servers
connected = await bridge.connect_servers([
    {"id": "server1", "name": "My MCP", "transport": "stdio",
     "command": "node", "args": ["server.js"]},
    {"id": "server2", "name": "Remote MCP", "transport": "http",
     "url": "http://localhost:3000"}
])
# Returns: number of successfully connected servers

# Get all available tools across all servers
tools = bridge.get_all_tools()        # List[MCPTool]

# Get all available resources across all servers
resources = bridge.get_all_resources() # List[MCPResource]

# Call a tool on a specific server
result = await bridge.call_tool("server1", "tool_name", {"arg": "value"})

# Read a resource from a specific server
content = await bridge.read_resource("server1", "resource://uri")

# Disconnect all servers
await bridge.disconnect_all()

Method Reference:

Method Parameters Returns Async Description
connect_servers servers: List[Dict] int Yes Connect to MCP servers, returns count of successful connections
get_all_tools List[MCPTool] No Get all tools from all connected servers
get_all_resources List[MCPResource] No Get all resources from all connected servers
call_tool server_id: str, tool_name: str, arguments: Dict Any Yes Call a tool on a specific server
read_resource server_id: str, uri: str Any Yes Read a resource from a specific server
disconnect_all None Yes Disconnect all MCP servers

Dataclasses:

MCPServer:

Field Type Description
id str Unique server identifier
name str Human-readable server name
transport str Transport type: "stdio", "http", or "ws"
command Optional[str] Command to spawn (for stdio transport)
args Optional[List[str]] Command arguments (for stdio transport)
url Optional[str] Server URL (for http/ws transport)
headers Optional[Dict[str, str]] HTTP headers (for http transport)
env Optional[Dict[str, str]] Environment variables (for stdio transport)

MCPTool:

Field Type Description
name str Tool name
description str Tool description
input_schema Dict JSON Schema for tool input parameters
server_id str ID of the server providing this tool

MCPResource:

Field Type Description
uri str Resource URI
name str Resource name
description str Resource description
mime_type Optional[str] MIME type of the resource content
server_id str ID of the server providing this resource

MCPConnection:

Field Type Description
server MCPServer Server configuration

Manages the connection lifecycle (connect, disconnect, health check) for a single MCP server.


ACP CLI

File: src/acp/cli.py

Purpose: Command-line entry point for the ACP server. Parses arguments and starts the appropriate transport.

from src.acp.cli import setup_logging, main, run

# Functions
setup_logging()     # Configure logging for ACP server
await main()        # Async entry point — parses args and starts transport
run()               # Sync wrapper — calls asyncio.run(main())

Function Reference:

Function Parameters Returns Async Description
setup_logging None No Configure logging for ACP server with appropriate formatters and handlers
main None Yes Async entry point: parse CLI arguments, create agent, start transport
run None No Sync wrapper that calls asyncio.run(main())

CLI Arguments:

Argument Type Default Description
--transport str "stdio" Transport type: stdio, http, or ws
--host str "0.0.0.0" Host to bind (HTTP/WS only)
--port int 8080 Port to bind (HTTP/WS only)
--log-level str "info" Logging level: debug, info, warning, error

Domain-Specific Agents

Security Agents

File: src/security/security_agents.py

Purpose: Four specialized agents for security vulnerability detection, data flow analysis, reporting, and remediation advice. These agents use CPG data to identify security issues and produce structured findings.

SecurityScanner

The primary entry point for security analysis. Scans CPG data for known vulnerability patterns.

from src.security.security_agents import SecurityScanner

scanner = SecurityScanner(cpg_service: Optional[CPGQueryService] = None)

# Context manager support for automatic resource cleanup
with SecurityScanner(cpg_service) as scanner:
    findings = scanner.scan_all_patterns(limit_per_pattern=50)
# Returns: List[SecurityFinding]

Constructor Parameters:

Parameter Type Default Description
cpg_service Optional[CPGQueryService] None CPG query service for graph access

Key Methods:

Method Parameters Returns Async Description
scan_all_patterns limit_per_pattern: int = 50 List[SecurityFinding] No Scan all known security patterns
scan_pattern pattern: str, limit: int = 50 List[SecurityFinding] No Scan a specific pattern

Patterns detected: - SQL Injection (CWE-89) - Buffer Overflow (CWE-120) - Command Injection (CWE-78) - Format String (CWE-134) - Path Traversal (CWE-22) - Use After Free (CWE-416) - Integer Overflow (CWE-190) - Cross-Site Scripting (CWE-79) - And additional patterns from SECURITY_PATTERNS

DataFlowAnalyzer

Traces data flows from taint sources to sinks through the CPG.

from src.security.security_agents import DataFlowAnalyzer

analyzer = DataFlowAnalyzer(cpg_service: Optional[CPGQueryService] = None)

paths = analyzer.trace_taint_flows(limit=100)
# Returns: List[DataFlowPath]

Constructor Parameters:

Parameter Type Default Description
cpg_service Optional[CPGQueryService] None CPG query service for graph access

Key Methods:

Method Parameters Returns Async Description
trace_taint_flows limit: int = 100 List[DataFlowPath] No Trace all taint flows from sources to sinks
trace_flow source: str, sink: str Optional[DataFlowPath] No Trace a specific source-to-sink flow

VulnerabilityReporter

Generates structured vulnerability reports from findings and data flow paths.

from src.security.security_agents import VulnerabilityReporter

reporter = VulnerabilityReporter()

report = reporter.generate_report(findings, data_flow_paths)
# Returns: VulnerabilityReport

Constructor Parameters: None.

Key Methods:

Method Parameters Returns Async Description
generate_report findings: List[SecurityFinding], data_flows: List[DataFlowPath] VulnerabilityReport No Generate a structured vulnerability report

RemediationAdvisor

Suggests fixes for identified vulnerabilities.

from src.security.security_agents import RemediationAdvisor

advisor = RemediationAdvisor()

advice = advisor.get_remediation_advice(finding)
# Returns: RemediationAdvice

plan = advisor.get_bulk_remediation_plan(findings)
# Returns: List[RemediationAdvice]

Constructor Parameters: None.

Key Methods:

Method Parameters Returns Async Description
get_remediation_advice finding: SecurityFinding RemediationAdvice No Get remediation advice for a specific finding
get_bulk_remediation_plan findings: List[SecurityFinding] List[RemediationAdvice] No Get advice for multiple findings

Data Models:

SecurityFinding:

Field Type Description
finding_id str Unique identifier for the finding
pattern_name str Name of the security pattern matched
category str Category (e.g., injection, buffer, crypto)
severity str Severity level: critical, high, medium, low
method_name str Name of the affected method
filename str Source file containing the vulnerability
cwe_ids List[str] Associated CWE identifiers
confidence float Detection confidence (0.0 to 1.0)

DataFlowPath:

Field Type Description
source_method str Taint source method
sink_method str Taint sink method
path_length int Number of hops in the flow
taint_type str Type of taint (e.g., user_input, file_read)
sanitized bool Whether the flow passes through a sanitizer

VulnerabilityReport:

Field Type Description
total_findings int Total number of findings
critical_count int Number of critical findings
high_count int Number of high findings
medium_count int Number of medium findings
low_count int Number of low findings
findings List[SecurityFinding] All findings
data_flows List[DataFlowPath] All data flow paths

RemediationAdvice:

Field Type Description
remediation_steps List[str] Ordered steps to fix the vulnerability
code_example str Example code showing the fix
references List[str] External references (CVEs, documentation)
estimated_effort str Effort estimate: low, medium, high
priority int Remediation priority (1 = highest)

Performance Agents

Package: src/performance/agents/ (facade: src/performance/performance_agents.py)

Purpose: Three agents for performance bottleneck detection, resource analysis, and optimization recommendations.

PerformanceProfiler

File: src/performance/agents/profiler.py

The primary entry point for performance analysis. Scans CPG data for performance bottleneck patterns.

from src.performance.agents import PerformanceProfiler

profiler = PerformanceProfiler(cpg_service: Optional[CPGQueryService] = None)

# Context manager support
with PerformanceProfiler(cpg_service) as profiler:
    bottlenecks = profiler.profile_all_bottlenecks(limit_per_pattern=30)
    # Returns: List[BottleneckFinding]

    pattern_findings = profiler.profile_pattern(pattern, limit=30)
    # Returns: List[BottleneckFinding]

Constructor Parameters:

Parameter Type Default Description
cpg_service Optional[CPGQueryService] None CPG query service for graph access

Key Methods:

Method Parameters Returns Async Description
profile_all_bottlenecks limit_per_pattern: int = 30 List[BottleneckFinding] No Profile all known bottleneck patterns
profile_pattern pattern: str, limit: int = 30 List[BottleneckFinding] No Profile a specific bottleneck pattern

ResourceAnalyzer

File: src/performance/agents/resource_analyzer.py

Analyzes resource usage of individual methods.

from src.performance.agents import ResourceAnalyzer

analyzer = ResourceAnalyzer(cpg_service: Optional[CPGQueryService] = None)

# Context manager support
with ResourceAnalyzer(cpg_service) as analyzer:
    usage = analyzer.analyze_method_resources("method_name", filename="file.py")
# Returns: ResourceUsage

Constructor Parameters:

Parameter Type Default Description
cpg_service Optional[CPGQueryService] None CPG query service for graph access

Key Methods:

Method Parameters Returns Async Description
analyze_method_resources method_name: str, filename: Optional[str] = None ResourceUsage No Analyze resource usage of a specific method
analyze_all_resources limit: int = 50 List[ResourceUsage] No Analyze resource usage for top methods

OptimizationAdvisor

File: src/performance/agents/optimizer.py

Creates optimization plans and generates performance reports from profiling results.

from src.performance.agents import OptimizationAdvisor

advisor = OptimizationAdvisor()

plan = advisor.create_optimization_plan(findings, resource_analyses)
# Returns: List[OptimizationRecommendation]

report = advisor.generate_report(findings, resource_analyses)
# Returns: PerformanceReport

Constructor Parameters: None.

Key Methods:

Method Parameters Returns Async Description
create_optimization_plan findings: List[BottleneckFinding], resource_analyses: List[ResourceUsage] List[OptimizationRecommendation] No Create an optimization plan
generate_report findings: List[BottleneckFinding], resource_analyses: List[ResourceUsage] PerformanceReport No Generate a performance report

Data Models:

BottleneckFinding:

Field Type Description
pattern_name str Name of the bottleneck pattern
category str Category (e.g., cpu, memory, io, concurrency)
severity str Severity: critical, high, medium, low
method_name str Affected method name
optimization_technique str Suggested optimization technique
potential_speedup str Estimated speedup (e.g., "2x", "10%")

ResourceUsage:

Field Type Description
method_name str Method being analyzed
complexity_score float Computed complexity score
call_count int Number of calls to this method
estimated_memory_impact str Memory impact estimate: low, medium, high
resource_intensity str Overall resource intensity: light, moderate, heavy

OptimizationRecommendation:

Field Type Description
optimization_steps List[str] Ordered optimization steps
code_example str Example code showing the optimization
estimated_speedup str Estimated speedup from this optimization
priority int Recommendation priority (1 = highest)
risk_level str Risk of regression: low, medium, high

PerformanceReport:

Field Type Description
total_bottlenecks int Total number of bottlenecks found
by_severity Dict[str, int] Counts by severity level
findings List[BottleneckFinding] All findings
recommendations List[OptimizationRecommendation] All recommendations
summary str Human-readable summary

ProfilingResult:

Field Type Description
function_name str Name of the profiled function
total_calls int Total number of calls observed
total_time float Total execution time in seconds
bottleneck_score float Computed bottleneck score (0.0 to 1.0)

MemoryProfilingResult:

Field Type Description
function_name str Name of the profiled function
memory_usage_mb float Memory usage in megabytes
allocations int Number of memory allocations
allocation_rate float Allocations per second

PerformanceBaseline:

Field Type Description
method_name str Method name
execution_time_ms float Baseline execution time in milliseconds
memory_usage_mb float Baseline memory usage in megabytes

PerformanceTrend:

Field Type Description
baseline PerformanceBaseline Reference baseline
current PerformanceBaseline Current measurement
time_delta_percent float Percentage change in execution time
regression_detected bool Whether a regression was detected

Utility Function:

from src.performance.agents import run_complete_performance_analysis

report = run_complete_performance_analysis(cpg_service, limit=30)
# Returns: PerformanceReport with findings and recommendations

This convenience function runs the full performance analysis pipeline: profiling, resource analysis, optimization planning, and report generation.


Architecture Agents

Package: src/architecture/agents/ (facade: src/architecture/architecture_agents.py)

Purpose: Three agents for dependency analysis, layer validation, and architecture reporting. These agents detect architectural violations like circular dependencies, god modules, and layer boundary violations.

DependencyAnalyzer

Analyzes module dependencies and detects architectural violations.

from src.architecture.agents import DependencyAnalyzer

analyzer = DependencyAnalyzer(cpg_service)

violations = analyzer.detect_all_violations(limit_per_pattern=20)
# Returns: List[ViolationFinding]

analysis = analyzer.calculate_dependency_metrics(violations)
# Returns: DependencyAnalysis

Constructor Parameters:

Parameter Type Default Description
cpg_service CPGQueryService Required CPG query service for graph access

Key Methods:

Method Parameters Returns Async Description
detect_all_violations limit_per_pattern: int = 20 List[ViolationFinding] No Detect all architectural violations
detect_circular_dependencies limit: int = 20 List[ViolationFinding] No Detect circular dependency violations
calculate_dependency_metrics violations: List[ViolationFinding] DependencyAnalysis No Calculate dependency metrics from violations

LayerValidator

Validates that module dependencies respect layer boundaries.

from src.architecture.agents import LayerValidator

validator = LayerValidator(
    cpg_service,
    layer_hierarchy: Optional[Dict[str, int]] = None  # Custom layer ordering
)

violations = validator.validate_all_layers(limit=20)
# Returns: List[ViolationFinding]

Constructor Parameters:

Parameter Type Default Description
cpg_service CPGQueryService Required CPG query service for graph access
layer_hierarchy Optional[Dict[str, int]] None Custom layer ordering. Keys are layer names, values are numeric levels (higher = more abstract). When None, uses a default hierarchy

Key Methods:

Method Parameters Returns Async Description
validate_all_layers limit: int = 20 List[ViolationFinding] No Validate all layer boundary rules
validate_layer layer_name: str, limit: int = 20 List[ViolationFinding] No Validate a specific layer

ArchitectureReporter

Generates architecture reports and remediation plans from findings and metrics.

from src.architecture.agents import ArchitectureReporter

reporter = ArchitectureReporter()

report = reporter.generate_report(findings, dependency_analysis, layer_metrics)
# Returns: ArchitectureReport

plan = reporter.create_remediation_plan(findings)
# Returns: List[RemediationAction]

Constructor Parameters: None.

Key Methods:

Method Parameters Returns Async Description
generate_report findings: List[ViolationFinding], dependency_analysis: DependencyAnalysis, layer_metrics: Optional[Dict] = None ArchitectureReport No Generate an architecture report
create_remediation_plan findings: List[ViolationFinding] List[RemediationAction] No Create a remediation plan for violations

Data Models:

ViolationFinding:

Field Type Description
pattern_name str Name of the violation pattern
category str Category (e.g., circular_dependency, layer_violation, god_module)
severity str Severity: critical, high, medium, low
module_a str First module involved
module_b str Second module involved
violation_details str Human-readable description of the violation

DependencyMetrics:

Field Type Description
module_name str Module being analyzed
fan_in int Number of modules that depend on this module
fan_out int Number of modules this module depends on
instability float Instability metric: fan_out / (fan_in + fan_out) (0.0 = stable, 1.0 = unstable)
coupling_score float Overall coupling score
is_god_module bool Whether this module has excessive responsibilities

DependencyAnalysis:

Field Type Description
total_modules int Total number of modules analyzed
circular_dependency_count int Number of circular dependencies detected
god_module_count int Number of god modules detected
module_metrics List[DependencyMetrics] Per-module metrics

LayerRule:

Field Type Description
from_layer str Source layer name
to_layer str Target layer name
allowed bool Whether this dependency direction is allowed
description str Human-readable rule description

RemediationAction:

Field Type Description
title str Action title
steps List[str] Ordered remediation steps
priority int Action priority (1 = highest)
estimated_effort str Effort estimate: low, medium, high
risk_level str Risk of the remediation: low, medium, high

ArchitectureReport:

Field Type Description
total_violations int Total number of violations found
findings List[ViolationFinding] All violation findings
dependency_analysis DependencyAnalysis Dependency metrics and analysis
remediation_actions List[RemediationAction] Suggested remediation actions
summary str Human-readable summary

Configuration Reference

AgentConfidenceConfig

File: src/config/unified_config.py

The AgentConfidenceConfig section of the unified configuration controls confidence calculation parameters used by the AnalyzerAgent and ConfidenceCalculator.

from src.config import get_unified_config

cfg = get_unified_config()

cfg.agent_confidence.query_mode_base           # 0.5 — Base confidence for query mode classification
cfg.agent_confidence.query_mode_diff_factor    # 0.1 — Factor for confidence differential
cfg.agent_confidence.query_mode_score_factor   # 0.05 — Factor for raw score contribution
cfg.agent_confidence.pattern_match_confidence  # 0.75 — Confidence when a known pattern is matched
cfg.agent_confidence.traversal_confidence      # 0.65 — Confidence for graph traversal results

Configuration Fields:

Field Type Default Description
query_mode_base float 0.5 Base confidence score for query mode classification
query_mode_diff_factor float 0.1 Multiplier for the difference between semantic and structural scores
query_mode_score_factor float 0.05 Multiplier for the raw classification score
pattern_match_confidence float 0.75 Default confidence when a known pattern is matched by the PatternConfidenceStrategy
traversal_confidence float 0.65 Default confidence for results obtained via graph traversal

YAML Configuration:

agent_confidence:
  query_mode_base: 0.5
  query_mode_diff_factor: 0.1
  query_mode_score_factor: 0.05
  pattern_match_confidence: 0.75
  traversal_confidence: 0.65

ACPDiagnosticsConfig

File: src/config/unified_config.py

The ACPDiagnosticsConfig section controls which diagnostic categories are enabled, debounce timing, and threshold values for the DiagnosticsEngine.

from src.config import get_unified_config

cfg = get_unified_config()

# General settings
cfg.acp_diagnostics.enabled          # True — Master switch for diagnostics
cfg.acp_diagnostics.debounce_ms      # 2000 — Debounce interval in milliseconds

# Diagnostic categories (True = enabled, False = disabled)
cfg.acp_diagnostics.categories["taint"]       # True
cfg.acp_diagnostics.categories["complexity"]  # True
cfg.acp_diagnostics.categories["deprecated"]  # True
cfg.acp_diagnostics.categories["todo"]        # False
cfg.acp_diagnostics.categories["dead_code"]   # False

# Thresholds
cfg.acp_diagnostics.thresholds["complexity_warning"]  # 15
cfg.acp_diagnostics.thresholds["complexity_error"]    # 25
cfg.acp_diagnostics.thresholds["fan_in_warning"]      # 20

General Settings:

Field Type Default Description
enabled bool True Master switch to enable or disable all diagnostics
debounce_ms int 2000 Minimum interval between diagnostic generations for the same file, in milliseconds

Diagnostic Categories:

Category Default Description
taint True Taint flow detection (source-to-sink data flows)
complexity True Cyclomatic complexity analysis
deprecated True Deprecated API usage detection
todo False TODO/FIXME comment detection
dead_code False Unreachable or unused code detection

Thresholds:

Threshold Default Description
complexity_warning 15 Cyclomatic complexity level that triggers a warning diagnostic
complexity_error 25 Cyclomatic complexity level that triggers an error diagnostic
fan_in_warning 20 Fan-in count that triggers a warning about high coupling

YAML Configuration:

acp_diagnostics:
  enabled: true
  debounce_ms: 2000
  categories:
    taint: true
    complexity: true
    deprecated: true
    todo: false
    dead_code: false
  thresholds:
    complexity_warning: 15
    complexity_error: 25
    fan_in_warning: 20

Extending the Agent System

Adding a New Pipeline Agent

To add a new pipeline agent, create a new module in src/agents/:

from typing import Dict, List, Optional
from src.config import CPGConfig, get_global_cpg_config


class CustomAgent:
    """Custom pipeline agent for specialized analysis."""

    def __init__(self, cpg_config: Optional[CPGConfig] = None):
        self.cpg_config = cpg_config or get_global_cpg_config()

    def process(self, question: str, context: Dict) -> Dict:
        """Process question with context and return results.

        Args:
            question: The user's natural language question.
            context: Dictionary containing analysis, enrichment hints,
                     and other pipeline state.

        Returns:
            Dictionary with 'results' list and 'metadata' dict.
        """
        results = self._analyze(question, context)
        return {
            "results": results,
            "metadata": {
                "agent": "custom",
                "result_count": len(results),
            },
        }

    def _analyze(self, question: str, context: Dict) -> List[Dict]:
        """Internal analysis method.

        Implement your custom analysis logic here.
        """
        return []

Integrate into the workflow by passing the agent to MultiScenarioCopilot or calling it from a scenario handler in src/workflow/scenarios/.

Adding ACP Handlers

To add a new JSON-RPC handler to the ACP agent, add a new entry to the _handlers dict in CodeGraphACPAgent:

# In src/acp/server/agent.py, inside CodeGraphACPAgent.__init__()

self._handlers["custom/method"] = self._handle_custom

Then implement the handler method:

async def _handle_custom(self, params: Dict, user_id: Optional[str] = None) -> Dict:
    """Handle the custom/method JSON-RPC call.

    Args:
        params: JSON-RPC parameters from the client.
        user_id: Authenticated user ID (if available).

    Returns:
        Result dictionary to send back to the client.
    """
    # Validate params
    required_field = params.get("required_field")
    if not required_field:
        return {"error": "required_field is required"}

    # Process the request
    result = await self._process_custom(required_field)

    return {"status": "ok", "result": result}

The handler will be automatically dispatched when the client sends a custom/method JSON-RPC request.


Next Steps