Python SDK Reference

Complete Python SDK documentation for CodeGraph.

REST API endpoints are documented in REST API Documentation. Agent classes (AnalyzerAgent, RetrieverAgent, GeneratorAgent, etc.) are documented in Agents Reference.

Table of Contents


Overview

Data Flow

graph LR
    Q[User Query] --> IC[Intent Classifier]
    IC --> PR[Pre-Retrieval]
    PR --> R{Router}
    R --> VS[VectorStoreReal<br/>ChromaDB]
    R --> CPG[CPGQueryService<br/>DuckDB]
    VS --> RRF[RRF Merger]
    CPG --> RRF
    RRF --> RK[ResultRanker]
    RK --> LLM[LLM Provider]
    LLM --> A[Response]

Module Map

Module Purpose Key Class
src/services/cpg/ DuckDB CPG queries (mixin-based) CPGQueryService
src/retrieval/vector_store_real.py ChromaDB vector search VectorStoreReal
src/retrieval/hybrid/ Hybrid retrieval with RRF HybridRetriever
src/ranking/result_ranker.py Multi-signal relevance ranking ResultRanker
src/workflow/orchestration/copilot.py LangGraph orchestrator MultiScenarioCopilot
src/workflow/state.py Workflow state definitions MultiScenarioState
src/config/ Pydantic configuration get_unified_config()
src/domains/registry.py Domain plugin registry DomainRegistry
src/api/context.py Per-request project scope ProjectContext
src/api/services/project_services.py LRU service cache ProjectScopedServices
src/api/dependencies.py FastAPI DI functions get_project_context()
src/api/errors.py Localized HTTP exceptions LocalizedHTTPException
src/security/hardening/ D3FEND compliance scanner HardeningScanner
src/agents/ Pipeline agents See AGENTS.md

Core Services

CPGQueryService

File: src/services/cpg/__init__.py (composed from src/services/cpg/base.py + 11 mixins)

The primary interface for querying Code Property Graphs stored in DuckDB. Built using a mixin architecture where each mixin adds a set of domain-specific query methods.

class CPGQueryService(
    CPGQueryBase,
    SubsystemQueriesMixin,
    CallGraphQueriesMixin,
    SecurityQueriesMixin,
    PerformanceQueriesMixin,
    QualityQueriesMixin,
    SemanticQueriesMixin,
    StatisticsQueriesMixin,
    CommentQueriesMixin,
    ExternalQueriesMixin,
    TypeQueriesMixin,
    PatternQueriesMixin,
    CollectionQueriesMixin,
):
    pass  # All methods inherited from mixins

Constructor

CPGQueryService(
    db_path: Optional[str] = None,
    allowed_db_paths: Optional[set] = None
)
Parameter Type Default Description
db_path Optional[str] None Path to DuckDB file. If None, resolved via ProjectManager.get_active_db_path()
allowed_db_paths Optional[set] None Whitelist of allowed DB paths (multi-tenant security)

Usage

from src.services.cpg import CPGQueryService

# Auto-resolve DB path from active project
service = CPGQueryService()

# Explicit path
service = CPGQueryService("data/projects/myproject.duckdb")

# Context manager
with CPGQueryService("data/projects/myproject.duckdb") as svc:
    stats = svc.get_database_stats()
    hotspots = svc.get_security_hotspots(limit=20)

Base Methods

From CPGQueryBase (src/services/cpg/base.py):

Method Description
execute_query(query, parameters=None) Execute parameterized SQL, return list of tuples
execute_sql_dict(query) Execute SQL, return list of dicts
execute_custom_sql(query, limit=100) Execute user SQL with row limit enforcement
set_database(db_path) Switch to a different DuckDB file
switch_project(project_name) Switch to a named project (resolves path via ProjectManager)
close() Close the DuckDB connection
# Raw SQL with dict results
rows = service.execute_sql_dict("SELECT name, file FROM nodes_method LIMIT 5")
for row in rows:
    print(f"{row['name']} in {row['file']}")

# Switch project at runtime
service.switch_project("postgresql")

Subsystem Queries

From SubsystemQueriesMixin (src/services/cpg/subsystem_queries.py):

Method Description
get_subsystems() List all subsystems with method counts
get_methods_by_subsystem(subsystem, limit=100) Get methods in a subsystem
get_subsystem_method_counts() Method count per subsystem
get_subsystem_for_method(filename) Determine subsystem from file path
subsystems = service.get_subsystems()
for s in subsystems[:3]:
    print(f"{s['name']}: {s['method_count']} methods")

methods = service.get_methods_by_subsystem("executor", limit=50)

Call Graph Queries

From CallGraphQueriesMixin (src/services/cpg/callgraph_queries.py):

Method Description
get_call_graph(method_id, depth=2, direction="both") Get call graph around a method
get_callers(function_name, limit=20) Find callers of a function
get_callees(function_name, limit=20) Find callees of a function
callers = service.get_callers("LWLockAcquire", limit=10)
callees = service.get_callees("CommitTransaction")

# Full call graph with depth control
graph = service.get_call_graph(method_id=123, depth=3, direction="both")

Security Queries

From SecurityQueriesMixin (src/services/cpg/security_queries.py):

Method Description
get_security_hotspots(limit=100) Functions with high security risk
get_taint_sources(limit=100) Taint analysis source functions
get_taint_sinks(limit=100) Taint analysis sink functions
find_sql_injections(...) Detect SQL injection patterns
find_shell_injections(min_risk_score=0.7) Detect command injection patterns
find_sql_injections_dataflow(...) SQL injection via data flow analysis
find_code_injections(...) Detect code injection patterns
hotspots = service.get_security_hotspots(limit=20)
sources = service.get_taint_sources()
sinks = service.get_taint_sinks()

# Data flow-based SQL injection detection
sqli = service.find_sql_injections_dataflow()
for finding in sqli:
    print(f"{finding['method']} in {finding['file']}: {finding['risk']}")

Performance Queries

From PerformanceQueriesMixin (src/services/cpg/performance_queries.py):

Method Description
get_performance_hotspots(limit=100) High-complexity / high-call-count methods
get_allocation_heavy_methods(limit=100) Methods with many memory allocations
get_recursive_methods(limit=100) Recursive call detection
get_io_heavy_methods(limit=100) I/O-intensive methods
get_lock_heavy_methods(limit=100) Methods with lock contention risk
hotspots = service.get_performance_hotspots(limit=10)
recursive = service.get_recursive_methods()

Quality Queries

From QualityQueriesMixin (src/services/cpg/quality_queries.py):

Method Description
get_methods_without_tests(...) Methods lacking test coverage
count_methods_without_tests() Count of untested methods
get_complex_methods(...) Methods above cyclomatic complexity threshold
get_long_methods(min_lines=100, limit=100) Methods exceeding line count threshold
get_methods_with_many_parameters(...) Methods with excessive parameter counts
untested = service.get_methods_without_tests()
complex_methods = service.get_complex_methods()
long_methods = service.get_long_methods(min_lines=150, limit=50)

Semantic Queries

From SemanticQueriesMixin (src/services/cpg/semantic_queries.py):

Method Description
search_by_function_purpose(...) Search by semantic function purpose
search_by_comment_content(keyword, limit=50) Search within code comments
search_methods_by_name_pattern(...) Regex-based method name search
results = service.search_by_comment_content("TODO", limit=20)
pattern_matches = service.search_methods_by_name_pattern(".*Handler$")

Statistics Queries

From StatisticsQueriesMixin (src/services/cpg/statistics.py):

Method Description
get_database_stats() Overall CPG database statistics
get_node_type_counts() Count of each node type
get_edge_type_counts() Count of each edge type
get_method_quality_stats() Aggregate quality metrics
get_git_state() Git commit state at parse time
stats = service.get_database_stats()
print(f"Methods: {stats['method_count']}, Files: {stats['file_count']}")

git = service.get_git_state()
print(f"Parsed at commit: {git['commit_hash']}")

Comment Queries

From CommentQueriesMixin (src/services/cpg/comment_queries.py):

Method Description
get_method_comments(method_name, limit=50) Comments for a specific method
get_file_comments(filename, limit=100) All comments in a file
search_comments(pattern, limit=100) Regex search across all comments
get_todo_comments(limit=100) TODO/FIXME/HACK comments
get_comment_statistics() Comment density metrics
get_functions_descriptions(...) Function descriptions from docstrings
todos = service.get_todo_comments(limit=50)
stats = service.get_comment_statistics()
descs = service.get_functions_descriptions()

External / Git Queries

From ExternalQueriesMixin (src/services/cpg/external_queries.py):

Method Description
get_methods_by_author(author_email, limit=100) Methods by git author
get_git_authors(limit=50) All git authors
get_git_hotspots(min_churn=5, limit=100) High-churn files (change frequency)
get_error_prone_methods(...) Methods correlated with bug-fix commits
get_bus_factor_candidates(...) Single-author knowledge silos
authors = service.get_git_authors()
hotspots = service.get_git_hotspots(min_churn=10, limit=20)
bus_factor = service.get_bus_factor_candidates()

Type Queries

From TypeQueriesMixin (src/services/cpg/type_queries.py):

Method Description
find_type_conversions(...) Detect type conversion operations
get_type_statistics() Type usage distribution
find_polymorphic_variables(...) Variables with multiple types
find_cast_operations(...) Explicit cast operations
casts = service.find_cast_operations()
poly = service.find_polymorphic_variables()

Pattern Queries

From PatternQueriesMixin (src/services/cpg/pattern_queries.py):

Method Description
get_pattern_findings(...) Pattern engine findings
get_pattern_statistics() Summary of pattern matches
get_pattern_rules() Available pattern rules
findings = service.get_pattern_findings()
stats = service.get_pattern_statistics()
rules = service.get_pattern_rules()

Collection Queries

From CollectionQueriesMixin (src/services/cpg/collection_queries.py):

Method Description
get_collection_declarations(...) Collection/container declarations
get_collection_stats() Collection usage statistics
find_knowledge_base_collections(...) Knowledge-base-specific collections
collections = service.get_collection_declarations()
kb = service.find_knowledge_base_collections()

VectorStoreReal

File: src/retrieval/vector_store_real.py

ChromaDB-based vector store for semantic search across Q&A pairs, SQL examples, generated documentation, code comments, markdown docs, and domain patterns. Uses paraphrase-multilingual-MiniLM-L12-v2 for embeddings with an LRU cache.

Constructor

VectorStoreReal(
    persist_directory: Optional[str] = None,
    cache_size: int = 100,
    cache_ttl: int = 3600,
    collection_prefix: str = ""
)
Parameter Type Default Description
persist_directory Optional[str] None ChromaDB storage path. Defaults to <project_root>/chroma_db
cache_size int 100 Max cached embeddings (LRU)
cache_ttl int 3600 Cache entry TTL in seconds
collection_prefix str "" Per-project collection name prefix for tenant isolation

Methods

Method Description
initialize_collections() Create/load Q&A and SQL example collections
retrieve_qa(query, top_k=3, filter_dict=None) Search Q&A pairs
retrieve_sql(query, keywords=None, query_type=None, top_k=5) Search SQL examples
retrieve_generated_docs(query, top_k=5, language=None) Search auto-generated docs
retrieve_comments(query, top_k=5, comment_type=None, subsystem=None) Search code comments
retrieve_documentation(query, top_k=5, section=None) Search markdown docs
retrieve_domain_patterns(query, top_k=5, pattern_type=None) Search domain patterns
get_cache_stats() Get embedding cache hit/miss stats

Usage

from src.retrieval.vector_store_real import VectorStoreReal

store = VectorStoreReal(persist_directory="chroma_db", collection_prefix="postgresql")
store.initialize_collections()

# Search Q&A pairs
qa = store.retrieve_qa("How does transaction commit work?", top_k=3)

# Search SQL examples with type hint
sql = store.retrieve_sql(
    "find callers",
    keywords=["call_graph"],
    query_type="structural",
    top_k=5
)

# Search auto-generated documentation
docs = store.retrieve_generated_docs("buffer manager", top_k=5, language="en")

# Search code comments by type and subsystem
comments = store.retrieve_comments(
    "memory allocation",
    top_k=10,
    comment_type="docstring",
    subsystem="executor"
)

HybridRetriever

File: src/retrieval/hybrid/retriever.py

Parallel hybrid search combining vector (ChromaDB) and graph (DuckDB CPG) retrieval with Reciprocal Rank Fusion (RRF) merging and adaptive weighting by query type.

Constructor

HybridRetriever(
    vector_store,
    cpg_service,
    config: Optional[HybridRetrievalConfig] = None
)
Parameter Type Default Description
vector_store VectorStoreReal Vector search backend
cpg_service CPGQueryService Graph search backend
config Optional[HybridRetrievalConfig] None Weight/top-k configuration

Methods

Method Description
async retrieve(query, mode="hybrid", query_type=None, **kwargs) Perform retrieval, returns List[RetrievalResult]

Usage

import asyncio
from src.retrieval.hybrid.retriever import HybridRetriever
from src.retrieval.hybrid.models import HybridRetrievalConfig

config = HybridRetrievalConfig(
    vector_weight=0.6,
    graph_weight=0.4,
    final_top_k=10
)

retriever = HybridRetriever(
    vector_store=store,
    cpg_service=service,
    config=config
)

results = asyncio.run(retriever.retrieve(
    query="transaction commit handling",
    mode="hybrid",        # "hybrid", "vector_only", "graph_only"
    query_type="semantic"  # "semantic", "structural", "security"
))

for r in results:
    print(f"[{r.source}] {r.content[:80]}... (score={r.score:.3f})")

Adaptive weight presets by query_type:

Query Type Vector Weight Graph Weight
semantic 0.75 0.25
structural 0.25 0.75
security 0.40 0.60
default 0.60 0.40

Retrieval Models

All defined in src/retrieval/hybrid/models.py.

RetrievalResult

File: src/retrieval/hybrid/models.py

Unified result from any retrieval source.

@dataclass
class RetrievalResult:
    id: str
    content: str
    score: float
    source: str                                  # "vector", "graph", or "hybrid"
    entity_type: str = "method"                  # "method", "struct", "macro", "type", "caller", "callee"
    metadata: Dict[str, Any] = field(default_factory=dict)
    node_id: Optional[int] = None                # CPG node ID (for deduplication)

Supports __hash__ and __eq__ based on id for set deduplication.


HybridRetrievalConfig

File: src/retrieval/hybrid/models.py

@dataclass
class HybridRetrievalConfig:
    vector_weight: float = 0.6
    graph_weight: float = 0.4
    vector_top_k: int = 20
    graph_top_k: int = 20
    final_top_k: int = 10
    min_score_threshold: float = 0.1
    enable_reranking: bool = False

Validation in __post_init__: raises ValueError if vector_weight + graph_weight does not sum to 1.0 (within tolerance).


DomainBoostContext

File: src/retrieval/hybrid/models.py

Domain-aware boosting context for the RRF merger. Multiplicatively boosts results matching taint sources/sinks or target subsystems.

@dataclass
class DomainBoostContext:
    taint_sources: Set[str] = field(default_factory=set)
    taint_sinks: Set[str] = field(default_factory=set)
    target_subsystem: Optional[str] = None
    query_type: Optional[str] = None
    entry_point_boost: float = 1.2
    security_boost: float = 1.5
    subsystem_boost: float = 1.3

Factory Method

ctx = DomainBoostContext.from_config(
    taint_sources={"user_input", "env_var"},
    taint_sinks={"exec_sql", "system"},
    target_subsystem="executor",
    query_type="security"
)

from_config() reads boost factors from get_unified_config().reranking.


Ranking

RelevanceScore

File: src/ranking/result_ranker.py

Multi-signal relevance score with full breakdown.

@dataclass
class RelevanceScore:
    total: float
    keyword_match: float = 0.0
    tag_coverage: float = 0.0
    name_match: float = 0.0
    length_bonus: float = 0.0
    semantic_similarity: float = 0.0
    source_confidence: float = 0.0
    retrieval_score: float = 0.0

    def get_breakdown(self) -> Dict[str, float]
score = ranker.rank_result(result, question="find memory leaks")
breakdown = score.get_breakdown()
# {'total': 0.82, 'keyword_match': 0.3, 'semantic_similarity': 0.45, ...}

ResultRanker

File: src/ranking/result_ranker.py

Ranks query results using five relevance signals: keyword overlap, tag coverage, name match, length/complexity bonus, and semantic similarity (embedding-based). Supports cross-source ranking for hybrid retrieval.

from src.ranking.result_ranker import ResultRanker

ranker = ResultRanker()

See AGENTS.md for how agents use the ranker in retrieval pipelines.


Workflow

MultiScenarioCopilot

File: src/workflow/orchestration/copilot.py

The main LangGraph orchestrator. Routes queries through intent classification, pre-retrieval, and one of 21 scenario handlers.

Constructor

MultiScenarioCopilot()

No arguments – internal LangGraph is built in __init__.

Methods

Method Returns Description
run(query, context=None) Dict[str, Any] Run a query through the full workflow pipeline

Usage

from src.workflow.orchestration.copilot import MultiScenarioCopilot

copilot = MultiScenarioCopilot()

# Auto-detect scenario from intent
result = copilot.run("Find buffer overflow risks in the parser module")
print(result["answer"])
print(f"Scenario: {result['scenario_id']}, Confidence: {result['confidence']}")

# Force a specific scenario
result = copilot.run(
    "Analyze the executor module",
    context={"scenario_id": "scenario_2"}
)

The returned dict follows the MultiScenarioState shape: query, intent, scenario_id, confidence, answer, evidence, metadata, error.

See Workflows Reference for the full list of 21 scenarios and composite workflows.


MultiScenarioState

File: src/workflow/state.py

TypedDict defining the shape of data flowing through LangGraph nodes.

class MultiScenarioState(TypedDict):
    # Input
    query: str
    context: Optional[Dict[str, Any]]
    language: Optional[str]

    # Intent Classification
    intent: Optional[str]
    scenario_id: Optional[str]
    confidence: Optional[float]
    classification_method: Optional[str]

    # CPG Data
    cpg_results: Optional[List[Dict]]
    subsystems: Optional[List[str]]
    methods: Optional[List[Dict]]
    call_graph: Optional[Any]

    # Output
    answer: Optional[str]
    evidence: Optional[List[str]]
    metadata: Optional[Dict[str, Any]]
    retrieved_functions: Optional[List[str]]

    # Error Handling
    error: Optional[str]
    retry_count: int

    # Workflow Configuration
    enrichment_config: Optional[Dict[str, Any]]
    vector_store: Optional[Any]

    # Multi-tenant scoping
    db_path: Optional[str]
    collection_prefix: Optional[str]

    # Pre-retrieval (Phase E)
    pre_retrieval_results: Optional[List[Dict[str, Any]]]

Helper function:

from src.workflow.state import create_initial_state

state = create_initial_state(
    query="Where is heap_insert defined?",
    context={"subsystem": "executor"}
)

Configuration

CPGConfig

File: src/config/cpg_config.py

Controls which domain-specific prompts and analyst persona are used.

Methods

Method Description
set_cpg_type(cpg_type: str) Set the active domain (e.g., "postgresql_v2", "python_django", "go")
get_code_analyst_title() -> str Get domain-specific analyst title
from src.config.cpg_config import CPGConfig

config = CPGConfig()
config.set_cpg_type("postgresql_v2")
title = config.get_code_analyst_title()
# "PostgreSQL 17.6 expert"

DomainRegistry

File: src/domains/registry.py

Central registry for domain plugins (13 domains). Each plugin provides domain-specific taint sources/sinks, subsystem definitions, and pattern rules.

Class Methods

Method Returns Description
activate(domain_name) DomainPlugin Activate a domain plugin by name
get_active_or_none() Optional[DomainPlugin] Get currently active domain, or None
from src.domains.registry import DomainRegistry

plugin = DomainRegistry.activate("postgresql")
print(plugin.get_taint_sources())
print(plugin.get_subsystem_names())

# Check active domain
domain = DomainRegistry.get_active_or_none()
if domain:
    print(f"Active domain: {domain.name}")

get_unified_config()

File: src/config/__init__.py

Returns a Pydantic-based configuration singleton loaded from config.yaml. Always use attribute access, never .get().

from src.config import get_unified_config

cfg = get_unified_config()

# Retrieval weights
cfg.retrieval_weights.default_vector_weight   # 0.6
cfg.retrieval_weights.final_top_k             # 10

# Reranking
cfg.reranking.boost_domain_match              # 1.3

# Timeouts (never hardcode)
cfg.timeouts.external_api                     # 30
cfg.timeouts.llm_generation                   # 60

# Batch processing (never hardcode)
cfg.batch_processing.extraction_default_limit # 100
cfg.batch_processing.vector_store_batch       # 100

# Cache
cfg.cache.embedding_cache_size                # 100
cfg.cache.embedding_cache_ttl                 # 3600

To add new configuration fields: add a dataclass field in unified_config.py, add the corresponding YAML key in config.yaml, and access via get_unified_config().


API Infrastructure

ProjectContext

File: src/api/context.py

Frozen dataclass injected into every API request handler. Replaces direct calls to ProjectManager.get_active_db_path() in the API layer.

@dataclass(frozen=True)
class ProjectContext:
    project_id: UUID
    group_id: UUID
    project_name: str
    db_path: str
    domain: Optional[str] = None
    language: Optional[str] = None
    collection_prefix: str = ""

Resolution priority in the API layer: 1. X-Project-Id header – look up project, validate user access 2. User’s active project – from database 3. Global fallback – ProjectContext.from_global_fallback() (CLI, demo mode)

# In a FastAPI route handler
@router.get("/methods")
async def list_methods(ctx: ProjectContext = Depends(get_project_context)):
    cpg = ProjectScopedServices.get_cpg(ctx)
    return cpg.get_database_stats()

ProjectScopedServices

File: src/api/services/project_services.py

LRU-cached per-project service instances. Avoids opening a new DuckDB connection per request while bounding memory.

Class Methods

Method Returns Description
get_cpg(ctx: ProjectContext) CPGQueryService Get or create a CPG service for the project
get_vector_store(ctx: ProjectContext) VectorStoreReal Get or create a vector store with project prefix
health_check() Dict[str, Any] Check health of all cached connections
init_from_config() None Set cache size from unified config
reset() None Close all cached services (for testing/shutdown)
set_max_cache(size: int) None Update max concurrent project connections
from src.api.services.project_services import ProjectScopedServices

# Initialize from config on startup
ProjectScopedServices.init_from_config()

# Get project-scoped services
cpg = ProjectScopedServices.get_cpg(ctx)
vs = ProjectScopedServices.get_vector_store(ctx)

# Health check
status = ProjectScopedServices.health_check()
# {'cpg_cache_size': 2, 'vector_cache_size': 1, 'max_cache': 10, ...}

Default max cache: 10 concurrent project connections. Configurable via multi_tenant.max_project_connections in config.yaml.


FastAPI Dependencies

File: src/api/dependencies.py

Dependency injection functions for FastAPI route handlers.

Authentication Dependencies

Dependency Returns Description
get_current_user_optional() Optional[User] Get user from JWT or API key (returns None if unauthenticated)
get_current_user() User Get user (raises 401 if unauthenticated)
get_current_active_user() User Get active user (raises 403 if disabled)

Authorization Dependencies

Dependency Description
require_permission(permission: Permission) Factory: require a specific permission
require_role(min_role: UserRole) Factory: require minimum role level
RequireAdmin Shortcut for require_role(UserRole.ADMIN)
RequireReviewer Shortcut for require_role(UserRole.REVIEWER)
RequireAnalyst Shortcut for require_role(UserRole.ANALYST)

Project & Utility Dependencies

Dependency Returns Description
get_project_context() ProjectContext Resolve per-request project scope
get_cpg_service() CPGQueryService Get project-scoped CPG service
get_request_id() str Get request ID from middleware
get_client_ip() str Get client IP (supports X-Forwarded-For)
from fastapi import Depends
from src.api.dependencies import (
    get_project_context,
    get_cpg_service,
    RequireAdmin,
    require_permission,
)
from src.api.auth.permissions import Permission

@router.post("/admin/reset")
async def reset_cache(user=Depends(RequireAdmin)):
    ProjectScopedServices.reset()

@router.get("/query")
async def run_query(
    ctx=Depends(get_project_context),
    cpg=Depends(get_cpg_service),
    user=Depends(require_permission(Permission.QUERY_EXECUTE)),
):
    return cpg.get_database_stats()

Middleware Stack

File: src/api/main.py

Middleware is applied in this order (outermost first):

Order Middleware Module Description
1 CORSMiddleware fastapi.middleware.cors Cross-origin resource sharing
2 RateLimiterMiddleware src.api.middleware.rate_limiter 3-tier rate limiting (IP / JWT / API key)
3 SecurityHeadersMiddleware src.api.middleware.security_headers CSP, X-Frame-Options, HSTS
4 MetricsMiddleware src.api.middleware.metrics Prometheus request metrics
5 DLPMiddleware src.api.middleware.dlp Data Loss Prevention (conditional)

Request ID and timing headers are added via @app.middleware("http") handlers.


Authentication & Authorization

CodeGraph supports three authentication mechanisms:

Mechanism Module Description
Local JWT src/api/auth/jwt_handler.py Username/password with JWT tokens
OAuth2 src/api/auth/oauth.py SourceCraft (Yandex ID) and GitVerse (Sber ID)
LDAP src/api/auth/ldap_auth.py Enterprise LDAP/AD integration
API Keys src/api/auth/api_keys.py Programmatic access with scoped keys

Role hierarchy: VIEWER (0) < ANALYST (1) < REVIEWER (2) < ADMIN (3).

When multi_tenant.enabled: false (default in config.yaml), RBAC checks are no-ops.

For HTTP endpoint details, authentication headers, and token lifecycle, see REST API Documentation.


Error Handling

LocalizedHTTPException

File: src/api/errors.py

HTTPException subclass with automatic i18n. Messages are retrieved from src/config/localization/{en,ru}.yaml.

from src.api.errors import LocalizedHTTPException

raise LocalizedHTTPException(
    status_code=401,
    detail_key="invalid_credentials",
    lang="en"
)

# With format parameters
raise LocalizedHTTPException(
    status_code=404,
    detail_key="user_not_found",
    lang="ru",
    username="john"
)

Factory Functions

Pre-built factory functions for common HTTP errors:

Function Status Description
raise_invalid_credentials(lang) 401 Invalid username/password
raise_user_disabled(lang) 401 Disabled account
raise_token_expired(lang) 401 Expired JWT
raise_token_invalid(lang) 401 Invalid JWT
raise_insufficient_permissions(lang) 403 Permission denied
raise_user_not_found(lang, username) 404 User not found
raise_api_key_not_found(lang) 404 API key not found
raise_session_not_found(lang) 404 Session not found
raise_duplicate_username(lang, username) 409 Username conflict
raise_duplicate_email(lang, email) 409 Email conflict
raise_rate_limit_exceeded(lang) 429 Rate limit hit
raise_internal_error(lang) 500 Internal server error

LLMProviderError

File: src/llm/base_provider.py

Raised when an LLM provider (Yandex, GigaChat, OpenAI, local) fails.

from src.llm.base_provider import LLMProviderError

try:
    response = provider.generate(prompt)
except LLMProviderError as e:
    logger.error(f"LLM generation failed: {e}")

AgentExecutionError

File: src/workflow/error_handling.py

Raised when a workflow agent fails during execution.

from src.workflow.error_handling import AgentExecutionError

try:
    result = copilot.run("analyze security")
except AgentExecutionError as e:
    logger.error(f"Agent failed: {e}")

Other Exceptions

Exception File Description
TokenError src/api/auth/jwt_handler.py JWT decode/validation failure
OAuthError src/api/auth/oauth.py OAuth2 flow failure
LDAPError src/api/auth/ldap_auth.py LDAP bind/search failure
PlatformAPIError src/api/services/platform_client.py SourceCraft/GitVerse API failure
GoCPGProcessError src/services/gocpg/subprocess_runner.py GoCPG process execution failure
GoCPGTimeoutError src/services/gocpg/subprocess_runner.py GoCPG process timeout
VaultError src/security/vault/client.py HashiCorp Vault operation failure
DatabaseNotConfiguredError src/project_manager.py No active project / DB path available

Error Handling Pattern

from src.project_manager import DatabaseNotConfiguredError
from src.llm.base_provider import LLMProviderError
from src.workflow.error_handling import AgentExecutionError

try:
    service = CPGQueryService()
    result = copilot.run(query)
except DatabaseNotConfiguredError:
    print("No project configured. Use: python -m src.cli import /path/to/source")
except LLMProviderError as e:
    print(f"LLM provider error: {e}")
except AgentExecutionError as e:
    print(f"Workflow error: {e}")

Security Hardening Classes

HardeningScanner

D3FEND Source Code Hardening compliance scanner.

from src.security import HardeningScanner, HardeningCategory, HardeningSeverity

scanner = HardeningScanner(cpg_service=cpg_service, language="c")

Methods

scan_all(limit_per_check: int = 50) -> List[HardeningFinding]

Run all applicable hardening checks.

findings = scanner.scan_all(limit_per_check=50)
# Returns: [HardeningFinding(d3fend_id='D3-VI', severity='high', ...)]
scan_by_d3fend_id(d3fend_ids: List[str], limit: int = 50) -> List[HardeningFinding]

Run checks for specific D3FEND technique IDs.

findings = scanner.scan_by_d3fend_id(["D3-VI", "D3-NPC", "D3-TL"])
# Returns: Findings for Variable Initialization, Null Pointer Checking, Trusted Library
scan_by_category(category: HardeningCategory, limit: int = 50) -> List[HardeningFinding]

Run checks for a specific category.

findings = scanner.scan_by_category(HardeningCategory.MEMORY_SAFETY)
# Returns: Findings for all memory safety checks
scan_by_severity(min_severity: HardeningSeverity, limit: int = 50) -> List[HardeningFinding]

Run checks at or above a minimum severity level.

findings = scanner.scan_by_severity(HardeningSeverity.HIGH)
# Returns: Findings with CRITICAL or HIGH severity
get_compliance_score(findings: List[HardeningFinding]) -> Dict

Calculate compliance scores from findings.

scores = scanner.get_compliance_score(findings)
# Returns: {
#     'overall_score': 85.3,
#     'total_findings': 12,
#     'by_category': {'initialization': 3, 'pointer_safety': 5, ...},
#     'by_d3fend': {'D3-VI': 3, 'D3-NPC': 5, ...},
#     'by_severity': {'high': 2, 'medium': 6, 'low': 4},
#     'category_scores': {'initialization': 70, 'pointer_safety': 50, ...},
#     'd3fend_scores': {'D3-VI': 70, 'D3-NPC': 50, ...}
# }
get_remediation_report(findings: List[HardeningFinding]) -> str

Generate a Markdown remediation report.

report = scanner.get_remediation_report(findings)
print(report)
# # D3FEND Source Code Hardening Report
# ## Summary
# - **Overall Compliance Score**: 85.3%
# - **Total Findings**: 12
# ...
get_checks_summary() -> Dict

Get summary of available checks.

summary = scanner.get_checks_summary()
# Returns: {
#     'total_checks': 22,
#     'language': 'c',
#     'by_category': {...},
#     'by_d3fend': {...},
#     'domain_checks': 10
# }

HardeningCategory

Enum for D3FEND-aligned hardening categories.

from src.security import HardeningCategory

class HardeningCategory(Enum):
    INITIALIZATION = "initialization"           # D3-VI
    CREDENTIAL_MANAGEMENT = "credential_mgmt"   # D3-CS
    INTEGER_SAFETY = "integer_safety"           # D3-IRV
    POINTER_SAFETY = "pointer_safety"           # D3-PV, D3-NPC, D3-MBSV
    MEMORY_SAFETY = "memory_safety"             # D3-RN
    LIBRARY_SAFETY = "library_safety"           # D3-TL
    TYPE_SAFETY = "type_safety"                 # D3-VTV
    DOMAIN_VALIDATION = "domain_validation"     # D3-DLV
    OPERATIONAL_VALIDATION = "operational"      # D3-OLV

HardeningSeverity

Enum for severity levels.

from src.security import HardeningSeverity

class HardeningSeverity(Enum):
    CRITICAL = "critical"  # Directly exploitable
    HIGH = "high"          # Significant security risk
    MEDIUM = "medium"      # Moderate security risk
    LOW = "low"            # Minor security concern
    INFO = "info"          # Best practice recommendation

HardeningCheck

Definition of a hardening check.

from src.security import HardeningCheck

@dataclass
class HardeningCheck:
    id: str                    # "D3-VI-001"
    d3fend_id: str             # "D3-VI"
    d3fend_name: str           # "Variable Initialization"
    category: HardeningCategory
    severity: HardeningSeverity
    description: str
    cpgql_query: str           # SQL query for CPG database
    cwe_ids: List[str]         # ["CWE-457"]
    language_scope: List[str]  # ["c", "cpp"] or ["*"]
    indicators: List[str]
    good_patterns: List[str]
    remediation: str
    example_code: str
    confidence_weight: float   # 0.0-1.0

Methods

applies_to_language(language: str) -> bool

Check if this check applies to the given language.

check = get_check_by_id("D3-VI-001")
if check.applies_to_language("c"):
    print("Applies to C code")

HardeningFinding

Result from running a hardening check.

from src.security import HardeningFinding

@dataclass
class HardeningFinding:
    finding_id: str      # Unique ID
    check_id: str        # "D3-VI-001"
    d3fend_id: str       # "D3-VI"
    category: str        # "initialization"
    severity: str        # "high"
    method_name: str     # "process_input"
    filename: str        # "src/input.c"
    line_number: int     # 142
    code_snippet: str    # "int x; use(x);"
    description: str
    cwe_ids: List[str]
    remediation: str
    confidence: float    # 0.0-1.0
    metadata: Dict

Methods

to_dict() -> Dict

Convert finding to dictionary for serialization.

finding_dict = finding.to_dict()
# Returns: {'finding_id': 'a1b2c3', 'd3fend_id': 'D3-VI', ...}
from_check_and_row(check, row, confidence) -> HardeningFinding

Create a finding from a check definition and query result row.

finding = HardeningFinding.from_check_and_row(check, row, confidence=0.9)

Hardening Utility Functions

from src.security import (
    get_check_by_id,
    get_checks_by_category,
    get_checks_by_d3fend_id,
    get_all_checks,
    get_checks_for_language,
    D3FEND_TECHNIQUES,
    D3FEND_TECHNIQUE_IDS,
)
get_check_by_id(check_id: str) -> Optional[HardeningCheck]

Get a check by its ID.

check = get_check_by_id("D3-VI-001")
get_checks_by_category(category: HardeningCategory) -> List[HardeningCheck]

Get all checks in a category.

memory_checks = get_checks_by_category(HardeningCategory.MEMORY_SAFETY)
get_checks_by_d3fend_id(d3fend_id: str) -> List[HardeningCheck]

Get all checks for a D3FEND technique.

null_checks = get_checks_by_d3fend_id("D3-NPC")
get_all_checks() -> List[HardeningCheck]

Get all registered hardening checks.

all_checks = get_all_checks()
print(f"Total checks: {len(all_checks)}")
get_checks_for_language(language: str) -> List[HardeningCheck]

Get checks applicable to a specific language.

c_checks = get_checks_for_language("c")

D3FEND Constants

# Available D3FEND technique IDs
D3FEND_TECHNIQUE_IDS = [
    "D3-VI",   # Variable Initialization
    "D3-CS",   # Credential Scrubbing
    "D3-IRV",  # Integer Range Validation
    "D3-PV",   # Pointer Validation
    "D3-RN",   # Reference Nullification
    "D3-TL",   # Trusted Library
    "D3-VTV",  # Variable Type Validation
    "D3-MBSV", # Memory Block Start Validation
    "D3-NPC",  # Null Pointer Checking
    "D3-DLV",  # Domain Logic Validation
    "D3-OLV",  # Operational Logic Validation
]

# D3FEND technique metadata
D3FEND_TECHNIQUES = {
    "D3-VI": {
        "name": "Variable Initialization",
        "description": "Setting variables to a known value before use",
        "url": "https://next.d3fend.mitre.org/technique/d3f:VariableInitialization",
    },
    # ... other techniques
}

Next Steps