Technical Documentation for IT and Security Teams
Table of Contents¶
- Overview
- Key Capabilities
- RBAC Architecture
- Role Hierarchy
- Permission Inheritance
- Permission Catalog
- Permissions by Category
- Authentication Methods
- 1. JWT Bearer Token
- 2. API Keys
- Service Accounts and Machine Access
- 3. OAuth2/OIDC
- 4. LDAP/Active Directory
- 5. IAM (Yandex Cloud)
- API Reference
- Middleware Dependencies
- FastAPI Usage Examples
- AuthContext
- Permission Helper Functions
- Scope Enforcement
- Webhook Authentication
- Token Blacklist API
- API Key Data Models
- Auditing and Logging
- Authorization Events
- Log Format
- Group-Level RBAC (Multi-Tenant)
- Implementation Status
- Group RBAC Configuration
- Path Validation Middleware
- Path Validation Configuration
- Attack Vectors Blocked
- Best Practices
- For Administrators
- For Developers
- Related Documents
Overview¶
CodeGraph implements a comprehensive Role-Based Access Control (RBAC) system with support for multiple authentication modes. The system provides granular access control to platform features.
Key Capabilities¶
- 4 role levels with hierarchical inheritance
- 21 granular permissions across functional categories
- Authentication modes: JWT, API keys, OAuth2, LDAP, IAM (Yandex Cloud), and unauthenticated access where explicitly allowed
- Service-account lifecycle for non-human machine identities
- Token blacklisting for instant access revocation
- Webhook signature verification with replay attack protection
- Scope enforcement for fine-grained API key access control
- SIEM integration for authorization event auditing
RBAC Architecture¶
Role Hierarchy¶
+---------------------+
| ADMIN |
| (ADMIN_ALL |
| meta-permission) |
+----------+----------+
|
+----------v----------+
| REVIEWER |
| Code review + |
| GitHub + GitLab |
+----------+----------+
|
+----------v----------+
| ANALYST |
| Query execution |
| + API keys |
+----------+----------+
|
+----------v----------+
| VIEWER |
| Read-only |
+---------------------+
Permission Inheritance¶
Each role inherits all permissions from lower-level roles:
ADMIN– usesADMIN_ALLmeta-permission. Theget_role_permissions()function expands it to the full set of 21 permissions at runtime. The rawROLE_PERMISSIONSmapping stores only{Permission.ADMIN_ALL}.REVIEWER–ANALYSTpermissions + code review (review:execute,review:github,review:gitlab)ANALYST–VIEWERpermissions + execution/exportVIEWER– read-only (base level)
from src.api.auth.permissions import Role, ROLE_PERMISSIONS, Permission
# ADMIN stores only the meta-permission
ROLE_PERMISSIONS[Role.ADMIN] # => {Permission.ADMIN_ALL}
# get_role_permissions() expands ADMIN_ALL to all 21 permissions
from src.api.auth.permissions import get_role_permissions
perms = get_role_permissions(Role.ADMIN) # => all Permission values
Permission Catalog¶
Permissions by Category¶
All 21 permissions are defined in the Permission(str, Enum) class in src/api/auth/permissions.py.
Scenarios (scenarios:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
scenarios:read |
SCENARIOS_READ |
View scenario list | x | x | x | x |
scenarios:execute |
SCENARIOS_EXECUTE |
Run analysis scenarios | x | x | x |
Queries (query:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
query:execute |
QUERY_EXECUTE |
Execute SQL queries against CPG | x | x | x | |
query:validate |
QUERY_VALIDATE |
Validate query syntax | x | x | x |
Code Review (review:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
review:execute |
REVIEW_EXECUTE |
Run automated review | x | x | ||
review:github |
REVIEW_GITHUB |
GitHub PR integration | x | x | ||
review:gitlab |
REVIEW_GITLAB |
GitLab MR integration | x | x |
Sessions (sessions:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
sessions:read |
SESSIONS_READ |
View sessions | x | x | x | x |
sessions:write |
SESSIONS_WRITE |
Create/modify sessions | x | x | x | |
sessions:delete |
SESSIONS_DELETE |
Delete sessions | x | x | x |
History (history:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
history:read |
HISTORY_READ |
View query history | x | x | x | x |
history:export |
HISTORY_EXPORT |
Export history | x | x | x |
Users (users:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
users:read |
USERS_READ |
View user list | x | |||
users:write |
USERS_WRITE |
Create/edit users | x | |||
users:delete |
USERS_DELETE |
Delete users | x |
API Keys (api_keys:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
api_keys:read |
API_KEYS_READ |
View own keys | x | x | x | |
api_keys:write |
API_KEYS_WRITE |
Create keys | x | x | x | |
api_keys:delete |
API_KEYS_DELETE |
Delete any keys | x |
Metrics (stats:*, metrics:*)¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
stats:read |
STATS_READ |
View statistics | x | x | x | x |
metrics:read |
METRICS_READ |
Prometheus metrics | x |
Administration¶
| Permission | Enum Name | Description | VIEWER | ANALYST | REVIEWER | ADMIN |
|---|---|---|---|---|---|---|
admin:all |
ADMIN_ALL |
Full access (meta-permission) | x |
Authentication Methods¶
CodeGraph supports several authentication modes. The auth_method field in AuthContext indicates which mode was used:
| Value | Method | Description |
|---|---|---|
jwt |
JWT Bearer Token | Primary method for web applications |
api_key |
API Key | For integrations, CI/CD, and automation |
oauth2 |
OAuth2/OIDC | Enterprise identity providers |
ldap |
LDAP/AD | Corporate directory integration |
iam |
Yandex Cloud IAM | Cloud-native authentication |
none |
Unauthenticated | Default when no credentials are provided |
1. JWT Bearer Token¶
Primary method for web applications and interactive sessions.
Token Structure (TokenPayload)¶
class TokenPayload(BaseModel):
sub: str # Subject (user_id)
jti: str # JWT ID (unique identifier)
exp: datetime # Expiration time
iat: datetime # Issued at
type: str # "access" or "refresh"
scopes: list[str] = [] # Permission scopes
role: Optional[str] = None # User role
group_id: Optional[str] = None # Optional group scope (for CI/CD service accounts)
Source: src/api/auth/jwt_handler.py
Token Parameters¶
| Token Type | TTL | Purpose |
|---|---|---|
| Access Token | 30 minutes | API request authorization |
| Refresh Token | 7 days | Access token renewal |
Token Functions¶
from src.api.auth.jwt_handler import (
create_access_token, # Create JWT access token
create_refresh_token, # Create JWT refresh token
decode_token, # Decode and validate JWT
verify_token, # Verify JWT and check type
get_token_jti, # Extract JTI without full verification
blacklist_token, # Add token to blacklist
is_token_blacklisted, # Check if token is blacklisted
load_blacklist_cache, # Load blacklist from DB into memory
)
create_access_token() – creates a new JWT access token:
def create_access_token(
user_id: str,
scopes: Optional[list[str]] = None,
role: Optional[str] = None,
expires_delta: Optional[timedelta] = None,
group_id: Optional[str] = None,
) -> str:
get_token_jti() – extracts the JTI (JWT ID) from a token without full signature verification. Used for blacklist operations when the token may already be expired:
def get_token_jti(token: str) -> str:
Returns an empty string if extraction fails.
Usage Example¶
# Get token
curl -X POST /api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"username": "analyst", "password": "***"}'
# Response:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 1800
}
# Use token
curl -X GET /api/v1/scenarios \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
Token Revocation (Blacklisting)¶
# Logout - add token to blacklist
curl -X POST /api/v1/auth/logout \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
After blacklisting, the token becomes invalid immediately. See Token Blacklist API for implementation details.
2. API Keys¶
For integrations, CI/CD, and automation.
API Key Format¶
Prefix: rag_<8 hex chars> (e.g., rag_a1b2c3d4)
Secret: <48 hex chars> (secrets.token_hex(24))
Full: rag_<8hex>_<48hex>
Example: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8
The key is generated by generate_api_key() in src/api/auth/api_keys.py:
def generate_api_key() -> tuple[str, str, str]:
"""Returns (full_key, prefix, key_hash)."""
prefix = f"rag_{secrets.token_hex(4)}" # e.g., rag_a1b2c3d4
secret = secrets.token_hex(24) # 48 hex chars
full_key = f"{prefix}_{secret}"
key_hash = hash_api_key(full_key) # SHA-256
return full_key, prefix, key_hash
Storage Security¶
- Keys are SHA-256 hashed before storage (
hash_api_key()) - Full key shown only at creation time (returned as
ApiKeyWithSecret) - Support for expiration (
is_key_expired()) and revocation (is_revokedflag) - Constant-time comparison via
secrets.compare_digest()inverify_api_key()
Default API Key Scopes¶
The get_default_scopes_for_api_key() function returns default scopes for new API keys:
def get_default_scopes_for_api_key() -> List[str]:
return [
"scenarios:read",
"scenarios:execute",
"query:execute",
"sessions:read",
"sessions:write",
"history:read",
]
Usage Example¶
# Create API key
curl -X POST /api/v1/auth/api-keys \
-H "Authorization: Bearer <admin_token>" \
-d '{"name": "CI Pipeline", "scopes": ["query:execute", "scenarios:execute"]}'
# Response:
{
"id": "key_123",
"key": "rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8",
"name": "CI Pipeline",
"prefix": "rag_a1b2c3d4",
"scopes": ["query:execute", "scenarios:execute"],
"created_at": "2026-03-07T10:00:00Z",
"expires_at": "2026-06-07T10:00:00Z",
"is_revoked": false
}
# Use API key
curl -X GET /api/v1/scenarios \
-H "X-API-Key: rag_a1b2c3d4_e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8"
Service Accounts and Machine Access¶
Service accounts build on the X-API-Key transport but introduce a separate machine-identity lifecycle.
Implemented capabilities:
- dedicated
ServiceAccountandServiceAccountCredentialrecords - project/group scoping
- interface allowlists (
api,mcp) - rotate/revoke/deactivate flows
- unified audit metadata across API and MCP
- per-service-account rate limiting
- machine contract version validation through
X-CodeGraph-Machine-Contract
Admin API:
GET /api/v1/auth/service-accounts/action-catalogPOST /api/v1/auth/service-accountsGET /api/v1/auth/service-accountsGET /api/v1/auth/service-accounts/{id}POST /api/v1/auth/service-accounts/{id}/rotatePOST /api/v1/auth/service-accounts/{id}/credentials/{credential_id}/revokePOST /api/v1/auth/service-accounts/{id}/deactivate
CLI:
codegraph auth service-account create --owner admin --name ci-bot --template ci --interface api
codegraph auth service-account list
codegraph auth service-account inspect --id <service-account-id>
codegraph auth service-account rotate --id <service-account-id>
codegraph auth service-account revoke --id <credential-id>
codegraph auth service-account deactivate --id <service-account-id>
Operational notes:
- use service accounts for CI, bots, portals, and MCP/API automation
- roll out rotated credentials before revoking the old credential
- API and MCP use the same scope and interface enforcement logic
- high-assurance deployments can enable proxy-terminated mTLS bindings for
grpcandacpwhile keeping API-key-based service-account auth in place - configure trusted certificate bindings with
security.service_account_mtls_*settings and map fingerprints to service-account UUIDs - OAuth2/OIDC client-credentials for machine callers remains deferred and is not part of the current service-account runtime baseline
3. OAuth2/OIDC¶
Support for enterprise identity providers. See OAuth2/OIDC & LDAP/AD Integration for full configuration guide.
Supported Providers¶
| Provider | Status | Use Case |
|---|---|---|
| GitHub | Ready | Developers, open-source |
| GitLab | Ready | Corporate GitLab |
| Ready | Google Workspace | |
| Keycloak | Ready | Enterprise IdP |
| Azure AD | Ready | Microsoft 365 |
Configuration (Keycloak Example)¶
oauth2:
providers:
keycloak:
enabled: true
client_id: "codegraph"
client_secret: "${KEYCLOAK_SECRET}"
authorize_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/auth"
token_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/token"
userinfo_url: "https://keycloak.company.com/realms/main/protocol/openid-connect/userinfo"
scopes: ["openid", "profile", "email", "groups"]
role_claim: "realm_access.roles"
role_mapping:
codegraph-admin: admin
codegraph-reviewer: reviewer
codegraph-analyst: analyst
codegraph-viewer: viewer
4. LDAP/Active Directory¶
Integration with corporate directory. See OAuth2/OIDC & LDAP/AD Integration for full configuration guide.
Configuration¶
ldap:
enabled: true
server: "ldaps://ldap.company.com:636"
base_dn: "dc=company,dc=com"
bind_dn: "cn=codegraph,ou=services,dc=company,dc=com"
bind_password: "${LDAP_PASSWORD}"
user_search_base: "ou=users,dc=company,dc=com"
user_search_filter: "(sAMAccountName={username})"
group_search_base: "ou=groups,dc=company,dc=com"
group_membership_attr: "memberOf"
group_mapping:
"CN=CodeGraph-Admins,OU=Groups,DC=company,DC=com": admin
"CN=CodeGraph-Reviewers,OU=Groups,DC=company,DC=com": reviewer
"CN=CodeGraph-Analysts,OU=Groups,DC=company,DC=com": analyst
"CN=CodeGraph-Viewers,OU=Groups,DC=company,DC=com": viewer
sync_interval_minutes: 15
Group Synchronization¶
- Automatic role sync with AD groups
- Nested group support
- Caching with configurable TTL
5. IAM (Yandex Cloud)¶
Cloud-native authentication for services running in Yandex Cloud. IAM-authenticated users receive the ANALYST role by default with a limited scope set.
How It Works¶
The middleware checks the X-YC-IAM-Token header before JWT and API key authentication. If the header is present, the token is validated via the IAM validator (src/api/auth/iam).
# IAM authentication flow in get_auth_context():
iam_token = request.headers.get("X-YC-IAM-Token")
if iam_token:
validator = get_iam_validator()
user_info = await validator.validate_token(iam_token)
# Returns AuthContext with:
# role=Role.ANALYST
# scopes=["scenarios:read", "query:execute", "review:execute"]
# auth_method="iam"
Usage¶
# Authenticate with IAM token
curl -X GET /api/v1/scenarios \
-H "X-YC-IAM-Token: t1.9euelZqMkJKVjJqTnZyRm5GUkZqRke..."
Default IAM Permissions¶
| Field | Value |
|---|---|
role |
ANALYST |
scopes |
scenarios:read, query:execute, review:execute |
auth_method |
iam |
Environment¶
Requires IAM_ENABLED environment variable and a configured IAM validator. When the validator is unavailable, authentication falls through to JWT/API key methods.
API Reference¶
Middleware Dependencies¶
All middleware functions are in src/api/auth/middleware.py:
from src.api.auth.middleware import (
get_auth_context, # Get context (returns unauthenticated context if no credentials)
get_current_user, # FastAPI dependency, raises 401 if unauthenticated
get_optional_user, # FastAPI dependency, returns unauthenticated context if no credentials
require_auth, # Require any authentication (raises 401)
require_permission, # Factory: require specific permission (raises 403)
require_any_permission, # Factory: require any of multiple permissions (raises 403)
require_role, # Factory: require specific role (raises 403)
require_admin, # Shortcut for require_role(Role.ADMIN)
require_analyst, # Shortcut for require_role(Role.ANALYST, Role.REVIEWER, Role.ADMIN)
require_reviewer, # Shortcut for require_role(Role.REVIEWER, Role.ADMIN)
)
Function signatures:
async def get_auth_context(request, bearer_token, api_key) -> AuthContext
async def get_current_user(auth: AuthContext = Depends(require_auth)) -> AuthContext
async def get_optional_user(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
async def require_auth(auth: AuthContext = Depends(get_auth_context)) -> AuthContext
def require_permission(permission: Permission) -> Callable # dependency factory
def require_any_permission(*permissions: Permission) -> Callable # dependency factory
def require_role(*roles: Role) -> Callable # dependency factory
FastAPI Usage Examples¶
from fastapi import Depends, APIRouter
from src.api.auth.middleware import (
require_permission,
require_any_permission,
require_admin,
get_current_user,
get_optional_user,
)
from src.api.auth.permissions import Permission
router = APIRouter()
# Require specific permission
@router.post("/query/execute")
async def execute_query(
query: str,
auth = Depends(require_permission(Permission.QUERY_EXECUTE))
):
# auth.user_id, auth.role, auth.scopes, auth.group_id available
return {"result": "..."}
# Require one of multiple permissions
@router.get("/reviews")
async def list_reviews(
auth = Depends(require_any_permission(
Permission.REVIEW_EXECUTE,
Permission.REVIEW_GITHUB,
Permission.REVIEW_GITLAB
))
):
return {"reviews": [...]}
# Require Admin role
@router.delete("/users/{user_id}")
async def delete_user(
user_id: str,
auth = Depends(require_admin)
):
return {"deleted": user_id}
# Get authenticated user (raises 401 if not authenticated)
@router.get("/me")
async def get_profile(auth = Depends(get_current_user)):
return {"user_id": auth.user_id, "role": auth.role}
# Optional authentication (no error if unauthenticated)
@router.get("/public")
async def public_endpoint(auth = Depends(get_optional_user)):
if auth.is_authenticated:
return {"greeting": f"Hello, {auth.username}"}
return {"greeting": "Hello, anonymous"}
AuthContext¶
The AuthContext class holds the authentication state for the current request. Source: src/api/auth/middleware.py.
class AuthContext:
def __init__(
self,
user_id: Optional[str] = None,
username: Optional[str] = None,
role: Optional[Role] = None,
scopes: Optional[List[str]] = None,
auth_method: str = "none", # "jwt", "api_key", "oauth2", "ldap", "iam", "none"
group_id: Optional[str] = None, # optional group scope (from JWT or API key)
):
...
@property
def is_authenticated(self) -> bool:
"""True if user_id is not None."""
def has_permission(self, permission: Permission) -> bool:
"""Check if user has a specific permission."""
| Field | Type | Description |
|---|---|---|
user_id |
Optional[str] |
User ID (None if unauthenticated) |
username |
Optional[str] |
Username |
role |
Optional[Role] |
Role: VIEWER, ANALYST, REVIEWER, ADMIN |
scopes |
List[str] |
Permission scope list |
auth_method |
str |
Authentication method used |
group_id |
Optional[str] |
Group scope for multi-tenant isolation |
Permission Helper Functions¶
All functions are in src/api/auth/permissions.py:
from src.api.auth.permissions import (
get_role_permissions,
has_permission,
has_any_permission,
has_all_permissions,
validate_scopes,
get_default_scopes_for_api_key,
)
get_role_permissions(role: Role) -> Set[Permission]
Returns the set of permissions for a role. For ADMIN, the raw mapping contains only {Permission.ADMIN_ALL}.
has_permission(role, required_permission, user_scopes=None) -> bool
Checks if a role or user has a specific permission. Returns True if the role includes ADMIN_ALL, the permission is in role permissions, or the permission is in explicit user scopes.
def has_permission(
role: Optional[Role],
required_permission: Permission,
user_scopes: Optional[List[str]] = None,
) -> bool:
has_any_permission(role, required_permissions, user_scopes=None) -> bool
Returns True if any of the specified permissions is granted.
def has_any_permission(
role: Optional[Role],
required_permissions: List[Permission],
user_scopes: Optional[List[str]] = None,
) -> bool:
has_all_permissions(role, required_permissions, user_scopes=None) -> bool
Returns True only if all specified permissions are granted.
def has_all_permissions(
role: Optional[Role],
required_permissions: List[Permission],
user_scopes: Optional[List[str]] = None,
) -> bool:
validate_scopes(scopes: List[str]) -> List[str]
Validates and filters scope strings against the Permission enum. Returns only valid scope strings, silently dropping unknown values.
def validate_scopes(scopes: List[str]) -> List[str]:
valid_permissions = {p.value for p in Permission}
return [s for s in scopes if s in valid_permissions]
get_default_scopes_for_api_key() -> List[str]
Returns default scopes for new API keys. Used in API key management when no explicit scopes are specified.
Scope Enforcement¶
Source: src/api/auth/scope_enforcement.py
API key scope enforcement operates at two levels:
1. Automatic enforcement (middleware) – The ScopeEnforcementMiddleware (src/api/middleware/scope_enforcement.py) automatically checks API key scopes against the ROUTE_SCOPE_MAP based on the request path prefix. This runs for every API-key-authenticated request without requiring any changes to router code.
Route prefix → scope mapping (excerpt):
| Route Prefix | Required Scope |
|---|---|
/query |
query:execute |
/review |
review:execute |
/scenarios |
scenarios:read |
/security |
scenarios:execute |
/gocpg |
query:execute |
/import |
scenarios:execute |
/stats |
stats:read |
/groups |
admin:all |
/projects |
admin:all |
/health |
None (public) |
/auth |
None (public) |
Keys with admin:all scope bypass all enforcement. JWT and IAM-authenticated users are not affected (permissions checked via role-based access control).
Configuration:
# config.yaml
security:
api_key_scope_enforcement: true # Default: enabled
2. Explicit per-endpoint enforcement – The require_scope() dependency factory checks if the authenticated caller has a specific scope string in their scopes list. This is separate from role-based permission checks – it operates on raw scope strings.
from src.api.auth.scope_enforcement import require_scope
@router.get("/admin/stats")
async def admin_stats(auth: AuthContext = Depends(require_scope("admin:read"))):
...
Signature:
def require_scope(scope: str):
"""FastAPI dependency factory that checks if the authenticated caller has a scope.
Raises:
HTTPException(401) if not authenticated
HTTPException(403) if scope is missing
"""
Helper function:
def check_api_key_scope(auth: AuthContext, request_path: str) -> Optional[str]:
"""Check if an API-key-authenticated request has the required scope.
Returns None if scope is satisfied, or an error message string if missing.
"""
Webhook Authentication¶
The verify_webhook_signature() function in src/api/auth/webhook_auth.py provides HMAC-SHA256 signature verification for platform webhook events with replay attack prevention.
Signature:
async def verify_webhook_signature(
request: Request,
secret: Optional[str] = None,
platform: str = "sourcecraft",
max_age_seconds: int = 300,
) -> bytes:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
request |
Request |
required | FastAPI request object |
secret |
Optional[str] |
None |
Webhook secret for HMAC verification. None skips verification |
platform |
str |
"sourcecraft" |
Platform name |
max_age_seconds |
int |
300 |
Maximum webhook age in seconds (replay protection) |
Supported platforms and headers:
| Platform | Signature Header | Timestamp Header |
|---|---|---|
sourcecraft |
X-SourceCraft-Signature |
X-SourceCraft-Timestamp |
gitverse |
X-GitVerse-Signature |
X-GitVerse-Timestamp |
github |
X-Hub-Signature-256 |
X-Hub-Timestamp |
Replay protection: The function validates the timestamp header against the current time. If the difference exceeds max_age_seconds (default 300 seconds / 5 minutes), the request is rejected with HTTP 400 and the rejection is recorded in the audit log.
GitVerse fallback: If X-GitVerse-Signature is missing, the function also checks X-Hub-Signature-256 for backward compatibility.
Usage example:
from src.api.auth.webhook_auth import verify_webhook_signature
@router.post("/webhooks/sourcecraft")
async def handle_sourcecraft_webhook(request: Request):
body = await verify_webhook_signature(
request,
secret="your-webhook-secret",
platform="sourcecraft",
max_age_seconds=300,
)
payload = json.loads(body)
...
Token Blacklist API¶
The token blacklist provides instant revocation of JWT tokens. It uses PostgreSQL for persistence with an in-memory cache for fast lookups. Source: src/api/auth/jwt_handler.py.
Functions¶
blacklist_token(jti, expires_at=None) -> None
Adds a token to the blacklist. Stores the JTI in both the in-memory cache (for immediate effect) and PostgreSQL (for persistence across restarts).
async def blacklist_token(jti: str, expires_at: Optional[datetime] = None) -> None:
is_token_blacklisted(jti) -> bool
Checks if a token is blacklisted. Uses the in-memory cache first (fast path), then falls back to the database (slow path). Tokens found in the database are added to the cache for subsequent lookups.
async def is_token_blacklisted(jti: str) -> bool:
load_blacklist_cache() -> int
Loads non-expired blacklisted tokens from PostgreSQL into the in-memory cache. Should be called during application startup. Returns the number of tokens loaded.
async def load_blacklist_cache() -> int:
_blacklist_sync_task(interval_seconds=60) -> None
Background asyncio task that periodically refreshes the blacklist cache from the database. Catches all exceptions to prevent the task from dying on transient errors.
async def _blacklist_sync_task(interval_seconds: int = 60) -> None:
Architecture¶
Request blacklist_token()
| |
v v
is_token_blacklisted() _blacklisted_tokens (set)
| + |
| 1. check in-memory set | v
| 2. fallback to PostgreSQL | PostgreSQL: TokenBlacklist table
| (adds to cache if found) |
v |
True/False _blacklist_sync_task()
(periodic refresh)
API Key Data Models¶
Source: src/api/auth/api_keys.py
ApiKeyInfo¶
Information model for API keys (without the secret key). Used for listing and inspecting keys.
class ApiKeyInfo(BaseModel):
id: str # Unique key identifier
name: str # Human-readable name
prefix: str # Key prefix (e.g., "rag_a1b2c3d4")
scopes: List[str] # Permission scopes
created_at: datetime # Creation timestamp
expires_at: Optional[datetime] # Expiration (None = never)
last_used_at: Optional[datetime] # Last usage timestamp
is_revoked: bool # Revocation status
ApiKeyWithSecret¶
Extended model that includes the full key. Only returned at creation time.
class ApiKeyWithSecret(ApiKeyInfo):
key: str # Full API key (e.g., "rag_a1b2c3d4_<48hex>")
Utility Functions¶
from src.api.auth.api_keys import (
generate_api_key, # -> (full_key, prefix, key_hash)
hash_api_key, # key -> SHA-256 hex digest
verify_api_key, # (key, stored_hash) -> bool (constant-time)
extract_prefix, # key -> prefix string
is_key_expired, # expires_at -> bool
calculate_expiration, # days -> Optional[datetime]
validate_api_key, # Full validation pipeline (hash + revoked + expired)
)
Auditing and Logging¶
Authorization Events¶
All authorization events are logged and sent to SIEM:
| Event | Severity | Description |
|---|---|---|
AUTH_SUCCESS |
INFO | Successful authentication |
AUTH_FAILURE |
WARNING | Failed attempt |
TOKEN_ISSUED |
INFO | New token issued |
TOKEN_REVOKED |
INFO | Token revoked (blacklisted) |
PERMISSION_DENIED |
WARNING | Access denied |
API_KEY_CREATED |
INFO | API key created |
API_KEY_REVOKED |
INFO | API key revoked |
WEBHOOK_REPLAY_REJECTED |
WARNING | Webhook replay attack detected |
Log Format¶
{
"timestamp": "2026-03-07T10:30:00.000Z",
"event_type": "AUTH_SUCCESS",
"user_id": "user_123",
"username": "analyst@company.com",
"role": "analyst",
"auth_method": "jwt",
"group_id": null,
"ip_address": "10.0.0.50",
"user_agent": "Mozilla/5.0...",
"request_path": "/api/v1/scenarios",
"request_method": "GET"
}
Group-Level RBAC (Multi-Tenant)¶
When multi_tenant.enabled: true in config.yaml, CodeGraph enforces group-level access control on all data endpoints. Projects belong to groups, and users have per-group roles.
Group Roles¶
Source: src/api/database/models.py (GroupRole enum)
| Role | Permissions |
|---|---|
VIEWER |
Read-only: query, stats, list |
EDITOR |
Read + write: import, edit, activate |
ADMIN |
Full access: delete projects, manage group members |
System administrators (Role.ADMIN) bypass all group checks.
How It Works¶
Every API request resolves a ProjectContext containing project_id, group_id, and db_path. The require_project_access(min_group_role) dependency checks:
- If
multi_tenant.enabledisfalse-> pass-through (no enforcement) - If user is system admin -> bypass group checks
- Otherwise -> verify
user_group_accesstable for minimum role
from src.api.auth.project_auth import require_project_access
# Require at least editor role within the project group
@router.post("/projects/{project_id}/import")
async def import_project(
project_id: str,
auth = Depends(require_project_access("editor"))
):
...
Group ID is resolved from: X-Group-Id header (priority) -> auth.group_id (from JWT/API key).
API Key Group Scoping¶
API keys can optionally be scoped to a specific group via the group_id column:
- NULL – access all groups the user belongs to (backward compatible)
- Set – restricts project resolution to that group only
Audit Context¶
When multi-tenant is enabled, audit log entries include project_id and group_id fields for tenant-aware audit trails.
Implementation Status¶
| Component | File | Status |
|---|---|---|
GroupRole enum |
src/api/database/models.py |
Implemented |
ProjectGroup model |
src/api/database/models.py |
Implemented |
UserGroupAccess model |
src/api/database/models.py |
Implemented |
require_project_access() |
src/api/auth/project_auth.py |
Implemented |
_is_multi_tenant_enabled() |
src/api/auth/project_auth.py |
Implemented |
_resolve_group_id() |
src/api/auth/project_auth.py |
Implemented |
Group RBAC Configuration¶
When multi_tenant.enabled: false (default), all group-level checks are no-ops.
# config.yaml
multi_tenant:
enabled: false # Default: disabled
Migration¶
For existing single-tenant installations, run the migration script once:
python scripts/migrate_default_group.py # Creates "default" group, assigns all users as ADMIN
python scripts/migrate_default_group.py --dry-run # Preview changes without applying
Path Validation Middleware¶
Source: src/api/middleware/path_validation.py
The PathValidationMiddleware prevents path traversal attacks by validating db_path and source_path parameters in JSON request bodies against a whitelist of allowed directories.
How It Works¶
- Intercepts
POST/PUT/PATCHrequests with JSON bodies - Extracts
db_pathandsource_pathfields if present - Normalizes the path (
os.path.realpath(), symlink resolution) - Verifies the path falls within the whitelist
- Rejects invalid paths with
403 Forbidden
Validation Rules¶
- Path must be absolute after normalization
..path components are rejected- Symlinks that resolve outside the whitelist are rejected (configurable)
- Path must start with one of the allowed base directories
Whitelist Sources¶
The whitelist is built automatically from:
security.path_validation_allowed_base_dirsinconfig.yaml(e.g.data/projects/)projects.registry.*.db_path– directories of all registered project databasesprojects.registry.*.source_path– all registered source directories
Path Validation Configuration¶
# config.yaml
security:
path_validation_enabled: true # Default: enabled
path_validation_deny_symlinks: true # Reject symlinks outside whitelist
path_validation_deny_relative: true # Reject relative paths
path_validation_allowed_base_dirs:
- "data/projects/"
When path_validation_enabled: false, the middleware is a no-op.
Attack Vectors Blocked¶
| Attack | Example | Result |
|---|---|---|
| Directory traversal | {"db_path": "../../etc/passwd"} |
403 |
| Absolute path escape | {"db_path": "/etc/shadow"} |
403 |
| Symlink escape | {"db_path": "/data/projects/link-to-root"} |
403 |
| Type confusion | {"db_path": 12345} |
403 |
| Relative path | {"db_path": "relative/path.db"} |
403 |
Best Practices¶
For Administrators¶
- Least privilege principle – assign minimum necessary roles
- Use API keys with limited scopes for automation; use
get_default_scopes_for_api_key()as a baseline - Configure LDAP/AD sync for centralized management
- Enable SIEM integration for security event monitoring
- Regularly audit active sessions and API keys
- Use webhook secrets with replay protection for all platform integrations
- Call
load_blacklist_cache()during application startup to restore blacklisted tokens
For Developers¶
- Use JWT for web apps, API keys for CI/CD, IAM for Yandex Cloud services
- Store refresh tokens securely (httpOnly cookies)
- Handle 401/403 correctly in UI
- Never log tokens or keys in plain text
- Use
require_scope()for fine-grained scope checks on API key endpoints - Use
validate_scopes()to filter user-provided scope lists before storing - Prefer
require_permission()over manualhas_permission()checks in route handlers
Related Documents¶
- Enterprise Security Brief – Security overview
- DLP Security – Data loss prevention
- SIEM Integration – SIEM integration
- OAuth2/OIDC & LDAP/AD Integration – Full auth provider configuration
- REST API Reference – API reference
Version: 2.0 | March 2026