WebSocket API Reference

This document describes the WebSocket API for real-time communication with CodeGraph.

Table of Contents

Overview

The WebSocket API provides: - Streaming chat responses – real-time AI response streaming - Job progress updates – background job status notifications - Push notifications – system alerts and notifications

Base URL: ws://localhost:8000/api/v1/ws

The WebSocket router is mounted at /api/v1/ws in src/api/main.py.

Dashboard-specific real-time updates are exposed separately at ws://localhost:8000/api/v2/dashboard/ws.


Authentication

All WebSocket connections require a JWT token passed as a query parameter.

ws://localhost:8000/api/v1/ws/chat?token=<JWT_TOKEN>

Connection Errors

Code Reason Description
4001 Invalid token Token is missing, expired, or invalid
4004 Job not found The requested job ID does not exist

Connection Lifecycle

When a client connects to any WebSocket endpoint, the server follows this lifecycle:

  1. Authentication – the server validates the JWT token from the token query parameter. If invalid, the connection is closed with code 4001.

  2. CONNECTED – on successful connection, the server automatically sends a connected message:

{
  "type": "connected",
  "payload": {
    "connection_id": "conn-abc123",
    "user_id": "user-456"
  },
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. AUTH_REQUIRED – if the server requires re-authentication (e.g., token expiry during session), it sends:
{
  "type": "auth_required",
  "payload": {},
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. AUTHENTICATED – sent after successful re-authentication:
{
  "type": "authenticated",
  "payload": {},
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. DISCONNECTED – sent before the server closes the connection:
{
  "type": "disconnected",
  "payload": {},
  "timestamp": "2026-03-07T10:35:00Z"
}

Endpoints

Chat Streaming

Endpoint: ws://localhost:8000/api/v1/ws/chat

Real-time chat with streaming responses.

Connection

const ws = new WebSocket('ws://localhost:8000/api/v1/ws/chat?token=' + token);

ws.onopen = () => {
  console.log('Connected to chat');
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  console.log('Received:', message.type, message.payload);
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = (event) => {
  console.log('Disconnected:', event.code, event.reason);
};

Send Chat Query

ws.send(JSON.stringify({
  type: 'chat.query',
  payload: {
    query: 'Find all SQL injection vulnerabilities',
    session_id: 'optional-session-id',
    scenario_id: 'optional-scenario-id',
    language: 'en'
  },
  request_id: 'unique-request-id'
}));

ChatQueryPayload fields:

Field Type Required Default Description
query string Yes The query text
session_id string No null Session identifier for context continuity
scenario_id string No null Force a specific scenario
language string No "en" Response language (en or ru)

Receive Streaming Response

Messages arrive in sequence:

  1. Scenario Selection (optional)
{
  "type": "chat.scenario",
  "payload": { "scenario_id": "security_audit" },
  "timestamp": "2026-03-07T10:30:00Z",
  "request_id": "unique-request-id"
}
  1. Response Chunks (multiple)
{
  "type": "chat.chunk",
  "payload": { "content": "Found 3 potential ", "is_final": false },
  "timestamp": "2026-03-07T10:30:01Z",
  "request_id": "unique-request-id"
}
  1. Chat Response – the full assembled answer:
{
  "type": "chat.response",
  "payload": {
    "answer": "Found 3 potential SQL injection vulnerabilities in...",
    "scenario_id": "security_audit",
    "confidence": 0.92,
    "session_id": "sess-abc123"
  },
  "timestamp": "2026-03-07T10:30:04Z",
  "request_id": "unique-request-id"
}
  1. Completion
{
  "type": "chat.done",
  "payload": {},
  "timestamp": "2026-03-07T10:30:05Z",
  "request_id": "unique-request-id"
}

Job Status

Endpoint: ws://localhost:8000/api/v1/ws/jobs/{job_id}

Subscribe to background job progress updates. If the job ID does not exist, the connection is closed with code 4004.

Connection

const jobId = 'abc123';
const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/jobs/${jobId}?token=${token}`);

Messages

Job Started

{
  "type": "job.started",
  "payload": {
    "job_id": "abc123",
    "status": "started",
    "progress": 0,
    "job_type": "import",
    "started_at": "2026-03-07T10:30:00Z",
    "created_at": "2026-03-07T10:29:55Z"
  },
  "timestamp": "2026-03-07T10:30:00Z"
}

Job Progress

{
  "type": "job.progress",
  "payload": {
    "job_id": "abc123",
    "progress": 45,
    "message": "Processing files..."
  },
  "timestamp": "2026-03-07T10:31:00Z"
}

Job Completed

{
  "type": "job.completed",
  "payload": {
    "job_id": "abc123",
    "status": "completed",
    "result": {
      "files_processed": 150,
      "vulnerabilities_found": 3
    },
    "error": null,
    "completed_at": "2026-03-07T10:35:00Z"
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

Job Failed

{
  "type": "job.failed",
  "payload": {
    "job_id": "abc123",
    "status": "failed",
    "result": null,
    "error": "Connection timeout",
    "completed_at": "2026-03-07T10:35:00Z"
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

Notifications

Endpoint: ws://localhost:8000/api/v1/ws/notifications

Receive push notifications for system events.

Connection

const ws = new WebSocket('ws://localhost:8000/api/v1/ws/notifications?token=' + token);

Messages

Notification

{
  "type": "notification",
  "payload": {
    "title": "Import Complete",
    "message": "Project 'myapp' has been imported successfully",
    "type": "success",
    "data": {
      "project_id": "abc123",
      "files_imported": 150
    }
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

The type field inside the payload indicates the notification category. Values: info, success, warning, error. The data field is optional and can contain arbitrary additional context.

Note: The NotificationPayload Pydantic model defines a level field, but the NotificationHandler.send_notification() method sends the value under the key type in the actual WebSocket message payload.


Dashboard Updates

Endpoint: ws://localhost:8000/api/v2/dashboard/ws

Receive lightweight dashboard update events and heartbeats for the CISO/CTO frontend.

Connection

const ws = new WebSocket('ws://localhost:8000/api/v2/dashboard/ws?token=' + token);

Messages

Connected

{
  "event": "connected",
  "client_id": "ws-1",
  "heartbeat_seconds": 15
}

Project Updated

{
  "event": "project_updated",
  "project_name": "payments-api",
  "trigger": "release_gate",
  "timestamp": 1773326400.0
}

Heartbeat

{
  "event": "heartbeat",
  "timestamp": 1773326415.0,
  "clients": 4
}

Clients may send a plain ping message and receive {"event":"pong"} in response.


Message Types

Client to Server

Type Description Payload
chat.query Send chat query query, session_id?, scenario_id?, language?
ping Keep connection alive None

Server to Client

Type Enum Value Description Payload
connected CONNECTED Connection established connection_id, user_id
disconnected DISCONNECTED Connection closing None
authenticated AUTHENTICATED Authentication confirmed None
auth_required AUTH_REQUIRED Re-authentication needed None
chat.scenario CHAT_SCENARIO Scenario selection scenario_id
chat.chunk CHAT_CHUNK Response chunk (ChatChunkPayload) content, is_final
chat.response CHAT_RESPONSE Full response (ChatResponsePayload) answer, scenario_id, confidence, session_id
chat.done CHAT_DONE Response complete None
chat.error CHAT_ERROR Chat error error, details?
job.started JOB_STARTED Job subscribed job_id, status, progress, job_type, started_at, created_at
job.progress JOB_PROGRESS Job progress (JobProgressPayload) job_id, progress, message?
job.completed JOB_COMPLETED Job finished (JobCompletedPayload) job_id, status, result, error, completed_at
job.failed JOB_FAILED Job failed (JobFailedPayload) job_id, status, result, error, completed_at
notification NOTIFICATION Push notification title, message, type, data?
error ERROR General error error, details?
pong PONG Ping response None
cpg.update.complete CPG_UPDATE_COMPLETE CPG database updated None

All 19 values are defined in the WSMessageType enum in src/api/websocket/models.py.


Message Format

WSMessage Model

All messages are instances of the WSMessage Pydantic model (src/api/websocket/models.py):

Field Type Required Default Description
type WSMessageType Yes Enum value (see Message Types)
payload Dict[str, Any] No {} Message-specific data
timestamp datetime No datetime.now(UTC) Always present in serialized output
request_id Optional[str] No null Correlates requests with responses

The timestamp field is always populated (defaults to current UTC time). It is not optional – every message includes a timestamp.

Serialization

WSMessage provides two methods for JSON serialization:

from src.api.websocket.models import WSMessage, WSMessageType

# Create a message
msg = WSMessage(
    type=WSMessageType.CHAT_QUERY,
    payload={"query": "Find vulnerabilities"},
    request_id="req-001"
)

# Serialize to JSON string
json_str = msg.to_json()  # -> '{"type": "chat.query", "payload": {...}, "timestamp": "...", "request_id": "req-001"}'

# Deserialize from JSON string
msg = WSMessage.from_json(json_str)  # classmethod

Keep-Alive

Send periodic ping messages to prevent connection timeout:

setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'ping', payload: {} }));
  }
}, 30000); // Every 30 seconds

The server responds with a pong message.


Error Handling

Error Message Format

{
  "type": "error",
  "payload": {
    "error": "Error description",
    "details": "Additional details"
  },
  "timestamp": "2026-03-07T10:30:00Z",
  "request_id": "original-request-id"
}

Common Errors

Error Cause Solution
Invalid message format Malformed JSON Check message structure
Query is required Empty query Provide query text
Unknown message type Unsupported type Use documented types
Job not found Invalid job ID (close code 4004) Verify job exists via REST API
Invalid token Missing or expired JWT (close code 4001) Provide a valid token

Server-Side API

This section documents the Python server-side classes for building WebSocket integrations.

WebSocketManager

WebSocketManager (src/api/websocket/manager.py) manages all active connections, user sessions, and job subscriptions. Obtain the singleton via get_ws_manager().

Connection management:

Method Signature Returns Description
connect (user_id, websocket) str (connection ID) Register a new connection
disconnect (user_id, conn_id) Remove a connection
send_to_connection (user_id, conn_id, message) bool Send to a specific connection
send_to_user (user_id, message) int Send to all connections for a user; returns count
broadcast (message, exclude_users?) int Send to all connected users; returns count

Job subscriptions:

Method Signature Returns Description
subscribe_to_job (user_id, job_id) bool Subscribe a user to job updates
unsubscribe_from_job (user_id, job_id) Remove job subscription
send_job_update (job_id, message) int Send update to all job subscribers; returns count
cleanup_job_subscriptions (job_id) Remove all subscriptions for a job

Status queries:

Method Signature Returns Description
get_connection_count (user_id?) int Connection count (all users or specific user)
get_user_count () int Number of connected users
is_user_connected (user_id) bool Check if user has active connections

Handler Classes

Three handler classes (src/api/websocket/handlers.py) provide domain-specific logic. Each is initialized with a WebSocketManager instance and has a singleton accessor.

ChatHandlerget_chat_handler()

Method Signature Description
handle_query (user_id, conn_id, message) Process a chat query and stream response

JobHandlerget_job_handler()

Method Signature Description
handle_subscribe (user_id, conn_id, job_id) Subscribe connection to job updates
handle_unsubscribe (user_id, job_id) Unsubscribe from job updates
send_job_update (job_id, status, progress, result?, error?) Push a job status update to subscribers

NotificationHandlerget_notification_handler()

Method Signature Returns Description
send_notification (user_id, title, message, notification_type?, data?) bool Send notification to a specific user
broadcast_notification (title, message, notification_type?) int Broadcast notification to all users

Helper Functions

The models.py module provides factory functions that create pre-configured WSMessage instances:

Function Signature Creates
create_chat_chunk (content, request_id?, is_final?) CHAT_CHUNK message
create_chat_response (answer, scenario_id, confidence, session_id, request_id?) CHAT_RESPONSE message
create_job_progress (job_id, progress, message?) JOB_PROGRESS message
create_job_completed (job_id, result) JOB_COMPLETED message
create_job_failed (job_id, error, details?) JOB_FAILED message
create_notification (title, message, level?, action_url?) NOTIFICATION message
create_error (error, details?, request_id?) ERROR message

Example usage:

from src.api.websocket.models import create_chat_chunk, create_job_progress

# Create a streaming chunk
chunk = create_chat_chunk("Processing your query...", request_id="req-001")

# Create a job progress update
progress = create_job_progress("job-abc", progress=45, message="Analyzing files...")

Python Client Example

import asyncio
import json
import websockets

async def chat_client(token: str, query: str):
    uri = f"ws://localhost:8000/api/v1/ws/chat?token={token}"

    async with websockets.connect(uri) as ws:
        # Wait for connected message
        connected = json.loads(await ws.recv())
        assert connected["type"] == "connected"
        print(f"Connected: {connected['payload']['connection_id']}")

        # Send query
        await ws.send(json.dumps({
            "type": "chat.query",
            "payload": {"query": query},
            "request_id": "req-001"
        }))

        # Receive streaming response
        full_response = ""
        async for message in ws:
            data = json.loads(message)

            if data["type"] == "chat.chunk":
                content = data["payload"]["content"]
                full_response += content
                print(content, end="", flush=True)

            elif data["type"] == "chat.response":
                confidence = data["payload"]["confidence"]
                scenario = data["payload"]["scenario_id"]
                print(f"\n[scenario={scenario}, confidence={confidence}]")

            elif data["type"] == "chat.done":
                print("\n--- Response complete ---")
                break

            elif data["type"] == "error":
                print(f"Error: {data['payload']['error']}")
                break

        return full_response

# Usage
asyncio.run(chat_client("your-jwt-token", "Find buffer overflows"))

JavaScript Client Example

class ChatClient {
  constructor(token) {
    this.token = token;
    this.ws = null;
    this.handlers = new Map();
    this.connectionId = null;
  }

  connect() {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(
        `ws://localhost:8000/api/v1/ws/chat?token=${this.token}`
      );

      this.ws.onerror = (error) => reject(error);

      this.ws.onmessage = (event) => {
        const message = JSON.parse(event.data);

        // Handle connection lifecycle
        if (message.type === 'connected') {
          this.connectionId = message.payload.connection_id;
          resolve(this);
          return;
        }

        const handler = this.handlers.get(message.request_id);
        if (handler) {
          handler(message);
        }
      };
    });
  }

  async query(text, onChunk) {
    const requestId = `req-${Date.now()}`;

    return new Promise((resolve, reject) => {
      let fullResponse = '';

      this.handlers.set(requestId, (message) => {
        switch (message.type) {
          case 'chat.chunk':
            fullResponse += message.payload.content;
            onChunk?.(message.payload.content);
            break;
          case 'chat.response':
            // Full response with metadata
            break;
          case 'chat.done':
            this.handlers.delete(requestId);
            resolve(fullResponse);
            break;
          case 'chat.error':
          case 'error':
            this.handlers.delete(requestId);
            reject(new Error(message.payload.error));
            break;
        }
      });

      this.ws.send(JSON.stringify({
        type: 'chat.query',
        payload: { query: text },
        request_id: requestId
      }));
    });
  }

  close() {
    this.ws?.close();
  }
}

// Usage
const client = await new ChatClient('jwt-token').connect();
const response = await client.query(
  'Find SQL injections',
  (chunk) => process.stdout.write(chunk)
);
console.log('\nFull response:', response);
client.close();

Rate Limits

The WebSocket API currently does not enforce per-user connection limits, per-connection message rate limits, or idle connection timeouts.

Parameter Value Status
Max message size 64 KB Enforced (FastAPI/Starlette default)
Max connections per user Not enforced
Messages per second Not enforced
Idle connection timeout Not enforced
Recommended ping interval 30 sec Advisory (client-side)

The 64 KB maximum message size (websocket_max_message: 65536) is the only enforced limit. Messages exceeding this size will cause the connection to close.


See Also