Справочная документация по WebSocket API

В этом документе описывается WebSocket API для работы с CodeGraph в режиме реального времени.

Содержание

Обзор

WebSocket API предоставляет: - Потоковую передачу ответов чата – потоковая передача ответов ИИ в реальном времени - Обновления хода выполнения задач – уведомления о состоянии фоновых задач - Серверные уведомления – системные оповещения и уведомления

Базовый URL: ws://localhost:8000/api/v1/ws

WebSocket-маршрутизатор подключён по пути /api/v1/ws в src/api/main.py.

Отдельные обновления дашборда публикуются по адресу ws://localhost:8000/api/v2/dashboard/ws.


Аутентификация

Все WebSocket-соединения требуют JWT-токен, передаваемый как параметр запроса.

ws://localhost:8000/api/v1/ws/chat?token=<JWT_TOKEN>

Ошибки соединения

Код Причина Описание
4001 Invalid token Токен отсутствует, просрочен или недействителен
4004 Job not found Запрошенный идентификатор задачи не существует

Жизненный цикл соединения

При подключении клиента к любой конечной точке WebSocket сервер проходит следующие этапы:

  1. Аутентификация – сервер проверяет JWT-токен из параметра запроса token. Если токен недействителен, соединение закрывается с кодом 4001.

  2. CONNECTED – при успешном подключении сервер автоматически отправляет сообщение connected:

{
  "type": "connected",
  "payload": {
    "connection_id": "conn-abc123",
    "user_id": "user-456"
  },
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. AUTH_REQUIRED – если серверу требуется повторная аутентификация (например, истечение токена во время сессии):
{
  "type": "auth_required",
  "payload": {},
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. AUTHENTICATED – отправляется после успешной повторной аутентификации:
{
  "type": "authenticated",
  "payload": {},
  "timestamp": "2026-03-07T10:30:00Z"
}
  1. DISCONNECTED – отправляется перед закрытием соединения:
{
  "type": "disconnected",
  "payload": {},
  "timestamp": "2026-03-07T10:35:00Z"
}

Конечные точки

Потоковый чат

Конечная точка: ws://localhost:8000/api/v1/ws/chat

Общение в реальном времени с потоковой передачей ответов.

Подключение

const ws = new WebSocket('ws://localhost:8000/api/v1/ws/chat?token=' + token);

ws.onopen = () => {
  console.log('Подключено к чату');
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  console.log('Получено:', message.type, message.payload);
};

ws.onerror = (error) => {
  console.error('Ошибка WebSocket:', error);
};

ws.onclose = (event) => {
  console.log('Отключено:', event.code, event.reason);
};

Отправка запроса в чат

ws.send(JSON.stringify({
  type: 'chat.query',
  payload: {
    query: 'Найти все уязвимости к SQL-инъекциям',
    session_id: 'необязательный-id-сессии',
    scenario_id: 'необязательный-id-сценария',
    language: 'ru'
  },
  request_id: 'уникальный-id-запроса'
}));

Поля ChatQueryPayload:

Поле Тип Обязательное По умолчанию Описание
query string Да Текст запроса
session_id string Нет null Идентификатор сессии для сохранения контекста
scenario_id string Нет null Принудительный выбор сценария
language string Нет "en" Язык ответа (en или ru)

Получение потокового ответа

Сообщения приходят последовательно:

  1. Выбор сценария (необязательно)
{
  "type": "chat.scenario",
  "payload": { "scenario_id": "security_audit" },
  "timestamp": "2026-03-07T10:30:00Z",
  "request_id": "unique-request-id"
}
  1. Фрагменты ответа (несколько)
{
  "type": "chat.chunk",
  "payload": { "content": "Найдено 3 потенциальные ", "is_final": false },
  "timestamp": "2026-03-07T10:30:01Z",
  "request_id": "unique-request-id"
}
  1. Полный ответ – собранный ответ целиком:
{
  "type": "chat.response",
  "payload": {
    "answer": "Найдено 3 потенциальных уязвимости SQL-инъекций в...",
    "scenario_id": "security_audit",
    "confidence": 0.92,
    "session_id": "sess-abc123"
  },
  "timestamp": "2026-03-07T10:30:04Z",
  "request_id": "unique-request-id"
}
  1. Завершение
{
  "type": "chat.done",
  "payload": {},
  "timestamp": "2026-03-07T10:30:05Z",
  "request_id": "unique-request-id"
}

Статус задачи

Конечная точка: ws://localhost:8000/api/v1/ws/jobs/{job_id}

Подписка на обновления хода выполнения фоновой задачи. Если идентификатор задачи не существует, соединение закрывается с кодом 4004.

Подключение

const jobId = 'abc123';
const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/jobs/${jobId}?token=${token}`);

Сообщения

Задача запущена

{
  "type": "job.started",
  "payload": {
    "job_id": "abc123",
    "status": "started",
    "progress": 0,
    "job_type": "import",
    "started_at": "2026-03-07T10:30:00Z",
    "created_at": "2026-03-07T10:29:55Z"
  },
  "timestamp": "2026-03-07T10:30:00Z"
}

Ход выполнения задачи

{
  "type": "job.progress",
  "payload": {
    "job_id": "abc123",
    "progress": 45,
    "message": "Обработка файлов..."
  },
  "timestamp": "2026-03-07T10:31:00Z"
}

Задача завершена

{
  "type": "job.completed",
  "payload": {
    "job_id": "abc123",
    "status": "completed",
    "result": {
      "files_processed": 150,
      "vulnerabilities_found": 3
    },
    "error": null,
    "completed_at": "2026-03-07T10:35:00Z"
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

Задача завершилась с ошибкой

{
  "type": "job.failed",
  "payload": {
    "job_id": "abc123",
    "status": "failed",
    "result": null,
    "error": "Время ожидания соединения",
    "completed_at": "2026-03-07T10:35:00Z"
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

Уведомления

Конечная точка: ws://localhost:8000/api/v1/ws/notifications

Получение серверных уведомлений о системных событиях.

Подключение

const ws = new WebSocket('ws://localhost:8000/api/v1/ws/notifications?token=' + token);

Сообщения

Уведомление

{
  "type": "notification",
  "payload": {
    "title": "Импорт завершён",
    "message": "Проект 'myapp' успешно импортирован",
    "type": "success",
    "data": {
      "project_id": "abc123",
      "files_imported": 150
    }
  },
  "timestamp": "2026-03-07T10:35:00Z"
}

Поле type внутри полезной нагрузки указывает категорию уведомления. Значения: info, success, warning, error. Поле data необязательно и может содержать произвольные дополнительные данные.

Примечание: Pydantic-модель NotificationPayload определяет поле level, но метод NotificationHandler.send_notification() отправляет значение под ключом type в фактической полезной нагрузке WebSocket-сообщения.


Обновления дашборда

Конечная точка: ws://localhost:8000/api/v2/dashboard/ws

Получение лёгких событий обновления дашборда и heartbeat-сообщений для frontend CISO/CTO.

Подключение

const ws = new WebSocket('ws://localhost:8000/api/v2/dashboard/ws?token=' + token);

Сообщения

Подключение установлено

{
  "event": "connected",
  "client_id": "ws-1",
  "heartbeat_seconds": 15
}

Проект обновлён

{
  "event": "project_updated",
  "project_name": "payments-api",
  "trigger": "release_gate",
  "timestamp": 1773326400.0
}

Heartbeat

{
  "event": "heartbeat",
  "timestamp": 1773326415.0,
  "clients": 4
}

Клиент может отправить простой ping и получить в ответ {\"event\":\"pong\"}.


Типы сообщений

Клиент к серверу

Тип Описание Полезная нагрузка
chat.query Отправить запрос в чат query, session_id?, scenario_id?, language?
ping Поддержание соединения Нет

Сервер к клиенту

Тип Значение перечисления Описание Полезная нагрузка
connected CONNECTED Соединение установлено connection_id, user_id
disconnected DISCONNECTED Закрытие соединения Нет
authenticated AUTHENTICATED Аутентификация подтверждена Нет
auth_required AUTH_REQUIRED Требуется повторная аутентификация Нет
chat.scenario CHAT_SCENARIO Выбор сценария scenario_id
chat.chunk CHAT_CHUNK Фрагмент ответа (ChatChunkPayload) content, is_final
chat.response CHAT_RESPONSE Полный ответ (ChatResponsePayload) answer, scenario_id, confidence, session_id
chat.done CHAT_DONE Ответ завершён Нет
chat.error CHAT_ERROR Ошибка чата error, details?
job.started JOB_STARTED Подписка на задачу job_id, status, progress, job_type, started_at, created_at
job.progress JOB_PROGRESS Ход выполнения задачи (JobProgressPayload) job_id, progress, message?
job.completed JOB_COMPLETED Задача завершена (JobCompletedPayload) job_id, status, result, error, completed_at
job.failed JOB_FAILED Задача завершилась с ошибкой (JobFailedPayload) job_id, status, result, error, completed_at
notification NOTIFICATION Уведомление title, message, type, data?
error ERROR Общая ошибка error, details?
pong PONG Ответ на ping Нет
cpg.update.complete CPG_UPDATE_COMPLETE База данных CPG обновлена Нет

Все 19 значений определены в перечислении WSMessageType в src/api/websocket/models.py.


Формат сообщений

Модель WSMessage

Все сообщения являются экземплярами Pydantic-модели WSMessage (src/api/websocket/models.py):

Поле Тип Обязательное По умолчанию Описание
type WSMessageType Да Значение перечисления (см. Типы сообщений)
payload Dict[str, Any] Нет {} Данные, специфичные для сообщения
timestamp datetime Нет datetime.now(UTC) Всегда присутствует в сериализованном выводе
request_id Optional[str] Нет null Связывает запросы с ответами

Поле timestamp всегда заполнено (по умолчанию текущее время UTC). Оно не опционально – каждое сообщение содержит временную метку.

Сериализация

WSMessage предоставляет два метода для JSON-сериализации:

from src.api.websocket.models import WSMessage, WSMessageType

# Создание сообщения
msg = WSMessage(
    type=WSMessageType.CHAT_QUERY,
    payload={"query": "Найти уязвимости"},
    request_id="req-001"
)

# Сериализация в JSON-строку
json_str = msg.to_json()  # -> '{"type": "chat.query", "payload": {...}, "timestamp": "...", "request_id": "req-001"}'

# Десериализация из JSON-строки
msg = WSMessage.from_json(json_str)  # classmethod

Поддержание соединения

Отправляйте периодические ping-сообщения для предотвращения разрыва соединения:

setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'ping', payload: {} }));
  }
}, 30000); // Каждые 30 секунд

Сервер отвечает сообщением pong.


Обработка ошибок

Формат сообщения об ошибке

{
  "type": "error",
  "payload": {
    "error": "Описание ошибки",
    "details": "Дополнительные сведения"
  },
  "timestamp": "2026-03-07T10:30:00Z",
  "request_id": "original-request-id"
}

Распространённые ошибки

Ошибка Причина Решение
Invalid message format Некорректный JSON Проверьте структуру сообщения
Query is required Пустой запрос Укажите текст запроса
Unknown message type Неподдерживаемый тип Используйте задокументированные типы
Job not found Неверный идентификатор задачи (код закрытия 4004) Проверьте существование задачи через REST API
Invalid token Отсутствующий или просроченный JWT (код закрытия 4001) Предоставьте действительный токен

Серверный API

В этом разделе описаны серверные Python-классы для построения WebSocket-интеграций.

WebSocketManager

WebSocketManager (src/api/websocket/manager.py) управляет всеми активными соединениями, пользовательскими сессиями и подписками на задачи. Синглтон доступен через get_ws_manager().

Управление соединениями:

Метод Сигнатура Возвращает Описание
connect (user_id, websocket) str (идентификатор соединения) Регистрация нового соединения
disconnect (user_id, conn_id) Удаление соединения
send_to_connection (user_id, conn_id, message) bool Отправка конкретному соединению
send_to_user (user_id, message) int Отправка всем соединениям пользователя; возвращает количество
broadcast (message, exclude_users?) int Отправка всем подключённым пользователям; возвращает количество

Подписки на задачи:

Метод Сигнатура Возвращает Описание
subscribe_to_job (user_id, job_id) bool Подписка пользователя на обновления задачи
unsubscribe_from_job (user_id, job_id) Отмена подписки на задачу
send_job_update (job_id, message) int Отправка обновления всем подписчикам; возвращает количество
cleanup_job_subscriptions (job_id) Удаление всех подписок на задачу

Запросы состояния:

Метод Сигнатура Возвращает Описание
get_connection_count (user_id?) int Количество соединений (все или конкретный пользователь)
get_user_count () int Количество подключённых пользователей
is_user_connected (user_id) bool Проверка наличия активных соединений пользователя

Классы обработчиков

Три класса обработчиков (src/api/websocket/handlers.py) реализуют предметно-ориентированную логику. Каждый инициализируется экземпляром WebSocketManager и имеет функцию доступа к синглтону.

ChatHandlerget_chat_handler()

Метод Сигнатура Описание
handle_query (user_id, conn_id, message) Обработка запроса чата и потоковая передача ответа

JobHandlerget_job_handler()

Метод Сигнатура Описание
handle_subscribe (user_id, conn_id, job_id) Подписка соединения на обновления задачи
handle_unsubscribe (user_id, job_id) Отмена подписки на обновления задачи
send_job_update (job_id, status, progress, result?, error?) Отправка обновления статуса задачи подписчикам

NotificationHandlerget_notification_handler()

Метод Сигнатура Возвращает Описание
send_notification (user_id, title, message, notification_type?, data?) bool Отправка уведомления конкретному пользователю
broadcast_notification (title, message, notification_type?) int Рассылка уведомления всем пользователям

Вспомогательные функции

Модуль models.py предоставляет фабричные функции для создания предварительно настроенных экземпляров WSMessage:

Функция Сигнатура Создаёт
create_chat_chunk (content, request_id?, is_final?) Сообщение CHAT_CHUNK
create_chat_response (answer, scenario_id, confidence, session_id, request_id?) Сообщение CHAT_RESPONSE
create_job_progress (job_id, progress, message?) Сообщение JOB_PROGRESS
create_job_completed (job_id, result) Сообщение JOB_COMPLETED
create_job_failed (job_id, error, details?) Сообщение JOB_FAILED
create_notification (title, message, level?, action_url?) Сообщение NOTIFICATION
create_error (error, details?, request_id?) Сообщение ERROR

Пример использования:

from src.api.websocket.models import create_chat_chunk, create_job_progress

# Создание фрагмента потоковой передачи
chunk = create_chat_chunk("Обработка запроса...", request_id="req-001")

# Создание обновления хода выполнения задачи
progress = create_job_progress("job-abc", progress=45, message="Анализ файлов...")

Пример клиента на Python

import asyncio
import json
import websockets

async def chat_client(token: str, query: str):
    uri = f"ws://localhost:8000/api/v1/ws/chat?token={token}"

    async with websockets.connect(uri) as ws:
        # Ожидание сообщения о подключении
        connected = json.loads(await ws.recv())
        assert connected["type"] == "connected"
        print(f"Подключено: {connected['payload']['connection_id']}")

        # Отправка запроса
        await ws.send(json.dumps({
            "type": "chat.query",
            "payload": {"query": query},
            "request_id": "req-001"
        }))

        # Получение потокового ответа
        full_response = ""
        async for message in ws:
            data = json.loads(message)

            if data["type"] == "chat.chunk":
                content = data["payload"]["content"]
                full_response += content
                print(content, end="", flush=True)

            elif data["type"] == "chat.response":
                confidence = data["payload"]["confidence"]
                scenario = data["payload"]["scenario_id"]
                print(f"\n[scenario={scenario}, confidence={confidence}]")

            elif data["type"] == "chat.done":
                print("\n--- Ответ завершён ---")
                break

            elif data["type"] == "error":
                print(f"Ошибка: {data['payload']['error']}")
                break

        return full_response

# Использование
asyncio.run(chat_client("your-jwt-token", "Найди переполнения буфера"))

Пример клиента на JavaScript

class ChatClient {
  constructor(token) {
    this.token = token;
    this.ws = null;
    this.handlers = new Map();
    this.connectionId = null;
  }

  connect() {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(
        `ws://localhost:8000/api/v1/ws/chat?token=${this.token}`
      );

      this.ws.onerror = (error) => reject(error);

      this.ws.onmessage = (event) => {
        const message = JSON.parse(event.data);

        // Обработка жизненного цикла соединения
        if (message.type === 'connected') {
          this.connectionId = message.payload.connection_id;
          resolve(this);
          return;
        }

        const handler = this.handlers.get(message.request_id);
        if (handler) {
          handler(message);
        }
      };
    });
  }

  async query(text, onChunk) {
    const requestId = `req-${Date.now()}`;

    return new Promise((resolve, reject) => {
      let fullResponse = '';

      this.handlers.set(requestId, (message) => {
        switch (message.type) {
          case 'chat.chunk':
            fullResponse += message.payload.content;
            onChunk?.(message.payload.content);
            break;
          case 'chat.response':
            // Полный ответ с метаданными
            break;
          case 'chat.done':
            this.handlers.delete(requestId);
            resolve(fullResponse);
            break;
          case 'chat.error':
          case 'error':
            this.handlers.delete(requestId);
            reject(new Error(message.payload.error));
            break;
        }
      });

      this.ws.send(JSON.stringify({
        type: 'chat.query',
        payload: { query: text },
        request_id: requestId
      }));
    });
  }

  close() {
    this.ws?.close();
  }
}

// Использование
const client = await new ChatClient('jwt-token').connect();
const response = await client.query(
  'Найти SQL-инъекции',
  (chunk) => process.stdout.write(chunk)
);
console.log('\nПолный ответ:', response);
client.close();

Ограничения

WebSocket API в настоящее время не применяет ограничения на количество соединений на пользователя, частоту сообщений или время ожидания неактивного соединения.

Параметр Значение Статус
Максимальный размер сообщения 64 КБ Применяется (по умолчанию FastAPI/Starlette)
Максимум соединений на пользователя Не применяется
Сообщений в секунду Не применяется
Время ожидания неактивного соединения Не применяется
Рекомендуемый интервал ping 30 сек Рекомендация (на стороне клиента)

Максимальный размер сообщения 64 КБ (websocket_max_message: 65536) – единственное применяемое ограничение. Сообщения, превышающие этот размер, приведут к закрытию соединения.


См. также