Documentation for CodeGraph’s workflow system – LangGraph-based orchestration of 21 scenario workflows, composite pipelines, and the exec CI/CD tool.
Table of Contents¶
- Workflow Architecture
- MultiScenarioCopilot
- Workflow State
- MultiScenarioState
- Specialized States
- Workflow Nodes
- Intent Classification
- Pre-Retrieval
- Scenario Routing
- Scenario Execution
- Scenario Workflows
- Structure
- Available Scenarios
- Handler Base Class
- Composite Workflows
- Exec Pipeline
- Error Handling
- Retry Logic
- Fallback Strategies
- Custom Workflows
- Creating a Custom Workflow
- Conditional Routing
- Streaming
- Related Documentation
Workflow Architecture¶
CodeGraph uses LangGraph for workflow orchestration. All queries flow through a single entry point – MultiScenarioCopilot – which classifies intent, runs pre-retrieval, routes to the appropriate scenario workflow, and returns structured results.
graph TD
START([User Query]) --> classify_intent[classify_intent]
classify_intent --> pre_retrieval[pre_retrieval]
pre_retrieval --> route_by_intent[route_by_intent]
route_by_intent --> S01[onboarding_workflow]
route_by_intent --> S02[security_workflow]
route_by_intent --> S03[documentation_workflow]
route_by_intent --> S_N[... 18 more scenarios]
route_by_intent --> S21[interface_docs_sync_workflow]
S01 --> END_NODE([END])
S02 --> END_NODE
S03 --> END_NODE
S_N --> END_NODE
S21 --> END_NODE
The graph chain is: classify_intent -> pre_retrieval -> route_by_intent -> scenario node -> END.
MultiScenarioCopilot¶
The main entry point for all workflow execution.
Location: src/workflow/orchestration/copilot.py
Re-exported from src/workflow/ for convenience.
from src.workflow import MultiScenarioCopilot
copilot = MultiScenarioCopilot()
# Auto-detect scenario from query
result = copilot.run("Find SQL injection vulnerabilities")
# Force a specific scenario
result = copilot.run(
"Analyze this module",
context={"scenario_id": "scenario_2"}
)
# Set language via context
result = copilot.run(
"Find memory leaks",
context={"language": "ru"}
)
Constructor and method signatures:
class MultiScenarioCopilot:
def __init__(self):
self.graph = build_multi_scenario_graph()
def run(self, query: str, context: Optional[Dict] = None) -> Dict[str, Any]:
...
The graph is built internally via build_multi_scenario_graph() from src/workflow/orchestration/graph_builder.py.
Result structure:
{
"query": "Find SQL injection vulnerabilities",
"intent": "security_audit",
"scenario_id": "scenario_2",
"confidence": 0.92,
"answer": "Found 3 potential SQL injection...",
"evidence": ["Function exec_simple_query at line 142..."],
"metadata": {...}
}
Workflow State¶
MultiScenarioState¶
All workflows share MultiScenarioState, a TypedDict with 21 fields that flows through the LangGraph nodes.
Location: src/workflow/state.py
class MultiScenarioState(TypedDict):
# Input
query: str
context: Optional[Dict[str, Any]]
language: Optional[str] # "en" or "ru"
# Intent Classification
intent: Optional[str] # e.g., "security_audit"
scenario_id: Optional[str] # e.g., "scenario_2"
confidence: Optional[float] # 0.0-1.0
classification_method: Optional[str] # "keyword" or "llm"
# CPG Data
cpg_results: Optional[List[Dict]]
subsystems: Optional[List[str]]
methods: Optional[List[Dict]]
call_graph: Optional[Any]
# Final Output
answer: Optional[str]
evidence: Optional[List[str]]
metadata: Optional[Dict[str, Any]]
retrieved_functions: Optional[List[str]]
# Error Handling
error: Optional[str]
retry_count: int
# Workflow Configuration
enrichment_config: Optional[Dict[str, Any]]
vector_store: Optional[Any]
# Multi-tenant project scoping
db_path: Optional[str]
collection_prefix: Optional[str]
# Pre-retrieval results (Phase E)
pre_retrieval_results: Optional[List[Dict[str, Any]]]
Create initial state with the helper function. Note that language is NOT a parameter – pass it via context:
from src.workflow.state import create_initial_state
# Signature: create_initial_state(query, context=None)
state = create_initial_state(
query="Find memory leaks",
context={"subsystem": "executor", "language": "en"}
)
Specialized States¶
Some scenarios extend the base state with domain-specific fields:
| State Class | Additional Fields |
|---|---|
SecurityWorkflowState |
vulnerabilities, taint_paths, security_findings, risk_score |
PerformanceWorkflowState |
hotspots, complexity_metrics, bottlenecks, optimization_suggestions |
ArchitectureWorkflowState |
dependencies, layer_violations, circular_deps, subsystem_info |
All specialized states also include the base fields: query, context, intent, answer, evidence, error.
Workflow Nodes¶
Intent Classification¶
The first node in the graph classifies the user query into a scenario using bilingual (EN/RU) keyword matching with optional LLM fallback.
Location: src/workflow/orchestration/intent_classifier.py
from src.workflow.orchestration.intent_classifier import classify_intent_node
# Called internally by the graph
state = classify_intent_node(state)
# Populates: state['intent'], state['scenario_id'], state['confidence'],
# state['classification_method']
Classification methods:
- "keyword" – fast bilingual keyword matching (default)
- "llm" – LLM-based fallback when keyword confidence is low
Pre-Retrieval¶
After intent classification and before routing, the pre-retrieval node runs HybridRetriever to gather initial context. This is Phase E of the retrieval pipeline.
Location: src/workflow/orchestration/pre_retrieval.py
The node maps each intent to a query_type that controls retrieval weighting:
| Query Type | Intent Keys |
|---|---|
| semantic | onboarding, documentation, feature_development, debugging, test_coverage |
| structural | architecture_violations, cross_repo_impact, dependencies, mass_refactoring, tech_debt, refactoring, performance |
| security | security_audit, security_incident, entry_points, compliance |
| default | code_review, file_editing, code_optimization, standards_check |
Results are stored in state["pre_retrieval_results"].
Configuration: config.yaml -> workflows.pre_retrieval.enable (enabled by default).
Scenario Routing¶
After pre-retrieval, the router dispatches to the matched scenario workflow function.
Location: src/workflow/orchestration/router.py
from src.workflow.orchestration.router import route_by_intent
# Returns the scenario node name
next_node = route_by_intent(state)
# e.g., "security_workflow", "onboarding_workflow"
Scenario Execution¶
Each scenario workflow is a LangGraph subgraph that:
- Queries the CPG database via
CPGQueryService - Processes results through scenario-specific handlers inheriting from
BaseHandler - Formats the answer using localized formatters
- Returns results back to the main graph state
Scenario Workflows¶
Structure¶
Each scenario follows a standard directory layout under src/workflow/scenarios/:
src/workflow/scenarios/
├── _base/ # Base handler class
│ └── handler.py # BaseHandler, HandlerResult
├── _intent/ # Intent classification
├── onboarding/ # S01: Onboarding
│ └── handlers/
├── security/ # S02: Security
│ ├── handlers/
│ └── formatters/
├── documentation_handlers/ # S03: Documentation
├── feature_dev_handlers/ # S04: Feature development
├── refactoring_handlers/ # S05: Refactoring
├── performance_handlers/ # S06: Performance
├── coverage_handlers/ # S07: Test coverage
├── compliance_handlers/ # S08: Compliance
├── code_review_handlers/ # S09: Code review
├── cross_repo_handlers/ # S10: Cross-repo
├── architecture_handlers/ # S11: Architecture
│ ├── handlers/
│ └── formatters/
├── tech_debt_handlers/ # S12: Tech debt
├── debugging_handlers/ # S15: Debugging
├── code_optimization.py # S18: Code optimization
├── code_optimization_composite.py # S18 composite variant
├── file_editing.py # S17: File editing
├── standards_check.py # S19: Standards check
├── standards_check_composite.py # S19 composite variant
├── dependencies_analysis.py # S20: Dependencies
├── interface_docs_sync_composite.py # S21 + composite: Docs sync
├── audit_composite.py # Composite: Audit (AuditRunner)
└── story_validation_composite.py # Composite: Story validation (StoryValidationRunner)
Available Scenarios¶
| ID | Name | Entry Point | Purpose |
|---|---|---|---|
| 01 | onboarding | onboarding_workflow |
Codebase onboarding and navigation |
| 02 | security | security_workflow |
Vulnerability detection |
| 03 | documentation | documentation_workflow |
Documentation generation |
| 04 | feature_dev | feature_dev_workflow |
Feature development |
| 05 | refactoring | refactoring_workflow |
Refactoring assistance |
| 06 | performance | performance_workflow |
Performance and complexity |
| 07 | test_coverage | test_coverage_workflow |
Test coverage analysis |
| 08 | compliance | compliance_workflow |
Compliance checking |
| 09 | code_review | code_review_workflow |
Code review automation |
| 10 | cross_repo | cross_repo_workflow |
Cross-repo impact analysis |
| 11 | architecture | architecture_workflow |
Architectural analysis |
| 12 | tech_debt | tech_debt_workflow |
Tech debt quantification |
| 13 | mass_refactoring | mass_refactoring_workflow |
Enterprise-scale refactoring |
| 14 | security_incident | security_incident_workflow |
Incident response |
| 15 | debugging | debugging_workflow |
Debugging support |
| 16 | entry_points | entry_points_workflow |
Entry point analysis |
| 17 | file_editing | file_editing_workflow |
AST-based file editing |
| 18 | code_optimization | optimization_workflow |
Code optimization |
| 19 | standards_check | standards_check_workflow |
Standards-guided optimization |
| 20 | dependencies | dependencies_workflow |
Dependency analysis |
| 21 | interface_docs_sync | interface_docs_sync_workflow |
Interface documentation sync |
Composites (no numeric ID):
| ID | Name | Entry Point | Purpose |
|---|---|---|---|
| – | audit | AuditRunner |
12-dimension quality audit (9 sub-scenarios parallel, 600s) |
| – | interface_docs_sync | InterfaceDocsSyncRunner |
5-phase pipeline (7 interfaces, 120s) |
| – | story_validation | StoryValidationRunner |
User story validation (5 interfaces) |
Handler Base Class¶
All scenario handlers inherit from BaseHandler.
Location: src/workflow/scenarios/_base/handler.py
@dataclass
class HandlerResult(Generic[T]):
data: Optional[T] = None
cpg_results: List[Dict[str, Any]] = field(default_factory=list)
retrieved_functions: List[str] = field(default_factory=list)
answer: str = ""
evidence: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
should_return: bool = True
llm_context: Dict[str, Any] = field(default_factory=dict)
BaseHandler constructor and key attributes:
class BaseHandler:
def __init__(self, cpg: Any, state: MultiScenarioState):
self.cpg = cpg # CPGQueryService instance
self.state = state # MultiScenarioState dict
self.query = state["query"]
self.context = state.get("context", {})
self.language = state.get("language", "en")
self.cfg = get_unified_config()
Public methods:
can_handle(query_info: Dict) -> bool– returns True by default, override for custom routing logichandle(query_info: Dict) -> HandlerResult[T]– abstract, implement scenario-specific logicapply_result(result: HandlerResult) -> MultiScenarioState– applies HandlerResult fields to statelog_info(message: str)/log_debug(message: str)/log_warning(message: str)– logging with class context
Usage example:
from src.workflow.scenarios._base.handler import BaseHandler, HandlerResult
class MyHandler(BaseHandler):
async def handle(self, query_info: Dict) -> HandlerResult:
results = self.cpg.get_methods_by_subsystem("executor")
return HandlerResult(
answer="Found methods...",
evidence=["method_a at line 42"],
metadata={"handler": "my_handler"}
)
Warning: Do NOT use
AnalysisHandlerfromsrc/workflow/handlers/analysis.pyas a base class – its__init__signature is incompatible with the scenario registry (self.cpgandself.statewill not be set).
Composite Workflows¶
Composite workflows orchestrate multiple sub-scenarios with conflict resolution and timeout management.
S18 code_optimization (composite variant):
- Sub-scenarios: 02 (security), 05 (refactoring), 06 (performance), 11 (architecture), 12 (tech_debt)
- Mode: parallel
- Timeout: 60s
- Triggered by
composite_mode=Trueviaoptimization_composite_workflow
S19 standards_check (composite variant):
- Sub-scenarios: 08 (compliance), 17 (file_editing), 18 (code_optimization)
- Mode: sequential
- Timeout: 45s
- Triggered by
composite_mode=Trueviastandards_check_composite_workflow
Audit (AuditRunner):
- 9 sub-scenarios run in parallel
- 12 code quality dimensions: security, complexity, duplication, dependencies, naming, error handling, testing, documentation, performance, portability, style, architecture
- Timeout: 600s
- FP-reduction pipeline (V25-V32)
python -m src.cli audit --db PATH [--language ru] [--format json]
Interface Docs Sync (InterfaceDocsSyncRunner):
- 5-phase pipeline: Discovery -> Doc Parsing -> Generation -> Drift Detection -> Report
- Scans 6 interfaces: REST API, CLI, MCP, ACP, gRPC, WebSocket
- Timeout: 120s
- DriftType: UNDOCUMENTED, STALE, OUTDATED, COVERED
python -m src.cli docs-sync --db PATH [--check] [--format json]
Also available as MCP tool codegraph_docs_sync and REST endpoint POST /api/v1/documentation/sync.
Story Validation (StoryValidationRunner):
- Validates Done user stories against 5 interfaces
- Evidence types: dedicated (threshold 0.8), passthrough (threshold 0.5), scenario_map (0.7)
- Tracks all matched function names via
all_matched_names - Supports Go CPG via
--go-dbparameter
python -m src.cli.import_commands dogfood validate-stories
Conflict resolution across all composites uses priority mode: security findings get a 1.5x boost, compliance findings get 1.3x. Configuration: config.yaml -> composition.
Exec Pipeline¶
Separate CI/CD tool – not a composite workflow. Designed for non-interactive PR security review in CI pipelines.
Location: src/cli/exec_command.py
python -m src.cli exec --prompt "Review security" --base-ref origin/main \
--sarif-file out.sarif --comment-file comment.md --sandbox read-only
The exec pipeline:
- Gets changed files from
--base-ref - Scans changed methods via CPG
- Computes “New vs Fixed” delta via fingerprinting
- Generates SARIF 2.1.0 output via
SARIFExporter(src/security/sarif_exporter.py) - Generates PR comment markdown
Configuration: config.yaml -> reporting.
Error Handling¶
Retry Logic¶
Workflows support automatic retry with query refinement:
# Built into the graph -- configurable via state['retry_count']
# Default: up to 2 retries with adaptive query refinement
The retry_count field in MultiScenarioState tracks the current retry attempt. When a scenario handler raises an exception, the graph increments retry_count and re-invokes the handler with a refined query.
Fallback Strategies¶
When LLM-based generation fails, workflows fall back through a chain:
1. LLM-generated SQL query
2. Template-matched query from query examples
3. Direct CPG method call
If all fallbacks fail, the error field in MultiScenarioState is populated with a descriptive message and answer contains a user-friendly explanation.
Custom Workflows¶
Creating a Custom Workflow¶
from langgraph.graph import StateGraph
from src.workflow.state import MultiScenarioState
def create_custom_workflow():
workflow = StateGraph(MultiScenarioState)
workflow.add_node("analyze", my_analyze_node)
workflow.add_node("process", my_process_node)
workflow.add_node("interpret", my_interpret_node)
workflow.add_edge("analyze", "process")
workflow.add_edge("process", "interpret")
workflow.set_entry_point("analyze")
workflow.set_finish_point("interpret")
return workflow.compile()
result = create_custom_workflow().invoke({"query": "..."})
Conditional Routing¶
def my_router(state: MultiScenarioState) -> str:
if state["intent"] == "security_audit":
return "security_node"
elif state["intent"] == "performance":
return "performance_node"
else:
return "general_node"
workflow.add_conditional_edges(
"classify",
my_router,
{
"security_node": "security",
"performance_node": "performance",
"general_node": "general",
}
)
Streaming¶
MultiScenarioCopilot.run() is synchronous. Streaming is implemented at the API/WebSocket layer via thread pool executor.
ChatService.process_query_stream()insrc/api/services/chat_service.py– AsyncGenerator producing SSE eventsPOST /api/v1/chat/streaminsrc/api/routers/chat.py– SSE endpoint- WebSocket support in
src/api/websocket/handlers.py
from src.api.services.chat_service import ChatService
# Streaming is handled at the API layer
# MultiScenarioCopilot.run() runs in a thread pool executor
# Results are streamed as SSE events to the client
Related Documentation¶
- API Reference – REST API endpoints
- Agents Reference – Agent system details
- Analysis Modules – CPG analysis classes
- Scenarios Guide – Scenario usage examples