В этом документе описывается WebSocket API для работы с CodeGraph в режиме реального времени.
Содержание¶
- Обзор
- Аутентификация
- Ошибки соединения
- Жизненный цикл соединения
- Конечные точки
- Потоковый чат
- Статус задачи
- Уведомления
- Обновления дашборда
- Типы сообщений
- Клиент к серверу
- Сервер к клиенту
- Формат сообщений
- Модель WSMessage
- Сериализация
- Поддержание соединения
- Обработка ошибок
- Формат сообщения об ошибке
- Распространённые ошибки
- Серверный API
- WebSocketManager
- Классы обработчиков
- Вспомогательные функции
- Пример клиента на Python
- Пример клиента на JavaScript
- Ограничения
- См. также
Обзор¶
WebSocket API предоставляет: - Потоковую передачу ответов чата – потоковая передача ответов ИИ в реальном времени - Обновления хода выполнения задач – уведомления о состоянии фоновых задач - Серверные уведомления – системные оповещения и уведомления
Базовый URL: ws://localhost:8000/api/v1/ws
WebSocket-маршрутизатор подключён по пути /api/v1/ws в src/api/main.py.
Отдельные обновления дашборда публикуются по адресу ws://localhost:8000/api/v2/dashboard/ws.
Аутентификация¶
Все WebSocket-соединения требуют JWT-токен, передаваемый как параметр запроса.
ws://localhost:8000/api/v1/ws/chat?token=<JWT_TOKEN>
Ошибки соединения¶
| Код | Причина | Описание |
|---|---|---|
| 4001 | Invalid token | Токен отсутствует, просрочен или недействителен |
| 4004 | Job not found | Запрошенный идентификатор задачи не существует |
Жизненный цикл соединения¶
При подключении клиента к любой конечной точке WebSocket сервер проходит следующие этапы:
-
Аутентификация – сервер проверяет JWT-токен из параметра запроса
token. Если токен недействителен, соединение закрывается с кодом4001. -
CONNECTED – при успешном подключении сервер автоматически отправляет сообщение
connected:
{
"type": "connected",
"payload": {
"connection_id": "conn-abc123",
"user_id": "user-456"
},
"timestamp": "2026-03-07T10:30:00Z"
}
- AUTH_REQUIRED – если серверу требуется повторная аутентификация (например, истечение токена во время сессии):
{
"type": "auth_required",
"payload": {},
"timestamp": "2026-03-07T10:30:00Z"
}
- AUTHENTICATED – отправляется после успешной повторной аутентификации:
{
"type": "authenticated",
"payload": {},
"timestamp": "2026-03-07T10:30:00Z"
}
- DISCONNECTED – отправляется перед закрытием соединения:
{
"type": "disconnected",
"payload": {},
"timestamp": "2026-03-07T10:35:00Z"
}
Конечные точки¶
Потоковый чат¶
Конечная точка: ws://localhost:8000/api/v1/ws/chat
Общение в реальном времени с потоковой передачей ответов.
Подключение¶
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/chat?token=' + token);
ws.onopen = () => {
console.log('Подключено к чату');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Получено:', message.type, message.payload);
};
ws.onerror = (error) => {
console.error('Ошибка WebSocket:', error);
};
ws.onclose = (event) => {
console.log('Отключено:', event.code, event.reason);
};
Отправка запроса в чат¶
ws.send(JSON.stringify({
type: 'chat.query',
payload: {
query: 'Найти все уязвимости к SQL-инъекциям',
session_id: 'необязательный-id-сессии',
scenario_id: 'необязательный-id-сценария',
language: 'ru'
},
request_id: 'уникальный-id-запроса'
}));
Поля ChatQueryPayload:
| Поле | Тип | Обязательное | По умолчанию | Описание |
|---|---|---|---|---|
query |
string | Да | – | Текст запроса |
session_id |
string | Нет | null | Идентификатор сессии для сохранения контекста |
scenario_id |
string | Нет | null | Принудительный выбор сценария |
language |
string | Нет | "en" |
Язык ответа (en или ru) |
Получение потокового ответа¶
Сообщения приходят последовательно:
- Выбор сценария (необязательно)
{
"type": "chat.scenario",
"payload": { "scenario_id": "security_audit" },
"timestamp": "2026-03-07T10:30:00Z",
"request_id": "unique-request-id"
}
- Фрагменты ответа (несколько)
{
"type": "chat.chunk",
"payload": { "content": "Найдено 3 потенциальные ", "is_final": false },
"timestamp": "2026-03-07T10:30:01Z",
"request_id": "unique-request-id"
}
- Полный ответ – собранный ответ целиком:
{
"type": "chat.response",
"payload": {
"answer": "Найдено 3 потенциальных уязвимости SQL-инъекций в...",
"scenario_id": "security_audit",
"confidence": 0.92,
"session_id": "sess-abc123"
},
"timestamp": "2026-03-07T10:30:04Z",
"request_id": "unique-request-id"
}
- Завершение
{
"type": "chat.done",
"payload": {},
"timestamp": "2026-03-07T10:30:05Z",
"request_id": "unique-request-id"
}
Статус задачи¶
Конечная точка: ws://localhost:8000/api/v1/ws/jobs/{job_id}
Подписка на обновления хода выполнения фоновой задачи. Если идентификатор задачи не существует, соединение закрывается с кодом 4004.
Подключение¶
const jobId = 'abc123';
const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/jobs/${jobId}?token=${token}`);
Сообщения¶
Задача запущена
{
"type": "job.started",
"payload": {
"job_id": "abc123",
"status": "started",
"progress": 0,
"job_type": "import",
"started_at": "2026-03-07T10:30:00Z",
"created_at": "2026-03-07T10:29:55Z"
},
"timestamp": "2026-03-07T10:30:00Z"
}
Ход выполнения задачи
{
"type": "job.progress",
"payload": {
"job_id": "abc123",
"progress": 45,
"message": "Обработка файлов..."
},
"timestamp": "2026-03-07T10:31:00Z"
}
Задача завершена
{
"type": "job.completed",
"payload": {
"job_id": "abc123",
"status": "completed",
"result": {
"files_processed": 150,
"vulnerabilities_found": 3
},
"error": null,
"completed_at": "2026-03-07T10:35:00Z"
},
"timestamp": "2026-03-07T10:35:00Z"
}
Задача завершилась с ошибкой
{
"type": "job.failed",
"payload": {
"job_id": "abc123",
"status": "failed",
"result": null,
"error": "Время ожидания соединения",
"completed_at": "2026-03-07T10:35:00Z"
},
"timestamp": "2026-03-07T10:35:00Z"
}
Уведомления¶
Конечная точка: ws://localhost:8000/api/v1/ws/notifications
Получение серверных уведомлений о системных событиях.
Подключение¶
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/notifications?token=' + token);
Сообщения¶
Уведомление
{
"type": "notification",
"payload": {
"title": "Импорт завершён",
"message": "Проект 'myapp' успешно импортирован",
"type": "success",
"data": {
"project_id": "abc123",
"files_imported": 150
}
},
"timestamp": "2026-03-07T10:35:00Z"
}
Поле type внутри полезной нагрузки указывает категорию уведомления. Значения: info, success, warning, error. Поле data необязательно и может содержать произвольные дополнительные данные.
Примечание: Pydantic-модель
NotificationPayloadопределяет полеlevel, но методNotificationHandler.send_notification()отправляет значение под ключомtypeв фактической полезной нагрузке WebSocket-сообщения.
Обновления дашборда¶
Конечная точка: ws://localhost:8000/api/v2/dashboard/ws
Получение лёгких событий обновления дашборда и heartbeat-сообщений для frontend CISO/CTO.
Подключение¶
const ws = new WebSocket('ws://localhost:8000/api/v2/dashboard/ws?token=' + token);
Сообщения¶
Подключение установлено
{
"event": "connected",
"client_id": "ws-1",
"heartbeat_seconds": 15
}
Проект обновлён
{
"event": "project_updated",
"project_name": "payments-api",
"trigger": "release_gate",
"timestamp": 1773326400.0
}
Heartbeat
{
"event": "heartbeat",
"timestamp": 1773326415.0,
"clients": 4
}
Клиент может отправить простой ping и получить в ответ {\"event\":\"pong\"}.
Типы сообщений¶
Клиент к серверу¶
| Тип | Описание | Полезная нагрузка |
|---|---|---|
chat.query |
Отправить запрос в чат | query, session_id?, scenario_id?, language? |
ping |
Поддержание соединения | Нет |
Сервер к клиенту¶
| Тип | Значение перечисления | Описание | Полезная нагрузка |
|---|---|---|---|
connected |
CONNECTED | Соединение установлено | connection_id, user_id |
disconnected |
DISCONNECTED | Закрытие соединения | Нет |
authenticated |
AUTHENTICATED | Аутентификация подтверждена | Нет |
auth_required |
AUTH_REQUIRED | Требуется повторная аутентификация | Нет |
chat.scenario |
CHAT_SCENARIO | Выбор сценария | scenario_id |
chat.chunk |
CHAT_CHUNK | Фрагмент ответа (ChatChunkPayload) |
content, is_final |
chat.response |
CHAT_RESPONSE | Полный ответ (ChatResponsePayload) |
answer, scenario_id, confidence, session_id |
chat.done |
CHAT_DONE | Ответ завершён | Нет |
chat.error |
CHAT_ERROR | Ошибка чата | error, details? |
job.started |
JOB_STARTED | Подписка на задачу | job_id, status, progress, job_type, started_at, created_at |
job.progress |
JOB_PROGRESS | Ход выполнения задачи (JobProgressPayload) |
job_id, progress, message? |
job.completed |
JOB_COMPLETED | Задача завершена (JobCompletedPayload) |
job_id, status, result, error, completed_at |
job.failed |
JOB_FAILED | Задача завершилась с ошибкой (JobFailedPayload) |
job_id, status, result, error, completed_at |
notification |
NOTIFICATION | Уведомление | title, message, type, data? |
error |
ERROR | Общая ошибка | error, details? |
pong |
PONG | Ответ на ping | Нет |
cpg.update.complete |
CPG_UPDATE_COMPLETE | База данных CPG обновлена | Нет |
Все 19 значений определены в перечислении WSMessageType в src/api/websocket/models.py.
Формат сообщений¶
Модель WSMessage¶
Все сообщения являются экземплярами Pydantic-модели WSMessage (src/api/websocket/models.py):
| Поле | Тип | Обязательное | По умолчанию | Описание |
|---|---|---|---|---|
type |
WSMessageType |
Да | – | Значение перечисления (см. Типы сообщений) |
payload |
Dict[str, Any] |
Нет | {} |
Данные, специфичные для сообщения |
timestamp |
datetime |
Нет | datetime.now(UTC) |
Всегда присутствует в сериализованном выводе |
request_id |
Optional[str] |
Нет | null |
Связывает запросы с ответами |
Поле timestamp всегда заполнено (по умолчанию текущее время UTC). Оно не опционально – каждое сообщение содержит временную метку.
Сериализация¶
WSMessage предоставляет два метода для JSON-сериализации:
from src.api.websocket.models import WSMessage, WSMessageType
# Создание сообщения
msg = WSMessage(
type=WSMessageType.CHAT_QUERY,
payload={"query": "Найти уязвимости"},
request_id="req-001"
)
# Сериализация в JSON-строку
json_str = msg.to_json() # -> '{"type": "chat.query", "payload": {...}, "timestamp": "...", "request_id": "req-001"}'
# Десериализация из JSON-строки
msg = WSMessage.from_json(json_str) # classmethod
Поддержание соединения¶
Отправляйте периодические ping-сообщения для предотвращения разрыва соединения:
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping', payload: {} }));
}
}, 30000); // Каждые 30 секунд
Сервер отвечает сообщением pong.
Обработка ошибок¶
Формат сообщения об ошибке¶
{
"type": "error",
"payload": {
"error": "Описание ошибки",
"details": "Дополнительные сведения"
},
"timestamp": "2026-03-07T10:30:00Z",
"request_id": "original-request-id"
}
Распространённые ошибки¶
| Ошибка | Причина | Решение |
|---|---|---|
| Invalid message format | Некорректный JSON | Проверьте структуру сообщения |
| Query is required | Пустой запрос | Укажите текст запроса |
| Unknown message type | Неподдерживаемый тип | Используйте задокументированные типы |
| Job not found | Неверный идентификатор задачи (код закрытия 4004) | Проверьте существование задачи через REST API |
| Invalid token | Отсутствующий или просроченный JWT (код закрытия 4001) | Предоставьте действительный токен |
Серверный API¶
В этом разделе описаны серверные Python-классы для построения WebSocket-интеграций.
WebSocketManager¶
WebSocketManager (src/api/websocket/manager.py) управляет всеми активными соединениями, пользовательскими сессиями и подписками на задачи. Синглтон доступен через get_ws_manager().
Управление соединениями:
| Метод | Сигнатура | Возвращает | Описание |
|---|---|---|---|
connect |
(user_id, websocket) |
str (идентификатор соединения) |
Регистрация нового соединения |
disconnect |
(user_id, conn_id) |
– | Удаление соединения |
send_to_connection |
(user_id, conn_id, message) |
bool |
Отправка конкретному соединению |
send_to_user |
(user_id, message) |
int |
Отправка всем соединениям пользователя; возвращает количество |
broadcast |
(message, exclude_users?) |
int |
Отправка всем подключённым пользователям; возвращает количество |
Подписки на задачи:
| Метод | Сигнатура | Возвращает | Описание |
|---|---|---|---|
subscribe_to_job |
(user_id, job_id) |
bool |
Подписка пользователя на обновления задачи |
unsubscribe_from_job |
(user_id, job_id) |
– | Отмена подписки на задачу |
send_job_update |
(job_id, message) |
int |
Отправка обновления всем подписчикам; возвращает количество |
cleanup_job_subscriptions |
(job_id) |
– | Удаление всех подписок на задачу |
Запросы состояния:
| Метод | Сигнатура | Возвращает | Описание |
|---|---|---|---|
get_connection_count |
(user_id?) |
int |
Количество соединений (все или конкретный пользователь) |
get_user_count |
() |
int |
Количество подключённых пользователей |
is_user_connected |
(user_id) |
bool |
Проверка наличия активных соединений пользователя |
Классы обработчиков¶
Три класса обработчиков (src/api/websocket/handlers.py) реализуют предметно-ориентированную логику. Каждый инициализируется экземпляром WebSocketManager и имеет функцию доступа к синглтону.
ChatHandler – get_chat_handler()
| Метод | Сигнатура | Описание |
|---|---|---|
handle_query |
(user_id, conn_id, message) |
Обработка запроса чата и потоковая передача ответа |
JobHandler – get_job_handler()
| Метод | Сигнатура | Описание |
|---|---|---|
handle_subscribe |
(user_id, conn_id, job_id) |
Подписка соединения на обновления задачи |
handle_unsubscribe |
(user_id, job_id) |
Отмена подписки на обновления задачи |
send_job_update |
(job_id, status, progress, result?, error?) |
Отправка обновления статуса задачи подписчикам |
NotificationHandler – get_notification_handler()
| Метод | Сигнатура | Возвращает | Описание |
|---|---|---|---|
send_notification |
(user_id, title, message, notification_type?, data?) |
bool |
Отправка уведомления конкретному пользователю |
broadcast_notification |
(title, message, notification_type?) |
int |
Рассылка уведомления всем пользователям |
Вспомогательные функции¶
Модуль models.py предоставляет фабричные функции для создания предварительно настроенных экземпляров WSMessage:
| Функция | Сигнатура | Создаёт |
|---|---|---|
create_chat_chunk |
(content, request_id?, is_final?) |
Сообщение CHAT_CHUNK |
create_chat_response |
(answer, scenario_id, confidence, session_id, request_id?) |
Сообщение CHAT_RESPONSE |
create_job_progress |
(job_id, progress, message?) |
Сообщение JOB_PROGRESS |
create_job_completed |
(job_id, result) |
Сообщение JOB_COMPLETED |
create_job_failed |
(job_id, error, details?) |
Сообщение JOB_FAILED |
create_notification |
(title, message, level?, action_url?) |
Сообщение NOTIFICATION |
create_error |
(error, details?, request_id?) |
Сообщение ERROR |
Пример использования:
from src.api.websocket.models import create_chat_chunk, create_job_progress
# Создание фрагмента потоковой передачи
chunk = create_chat_chunk("Обработка запроса...", request_id="req-001")
# Создание обновления хода выполнения задачи
progress = create_job_progress("job-abc", progress=45, message="Анализ файлов...")
Пример клиента на Python¶
import asyncio
import json
import websockets
async def chat_client(token: str, query: str):
uri = f"ws://localhost:8000/api/v1/ws/chat?token={token}"
async with websockets.connect(uri) as ws:
# Ожидание сообщения о подключении
connected = json.loads(await ws.recv())
assert connected["type"] == "connected"
print(f"Подключено: {connected['payload']['connection_id']}")
# Отправка запроса
await ws.send(json.dumps({
"type": "chat.query",
"payload": {"query": query},
"request_id": "req-001"
}))
# Получение потокового ответа
full_response = ""
async for message in ws:
data = json.loads(message)
if data["type"] == "chat.chunk":
content = data["payload"]["content"]
full_response += content
print(content, end="", flush=True)
elif data["type"] == "chat.response":
confidence = data["payload"]["confidence"]
scenario = data["payload"]["scenario_id"]
print(f"\n[scenario={scenario}, confidence={confidence}]")
elif data["type"] == "chat.done":
print("\n--- Ответ завершён ---")
break
elif data["type"] == "error":
print(f"Ошибка: {data['payload']['error']}")
break
return full_response
# Использование
asyncio.run(chat_client("your-jwt-token", "Найди переполнения буфера"))
Пример клиента на JavaScript¶
class ChatClient {
constructor(token) {
this.token = token;
this.ws = null;
this.handlers = new Map();
this.connectionId = null;
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(
`ws://localhost:8000/api/v1/ws/chat?token=${this.token}`
);
this.ws.onerror = (error) => reject(error);
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
// Обработка жизненного цикла соединения
if (message.type === 'connected') {
this.connectionId = message.payload.connection_id;
resolve(this);
return;
}
const handler = this.handlers.get(message.request_id);
if (handler) {
handler(message);
}
};
});
}
async query(text, onChunk) {
const requestId = `req-${Date.now()}`;
return new Promise((resolve, reject) => {
let fullResponse = '';
this.handlers.set(requestId, (message) => {
switch (message.type) {
case 'chat.chunk':
fullResponse += message.payload.content;
onChunk?.(message.payload.content);
break;
case 'chat.response':
// Полный ответ с метаданными
break;
case 'chat.done':
this.handlers.delete(requestId);
resolve(fullResponse);
break;
case 'chat.error':
case 'error':
this.handlers.delete(requestId);
reject(new Error(message.payload.error));
break;
}
});
this.ws.send(JSON.stringify({
type: 'chat.query',
payload: { query: text },
request_id: requestId
}));
});
}
close() {
this.ws?.close();
}
}
// Использование
const client = await new ChatClient('jwt-token').connect();
const response = await client.query(
'Найти SQL-инъекции',
(chunk) => process.stdout.write(chunk)
);
console.log('\nПолный ответ:', response);
client.close();
Ограничения¶
WebSocket API в настоящее время не применяет ограничения на количество соединений на пользователя, частоту сообщений или время ожидания неактивного соединения.
| Параметр | Значение | Статус |
|---|---|---|
| Максимальный размер сообщения | 64 КБ | Применяется (по умолчанию FastAPI/Starlette) |
| Максимум соединений на пользователя | – | Не применяется |
| Сообщений в секунду | – | Не применяется |
| Время ожидания неактивного соединения | – | Не применяется |
| Рекомендуемый интервал ping | 30 сек | Рекомендация (на стороне клиента) |
Максимальный размер сообщения 64 КБ (websocket_max_message: 65536) – единственное применяемое ограничение. Сообщения, превышающие этот размер, приведут к закрытию соединения.
См. также¶
- Справочник по REST API – документация по HTTP API
- Интеграция ACP – протокол JSON-RPC 2.0 с WebSocket-транспортом (
/api/v1/acp/ws)