RBAC и авторизация

Техническая документация для ИТ- и служб безопасности


Содержание

Обзор

CodeGraph реализует комплексную систему управления доступом на основе ролей (RBAC) с поддержкой нескольких методов аутентификации. Система обеспечивает гранулярный контроль доступа к функциям платформы.

Ключевые возможности

  • 4 уровня ролей с иерархическим наследованием
  • 21 гранулярное разрешение по функциональным категориям
  • Режимы аутентификации: JWT, API-ключи, OAuth2, LDAP, IAM (Yandex Cloud) и неаутентифицированный доступ там, где он явно разрешён
  • Lifecycle service accounts для нечеловеческих machine identities
  • Чёрный список токенов для мгновенного отзыва доступа
  • Проверка подписи вебхуков с защитой от атак повторного воспроизведения
  • Проверка scope для детализированного контроля доступа по API-ключам
  • Интеграция с SIEM для аудита событий авторизации

Архитектура RBAC

Иерархия ролей

                           +---------------------+
                           |       ADMIN         |
                           |   (ADMIN_ALL        |
                           |    meta-permission) |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |      REVIEWER       |
                           |  Code review +      |
                           |  GitHub + GitLab    |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |      ANALYST        |
                           |  Query execution    |
                           |  + API keys         |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |       VIEWER        |
                           |    Read-only        |
                           +---------------------+

Наследование разрешений

Каждая роль наследует все разрешения от ролей нижнего уровня:

  • ADMIN – использует мета-разрешение ADMIN_ALL. Функция get_role_permissions() разворачивает его в полный набор из 21 разрешения во время выполнения. В исходном словаре ROLE_PERMISSIONS хранится только {Permission.ADMIN_ALL}.
  • REVIEWER – разрешения ANALYST + проверка кода (review:execute, review:github, review:gitlab)
  • ANALYST – разрешения VIEWER + выполнение/экспорт
  • VIEWER – только чтение (базовый уровень)
from src.api.auth.permissions import Role, ROLE_PERMISSIONS, Permission

# ADMIN хранит только мета-разрешение
ROLE_PERMISSIONS[Role.ADMIN]  # => {Permission.ADMIN_ALL}

# get_role_permissions() разворачивает ADMIN_ALL во все 21 разрешение
from src.api.auth.permissions import get_role_permissions
perms = get_role_permissions(Role.ADMIN)  # => all Permission values

Каталог разрешений

Разрешения по категориям

Все 21 разрешение определены в классе Permission(str, Enum) в файле src/api/auth/permissions.py.

Сценарии (scenarios:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
scenarios:read SCENARIOS_READ Просмотр списка сценариев x x x x
scenarios:execute SCENARIOS_EXECUTE Запуск сценариев анализа x x x

Запросы (query:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
query:execute QUERY_EXECUTE Выполнение SQL-запросов к CPG x x x
query:validate QUERY_VALIDATE Проверка синтаксиса запроса x x x

Проверка кода (review:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
review:execute REVIEW_EXECUTE Запуск автоматической проверки x x
review:github REVIEW_GITHUB Интеграция с GitHub PR x x
review:gitlab REVIEW_GITLAB Интеграция с GitLab MR x x

Сессии (sessions:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
sessions:read SESSIONS_READ Просмотр сессий x x x x
sessions:write SESSIONS_WRITE Создание/изменение сессий x x x
sessions:delete SESSIONS_DELETE Удаление сессий x x x

История (history:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
history:read HISTORY_READ Просмотр истории запросов x x x x
history:export HISTORY_EXPORT Экспорт истории x x x

Пользователи (users:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
users:read USERS_READ Просмотр списка пользователей x
users:write USERS_WRITE Создание/редактирование пользователей x
users:delete USERS_DELETE Удаление пользователей x

API-ключи (api_keys:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
api_keys:read API_KEYS_READ Просмотр собственных ключей x x x
api_keys:write API_KEYS_WRITE Создание ключей x x x
api_keys:delete API_KEYS_DELETE Удаление любых ключей x

Метрики (stats:*, metrics:*)

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
stats:read STATS_READ Просмотр статистики x x x x
metrics:read METRICS_READ Метрики Prometheus x

Администрирование

Разрешение Имя в Enum Описание VIEWER ANALYST REVIEWER ADMIN
admin:all ADMIN_ALL Полный доступ (мета-разрешение) x

Методы аутентификации

CodeGraph поддерживает несколько режимов аутентификации. Поле auth_method в AuthContext указывает, какой режим был использован:

Значение Метод Описание
jwt JWT Bearer Token Основной метод для веб-приложений
api_key API-ключ Для интеграций, CI/CD и автоматизации
oauth2 OAuth2/OIDC Корпоративные поставщики идентификации
ldap LDAP/AD Интеграция с корпоративным каталогом
iam Yandex Cloud IAM Облачная аутентификация
none Без аутентификации По умолчанию, если учётные данные не предоставлены

1. JWT Bearer Token

Основной метод для веб-приложений и интерактивных сессий.

Структура токена (TokenPayload)

class TokenPayload(BaseModel):
    sub: str                          # Subject (user_id)
    jti: str                          # JWT ID (unique identifier)
    exp: datetime                     # Expiration time
    iat: datetime                     # Issued at
    type: str                         # "access" or "refresh"
    scopes: list[str] = []            # Permission scopes
    role: Optional[str] = None        # User role
    group_id: Optional[str] = None    # Optional group scope (for CI/CD service accounts)

Исходный код: src/api/auth/jwt_handler.py

Параметры токенов

Тип токена Время жизни Назначение
Access Token 30 минут Авторизация запросов к API
Refresh Token 7 дней Обновление access-токена

Функции для работы с токенами

from src.api.auth.jwt_handler import (
    create_access_token,   # Create JWT access token
    create_refresh_token,  # Create JWT refresh token
    decode_token,          # Decode and validate JWT
    verify_token,          # Verify JWT and check type
    get_token_jti,         # Extract JTI without full verification
    blacklist_token,       # Add token to blacklist
    is_token_blacklisted,  # Check if token is blacklisted
    load_blacklist_cache,  # Load blacklist from DB into memory
)

create_access_token() – создаёт новый JWT access-токен:

def create_access_token(
    user_id: str,
    scopes: Optional[list[str]] = None,
    role: Optional[str] = None,
    expires_delta: Optional[timedelta] = None,
    group_id: Optional[str] = None,
) -> str:

get_token_jti() – извлекает JTI (JWT ID) из токена без полной проверки подписи. Используется для операций с чёрным списком, когда токен уже может быть просрочен:

def get_token_jti(token: str) -> str:

Возвращает пустую строку в случае ошибки извлечения.

Пример использования

# Get token
curl -X POST /api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "analyst", "password": "***"}'

# Response:
{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 1800
}

# Use token
curl -X GET /api/v1/scenarios \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

Отзыв токена (чёрный список)

# Logout - add token to blacklist
curl -X POST /api/v1/auth/logout \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

После добавления в чёрный список токен становится недействительным немедленно. Подробности реализации см. в разделе API чёрного списка токенов.


2. API-ключи

Для интеграций, CI/CD и автоматизации.

Формат API-ключа

Prefix:  rag_<8 hex chars>     (e.g., rag_a1b2c3d4)
Secret:  <48 hex chars>        (secrets.token_hex(24))
Full:    rag_<8hex>_<48hex>

Example: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8

Ключ генерируется функцией generate_api_key() в src/api/auth/api_keys.py:

def generate_api_key() -> tuple[str, str, str]:
    """Returns (full_key, prefix, key_hash)."""
    prefix = f"rag_{secrets.token_hex(4)}"   # e.g., rag_a1b2c3d4
    secret = secrets.token_hex(24)            # 48 hex chars
    full_key = f"{prefix}_{secret}"
    key_hash = hash_api_key(full_key)         # SHA-256
    return full_key, prefix, key_hash

Безопасность хранения

  • Ключи хешируются SHA-256 перед сохранением (hash_api_key())
  • Полный ключ отображается только при создании (возвращается как ApiKeyWithSecret)
  • Поддержка истечения срока действия (is_key_expired()) и отзыва (флаг is_revoked)
  • Сравнение за константное время через secrets.compare_digest() в verify_api_key()

Scope API-ключей по умолчанию

Функция get_default_scopes_for_api_key() возвращает scope по умолчанию для новых API-ключей:

def get_default_scopes_for_api_key() -> List[str]:
    return [
        "scenarios:read",
        "scenarios:execute",
        "query:execute",
        "sessions:read",
        "sessions:write",
        "history:read",
    ]

Пример использования

# Create API key
curl -X POST /api/v1/auth/api-keys \
  -H "Authorization: Bearer <admin_token>" \
  -d '{"name": "CI Pipeline", "scopes": ["query:execute", "scenarios:execute"]}'

# Response:
{
  "id": "key_123",
  "key": "rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8",
  "name": "CI Pipeline",
  "prefix": "rag_a1b2c3d4",
  "scopes": ["query:execute", "scenarios:execute"],
  "created_at": "2026-03-07T10:00:00Z",
  "expires_at": "2026-06-07T10:00:00Z",
  "is_revoked": false
}

# Use API key
curl -X GET /api/v1/scenarios \
  -H "X-API-Key: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8"

Service accounts и machine access

Service accounts используют тот же транспорт X-API-Key, но вводят отдельный lifecycle для machine identity.

Реализованные возможности:

  • отдельные сущности ServiceAccount и ServiceAccountCredential
  • project/group scoping
  • interface allowlist (api, mcp)
  • rotate/revoke/deactivate flows
  • unified audit metadata для API и MCP
  • per-service-account rate limiting
  • проверка версии machine contract через X-CodeGraph-Machine-Contract

Admin API:

  • GET /api/v1/auth/service-accounts/action-catalog
  • POST /api/v1/auth/service-accounts
  • GET /api/v1/auth/service-accounts
  • GET /api/v1/auth/service-accounts/{id}
  • POST /api/v1/auth/service-accounts/{id}/rotate
  • POST /api/v1/auth/service-accounts/{id}/credentials/{credential_id}/revoke
  • POST /api/v1/auth/service-accounts/{id}/deactivate

CLI:

codegraph auth service-account create --owner admin --name ci-bot --template ci --interface api
codegraph auth service-account list
codegraph auth service-account inspect --id <service-account-id>
codegraph auth service-account rotate --id <service-account-id>
codegraph auth service-account revoke --id <credential-id>
codegraph auth service-account deactivate --id <service-account-id>

Эксплуатационные замечания:

  • используйте service accounts для CI, ботов, порталов и MCP/API automation
  • при rotation сначала раскатывайте новый credential, затем revoke старого
  • API и MCP используют одну и ту же логику проверки scope и interface
  • для high-assurance deployments можно включить proxy-terminated mTLS bindings на интерфейсах grpc и acp, сохраняя API-key-based service-account auth
  • trusted certificate bindings настраиваются через security.service_account_mtls_*, а fingerprints привязываются к UUID service account
  • OAuth2/OIDC client-credentials для machine callers остаётся deferred и не входит в текущий service-account runtime baseline

3. OAuth2/OIDC

Поддержка корпоративных поставщиков идентификации. Полное руководство по настройке см. в документе Интеграция OAuth2/OIDC и LDAP/AD.

Поддерживаемые провайдеры

Провайдер Статус Область применения
GitHub Готов Разработчики, open-source
GitLab Готов Корпоративный GitLab
Google Готов Google Workspace
Keycloak Готов Корпоративный IdP
Azure AD Готов Microsoft 365

Конфигурация (пример Keycloak)

oauth2:
  providers:
    keycloak:
      enabled: true
      client_id: "codegraph"
      client_secret: "${KEYCLOAK_SECRET}"
      authorize_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/auth"
      token_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/token"
      userinfo_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/userinfo"
      scopes: ["openid", "profile", "email", "groups"]
      role_claim: "realm_access.roles"
      role_mapping:
        codegraph-admin: admin
        codegraph-reviewer: reviewer
        codegraph-analyst: analyst
        codegraph-viewer: viewer

4. LDAP/Active Directory

Интеграция с корпоративным каталогом. Полное руководство по настройке см. в документе Интеграция OAuth2/OIDC и LDAP/AD.

Конфигурация

ldap:
  enabled: true
  server: "ldaps://ldap.company.com:636"
  base_dn: "dc=company,dc=com"
  bind_dn: "cn=codegraph,ou=services,dc=company,dc=com"
  bind_password: "${LDAP_PASSWORD}"
  user_search_base: "ou=users,dc=company,dc=com"
  user_search_filter: "(sAMAccountName={username})"
  group_search_base: "ou=groups,dc=company,dc=com"
  group_membership_attr: "memberOf"
  group_mapping:
    "CN=CodeGraph-Admins,OU=Groups,DC=company,DC=com": admin
    "CN=CodeGraph-Reviewers,OU=Groups,DC=company,DC=com": reviewer
    "CN=CodeGraph-Analysts,OU=Groups,DC=company,DC=com": analyst
    "CN=CodeGraph-Viewers,OU=Groups,DC=company,DC=com": viewer
  sync_interval_minutes: 15

Синхронизация групп

  • Автоматическая синхронизация ролей с группами AD
  • Поддержка вложенных групп
  • Кеширование с настраиваемым TTL

5. IAM (Yandex Cloud)

Облачная аутентификация для сервисов, работающих в Yandex Cloud. Пользователи, прошедшие аутентификацию через IAM, получают роль ANALYST по умолчанию с ограниченным набором scope.

Принцип работы

Middleware проверяет заголовок X-YC-IAM-Token до аутентификации по JWT и API-ключам. Если заголовок присутствует, токен валидируется через IAM-валидатор (src/api/auth/iam).

# IAM authentication flow in get_auth_context():
iam_token = request.headers.get("X-YC-IAM-Token")
if iam_token:
    validator = get_iam_validator()
    user_info = await validator.validate_token(iam_token)
    # Returns AuthContext with:
    #   role=Role.ANALYST
    #   scopes=["scenarios:read", "query:execute", "review:execute"]
    #   auth_method="iam"

Использование

# Authenticate with IAM token
curl -X GET /api/v1/scenarios \
  -H "X-YC-IAM-Token: t1.9euelZqMkJKVjJqTnZyRm5GUkZqRke..."

Разрешения IAM по умолчанию

Поле Значение
role ANALYST
scopes scenarios:read, query:execute, review:execute
auth_method iam

Окружение

Требуется переменная окружения IAM_ENABLED и настроенный IAM-валидатор. Если валидатор недоступен, аутентификация переходит к методам JWT/API-ключ.


Справочник API

Зависимости middleware

Все функции middleware находятся в src/api/auth/middleware.py:

from src.api.auth.middleware import (
    get_auth_context,        # Get context (returns unauthenticated context if no credentials)
    get_current_user,        # FastAPI dependency, raises 401 if unauthenticated
    get_optional_user,       # FastAPI dependency, returns unauthenticated context if no credentials
    require_auth,            # Require any authentication (raises 401)
    require_permission,      # Factory: require specific permission (raises 403)
    require_any_permission,  # Factory: require any of multiple permissions (raises 403)
    require_role,            # Factory: require specific role (raises 403)
    require_admin,           # Shortcut for require_role(Role.ADMIN)
    require_analyst,         # Shortcut for require_role(Role.ANALYST, Role.REVIEWER, Role.ADMIN)
    require_reviewer,        # Shortcut for require_role(Role.REVIEWER, Role.ADMIN)
)

Сигнатуры функций:

async def get_auth_context(request, bearer_token, api_key) -> AuthContext
async def get_current_user(auth: AuthContext = Depends(require_auth)) -> AuthContext
async def get_optional_user(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
async def require_auth(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
def require_permission(permission: Permission) -> Callable  # dependency factory
def require_any_permission(*permissions: Permission) -> Callable  # dependency factory
def require_role(*roles: Role) -> Callable  # dependency factory

Примеры использования в FastAPI

from fastapi import Depends, APIRouter
from src.api.auth.middleware import (
    require_permission,
    require_any_permission,
    require_admin,
    get_current_user,
    get_optional_user,
)
from src.api.auth.permissions import Permission

router = APIRouter()

# Require specific permission
@router.post("/query/execute")
async def execute_query(
    query: str,
    auth = Depends(require_permission(Permission.QUERY_EXECUTE))
):
    # auth.user_id, auth.role, auth.scopes, auth.group_id available
    return {"result": "..."}

# Require one of multiple permissions
@router.get("/reviews")
async def list_reviews(
    auth = Depends(require_any_permission(
        Permission.REVIEW_EXECUTE,
        Permission.REVIEW_GITHUB,
        Permission.REVIEW_GITLAB
    ))
):
    return {"reviews": [...]}

# Require Admin role
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: str,
    auth = Depends(require_admin)
):
    return {"deleted": user_id}

# Get authenticated user (raises 401 if not authenticated)
@router.get("/me")
async def get_profile(auth = Depends(get_current_user)):
    return {"user_id": auth.user_id, "role": auth.role}

# Optional authentication (no error if unauthenticated)
@router.get("/public")
async def public_endpoint(auth = Depends(get_optional_user)):
    if auth.is_authenticated:
        return {"greeting": f"Hello, {auth.username}"}
    return {"greeting": "Hello, anonymous"}

AuthContext

Класс AuthContext хранит состояние аутентификации для текущего запроса. Исходный код: src/api/auth/middleware.py.

class AuthContext:
    def __init__(
        self,
        user_id: Optional[str] = None,
        username: Optional[str] = None,
        role: Optional[Role] = None,
        scopes: Optional[List[str]] = None,
        auth_method: str = "none",       # "jwt", "api_key", "oauth2", "ldap", "iam", "none"
        group_id: Optional[str] = None,  # optional group scope (from JWT or API key)
    ):
        ...

    @property
    def is_authenticated(self) -> bool:
        """True if user_id is not None."""

    def has_permission(self, permission: Permission) -> bool:
        """Check if user has a specific permission."""
Поле Тип Описание
user_id Optional[str] Идентификатор пользователя (None, если не аутентифицирован)
username Optional[str] Имя пользователя
role Optional[Role] Роль: VIEWER, ANALYST, REVIEWER, ADMIN
scopes List[str] Список scope разрешений
auth_method str Использованный метод аутентификации
group_id Optional[str] Scope группы для мультитенантной изоляции

Вспомогательные функции разрешений

Все функции находятся в src/api/auth/permissions.py:

from src.api.auth.permissions import (
    get_role_permissions,
    has_permission,
    has_any_permission,
    has_all_permissions,
    validate_scopes,
    get_default_scopes_for_api_key,
)

get_role_permissions(role: Role) -> Set[Permission]

Возвращает набор разрешений для роли. Для ADMIN исходный словарь содержит только {Permission.ADMIN_ALL}.

has_permission(role, required_permission, user_scopes=None) -> bool

Проверяет, имеет ли роль или пользователь указанное разрешение. Возвращает True, если роль включает ADMIN_ALL, разрешение входит в набор разрешений роли или разрешение указано в явных scope пользователя.

def has_permission(
    role: Optional[Role],
    required_permission: Permission,
    user_scopes: Optional[List[str]] = None,
) -> bool:

has_any_permission(role, required_permissions, user_scopes=None) -> bool

Возвращает True, если хотя бы одно из указанных разрешений предоставлено.

def has_any_permission(
    role: Optional[Role],
    required_permissions: List[Permission],
    user_scopes: Optional[List[str]] = None,
) -> bool:

has_all_permissions(role, required_permissions, user_scopes=None) -> bool

Возвращает True, только если все указанные разрешения предоставлены.

def has_all_permissions(
    role: Optional[Role],
    required_permissions: List[Permission],
    user_scopes: Optional[List[str]] = None,
) -> bool:

validate_scopes(scopes: List[str]) -> List[str]

Валидирует и фильтрует строки scope по перечислению Permission. Возвращает только допустимые строки scope, молча отбрасывая неизвестные значения.

def validate_scopes(scopes: List[str]) -> List[str]:
    valid_permissions = {p.value for p in Permission}
    return [s for s in scopes if s in valid_permissions]

get_default_scopes_for_api_key() -> List[str]

Возвращает scope по умолчанию для новых API-ключей. Используется при управлении API-ключами, когда явные scope не указаны.

Проверка scope

Исходный файл: src/api/auth/scope_enforcement.py

Проверка scope API-ключей работает на двух уровнях:

1. Автоматическая проверка (middleware)ScopeEnforcementMiddleware (src/api/middleware/scope_enforcement.py) автоматически проверяет scope API-ключей по таблице ROUTE_SCOPE_MAP на основе префикса маршрута запроса. Выполняется для каждого запроса, аутентифицированного через API-ключ, без изменений в коде маршрутов.

Соответствие префиксов маршрутов и scope (выдержка):

Префикс маршрута Требуемый scope
/query query:execute
/review review:execute
/scenarios scenarios:read
/security scenarios:execute
/gocpg query:execute
/import scenarios:execute
/stats stats:read
/groups admin:all
/projects admin:all
/health None (публичный)
/auth None (публичный)

Ключи со scope admin:all обходят все проверки. Пользователи, аутентифицированные через JWT и IAM, не затрагиваются (разрешения проверяются через ролевой контроль доступа).

Конфигурация:

# config.yaml
security:
  api_key_scope_enforcement: true   # По умолчанию: включено

2. Явная проверка на эндпоинте – Фабрика зависимостей require_scope() проверяет, содержит ли список scopes аутентифицированного вызывающего определённую строку scope. Это отдельный механизм от проверки разрешений на основе ролей – он работает с необработанными строками scope.

from src.api.auth.scope_enforcement import require_scope

@router.get("/admin/stats")
async def admin_stats(auth: AuthContext = Depends(require_scope("admin:read"))):
    ...

Сигнатура:

def require_scope(scope: str):
    """FastAPI dependency factory that checks if the authenticated caller has a scope.

    Raises:
        HTTPException(401) if not authenticated
        HTTPException(403) if scope is missing
    """

Вспомогательная функция:

def check_api_key_scope(auth: AuthContext, request_path: str) -> Optional[str]:
    """Проверяет, имеет ли API-ключ требуемый scope для маршрута.

    Возвращает None при успехе, строку с ошибкой при отсутствии scope.
    """

Аутентификация вебхуков

Функция verify_webhook_signature() в src/api/auth/webhook_auth.py обеспечивает проверку подписи HMAC-SHA256 для событий вебхуков платформ с защитой от атак повторного воспроизведения.

Сигнатура:

async def verify_webhook_signature(
    request: Request,
    secret: Optional[str] = None,
    platform: str = "sourcecraft",
    max_age_seconds: int = 300,
) -> bytes:

Параметры:

Параметр Тип По умолчанию Описание
request Request обязательный Объект запроса FastAPI
secret Optional[str] None Секрет вебхука для HMAC-проверки. None пропускает проверку
platform str "sourcecraft" Имя платформы
max_age_seconds int 300 Максимальный возраст вебхука в секундах (защита от повторов)

Поддерживаемые платформы и заголовки:

Платформа Заголовок подписи Заголовок метки времени
sourcecraft X-SourceCraft-Signature X-SourceCraft-Timestamp
gitverse X-GitVerse-Signature X-GitVerse-Timestamp
github X-Hub-Signature-256 X-Hub-Timestamp

Защита от повторов: Функция проверяет заголовок метки времени относительно текущего времени. Если разница превышает max_age_seconds (по умолчанию 300 секунд / 5 минут), запрос отклоняется с HTTP 400, и отклонение фиксируется в журнале аудита.

Резервный вариант GitVerse: Если заголовок X-GitVerse-Signature отсутствует, функция также проверяет X-Hub-Signature-256 для обратной совместимости.

Пример использования:

from src.api.auth.webhook_auth import verify_webhook_signature

@router.post("/webhooks/sourcecraft")
async def handle_sourcecraft_webhook(request: Request):
    body = await verify_webhook_signature(
        request,
        secret="your-webhook-secret",
        platform="sourcecraft",
        max_age_seconds=300,
    )
    payload = json.loads(body)
    ...

API чёрного списка токенов

Чёрный список токенов обеспечивает мгновенный отзыв JWT-токенов. Используется PostgreSQL для постоянного хранения с кешем в оперативной памяти для быстрого поиска. Исходный код: src/api/auth/jwt_handler.py.

Функции

blacklist_token(jti, expires_at=None) -> None

Добавляет токен в чёрный список. Сохраняет JTI как в кеше в памяти (для немедленного действия), так и в PostgreSQL (для сохранности между перезапусками).

async def blacklist_token(jti: str, expires_at: Optional[datetime] = None) -> None:

is_token_blacklisted(jti) -> bool

Проверяет, находится ли токен в чёрном списке. Сначала использует кеш в памяти (быстрый путь), затем обращается к базе данных (медленный путь). Токены, найденные в базе данных, добавляются в кеш для последующих проверок.

async def is_token_blacklisted(jti: str) -> bool:

load_blacklist_cache() -> int

Загружает действующие (не просроченные) токены из чёрного списка PostgreSQL в кеш в памяти. Должна вызываться при запуске приложения. Возвращает количество загруженных токенов.

async def load_blacklist_cache() -> int:

_blacklist_sync_task(interval_seconds=60) -> None

Фоновая задача asyncio, которая периодически обновляет кеш чёрного списка из базы данных. Перехватывает все исключения для предотвращения остановки задачи при временных ошибках.

async def _blacklist_sync_task(interval_seconds: int = 60) -> None:

Архитектура

Request                              blacklist_token()
   |                                       |
   v                                       v
is_token_blacklisted()              _blacklisted_tokens (set)
   |                                  +    |
   |  1. check in-memory set          |    v
   |  2. fallback to PostgreSQL        | PostgreSQL: TokenBlacklist table
   |     (adds to cache if found)      |
   v                                   |
 True/False                     _blacklist_sync_task()
                                (periodic refresh)

Модели данных API-ключей

Исходный код: src/api/auth/api_keys.py

ApiKeyInfo

Информационная модель API-ключей (без секретного ключа). Используется для отображения списка и просмотра ключей.

class ApiKeyInfo(BaseModel):
    id: str                            # Unique key identifier
    name: str                          # Human-readable name
    prefix: str                        # Key prefix (e.g., "rag_a1b2c3d4")
    scopes: List[str]                  # Permission scopes
    created_at: datetime               # Creation timestamp
    expires_at: Optional[datetime]     # Expiration (None = never)
    last_used_at: Optional[datetime]   # Last usage timestamp
    is_revoked: bool                   # Revocation status

ApiKeyWithSecret

Расширенная модель, включающая полный ключ. Возвращается только при создании.

class ApiKeyWithSecret(ApiKeyInfo):
    key: str    # Full API key (e.g., "rag_a1b2c3d4_<48hex>")

Вспомогательные функции

from src.api.auth.api_keys import (
    generate_api_key,      # -> (full_key, prefix, key_hash)
    hash_api_key,          # key -> SHA-256 hex digest
    verify_api_key,        # (key, stored_hash) -> bool (constant-time)
    extract_prefix,        # key -> prefix string
    is_key_expired,        # expires_at -> bool
    calculate_expiration,  # days -> Optional[datetime]
    validate_api_key,      # Full validation pipeline (hash + revoked + expired)
)

Аудит и журналирование

События авторизации

Все события авторизации журналируются и отправляются в SIEM:

Событие Серьёзность Описание
AUTH_SUCCESS INFO Успешная аутентификация
AUTH_FAILURE WARNING Неудачная попытка
TOKEN_ISSUED INFO Выдан новый токен
TOKEN_REVOKED INFO Токен отозван (добавлен в чёрный список)
PERMISSION_DENIED WARNING Отказ в доступе
API_KEY_CREATED INFO Создан API-ключ
API_KEY_REVOKED INFO API-ключ отозван
WEBHOOK_REPLAY_REJECTED WARNING Обнаружена атака повторного воспроизведения вебхука

Формат журнала

{
  "timestamp": "2026-03-07T10:30:00.000Z",
  "event_type": "AUTH_SUCCESS",
  "user_id": "user_123",
  "username": "analyst@company.com",
  "role": "analyst",
  "auth_method": "jwt",
  "group_id": null,
  "ip_address": "10.0.0.50",
  "user_agent": "Mozilla/5.0...",
  "request_path": "/api/v1/scenarios",
  "request_method": "GET"
}

Групповой RBAC (мультитенантность)

Когда multi_tenant.enabled: true в config.yaml, CodeGraph обеспечивает контроль доступа на уровне групп для всех конечных точек данных. Проекты принадлежат группам, а пользователи имеют роли для каждой группы.

Групповые роли

Исходный код: src/api/database/models.py (перечисление GroupRole)

Роль Разрешения
VIEWER Только чтение: запросы, статистика, списки
EDITOR Чтение + запись: импорт, редактирование, активация
ADMIN Полный доступ: удаление проектов, управление участниками группы

Системные администраторы (Role.ADMIN) обходят все групповые проверки.

Принцип работы

Каждый API-запрос разрешает ProjectContext, содержащий project_id, group_id и db_path. Зависимость require_project_access(min_group_role) проверяет:

  1. Если multi_tenant.enabled равно false -> пропуск (без проверок)
  2. Если пользователь – системный администратор -> обход групповых проверок
  3. Иначе -> проверка таблицы user_group_access на минимальную роль
from src.api.auth.project_auth import require_project_access

# Require at least editor role within the project group
@router.post("/projects/{project_id}/import")
async def import_project(
    project_id: str,
    auth = Depends(require_project_access("editor"))
):
    ...

Идентификатор группы определяется из: заголовок X-Group-Id (приоритет) -> auth.group_id (из JWT/API-ключа).

Привязка API-ключей к группам

API-ключи могут быть дополнительно привязаны к определённой группе через столбец group_id: - NULL – доступ ко всем группам пользователя (обратная совместимость) - Указано значение – ограничивает разрешение проектов только этой группой

Контекст аудита

Когда мультитенантность включена, записи журнала аудита содержат поля project_id и group_id для аудита с учётом арендаторов.

Статус реализации

Компонент Файл Статус
GroupRole enum src/api/database/models.py Реализован
ProjectGroup model src/api/database/models.py Реализован
UserGroupAccess model src/api/database/models.py Реализован
require_project_access() src/api/auth/project_auth.py Реализован
_is_multi_tenant_enabled() src/api/auth/project_auth.py Реализован
_resolve_group_id() src/api/auth/project_auth.py Реализован

Конфигурация группового RBAC

Когда multi_tenant.enabled: false (по умолчанию), все проверки на уровне групп являются заглушками.

# config.yaml
multi_tenant:
  enabled: false    # Default: disabled

Миграция

Для существующих однотенантных установок выполните скрипт миграции однократно:

python scripts/migrate_default_group.py          # Creates "default" group, assigns all users as ADMIN
python scripts/migrate_default_group.py --dry-run # Preview changes without applying

Middleware валидации путей

Исходный файл: src/api/middleware/path_validation.py

PathValidationMiddleware предотвращает атаки обхода каталогов, проверяя параметры db_path и source_path в JSON-телах запросов по белому списку разрешённых директорий.

Принцип работы

  1. Перехватывает запросы POST/PUT/PATCH с JSON-телом
  2. Извлекает поля db_path и source_path, если они присутствуют
  3. Нормализует путь (os.path.realpath(), разрешение символических ссылок)
  4. Проверяет, что путь входит в белый список
  5. Отклоняет недопустимые пути с ответом 403 Forbidden

Правила валидации

  • Путь должен быть абсолютным после нормализации
  • Компоненты .. в пути отклоняются
  • Символические ссылки, разрешающиеся за пределы белого списка, отклоняются (настраивается)
  • Путь должен начинаться с одной из разрешённых базовых директорий

Источники белого списка

Белый список формируется автоматически из:

  • security.path_validation_allowed_base_dirs в config.yaml (например, data/projects/)
  • projects.registry.*.db_path – директории всех зарегистрированных баз данных проектов
  • projects.registry.*.source_path – все зарегистрированные каталоги исходного кода

Конфигурация валидации путей

# config.yaml
security:
  path_validation_enabled: true       # По умолчанию: включено
  path_validation_deny_symlinks: true  # Отклонять символические ссылки за пределами белого списка
  path_validation_deny_relative: true  # Отклонять относительные пути
  path_validation_allowed_base_dirs:
    - "data/projects/"

При path_validation_enabled: false middleware отключён.

Блокируемые векторы атак

Атака Пример Результат
Обход каталога {"db_path": "../../etc/passwd"} 403
Выход за пределы через абсолютный путь {"db_path": "/etc/shadow"} 403
Выход через символическую ссылку {"db_path": "/data/projects/link-to-root"} 403
Подмена типа {"db_path": 12345} 403
Относительный путь {"db_path": "relative/path.db"} 403

Лучшие практики

Для администраторов

  1. Принцип наименьших привилегий – назначайте минимально необходимые роли
  2. Используйте API-ключи с ограниченными scope для автоматизации; используйте get_default_scopes_for_api_key() как базовый набор
  3. Настройте синхронизацию LDAP/AD для централизованного управления
  4. Включите интеграцию с SIEM для мониторинга событий безопасности
  5. Регулярно проводите аудит активных сессий и API-ключей
  6. Используйте секреты вебхуков с защитой от повторов для всех интеграций с платформами
  7. Вызывайте load_blacklist_cache() при запуске приложения для восстановления чёрного списка токенов

Для разработчиков

  1. Используйте JWT для веб-приложений, API-ключи для CI/CD, IAM для сервисов Yandex Cloud
  2. Храните refresh-токены безопасно (httpOnly cookies)
  3. Корректно обрабатывайте 401/403 в пользовательском интерфейсе
  4. Никогда не журналируйте токены и ключи в открытом виде
  5. Используйте require_scope() для детализированных проверок scope на конечных точках с API-ключами
  6. Используйте validate_scopes() для фильтрации пользовательских списков scope перед сохранением
  7. Предпочитайте require_permission() вместо ручных проверок has_permission() в обработчиках маршрутов

Связанные документы


Версия: 2.0 | Март 2026