This document describes the WebSocket API for real-time communication with CodeGraph.
Table of Contents¶
- Overview
- Authentication
- Connection Errors
- Connection Lifecycle
- Endpoints
- Chat Streaming
- Job Status
- Notifications
- Dashboard Updates
- Message Types
- Client to Server
- Server to Client
- Message Format
- WSMessage Model
- Serialization
- Keep-Alive
- Error Handling
- Error Message Format
- Common Errors
- Server-Side API
- WebSocketManager
- Handler Classes
- Helper Functions
- Python Client Example
- JavaScript Client Example
- Rate Limits
- See Also
Overview¶
The WebSocket API provides: - Streaming chat responses – real-time AI response streaming - Job progress updates – background job status notifications - Push notifications – system alerts and notifications
Base URL: ws://localhost:8000/api/v1/ws
The WebSocket router is mounted at /api/v1/ws in src/api/main.py.
Dashboard-specific real-time updates are exposed separately at ws://localhost:8000/api/v2/dashboard/ws.
Authentication¶
All WebSocket connections require a JWT token passed as a query parameter.
ws://localhost:8000/api/v1/ws/chat?token=<JWT_TOKEN>
Connection Errors¶
| Code | Reason | Description |
|---|---|---|
| 4001 | Invalid token | Token is missing, expired, or invalid |
| 4004 | Job not found | The requested job ID does not exist |
Connection Lifecycle¶
When a client connects to any WebSocket endpoint, the server follows this lifecycle:
-
Authentication – the server validates the JWT token from the
tokenquery parameter. If invalid, the connection is closed with code4001. -
CONNECTED – on successful connection, the server automatically sends a
connectedmessage:
{
"type": "connected",
"payload": {
"connection_id": "conn-abc123",
"user_id": "user-456"
},
"timestamp": "2026-03-07T10:30:00Z"
}
- AUTH_REQUIRED – if the server requires re-authentication (e.g., token expiry during session), it sends:
{
"type": "auth_required",
"payload": {},
"timestamp": "2026-03-07T10:30:00Z"
}
- AUTHENTICATED – sent after successful re-authentication:
{
"type": "authenticated",
"payload": {},
"timestamp": "2026-03-07T10:30:00Z"
}
- DISCONNECTED – sent before the server closes the connection:
{
"type": "disconnected",
"payload": {},
"timestamp": "2026-03-07T10:35:00Z"
}
Endpoints¶
Chat Streaming¶
Endpoint: ws://localhost:8000/api/v1/ws/chat
Real-time chat with streaming responses.
Connection¶
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/chat?token=' + token);
ws.onopen = () => {
console.log('Connected to chat');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Received:', message.type, message.payload);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = (event) => {
console.log('Disconnected:', event.code, event.reason);
};
Send Chat Query¶
ws.send(JSON.stringify({
type: 'chat.query',
payload: {
query: 'Find all SQL injection vulnerabilities',
session_id: 'optional-session-id',
scenario_id: 'optional-scenario-id',
language: 'en'
},
request_id: 'unique-request-id'
}));
ChatQueryPayload fields:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
query |
string | Yes | – | The query text |
session_id |
string | No | null | Session identifier for context continuity |
scenario_id |
string | No | null | Force a specific scenario |
language |
string | No | "en" |
Response language (en or ru) |
Receive Streaming Response¶
Messages arrive in sequence:
- Scenario Selection (optional)
{
"type": "chat.scenario",
"payload": { "scenario_id": "security_audit" },
"timestamp": "2026-03-07T10:30:00Z",
"request_id": "unique-request-id"
}
- Response Chunks (multiple)
{
"type": "chat.chunk",
"payload": { "content": "Found 3 potential ", "is_final": false },
"timestamp": "2026-03-07T10:30:01Z",
"request_id": "unique-request-id"
}
- Chat Response – the full assembled answer:
{
"type": "chat.response",
"payload": {
"answer": "Found 3 potential SQL injection vulnerabilities in...",
"scenario_id": "security_audit",
"confidence": 0.92,
"session_id": "sess-abc123"
},
"timestamp": "2026-03-07T10:30:04Z",
"request_id": "unique-request-id"
}
- Completion
{
"type": "chat.done",
"payload": {},
"timestamp": "2026-03-07T10:30:05Z",
"request_id": "unique-request-id"
}
Job Status¶
Endpoint: ws://localhost:8000/api/v1/ws/jobs/{job_id}
Subscribe to background job progress updates. If the job ID does not exist, the connection is closed with code 4004.
Connection¶
const jobId = 'abc123';
const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/jobs/${jobId}?token=${token}`);
Messages¶
Job Started
{
"type": "job.started",
"payload": {
"job_id": "abc123",
"status": "started",
"progress": 0,
"job_type": "import",
"started_at": "2026-03-07T10:30:00Z",
"created_at": "2026-03-07T10:29:55Z"
},
"timestamp": "2026-03-07T10:30:00Z"
}
Job Progress
{
"type": "job.progress",
"payload": {
"job_id": "abc123",
"progress": 45,
"message": "Processing files..."
},
"timestamp": "2026-03-07T10:31:00Z"
}
Job Completed
{
"type": "job.completed",
"payload": {
"job_id": "abc123",
"status": "completed",
"result": {
"files_processed": 150,
"vulnerabilities_found": 3
},
"error": null,
"completed_at": "2026-03-07T10:35:00Z"
},
"timestamp": "2026-03-07T10:35:00Z"
}
Job Failed
{
"type": "job.failed",
"payload": {
"job_id": "abc123",
"status": "failed",
"result": null,
"error": "Connection timeout",
"completed_at": "2026-03-07T10:35:00Z"
},
"timestamp": "2026-03-07T10:35:00Z"
}
Notifications¶
Endpoint: ws://localhost:8000/api/v1/ws/notifications
Receive push notifications for system events.
Connection¶
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/notifications?token=' + token);
Messages¶
Notification
{
"type": "notification",
"payload": {
"title": "Import Complete",
"message": "Project 'myapp' has been imported successfully",
"type": "success",
"data": {
"project_id": "abc123",
"files_imported": 150
}
},
"timestamp": "2026-03-07T10:35:00Z"
}
The type field inside the payload indicates the notification category. Values: info, success, warning, error. The data field is optional and can contain arbitrary additional context.
Note: The
NotificationPayloadPydantic model defines alevelfield, but theNotificationHandler.send_notification()method sends the value under the keytypein the actual WebSocket message payload.
Dashboard Updates¶
Endpoint: ws://localhost:8000/api/v2/dashboard/ws
Receive lightweight dashboard update events and heartbeats for the CISO/CTO frontend.
Connection¶
const ws = new WebSocket('ws://localhost:8000/api/v2/dashboard/ws?token=' + token);
Messages¶
Connected
{
"event": "connected",
"client_id": "ws-1",
"heartbeat_seconds": 15
}
Project Updated
{
"event": "project_updated",
"project_name": "payments-api",
"trigger": "release_gate",
"timestamp": 1773326400.0
}
Heartbeat
{
"event": "heartbeat",
"timestamp": 1773326415.0,
"clients": 4
}
Clients may send a plain ping message and receive {"event":"pong"} in response.
Message Types¶
Client to Server¶
| Type | Description | Payload |
|---|---|---|
chat.query |
Send chat query | query, session_id?, scenario_id?, language? |
ping |
Keep connection alive | None |
Server to Client¶
| Type | Enum Value | Description | Payload |
|---|---|---|---|
connected |
CONNECTED | Connection established | connection_id, user_id |
disconnected |
DISCONNECTED | Connection closing | None |
authenticated |
AUTHENTICATED | Authentication confirmed | None |
auth_required |
AUTH_REQUIRED | Re-authentication needed | None |
chat.scenario |
CHAT_SCENARIO | Scenario selection | scenario_id |
chat.chunk |
CHAT_CHUNK | Response chunk (ChatChunkPayload) |
content, is_final |
chat.response |
CHAT_RESPONSE | Full response (ChatResponsePayload) |
answer, scenario_id, confidence, session_id |
chat.done |
CHAT_DONE | Response complete | None |
chat.error |
CHAT_ERROR | Chat error | error, details? |
job.started |
JOB_STARTED | Job subscribed | job_id, status, progress, job_type, started_at, created_at |
job.progress |
JOB_PROGRESS | Job progress (JobProgressPayload) |
job_id, progress, message? |
job.completed |
JOB_COMPLETED | Job finished (JobCompletedPayload) |
job_id, status, result, error, completed_at |
job.failed |
JOB_FAILED | Job failed (JobFailedPayload) |
job_id, status, result, error, completed_at |
notification |
NOTIFICATION | Push notification | title, message, type, data? |
error |
ERROR | General error | error, details? |
pong |
PONG | Ping response | None |
cpg.update.complete |
CPG_UPDATE_COMPLETE | CPG database updated | None |
All 19 values are defined in the WSMessageType enum in src/api/websocket/models.py.
Message Format¶
WSMessage Model¶
All messages are instances of the WSMessage Pydantic model (src/api/websocket/models.py):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
type |
WSMessageType |
Yes | – | Enum value (see Message Types) |
payload |
Dict[str, Any] |
No | {} |
Message-specific data |
timestamp |
datetime |
No | datetime.now(UTC) |
Always present in serialized output |
request_id |
Optional[str] |
No | null |
Correlates requests with responses |
The timestamp field is always populated (defaults to current UTC time). It is not optional – every message includes a timestamp.
Serialization¶
WSMessage provides two methods for JSON serialization:
from src.api.websocket.models import WSMessage, WSMessageType
# Create a message
msg = WSMessage(
type=WSMessageType.CHAT_QUERY,
payload={"query": "Find vulnerabilities"},
request_id="req-001"
)
# Serialize to JSON string
json_str = msg.to_json() # -> '{"type": "chat.query", "payload": {...}, "timestamp": "...", "request_id": "req-001"}'
# Deserialize from JSON string
msg = WSMessage.from_json(json_str) # classmethod
Keep-Alive¶
Send periodic ping messages to prevent connection timeout:
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping', payload: {} }));
}
}, 30000); // Every 30 seconds
The server responds with a pong message.
Error Handling¶
Error Message Format¶
{
"type": "error",
"payload": {
"error": "Error description",
"details": "Additional details"
},
"timestamp": "2026-03-07T10:30:00Z",
"request_id": "original-request-id"
}
Common Errors¶
| Error | Cause | Solution |
|---|---|---|
| Invalid message format | Malformed JSON | Check message structure |
| Query is required | Empty query | Provide query text |
| Unknown message type | Unsupported type | Use documented types |
| Job not found | Invalid job ID (close code 4004) | Verify job exists via REST API |
| Invalid token | Missing or expired JWT (close code 4001) | Provide a valid token |
Server-Side API¶
This section documents the Python server-side classes for building WebSocket integrations.
WebSocketManager¶
WebSocketManager (src/api/websocket/manager.py) manages all active connections, user sessions, and job subscriptions. Obtain the singleton via get_ws_manager().
Connection management:
| Method | Signature | Returns | Description |
|---|---|---|---|
connect |
(user_id, websocket) |
str (connection ID) |
Register a new connection |
disconnect |
(user_id, conn_id) |
– | Remove a connection |
send_to_connection |
(user_id, conn_id, message) |
bool |
Send to a specific connection |
send_to_user |
(user_id, message) |
int |
Send to all connections for a user; returns count |
broadcast |
(message, exclude_users?) |
int |
Send to all connected users; returns count |
Job subscriptions:
| Method | Signature | Returns | Description |
|---|---|---|---|
subscribe_to_job |
(user_id, job_id) |
bool |
Subscribe a user to job updates |
unsubscribe_from_job |
(user_id, job_id) |
– | Remove job subscription |
send_job_update |
(job_id, message) |
int |
Send update to all job subscribers; returns count |
cleanup_job_subscriptions |
(job_id) |
– | Remove all subscriptions for a job |
Status queries:
| Method | Signature | Returns | Description |
|---|---|---|---|
get_connection_count |
(user_id?) |
int |
Connection count (all users or specific user) |
get_user_count |
() |
int |
Number of connected users |
is_user_connected |
(user_id) |
bool |
Check if user has active connections |
Handler Classes¶
Three handler classes (src/api/websocket/handlers.py) provide domain-specific logic. Each is initialized with a WebSocketManager instance and has a singleton accessor.
ChatHandler – get_chat_handler()
| Method | Signature | Description |
|---|---|---|
handle_query |
(user_id, conn_id, message) |
Process a chat query and stream response |
JobHandler – get_job_handler()
| Method | Signature | Description |
|---|---|---|
handle_subscribe |
(user_id, conn_id, job_id) |
Subscribe connection to job updates |
handle_unsubscribe |
(user_id, job_id) |
Unsubscribe from job updates |
send_job_update |
(job_id, status, progress, result?, error?) |
Push a job status update to subscribers |
NotificationHandler – get_notification_handler()
| Method | Signature | Returns | Description |
|---|---|---|---|
send_notification |
(user_id, title, message, notification_type?, data?) |
bool |
Send notification to a specific user |
broadcast_notification |
(title, message, notification_type?) |
int |
Broadcast notification to all users |
Helper Functions¶
The models.py module provides factory functions that create pre-configured WSMessage instances:
| Function | Signature | Creates |
|---|---|---|
create_chat_chunk |
(content, request_id?, is_final?) |
CHAT_CHUNK message |
create_chat_response |
(answer, scenario_id, confidence, session_id, request_id?) |
CHAT_RESPONSE message |
create_job_progress |
(job_id, progress, message?) |
JOB_PROGRESS message |
create_job_completed |
(job_id, result) |
JOB_COMPLETED message |
create_job_failed |
(job_id, error, details?) |
JOB_FAILED message |
create_notification |
(title, message, level?, action_url?) |
NOTIFICATION message |
create_error |
(error, details?, request_id?) |
ERROR message |
Example usage:
from src.api.websocket.models import create_chat_chunk, create_job_progress
# Create a streaming chunk
chunk = create_chat_chunk("Processing your query...", request_id="req-001")
# Create a job progress update
progress = create_job_progress("job-abc", progress=45, message="Analyzing files...")
Python Client Example¶
import asyncio
import json
import websockets
async def chat_client(token: str, query: str):
uri = f"ws://localhost:8000/api/v1/ws/chat?token={token}"
async with websockets.connect(uri) as ws:
# Wait for connected message
connected = json.loads(await ws.recv())
assert connected["type"] == "connected"
print(f"Connected: {connected['payload']['connection_id']}")
# Send query
await ws.send(json.dumps({
"type": "chat.query",
"payload": {"query": query},
"request_id": "req-001"
}))
# Receive streaming response
full_response = ""
async for message in ws:
data = json.loads(message)
if data["type"] == "chat.chunk":
content = data["payload"]["content"]
full_response += content
print(content, end="", flush=True)
elif data["type"] == "chat.response":
confidence = data["payload"]["confidence"]
scenario = data["payload"]["scenario_id"]
print(f"\n[scenario={scenario}, confidence={confidence}]")
elif data["type"] == "chat.done":
print("\n--- Response complete ---")
break
elif data["type"] == "error":
print(f"Error: {data['payload']['error']}")
break
return full_response
# Usage
asyncio.run(chat_client("your-jwt-token", "Find buffer overflows"))
JavaScript Client Example¶
class ChatClient {
constructor(token) {
this.token = token;
this.ws = null;
this.handlers = new Map();
this.connectionId = null;
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(
`ws://localhost:8000/api/v1/ws/chat?token=${this.token}`
);
this.ws.onerror = (error) => reject(error);
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
// Handle connection lifecycle
if (message.type === 'connected') {
this.connectionId = message.payload.connection_id;
resolve(this);
return;
}
const handler = this.handlers.get(message.request_id);
if (handler) {
handler(message);
}
};
});
}
async query(text, onChunk) {
const requestId = `req-${Date.now()}`;
return new Promise((resolve, reject) => {
let fullResponse = '';
this.handlers.set(requestId, (message) => {
switch (message.type) {
case 'chat.chunk':
fullResponse += message.payload.content;
onChunk?.(message.payload.content);
break;
case 'chat.response':
// Full response with metadata
break;
case 'chat.done':
this.handlers.delete(requestId);
resolve(fullResponse);
break;
case 'chat.error':
case 'error':
this.handlers.delete(requestId);
reject(new Error(message.payload.error));
break;
}
});
this.ws.send(JSON.stringify({
type: 'chat.query',
payload: { query: text },
request_id: requestId
}));
});
}
close() {
this.ws?.close();
}
}
// Usage
const client = await new ChatClient('jwt-token').connect();
const response = await client.query(
'Find SQL injections',
(chunk) => process.stdout.write(chunk)
);
console.log('\nFull response:', response);
client.close();
Rate Limits¶
The WebSocket API currently does not enforce per-user connection limits, per-connection message rate limits, or idle connection timeouts.
| Parameter | Value | Status |
|---|---|---|
| Max message size | 64 KB | Enforced (FastAPI/Starlette default) |
| Max connections per user | – | Not enforced |
| Messages per second | – | Not enforced |
| Idle connection timeout | – | Not enforced |
| Recommended ping interval | 30 sec | Advisory (client-side) |
The 64 KB maximum message size (websocket_max_message: 65536) is the only enforced limit. Messages exceeding this size will cause the connection to close.
See Also¶
- REST API Reference – HTTP API documentation
- ACP Integration – JSON-RPC 2.0 protocol with WebSocket transport (
/api/v1/acp/ws)