RBAC and Authorization

Technical Documentation for IT and Security Teams


Table of Contents

Overview

CodeGraph implements a comprehensive Role-Based Access Control (RBAC) system with support for multiple authentication modes. The system provides granular access control to platform features.

Key Capabilities

  • 4 role levels with hierarchical inheritance
  • 21 granular permissions across functional categories
  • Authentication modes: JWT, API keys, OAuth2, LDAP, IAM (Yandex Cloud), and unauthenticated access where explicitly allowed
  • Service-account lifecycle for non-human machine identities
  • Token blacklisting for instant access revocation
  • Webhook signature verification with replay attack protection
  • Scope enforcement for fine-grained API key access control
  • SIEM integration for authorization event auditing

RBAC Architecture

Role Hierarchy

                           +---------------------+
                           |       ADMIN         |
                           |   (ADMIN_ALL        |
                           |    meta-permission) |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |      REVIEWER       |
                           |  Code review +      |
                           |  GitHub + GitLab    |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |      ANALYST        |
                           |  Query execution    |
                           |  + API keys         |
                           +----------+----------+
                                      |
                           +----------v----------+
                           |       VIEWER        |
                           |    Read-only        |
                           +---------------------+

Permission Inheritance

Each role inherits all permissions from lower-level roles:

  • ADMIN – uses ADMIN_ALL meta-permission. The get_role_permissions() function expands it to the full set of 21 permissions at runtime. The raw ROLE_PERMISSIONS mapping stores only {Permission.ADMIN_ALL}.
  • REVIEWERANALYST permissions + code review (review:execute, review:github, review:gitlab)
  • ANALYSTVIEWER permissions + execution/export
  • VIEWER – read-only (base level)
from src.api.auth.permissions import Role, ROLE_PERMISSIONS, Permission

# ADMIN stores only the meta-permission
ROLE_PERMISSIONS[Role.ADMIN]  # => {Permission.ADMIN_ALL}

# get_role_permissions() expands ADMIN_ALL to all 21 permissions
from src.api.auth.permissions import get_role_permissions
perms = get_role_permissions(Role.ADMIN)  # => all Permission values

Permission Catalog

Permissions by Category

All 21 permissions are defined in the Permission(str, Enum) class in src/api/auth/permissions.py.

Scenarios (scenarios:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
scenarios:read SCENARIOS_READ View scenario list x x x x
scenarios:execute SCENARIOS_EXECUTE Run analysis scenarios x x x

Queries (query:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
query:execute QUERY_EXECUTE Execute SQL queries against CPG x x x
query:validate QUERY_VALIDATE Validate query syntax x x x

Code Review (review:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
review:execute REVIEW_EXECUTE Run automated review x x
review:github REVIEW_GITHUB GitHub PR integration x x
review:gitlab REVIEW_GITLAB GitLab MR integration x x

Sessions (sessions:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
sessions:read SESSIONS_READ View sessions x x x x
sessions:write SESSIONS_WRITE Create/modify sessions x x x
sessions:delete SESSIONS_DELETE Delete sessions x x x

History (history:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
history:read HISTORY_READ View query history x x x x
history:export HISTORY_EXPORT Export history x x x

Users (users:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
users:read USERS_READ View user list x
users:write USERS_WRITE Create/edit users x
users:delete USERS_DELETE Delete users x

API Keys (api_keys:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
api_keys:read API_KEYS_READ View own keys x x x
api_keys:write API_KEYS_WRITE Create keys x x x
api_keys:delete API_KEYS_DELETE Delete any keys x

Metrics (stats:*, metrics:*)

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
stats:read STATS_READ View statistics x x x x
metrics:read METRICS_READ Prometheus metrics x

Administration

Permission Enum Name Description VIEWER ANALYST REVIEWER ADMIN
admin:all ADMIN_ALL Full access (meta-permission) x

Authentication Methods

CodeGraph supports several authentication modes. The auth_method field in AuthContext indicates which mode was used:

Value Method Description
jwt JWT Bearer Token Primary method for web applications
api_key API Key For integrations, CI/CD, and automation
oauth2 OAuth2/OIDC Enterprise identity providers
ldap LDAP/AD Corporate directory integration
iam Yandex Cloud IAM Cloud-native authentication
none Unauthenticated Default when no credentials are provided

1. JWT Bearer Token

Primary method for web applications and interactive sessions.

Token Structure (TokenPayload)

class TokenPayload(BaseModel):
    sub: str                          # Subject (user_id)
    jti: str                          # JWT ID (unique identifier)
    exp: datetime                     # Expiration time
    iat: datetime                     # Issued at
    type: str                         # "access" or "refresh"
    scopes: list[str] = []            # Permission scopes
    role: Optional[str] = None        # User role
    group_id: Optional[str] = None    # Optional group scope (for CI/CD service accounts)

Source: src/api/auth/jwt_handler.py

Token Parameters

Token Type TTL Purpose
Access Token 30 minutes API request authorization
Refresh Token 7 days Access token renewal

Token Functions

from src.api.auth.jwt_handler import (
    create_access_token,   # Create JWT access token
    create_refresh_token,  # Create JWT refresh token
    decode_token,          # Decode and validate JWT
    verify_token,          # Verify JWT and check type
    get_token_jti,         # Extract JTI without full verification
    blacklist_token,       # Add token to blacklist
    is_token_blacklisted,  # Check if token is blacklisted
    load_blacklist_cache,  # Load blacklist from DB into memory
)

create_access_token() – creates a new JWT access token:

def create_access_token(
    user_id: str,
    scopes: Optional[list[str]] = None,
    role: Optional[str] = None,
    expires_delta: Optional[timedelta] = None,
    group_id: Optional[str] = None,
) -> str:

get_token_jti() – extracts the JTI (JWT ID) from a token without full signature verification. Used for blacklist operations when the token may already be expired:

def get_token_jti(token: str) -> str:

Returns an empty string if extraction fails.

Usage Example

# Get token
curl -X POST /api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "analyst", "password": "***"}'

# Response:
{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 1800
}

# Use token
curl -X GET /api/v1/scenarios \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

Token Revocation (Blacklisting)

# Logout - add token to blacklist
curl -X POST /api/v1/auth/logout \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

After blacklisting, the token becomes invalid immediately. See Token Blacklist API for implementation details.


2. API Keys

For integrations, CI/CD, and automation.

API Key Format

Prefix:  rag_<8 hex chars>     (e.g., rag_a1b2c3d4)
Secret:  <48 hex chars>        (secrets.token_hex(24))
Full:    rag_<8hex>_<48hex>

Example: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8

The key is generated by generate_api_key() in src/api/auth/api_keys.py:

def generate_api_key() -> tuple[str, str, str]:
    """Returns (full_key, prefix, key_hash)."""
    prefix = f"rag_{secrets.token_hex(4)}"   # e.g., rag_a1b2c3d4
    secret = secrets.token_hex(24)            # 48 hex chars
    full_key = f"{prefix}_{secret}"
    key_hash = hash_api_key(full_key)         # SHA-256
    return full_key, prefix, key_hash

Storage Security

  • Keys are SHA-256 hashed before storage (hash_api_key())
  • Full key shown only at creation time (returned as ApiKeyWithSecret)
  • Support for expiration (is_key_expired()) and revocation (is_revoked flag)
  • Constant-time comparison via secrets.compare_digest() in verify_api_key()

Default API Key Scopes

The get_default_scopes_for_api_key() function returns default scopes for new API keys:

def get_default_scopes_for_api_key() -> List[str]:
    return [
        "scenarios:read",
        "scenarios:execute",
        "query:execute",
        "sessions:read",
        "sessions:write",
        "history:read",
    ]

Usage Example

# Create API key
curl -X POST /api/v1/auth/api-keys \
  -H "Authorization: Bearer <admin_token>" \
  -d '{"name": "CI Pipeline", "scopes": ["query:execute", "scenarios:execute"]}'

# Response:
{
  "id": "key_123",
  "key": "rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8",
  "name": "CI Pipeline",
  "prefix": "rag_a1b2c3d4",
  "scopes": ["query:execute", "scenarios:execute"],
  "created_at": "2026-03-07T10:00:00Z",
  "expires_at": "2026-06-07T10:00:00Z",
  "is_revoked": false
}

# Use API key
curl -X GET /api/v1/scenarios \
  -H "X-API-Key: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8"

Service Accounts and Machine Access

Service accounts build on the X-API-Key transport but introduce a separate machine-identity lifecycle.

Implemented capabilities:

  • dedicated ServiceAccount and ServiceAccountCredential records
  • project/group scoping
  • interface allowlists (api, mcp)
  • rotate/revoke/deactivate flows
  • unified audit metadata across API and MCP
  • per-service-account rate limiting
  • machine contract version validation through X-CodeGraph-Machine-Contract

Admin API:

  • GET /api/v1/auth/service-accounts/action-catalog
  • POST /api/v1/auth/service-accounts
  • GET /api/v1/auth/service-accounts
  • GET /api/v1/auth/service-accounts/{id}
  • POST /api/v1/auth/service-accounts/{id}/rotate
  • POST /api/v1/auth/service-accounts/{id}/credentials/{credential_id}/revoke
  • POST /api/v1/auth/service-accounts/{id}/deactivate

CLI:

codegraph auth service-account create --owner admin --name ci-bot --template ci --interface api
codegraph auth service-account list
codegraph auth service-account inspect --id <service-account-id>
codegraph auth service-account rotate --id <service-account-id>
codegraph auth service-account revoke --id <credential-id>
codegraph auth service-account deactivate --id <service-account-id>

Operational notes:

  • use service accounts for CI, bots, portals, and MCP/API automation
  • roll out rotated credentials before revoking the old credential
  • API and MCP use the same scope and interface enforcement logic
  • high-assurance deployments can enable proxy-terminated mTLS bindings for grpc and acp while keeping API-key-based service-account auth in place
  • configure trusted certificate bindings with security.service_account_mtls_* settings and map fingerprints to service-account UUIDs
  • OAuth2/OIDC client-credentials for machine callers remains deferred and is not part of the current service-account runtime baseline

3. OAuth2/OIDC

Support for enterprise identity providers. See OAuth2/OIDC & LDAP/AD Integration for full configuration guide.

Supported Providers

Provider Status Use Case
GitHub Ready Developers, open-source
GitLab Ready Corporate GitLab
Google Ready Google Workspace
Keycloak Ready Enterprise IdP
Azure AD Ready Microsoft 365

Configuration (Keycloak Example)

oauth2:
  providers:
    keycloak:
      enabled: true
      client_id: "codegraph"
      client_secret: "${KEYCLOAK_SECRET}"
      authorize_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/auth"
      token_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/token"
      userinfo_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/userinfo"
      scopes: ["openid", "profile", "email", "groups"]
      role_claim: "realm_access.roles"
      role_mapping:
        codegraph-admin: admin
        codegraph-reviewer: reviewer
        codegraph-analyst: analyst
        codegraph-viewer: viewer

4. LDAP/Active Directory

Integration with corporate directory. See OAuth2/OIDC & LDAP/AD Integration for full configuration guide.

Configuration

ldap:
  enabled: true
  server: "ldaps://ldap.company.com:636"
  base_dn: "dc=company,dc=com"
  bind_dn: "cn=codegraph,ou=services,dc=company,dc=com"
  bind_password: "${LDAP_PASSWORD}"
  user_search_base: "ou=users,dc=company,dc=com"
  user_search_filter: "(sAMAccountName={username})"
  group_search_base: "ou=groups,dc=company,dc=com"
  group_membership_attr: "memberOf"
  group_mapping:
    "CN=CodeGraph-Admins,OU=Groups,DC=company,DC=com": admin
    "CN=CodeGraph-Reviewers,OU=Groups,DC=company,DC=com": reviewer
    "CN=CodeGraph-Analysts,OU=Groups,DC=company,DC=com": analyst
    "CN=CodeGraph-Viewers,OU=Groups,DC=company,DC=com": viewer
  sync_interval_minutes: 15

Group Synchronization

  • Automatic role sync with AD groups
  • Nested group support
  • Caching with configurable TTL

5. IAM (Yandex Cloud)

Cloud-native authentication for services running in Yandex Cloud. IAM-authenticated users receive the ANALYST role by default with a limited scope set.

How It Works

The middleware checks the X-YC-IAM-Token header before JWT and API key authentication. If the header is present, the token is validated via the IAM validator (src/api/auth/iam).

# IAM authentication flow in get_auth_context():
iam_token = request.headers.get("X-YC-IAM-Token")
if iam_token:
    validator = get_iam_validator()
    user_info = await validator.validate_token(iam_token)
    # Returns AuthContext with:
    #   role=Role.ANALYST
    #   scopes=["scenarios:read", "query:execute", "review:execute"]
    #   auth_method="iam"

Usage

# Authenticate with IAM token
curl -X GET /api/v1/scenarios \
  -H "X-YC-IAM-Token: t1.9euelZqMkJKVjJqTnZyRm5GUkZqRke..."

Default IAM Permissions

Field Value
role ANALYST
scopes scenarios:read, query:execute, review:execute
auth_method iam

Environment

Requires IAM_ENABLED environment variable and a configured IAM validator. When the validator is unavailable, authentication falls through to JWT/API key methods.


API Reference

Middleware Dependencies

All middleware functions are in src/api/auth/middleware.py:

from src.api.auth.middleware import (
    get_auth_context,        # Get context (returns unauthenticated context if no credentials)
    get_current_user,        # FastAPI dependency, raises 401 if unauthenticated
    get_optional_user,       # FastAPI dependency, returns unauthenticated context if no credentials
    require_auth,            # Require any authentication (raises 401)
    require_permission,      # Factory: require specific permission (raises 403)
    require_any_permission,  # Factory: require any of multiple permissions (raises 403)
    require_role,            # Factory: require specific role (raises 403)
    require_admin,           # Shortcut for require_role(Role.ADMIN)
    require_analyst,         # Shortcut for require_role(Role.ANALYST, Role.REVIEWER, Role.ADMIN)
    require_reviewer,        # Shortcut for require_role(Role.REVIEWER, Role.ADMIN)
)

Function signatures:

async def get_auth_context(request, bearer_token, api_key) -> AuthContext
async def get_current_user(auth: AuthContext = Depends(require_auth)) -> AuthContext
async def get_optional_user(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
async def require_auth(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
def require_permission(permission: Permission) -> Callable  # dependency factory
def require_any_permission(*permissions: Permission) -> Callable  # dependency factory
def require_role(*roles: Role) -> Callable  # dependency factory

FastAPI Usage Examples

from fastapi import Depends, APIRouter
from src.api.auth.middleware import (
    require_permission,
    require_any_permission,
    require_admin,
    get_current_user,
    get_optional_user,
)
from src.api.auth.permissions import Permission

router = APIRouter()

# Require specific permission
@router.post("/query/execute")
async def execute_query(
    query: str,
    auth = Depends(require_permission(Permission.QUERY_EXECUTE))
):
    # auth.user_id, auth.role, auth.scopes, auth.group_id available
    return {"result": "..."}

# Require one of multiple permissions
@router.get("/reviews")
async def list_reviews(
    auth = Depends(require_any_permission(
        Permission.REVIEW_EXECUTE,
        Permission.REVIEW_GITHUB,
        Permission.REVIEW_GITLAB
    ))
):
    return {"reviews": [...]}

# Require Admin role
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: str,
    auth = Depends(require_admin)
):
    return {"deleted": user_id}

# Get authenticated user (raises 401 if not authenticated)
@router.get("/me")
async def get_profile(auth = Depends(get_current_user)):
    return {"user_id": auth.user_id, "role": auth.role}

# Optional authentication (no error if unauthenticated)
@router.get("/public")
async def public_endpoint(auth = Depends(get_optional_user)):
    if auth.is_authenticated:
        return {"greeting": f"Hello, {auth.username}"}
    return {"greeting": "Hello, anonymous"}

AuthContext

The AuthContext class holds the authentication state for the current request. Source: src/api/auth/middleware.py.

class AuthContext:
    def __init__(
        self,
        user_id: Optional[str] = None,
        username: Optional[str] = None,
        role: Optional[Role] = None,
        scopes: Optional[List[str]] = None,
        auth_method: str = "none",       # "jwt", "api_key", "oauth2", "ldap", "iam", "none"
        group_id: Optional[str] = None,  # optional group scope (from JWT or API key)
    ):
        ...

    @property
    def is_authenticated(self) -> bool:
        """True if user_id is not None."""

    def has_permission(self, permission: Permission) -> bool:
        """Check if user has a specific permission."""
Field Type Description
user_id Optional[str] User ID (None if unauthenticated)
username Optional[str] Username
role Optional[Role] Role: VIEWER, ANALYST, REVIEWER, ADMIN
scopes List[str] Permission scope list
auth_method str Authentication method used
group_id Optional[str] Group scope for multi-tenant isolation

Permission Helper Functions

All functions are in src/api/auth/permissions.py:

from src.api.auth.permissions import (
    get_role_permissions,
    has_permission,
    has_any_permission,
    has_all_permissions,
    validate_scopes,
    get_default_scopes_for_api_key,
)

get_role_permissions(role: Role) -> Set[Permission]

Returns the set of permissions for a role. For ADMIN, the raw mapping contains only {Permission.ADMIN_ALL}.

has_permission(role, required_permission, user_scopes=None) -> bool

Checks if a role or user has a specific permission. Returns True if the role includes ADMIN_ALL, the permission is in role permissions, or the permission is in explicit user scopes.

def has_permission(
    role: Optional[Role],
    required_permission: Permission,
    user_scopes: Optional[List[str]] = None,
) -> bool:

has_any_permission(role, required_permissions, user_scopes=None) -> bool

Returns True if any of the specified permissions is granted.

def has_any_permission(
    role: Optional[Role],
    required_permissions: List[Permission],
    user_scopes: Optional[List[str]] = None,
) -> bool:

has_all_permissions(role, required_permissions, user_scopes=None) -> bool

Returns True only if all specified permissions are granted.

def has_all_permissions(
    role: Optional[Role],
    required_permissions: List[Permission],
    user_scopes: Optional[List[str]] = None,
) -> bool:

validate_scopes(scopes: List[str]) -> List[str]

Validates and filters scope strings against the Permission enum. Returns only valid scope strings, silently dropping unknown values.

def validate_scopes(scopes: List[str]) -> List[str]:
    valid_permissions = {p.value for p in Permission}
    return [s for s in scopes if s in valid_permissions]

get_default_scopes_for_api_key() -> List[str]

Returns default scopes for new API keys. Used in API key management when no explicit scopes are specified.

Scope Enforcement

Source: src/api/auth/scope_enforcement.py

API key scope enforcement operates at two levels:

1. Automatic enforcement (middleware) – The ScopeEnforcementMiddleware (src/api/middleware/scope_enforcement.py) automatically checks API key scopes against the ROUTE_SCOPE_MAP based on the request path prefix. This runs for every API-key-authenticated request without requiring any changes to router code.

Route prefix → scope mapping (excerpt):

Route Prefix Required Scope
/query query:execute
/review review:execute
/scenarios scenarios:read
/security scenarios:execute
/gocpg query:execute
/import scenarios:execute
/stats stats:read
/groups admin:all
/projects admin:all
/health None (public)
/auth None (public)

Keys with admin:all scope bypass all enforcement. JWT and IAM-authenticated users are not affected (permissions checked via role-based access control).

Configuration:

# config.yaml
security:
  api_key_scope_enforcement: true   # Default: enabled

2. Explicit per-endpoint enforcement – The require_scope() dependency factory checks if the authenticated caller has a specific scope string in their scopes list. This is separate from role-based permission checks – it operates on raw scope strings.

from src.api.auth.scope_enforcement import require_scope

@router.get("/admin/stats")
async def admin_stats(auth: AuthContext = Depends(require_scope("admin:read"))):
    ...

Signature:

def require_scope(scope: str):
    """FastAPI dependency factory that checks if the authenticated caller has a scope.

    Raises:
        HTTPException(401) if not authenticated
        HTTPException(403) if scope is missing
    """

Helper function:

def check_api_key_scope(auth: AuthContext, request_path: str) -> Optional[str]:
    """Check if an API-key-authenticated request has the required scope.

    Returns None if scope is satisfied, or an error message string if missing.
    """

Webhook Authentication

The verify_webhook_signature() function in src/api/auth/webhook_auth.py provides HMAC-SHA256 signature verification for platform webhook events with replay attack prevention.

Signature:

async def verify_webhook_signature(
    request: Request,
    secret: Optional[str] = None,
    platform: str = "sourcecraft",
    max_age_seconds: int = 300,
) -> bytes:

Parameters:

Parameter Type Default Description
request Request required FastAPI request object
secret Optional[str] None Webhook secret for HMAC verification. None skips verification
platform str "sourcecraft" Platform name
max_age_seconds int 300 Maximum webhook age in seconds (replay protection)

Supported platforms and headers:

Platform Signature Header Timestamp Header
sourcecraft X-SourceCraft-Signature X-SourceCraft-Timestamp
gitverse X-GitVerse-Signature X-GitVerse-Timestamp
github X-Hub-Signature-256 X-Hub-Timestamp

Replay protection: The function validates the timestamp header against the current time. If the difference exceeds max_age_seconds (default 300 seconds / 5 minutes), the request is rejected with HTTP 400 and the rejection is recorded in the audit log.

GitVerse fallback: If X-GitVerse-Signature is missing, the function also checks X-Hub-Signature-256 for backward compatibility.

Usage example:

from src.api.auth.webhook_auth import verify_webhook_signature

@router.post("/webhooks/sourcecraft")
async def handle_sourcecraft_webhook(request: Request):
    body = await verify_webhook_signature(
        request,
        secret="your-webhook-secret",
        platform="sourcecraft",
        max_age_seconds=300,
    )
    payload = json.loads(body)
    ...

Token Blacklist API

The token blacklist provides instant revocation of JWT tokens. It uses PostgreSQL for persistence with an in-memory cache for fast lookups. Source: src/api/auth/jwt_handler.py.

Functions

blacklist_token(jti, expires_at=None) -> None

Adds a token to the blacklist. Stores the JTI in both the in-memory cache (for immediate effect) and PostgreSQL (for persistence across restarts).

async def blacklist_token(jti: str, expires_at: Optional[datetime] = None) -> None:

is_token_blacklisted(jti) -> bool

Checks if a token is blacklisted. Uses the in-memory cache first (fast path), then falls back to the database (slow path). Tokens found in the database are added to the cache for subsequent lookups.

async def is_token_blacklisted(jti: str) -> bool:

load_blacklist_cache() -> int

Loads non-expired blacklisted tokens from PostgreSQL into the in-memory cache. Should be called during application startup. Returns the number of tokens loaded.

async def load_blacklist_cache() -> int:

_blacklist_sync_task(interval_seconds=60) -> None

Background asyncio task that periodically refreshes the blacklist cache from the database. Catches all exceptions to prevent the task from dying on transient errors.

async def _blacklist_sync_task(interval_seconds: int = 60) -> None:

Architecture

Request                              blacklist_token()
   |                                       |
   v                                       v
is_token_blacklisted()              _blacklisted_tokens (set)
   |                                  +    |
   |  1. check in-memory set          |    v
   |  2. fallback to PostgreSQL        | PostgreSQL: TokenBlacklist table
   |     (adds to cache if found)      |
   v                                   |
 True/False                     _blacklist_sync_task()
                                (periodic refresh)

API Key Data Models

Source: src/api/auth/api_keys.py

ApiKeyInfo

Information model for API keys (without the secret key). Used for listing and inspecting keys.

class ApiKeyInfo(BaseModel):
    id: str                            # Unique key identifier
    name: str                          # Human-readable name
    prefix: str                        # Key prefix (e.g., "rag_a1b2c3d4")
    scopes: List[str]                  # Permission scopes
    created_at: datetime               # Creation timestamp
    expires_at: Optional[datetime]     # Expiration (None = never)
    last_used_at: Optional[datetime]   # Last usage timestamp
    is_revoked: bool                   # Revocation status

ApiKeyWithSecret

Extended model that includes the full key. Only returned at creation time.

class ApiKeyWithSecret(ApiKeyInfo):
    key: str    # Full API key (e.g., "rag_a1b2c3d4_<48hex>")

Utility Functions

from src.api.auth.api_keys import (
    generate_api_key,      # -> (full_key, prefix, key_hash)
    hash_api_key,          # key -> SHA-256 hex digest
    verify_api_key,        # (key, stored_hash) -> bool (constant-time)
    extract_prefix,        # key -> prefix string
    is_key_expired,        # expires_at -> bool
    calculate_expiration,  # days -> Optional[datetime]
    validate_api_key,      # Full validation pipeline (hash + revoked + expired)
)

Auditing and Logging

Authorization Events

All authorization events are logged and sent to SIEM:

Event Severity Description
AUTH_SUCCESS INFO Successful authentication
AUTH_FAILURE WARNING Failed attempt
TOKEN_ISSUED INFO New token issued
TOKEN_REVOKED INFO Token revoked (blacklisted)
PERMISSION_DENIED WARNING Access denied
API_KEY_CREATED INFO API key created
API_KEY_REVOKED INFO API key revoked
WEBHOOK_REPLAY_REJECTED WARNING Webhook replay attack detected

Log Format

{
  "timestamp": "2026-03-07T10:30:00.000Z",
  "event_type": "AUTH_SUCCESS",
  "user_id": "user_123",
  "username": "analyst@company.com",
  "role": "analyst",
  "auth_method": "jwt",
  "group_id": null,
  "ip_address": "10.0.0.50",
  "user_agent": "Mozilla/5.0...",
  "request_path": "/api/v1/scenarios",
  "request_method": "GET"
}

Group-Level RBAC (Multi-Tenant)

When multi_tenant.enabled: true in config.yaml, CodeGraph enforces group-level access control on all data endpoints. Projects belong to groups, and users have per-group roles.

Group Roles

Source: src/api/database/models.py (GroupRole enum)

Role Permissions
VIEWER Read-only: query, stats, list
EDITOR Read + write: import, edit, activate
ADMIN Full access: delete projects, manage group members

System administrators (Role.ADMIN) bypass all group checks.

How It Works

Every API request resolves a ProjectContext containing project_id, group_id, and db_path. The require_project_access(min_group_role) dependency checks:

  1. If multi_tenant.enabled is false -> pass-through (no enforcement)
  2. If user is system admin -> bypass group checks
  3. Otherwise -> verify user_group_access table for minimum role
from src.api.auth.project_auth import require_project_access

# Require at least editor role within the project group
@router.post("/projects/{project_id}/import")
async def import_project(
    project_id: str,
    auth = Depends(require_project_access("editor"))
):
    ...

Group ID is resolved from: X-Group-Id header (priority) -> auth.group_id (from JWT/API key).

API Key Group Scoping

API keys can optionally be scoped to a specific group via the group_id column: - NULL – access all groups the user belongs to (backward compatible) - Set – restricts project resolution to that group only

Audit Context

When multi-tenant is enabled, audit log entries include project_id and group_id fields for tenant-aware audit trails.

Implementation Status

Component File Status
GroupRole enum src/api/database/models.py Implemented
ProjectGroup model src/api/database/models.py Implemented
UserGroupAccess model src/api/database/models.py Implemented
require_project_access() src/api/auth/project_auth.py Implemented
_is_multi_tenant_enabled() src/api/auth/project_auth.py Implemented
_resolve_group_id() src/api/auth/project_auth.py Implemented

Group RBAC Configuration

When multi_tenant.enabled: false (default), all group-level checks are no-ops.

# config.yaml
multi_tenant:
  enabled: false    # Default: disabled

Migration

For existing single-tenant installations, run the migration script once:

python scripts/migrate_default_group.py          # Creates "default" group, assigns all users as ADMIN
python scripts/migrate_default_group.py --dry-run # Preview changes without applying

Path Validation Middleware

Source: src/api/middleware/path_validation.py

The PathValidationMiddleware prevents path traversal attacks by validating db_path and source_path parameters in JSON request bodies against a whitelist of allowed directories.

How It Works

  1. Intercepts POST/PUT/PATCH requests with JSON bodies
  2. Extracts db_path and source_path fields if present
  3. Normalizes the path (os.path.realpath(), symlink resolution)
  4. Verifies the path falls within the whitelist
  5. Rejects invalid paths with 403 Forbidden

Validation Rules

  • Path must be absolute after normalization
  • .. path components are rejected
  • Symlinks that resolve outside the whitelist are rejected (configurable)
  • Path must start with one of the allowed base directories

Whitelist Sources

The whitelist is built automatically from:

  • security.path_validation_allowed_base_dirs in config.yaml (e.g. data/projects/)
  • projects.registry.*.db_path – directories of all registered project databases
  • projects.registry.*.source_path – all registered source directories

Path Validation Configuration

# config.yaml
security:
  path_validation_enabled: true       # Default: enabled
  path_validation_deny_symlinks: true  # Reject symlinks outside whitelist
  path_validation_deny_relative: true  # Reject relative paths
  path_validation_allowed_base_dirs:
    - "data/projects/"

When path_validation_enabled: false, the middleware is a no-op.

Attack Vectors Blocked

Attack Example Result
Directory traversal {"db_path": "../../etc/passwd"} 403
Absolute path escape {"db_path": "/etc/shadow"} 403
Symlink escape {"db_path": "/data/projects/link-to-root"} 403
Type confusion {"db_path": 12345} 403
Relative path {"db_path": "relative/path.db"} 403

Best Practices

For Administrators

  1. Least privilege principle – assign minimum necessary roles
  2. Use API keys with limited scopes for automation; use get_default_scopes_for_api_key() as a baseline
  3. Configure LDAP/AD sync for centralized management
  4. Enable SIEM integration for security event monitoring
  5. Regularly audit active sessions and API keys
  6. Use webhook secrets with replay protection for all platform integrations
  7. Call load_blacklist_cache() during application startup to restore blacklisted tokens

For Developers

  1. Use JWT for web apps, API keys for CI/CD, IAM for Yandex Cloud services
  2. Store refresh tokens securely (httpOnly cookies)
  3. Handle 401/403 correctly in UI
  4. Never log tokens or keys in plain text
  5. Use require_scope() for fine-grained scope checks on API key endpoints
  6. Use validate_scopes() to filter user-provided scope lists before storing
  7. Prefer require_permission() over manual has_permission() checks in route handlers


Version: 2.0 | March 2026