Workflows Reference

Documentation for CodeGraph’s workflow system – LangGraph-based orchestration of 21 scenario workflows, composite pipelines, and the exec CI/CD tool.

Table of Contents

Workflow Architecture

CodeGraph uses LangGraph for workflow orchestration. All queries flow through a single entry point – MultiScenarioCopilot – which classifies intent, runs pre-retrieval, routes to the appropriate scenario workflow, and returns structured results.

graph TD
    START([User Query]) --> classify_intent[classify_intent]
    classify_intent --> pre_retrieval[pre_retrieval]
    pre_retrieval --> route_by_intent[route_by_intent]
    route_by_intent --> S01[onboarding_workflow]
    route_by_intent --> S02[security_workflow]
    route_by_intent --> S03[documentation_workflow]
    route_by_intent --> S_N[... 18 more scenarios]
    route_by_intent --> S21[interface_docs_sync_workflow]
    S01 --> END_NODE([END])
    S02 --> END_NODE
    S03 --> END_NODE
    S_N --> END_NODE
    S21 --> END_NODE

The graph chain is: classify_intent -> pre_retrieval -> route_by_intent -> scenario node -> END.

MultiScenarioCopilot

The main entry point for all workflow execution.

Location: src/workflow/orchestration/copilot.py

Re-exported from src/workflow/ for convenience.

from src.workflow import MultiScenarioCopilot

copilot = MultiScenarioCopilot()

# Auto-detect scenario from query
result = copilot.run("Find SQL injection vulnerabilities")

# Force a specific scenario
result = copilot.run(
    "Analyze this module",
    context={"scenario_id": "scenario_2"}
)

# Set language via context
result = copilot.run(
    "Find memory leaks",
    context={"language": "ru"}
)

Constructor and method signatures:

class MultiScenarioCopilot:
    def __init__(self):
        self.graph = build_multi_scenario_graph()

    def run(self, query: str, context: Optional[Dict] = None) -> Dict[str, Any]:
        ...

The graph is built internally via build_multi_scenario_graph() from src/workflow/orchestration/graph_builder.py.

Result structure:

{
    "query": "Find SQL injection vulnerabilities",
    "intent": "security_audit",
    "scenario_id": "scenario_2",
    "confidence": 0.92,
    "answer": "Found 3 potential SQL injection...",
    "evidence": ["Function exec_simple_query at line 142..."],
    "metadata": {...}
}

Workflow State

MultiScenarioState

All workflows share MultiScenarioState, a TypedDict with 21 fields that flows through the LangGraph nodes.

Location: src/workflow/state.py

class MultiScenarioState(TypedDict):
    # Input
    query: str
    context: Optional[Dict[str, Any]]
    language: Optional[str]               # "en" or "ru"

    # Intent Classification
    intent: Optional[str]                 # e.g., "security_audit"
    scenario_id: Optional[str]            # e.g., "scenario_2"
    confidence: Optional[float]           # 0.0-1.0
    classification_method: Optional[str]  # "keyword" or "llm"

    # CPG Data
    cpg_results: Optional[List[Dict]]
    subsystems: Optional[List[str]]
    methods: Optional[List[Dict]]
    call_graph: Optional[Any]

    # Final Output
    answer: Optional[str]
    evidence: Optional[List[str]]
    metadata: Optional[Dict[str, Any]]
    retrieved_functions: Optional[List[str]]

    # Error Handling
    error: Optional[str]
    retry_count: int

    # Workflow Configuration
    enrichment_config: Optional[Dict[str, Any]]
    vector_store: Optional[Any]

    # Multi-tenant project scoping
    db_path: Optional[str]
    collection_prefix: Optional[str]

    # Pre-retrieval results (Phase E)
    pre_retrieval_results: Optional[List[Dict[str, Any]]]

Create initial state with the helper function. Note that language is NOT a parameter – pass it via context:

from src.workflow.state import create_initial_state

# Signature: create_initial_state(query, context=None)
state = create_initial_state(
    query="Find memory leaks",
    context={"subsystem": "executor", "language": "en"}
)

Specialized States

Some scenarios extend the base state with domain-specific fields:

State Class Additional Fields
SecurityWorkflowState vulnerabilities, taint_paths, security_findings, risk_score
PerformanceWorkflowState hotspots, complexity_metrics, bottlenecks, optimization_suggestions
ArchitectureWorkflowState dependencies, layer_violations, circular_deps, subsystem_info

All specialized states also include the base fields: query, context, intent, answer, evidence, error.

Workflow Nodes

Intent Classification

The first node in the graph classifies the user query into a scenario using bilingual (EN/RU) keyword matching with optional LLM fallback.

Location: src/workflow/orchestration/intent_classifier.py

from src.workflow.orchestration.intent_classifier import classify_intent_node

# Called internally by the graph
state = classify_intent_node(state)
# Populates: state['intent'], state['scenario_id'], state['confidence'],
#            state['classification_method']

Classification methods: - "keyword" – fast bilingual keyword matching (default) - "llm" – LLM-based fallback when keyword confidence is low

Pre-Retrieval

After intent classification and before routing, the pre-retrieval node runs HybridRetriever to gather initial context. This is Phase E of the retrieval pipeline.

Location: src/workflow/orchestration/pre_retrieval.py

The node maps each intent to a query_type that controls retrieval weighting:

Query Type Intent Keys
semantic onboarding, documentation, feature_development, debugging, test_coverage
structural architecture_violations, cross_repo_impact, dependencies, mass_refactoring, tech_debt, refactoring, performance
security security_audit, security_incident, entry_points, compliance
default code_review, file_editing, code_optimization, standards_check

Results are stored in state["pre_retrieval_results"].

Configuration: config.yaml -> workflows.pre_retrieval.enable (enabled by default).

Scenario Routing

After pre-retrieval, the router dispatches to the matched scenario workflow function.

Location: src/workflow/orchestration/router.py

from src.workflow.orchestration.router import route_by_intent

# Returns the scenario node name
next_node = route_by_intent(state)
# e.g., "security_workflow", "onboarding_workflow"

Scenario Execution

Each scenario workflow is a LangGraph subgraph that:

  1. Queries the CPG database via CPGQueryService
  2. Processes results through scenario-specific handlers inheriting from BaseHandler
  3. Formats the answer using localized formatters
  4. Returns results back to the main graph state

Scenario Workflows

Structure

Each scenario follows a standard directory layout under src/workflow/scenarios/:

src/workflow/scenarios/
├── _base/                              # Base handler class
│   └── handler.py                      # BaseHandler, HandlerResult
├── _intent/                            # Intent classification
├── onboarding/                         # S01: Onboarding
│   └── handlers/
├── security/                           # S02: Security
│   ├── handlers/
│   └── formatters/
├── documentation_handlers/             # S03: Documentation
├── feature_dev_handlers/               # S04: Feature development
├── refactoring_handlers/               # S05: Refactoring
├── performance_handlers/               # S06: Performance
├── coverage_handlers/                  # S07: Test coverage
├── compliance_handlers/                # S08: Compliance
├── code_review_handlers/               # S09: Code review
├── cross_repo_handlers/                # S10: Cross-repo
├── architecture_handlers/              # S11: Architecture
│   ├── handlers/
│   └── formatters/
├── tech_debt_handlers/                 # S12: Tech debt
├── debugging_handlers/                 # S15: Debugging
├── code_optimization.py                # S18: Code optimization
├── code_optimization_composite.py      # S18 composite variant
├── file_editing.py                     # S17: File editing
├── standards_check.py                  # S19: Standards check
├── standards_check_composite.py        # S19 composite variant
├── dependencies_analysis.py            # S20: Dependencies
├── interface_docs_sync_composite.py    # S21 + composite: Docs sync
├── audit_composite.py                  # Composite: Audit (AuditRunner)
└── story_validation_composite.py       # Composite: Story validation (StoryValidationRunner)

Available Scenarios

ID Name Entry Point Purpose
01 onboarding onboarding_workflow Codebase onboarding and navigation
02 security security_workflow Vulnerability detection
03 documentation documentation_workflow Documentation generation
04 feature_dev feature_dev_workflow Feature development
05 refactoring refactoring_workflow Refactoring assistance
06 performance performance_workflow Performance and complexity
07 test_coverage test_coverage_workflow Test coverage analysis
08 compliance compliance_workflow Compliance checking
09 code_review code_review_workflow Code review automation
10 cross_repo cross_repo_workflow Cross-repo impact analysis
11 architecture architecture_workflow Architectural analysis
12 tech_debt tech_debt_workflow Tech debt quantification
13 mass_refactoring mass_refactoring_workflow Enterprise-scale refactoring
14 security_incident security_incident_workflow Incident response
15 debugging debugging_workflow Debugging support
16 entry_points entry_points_workflow Entry point analysis
17 file_editing file_editing_workflow AST-based file editing
18 code_optimization optimization_workflow Code optimization
19 standards_check standards_check_workflow Standards-guided optimization
20 dependencies dependencies_workflow Dependency analysis
21 interface_docs_sync interface_docs_sync_workflow Interface documentation sync

Composites (no numeric ID):

ID Name Entry Point Purpose
audit AuditRunner 12-dimension quality audit (9 sub-scenarios parallel, 600s)
interface_docs_sync InterfaceDocsSyncRunner 5-phase pipeline (7 interfaces, 120s)
story_validation StoryValidationRunner User story validation (5 interfaces)

Handler Base Class

All scenario handlers inherit from BaseHandler.

Location: src/workflow/scenarios/_base/handler.py

@dataclass
class HandlerResult(Generic[T]):
    data: Optional[T] = None
    cpg_results: List[Dict[str, Any]] = field(default_factory=list)
    retrieved_functions: List[str] = field(default_factory=list)
    answer: str = ""
    evidence: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
    should_return: bool = True
    llm_context: Dict[str, Any] = field(default_factory=dict)

BaseHandler constructor and key attributes:

class BaseHandler:
    def __init__(self, cpg: Any, state: MultiScenarioState):
        self.cpg = cpg          # CPGQueryService instance
        self.state = state      # MultiScenarioState dict
        self.query = state["query"]
        self.context = state.get("context", {})
        self.language = state.get("language", "en")
        self.cfg = get_unified_config()

Public methods:

  • can_handle(query_info: Dict) -> bool – returns True by default, override for custom routing logic
  • handle(query_info: Dict) -> HandlerResult[T] – abstract, implement scenario-specific logic
  • apply_result(result: HandlerResult) -> MultiScenarioState – applies HandlerResult fields to state
  • log_info(message: str) / log_debug(message: str) / log_warning(message: str) – logging with class context

Usage example:

from src.workflow.scenarios._base.handler import BaseHandler, HandlerResult

class MyHandler(BaseHandler):
    async def handle(self, query_info: Dict) -> HandlerResult:
        results = self.cpg.get_methods_by_subsystem("executor")
        return HandlerResult(
            answer="Found methods...",
            evidence=["method_a at line 42"],
            metadata={"handler": "my_handler"}
        )

Warning: Do NOT use AnalysisHandler from src/workflow/handlers/analysis.py as a base class – its __init__ signature is incompatible with the scenario registry (self.cpg and self.state will not be set).

Composite Workflows

Composite workflows orchestrate multiple sub-scenarios with conflict resolution and timeout management.

S18 code_optimization (composite variant):

  • Sub-scenarios: 02 (security), 05 (refactoring), 06 (performance), 11 (architecture), 12 (tech_debt)
  • Mode: parallel
  • Timeout: 60s
  • Triggered by composite_mode=True via optimization_composite_workflow

S19 standards_check (composite variant):

  • Sub-scenarios: 08 (compliance), 17 (file_editing), 18 (code_optimization)
  • Mode: sequential
  • Timeout: 45s
  • Triggered by composite_mode=True via standards_check_composite_workflow

Audit (AuditRunner):

  • 9 sub-scenarios run in parallel
  • 12 code quality dimensions: security, complexity, duplication, dependencies, naming, error handling, testing, documentation, performance, portability, style, architecture
  • Timeout: 600s
  • FP-reduction pipeline (V25-V32)
python -m src.cli audit --db PATH [--language ru] [--format json]

Interface Docs Sync (InterfaceDocsSyncRunner):

  • 5-phase pipeline: Discovery -> Doc Parsing -> Generation -> Drift Detection -> Report
  • Scans 6 interfaces: REST API, CLI, MCP, ACP, gRPC, WebSocket
  • Timeout: 120s
  • DriftType: UNDOCUMENTED, STALE, OUTDATED, COVERED
python -m src.cli docs-sync --db PATH [--check] [--format json]

Also available as MCP tool codegraph_docs_sync and REST endpoint POST /api/v1/documentation/sync.

Story Validation (StoryValidationRunner):

  • Validates Done user stories against 5 interfaces
  • Evidence types: dedicated (threshold 0.8), passthrough (threshold 0.5), scenario_map (0.7)
  • Tracks all matched function names via all_matched_names
  • Supports Go CPG via --go-db parameter
python -m src.cli.import_commands dogfood validate-stories

Conflict resolution across all composites uses priority mode: security findings get a 1.5x boost, compliance findings get 1.3x. Configuration: config.yaml -> composition.

Exec Pipeline

Separate CI/CD tool – not a composite workflow. Designed for non-interactive PR security review in CI pipelines.

Location: src/cli/exec_command.py

python -m src.cli exec --prompt "Review security" --base-ref origin/main \
    --sarif-file out.sarif --comment-file comment.md --sandbox read-only

The exec pipeline:

  1. Gets changed files from --base-ref
  2. Scans changed methods via CPG
  3. Computes “New vs Fixed” delta via fingerprinting
  4. Generates SARIF 2.1.0 output via SARIFExporter (src/security/sarif_exporter.py)
  5. Generates PR comment markdown

Configuration: config.yaml -> reporting.

Error Handling

Retry Logic

Workflows support automatic retry with query refinement:

# Built into the graph -- configurable via state['retry_count']
# Default: up to 2 retries with adaptive query refinement

The retry_count field in MultiScenarioState tracks the current retry attempt. When a scenario handler raises an exception, the graph increments retry_count and re-invokes the handler with a refined query.

Fallback Strategies

When LLM-based generation fails, workflows fall back through a chain:

1. LLM-generated SQL query
2. Template-matched query from query examples
3. Direct CPG method call

If all fallbacks fail, the error field in MultiScenarioState is populated with a descriptive message and answer contains a user-friendly explanation.

Custom Workflows

Creating a Custom Workflow

from langgraph.graph import StateGraph
from src.workflow.state import MultiScenarioState

def create_custom_workflow():
    workflow = StateGraph(MultiScenarioState)

    workflow.add_node("analyze", my_analyze_node)
    workflow.add_node("process", my_process_node)
    workflow.add_node("interpret", my_interpret_node)

    workflow.add_edge("analyze", "process")
    workflow.add_edge("process", "interpret")

    workflow.set_entry_point("analyze")
    workflow.set_finish_point("interpret")

    return workflow.compile()

result = create_custom_workflow().invoke({"query": "..."})

Conditional Routing

def my_router(state: MultiScenarioState) -> str:
    if state["intent"] == "security_audit":
        return "security_node"
    elif state["intent"] == "performance":
        return "performance_node"
    else:
        return "general_node"

workflow.add_conditional_edges(
    "classify",
    my_router,
    {
        "security_node": "security",
        "performance_node": "performance",
        "general_node": "general",
    }
)

Streaming

MultiScenarioCopilot.run() is synchronous. Streaming is implemented at the API/WebSocket layer via thread pool executor.

  • ChatService.process_query_stream() in src/api/services/chat_service.py – AsyncGenerator producing SSE events
  • POST /api/v1/chat/stream in src/api/routers/chat.py – SSE endpoint
  • WebSocket support in src/api/websocket/handlers.py
from src.api.services.chat_service import ChatService

# Streaming is handled at the API layer
# MultiScenarioCopilot.run() runs in a thread pool executor
# Results are streamed as SSE events to the client