Complete Python SDK documentation for CodeGraph.
REST API endpoints are documented in REST API Documentation. Agent classes (AnalyzerAgent, RetrieverAgent, GeneratorAgent, etc.) are documented in Agents Reference.
Table of Contents¶
- Overview
- Data Flow
- Module Map
- Core Services
- CPGQueryService
- VectorStoreReal
- HybridRetriever
- Retrieval Models
- RetrievalResult
- HybridRetrievalConfig
- DomainBoostContext
- Ranking
- RelevanceScore
- ResultRanker
- Workflow
- MultiScenarioCopilot
- MultiScenarioState
- Configuration
- CPGConfig
- DomainRegistry
- get_unified_config()
- API Infrastructure
- ProjectContext
- ProjectScopedServices
- FastAPI Dependencies
- Middleware Stack
- Authentication & Authorization
- Error Handling
- LocalizedHTTPException
- LLMProviderError
- AgentExecutionError
- Other Exceptions
- Security Hardening Classes
- HardeningScanner
- HardeningCategory
- HardeningSeverity
- HardeningCheck
- HardeningFinding
- Hardening Utility Functions
- Next Steps
Overview¶
Data Flow¶
graph LR
Q[User Query] --> IC[Intent Classifier]
IC --> PR[Pre-Retrieval]
PR --> R{Router}
R --> VS[VectorStoreReal<br/>ChromaDB]
R --> CPG[CPGQueryService<br/>DuckDB]
VS --> RRF[RRF Merger]
CPG --> RRF
RRF --> RK[ResultRanker]
RK --> LLM[LLM Provider]
LLM --> A[Response]
Module Map¶
| Module | Purpose | Key Class |
|---|---|---|
src/services/cpg/ |
DuckDB CPG queries (mixin-based) | CPGQueryService |
src/retrieval/vector_store_real.py |
ChromaDB vector search | VectorStoreReal |
src/retrieval/hybrid/ |
Hybrid retrieval with RRF | HybridRetriever |
src/ranking/result_ranker.py |
Multi-signal relevance ranking | ResultRanker |
src/workflow/orchestration/copilot.py |
LangGraph orchestrator | MultiScenarioCopilot |
src/workflow/state.py |
Workflow state definitions | MultiScenarioState |
src/config/ |
Pydantic configuration | get_unified_config() |
src/domains/registry.py |
Domain plugin registry | DomainRegistry |
src/api/context.py |
Per-request project scope | ProjectContext |
src/api/services/project_services.py |
LRU service cache | ProjectScopedServices |
src/api/dependencies.py |
FastAPI DI functions | get_project_context() |
src/api/errors.py |
Localized HTTP exceptions | LocalizedHTTPException |
src/security/hardening/ |
D3FEND compliance scanner | HardeningScanner |
src/agents/ |
Pipeline agents | See AGENTS.md |
Core Services¶
CPGQueryService¶
File: src/services/cpg/__init__.py (composed from src/services/cpg/base.py + 11 mixins)
The primary interface for querying Code Property Graphs stored in DuckDB. Built using a mixin architecture where each mixin adds a set of domain-specific query methods.
class CPGQueryService(
CPGQueryBase,
SubsystemQueriesMixin,
CallGraphQueriesMixin,
SecurityQueriesMixin,
PerformanceQueriesMixin,
QualityQueriesMixin,
SemanticQueriesMixin,
StatisticsQueriesMixin,
CommentQueriesMixin,
ExternalQueriesMixin,
TypeQueriesMixin,
PatternQueriesMixin,
CollectionQueriesMixin,
):
pass # All methods inherited from mixins
Constructor¶
CPGQueryService(
db_path: Optional[str] = None,
allowed_db_paths: Optional[set] = None
)
| Parameter | Type | Default | Description |
|---|---|---|---|
db_path |
Optional[str] |
None |
Path to DuckDB file. If None, resolved via ProjectManager.get_active_db_path() |
allowed_db_paths |
Optional[set] |
None |
Whitelist of allowed DB paths (multi-tenant security) |
Usage¶
from src.services.cpg import CPGQueryService
# Auto-resolve DB path from active project
service = CPGQueryService()
# Explicit path
service = CPGQueryService("data/projects/myproject.duckdb")
# Context manager
with CPGQueryService("data/projects/myproject.duckdb") as svc:
stats = svc.get_database_stats()
hotspots = svc.get_security_hotspots(limit=20)
Base Methods¶
From CPGQueryBase (src/services/cpg/base.py):
| Method | Description |
|---|---|
execute_query(query, parameters=None) |
Execute parameterized SQL, return list of tuples |
execute_sql_dict(query) |
Execute SQL, return list of dicts |
execute_custom_sql(query, limit=100) |
Execute user SQL with row limit enforcement |
set_database(db_path) |
Switch to a different DuckDB file |
switch_project(project_name) |
Switch to a named project (resolves path via ProjectManager) |
close() |
Close the DuckDB connection |
# Raw SQL with dict results
rows = service.execute_sql_dict("SELECT name, file FROM nodes_method LIMIT 5")
for row in rows:
print(f"{row['name']} in {row['file']}")
# Switch project at runtime
service.switch_project("postgresql")
Subsystem Queries¶
From SubsystemQueriesMixin (src/services/cpg/subsystem_queries.py):
| Method | Description |
|---|---|
get_subsystems() |
List all subsystems with method counts |
get_methods_by_subsystem(subsystem, limit=100) |
Get methods in a subsystem |
get_subsystem_method_counts() |
Method count per subsystem |
get_subsystem_for_method(filename) |
Determine subsystem from file path |
subsystems = service.get_subsystems()
for s in subsystems[:3]:
print(f"{s['name']}: {s['method_count']} methods")
methods = service.get_methods_by_subsystem("executor", limit=50)
Call Graph Queries¶
From CallGraphQueriesMixin (src/services/cpg/callgraph_queries.py):
| Method | Description |
|---|---|
get_call_graph(method_id, depth=2, direction="both") |
Get call graph around a method |
get_callers(function_name, limit=20) |
Find callers of a function |
get_callees(function_name, limit=20) |
Find callees of a function |
callers = service.get_callers("LWLockAcquire", limit=10)
callees = service.get_callees("CommitTransaction")
# Full call graph with depth control
graph = service.get_call_graph(method_id=123, depth=3, direction="both")
Security Queries¶
From SecurityQueriesMixin (src/services/cpg/security_queries.py):
| Method | Description |
|---|---|
get_security_hotspots(limit=100) |
Functions with high security risk |
get_taint_sources(limit=100) |
Taint analysis source functions |
get_taint_sinks(limit=100) |
Taint analysis sink functions |
find_sql_injections(...) |
Detect SQL injection patterns |
find_shell_injections(min_risk_score=0.7) |
Detect command injection patterns |
find_sql_injections_dataflow(...) |
SQL injection via data flow analysis |
find_code_injections(...) |
Detect code injection patterns |
hotspots = service.get_security_hotspots(limit=20)
sources = service.get_taint_sources()
sinks = service.get_taint_sinks()
# Data flow-based SQL injection detection
sqli = service.find_sql_injections_dataflow()
for finding in sqli:
print(f"{finding['method']} in {finding['file']}: {finding['risk']}")
Performance Queries¶
From PerformanceQueriesMixin (src/services/cpg/performance_queries.py):
| Method | Description |
|---|---|
get_performance_hotspots(limit=100) |
High-complexity / high-call-count methods |
get_allocation_heavy_methods(limit=100) |
Methods with many memory allocations |
get_recursive_methods(limit=100) |
Recursive call detection |
get_io_heavy_methods(limit=100) |
I/O-intensive methods |
get_lock_heavy_methods(limit=100) |
Methods with lock contention risk |
hotspots = service.get_performance_hotspots(limit=10)
recursive = service.get_recursive_methods()
Quality Queries¶
From QualityQueriesMixin (src/services/cpg/quality_queries.py):
| Method | Description |
|---|---|
get_methods_without_tests(...) |
Methods lacking test coverage |
count_methods_without_tests() |
Count of untested methods |
get_complex_methods(...) |
Methods above cyclomatic complexity threshold |
get_long_methods(min_lines=100, limit=100) |
Methods exceeding line count threshold |
get_methods_with_many_parameters(...) |
Methods with excessive parameter counts |
untested = service.get_methods_without_tests()
complex_methods = service.get_complex_methods()
long_methods = service.get_long_methods(min_lines=150, limit=50)
Semantic Queries¶
From SemanticQueriesMixin (src/services/cpg/semantic_queries.py):
| Method | Description |
|---|---|
search_by_function_purpose(...) |
Search by semantic function purpose |
search_by_comment_content(keyword, limit=50) |
Search within code comments |
search_methods_by_name_pattern(...) |
Regex-based method name search |
results = service.search_by_comment_content("TODO", limit=20)
pattern_matches = service.search_methods_by_name_pattern(".*Handler$")
Statistics Queries¶
From StatisticsQueriesMixin (src/services/cpg/statistics.py):
| Method | Description |
|---|---|
get_database_stats() |
Overall CPG database statistics |
get_node_type_counts() |
Count of each node type |
get_edge_type_counts() |
Count of each edge type |
get_method_quality_stats() |
Aggregate quality metrics |
get_git_state() |
Git commit state at parse time |
stats = service.get_database_stats()
print(f"Methods: {stats['method_count']}, Files: {stats['file_count']}")
git = service.get_git_state()
print(f"Parsed at commit: {git['commit_hash']}")
Comment Queries¶
From CommentQueriesMixin (src/services/cpg/comment_queries.py):
| Method | Description |
|---|---|
get_method_comments(method_name, limit=50) |
Comments for a specific method |
get_file_comments(filename, limit=100) |
All comments in a file |
search_comments(pattern, limit=100) |
Regex search across all comments |
get_todo_comments(limit=100) |
TODO/FIXME/HACK comments |
get_comment_statistics() |
Comment density metrics |
get_functions_descriptions(...) |
Function descriptions from docstrings |
todos = service.get_todo_comments(limit=50)
stats = service.get_comment_statistics()
descs = service.get_functions_descriptions()
External / Git Queries¶
From ExternalQueriesMixin (src/services/cpg/external_queries.py):
| Method | Description |
|---|---|
get_methods_by_author(author_email, limit=100) |
Methods by git author |
get_git_authors(limit=50) |
All git authors |
get_git_hotspots(min_churn=5, limit=100) |
High-churn files (change frequency) |
get_error_prone_methods(...) |
Methods correlated with bug-fix commits |
get_bus_factor_candidates(...) |
Single-author knowledge silos |
authors = service.get_git_authors()
hotspots = service.get_git_hotspots(min_churn=10, limit=20)
bus_factor = service.get_bus_factor_candidates()
Type Queries¶
From TypeQueriesMixin (src/services/cpg/type_queries.py):
| Method | Description |
|---|---|
find_type_conversions(...) |
Detect type conversion operations |
get_type_statistics() |
Type usage distribution |
find_polymorphic_variables(...) |
Variables with multiple types |
find_cast_operations(...) |
Explicit cast operations |
casts = service.find_cast_operations()
poly = service.find_polymorphic_variables()
Pattern Queries¶
From PatternQueriesMixin (src/services/cpg/pattern_queries.py):
| Method | Description |
|---|---|
get_pattern_findings(...) |
Pattern engine findings |
get_pattern_statistics() |
Summary of pattern matches |
get_pattern_rules() |
Available pattern rules |
findings = service.get_pattern_findings()
stats = service.get_pattern_statistics()
rules = service.get_pattern_rules()
Collection Queries¶
From CollectionQueriesMixin (src/services/cpg/collection_queries.py):
| Method | Description |
|---|---|
get_collection_declarations(...) |
Collection/container declarations |
get_collection_stats() |
Collection usage statistics |
find_knowledge_base_collections(...) |
Knowledge-base-specific collections |
collections = service.get_collection_declarations()
kb = service.find_knowledge_base_collections()
VectorStoreReal¶
File: src/retrieval/vector_store_real.py
ChromaDB-based vector store for semantic search across Q&A pairs, SQL examples, generated documentation, code comments, markdown docs, and domain patterns. Uses paraphrase-multilingual-MiniLM-L12-v2 for embeddings with an LRU cache.
Constructor¶
VectorStoreReal(
persist_directory: Optional[str] = None,
cache_size: int = 100,
cache_ttl: int = 3600,
collection_prefix: str = ""
)
| Parameter | Type | Default | Description |
|---|---|---|---|
persist_directory |
Optional[str] |
None |
ChromaDB storage path. Defaults to <project_root>/chroma_db |
cache_size |
int |
100 |
Max cached embeddings (LRU) |
cache_ttl |
int |
3600 |
Cache entry TTL in seconds |
collection_prefix |
str |
"" |
Per-project collection name prefix for tenant isolation |
Methods¶
| Method | Description |
|---|---|
initialize_collections() |
Create/load Q&A and SQL example collections |
retrieve_qa(query, top_k=3, filter_dict=None) |
Search Q&A pairs |
retrieve_sql(query, keywords=None, query_type=None, top_k=5) |
Search SQL examples |
retrieve_generated_docs(query, top_k=5, language=None) |
Search auto-generated docs |
retrieve_comments(query, top_k=5, comment_type=None, subsystem=None) |
Search code comments |
retrieve_documentation(query, top_k=5, section=None) |
Search markdown docs |
retrieve_domain_patterns(query, top_k=5, pattern_type=None) |
Search domain patterns |
get_cache_stats() |
Get embedding cache hit/miss stats |
Usage¶
from src.retrieval.vector_store_real import VectorStoreReal
store = VectorStoreReal(persist_directory="chroma_db", collection_prefix="postgresql")
store.initialize_collections()
# Search Q&A pairs
qa = store.retrieve_qa("How does transaction commit work?", top_k=3)
# Search SQL examples with type hint
sql = store.retrieve_sql(
"find callers",
keywords=["call_graph"],
query_type="structural",
top_k=5
)
# Search auto-generated documentation
docs = store.retrieve_generated_docs("buffer manager", top_k=5, language="en")
# Search code comments by type and subsystem
comments = store.retrieve_comments(
"memory allocation",
top_k=10,
comment_type="docstring",
subsystem="executor"
)
HybridRetriever¶
File: src/retrieval/hybrid/retriever.py
Parallel hybrid search combining vector (ChromaDB) and graph (DuckDB CPG) retrieval with Reciprocal Rank Fusion (RRF) merging and adaptive weighting by query type.
Constructor¶
HybridRetriever(
vector_store,
cpg_service,
config: Optional[HybridRetrievalConfig] = None
)
| Parameter | Type | Default | Description |
|---|---|---|---|
vector_store |
VectorStoreReal |
– | Vector search backend |
cpg_service |
CPGQueryService |
– | Graph search backend |
config |
Optional[HybridRetrievalConfig] |
None |
Weight/top-k configuration |
Methods¶
| Method | Description |
|---|---|
async retrieve(query, mode="hybrid", query_type=None, **kwargs) |
Perform retrieval, returns List[RetrievalResult] |
Usage¶
import asyncio
from src.retrieval.hybrid.retriever import HybridRetriever
from src.retrieval.hybrid.models import HybridRetrievalConfig
config = HybridRetrievalConfig(
vector_weight=0.6,
graph_weight=0.4,
final_top_k=10
)
retriever = HybridRetriever(
vector_store=store,
cpg_service=service,
config=config
)
results = asyncio.run(retriever.retrieve(
query="transaction commit handling",
mode="hybrid", # "hybrid", "vector_only", "graph_only"
query_type="semantic" # "semantic", "structural", "security"
))
for r in results:
print(f"[{r.source}] {r.content[:80]}... (score={r.score:.3f})")
Adaptive weight presets by query_type:
| Query Type | Vector Weight | Graph Weight |
|---|---|---|
semantic |
0.75 | 0.25 |
structural |
0.25 | 0.75 |
security |
0.40 | 0.60 |
| default | 0.60 | 0.40 |
Retrieval Models¶
All defined in src/retrieval/hybrid/models.py.
RetrievalResult¶
File: src/retrieval/hybrid/models.py
Unified result from any retrieval source.
@dataclass
class RetrievalResult:
id: str
content: str
score: float
source: str # "vector", "graph", or "hybrid"
entity_type: str = "method" # "method", "struct", "macro", "type", "caller", "callee"
metadata: Dict[str, Any] = field(default_factory=dict)
node_id: Optional[int] = None # CPG node ID (for deduplication)
Supports __hash__ and __eq__ based on id for set deduplication.
HybridRetrievalConfig¶
File: src/retrieval/hybrid/models.py
@dataclass
class HybridRetrievalConfig:
vector_weight: float = 0.6
graph_weight: float = 0.4
vector_top_k: int = 20
graph_top_k: int = 20
final_top_k: int = 10
min_score_threshold: float = 0.1
enable_reranking: bool = False
Validation in __post_init__: raises ValueError if vector_weight + graph_weight does not sum to 1.0 (within tolerance).
DomainBoostContext¶
File: src/retrieval/hybrid/models.py
Domain-aware boosting context for the RRF merger. Multiplicatively boosts results matching taint sources/sinks or target subsystems.
@dataclass
class DomainBoostContext:
taint_sources: Set[str] = field(default_factory=set)
taint_sinks: Set[str] = field(default_factory=set)
target_subsystem: Optional[str] = None
query_type: Optional[str] = None
entry_point_boost: float = 1.2
security_boost: float = 1.5
subsystem_boost: float = 1.3
Factory Method¶
ctx = DomainBoostContext.from_config(
taint_sources={"user_input", "env_var"},
taint_sinks={"exec_sql", "system"},
target_subsystem="executor",
query_type="security"
)
from_config() reads boost factors from get_unified_config().reranking.
Ranking¶
RelevanceScore¶
File: src/ranking/result_ranker.py
Multi-signal relevance score with full breakdown.
@dataclass
class RelevanceScore:
total: float
keyword_match: float = 0.0
tag_coverage: float = 0.0
name_match: float = 0.0
length_bonus: float = 0.0
semantic_similarity: float = 0.0
source_confidence: float = 0.0
retrieval_score: float = 0.0
def get_breakdown(self) -> Dict[str, float]
score = ranker.rank_result(result, question="find memory leaks")
breakdown = score.get_breakdown()
# {'total': 0.82, 'keyword_match': 0.3, 'semantic_similarity': 0.45, ...}
ResultRanker¶
File: src/ranking/result_ranker.py
Ranks query results using five relevance signals: keyword overlap, tag coverage, name match, length/complexity bonus, and semantic similarity (embedding-based). Supports cross-source ranking for hybrid retrieval.
from src.ranking.result_ranker import ResultRanker
ranker = ResultRanker()
See AGENTS.md for how agents use the ranker in retrieval pipelines.
Workflow¶
MultiScenarioCopilot¶
File: src/workflow/orchestration/copilot.py
The main LangGraph orchestrator. Routes queries through intent classification, pre-retrieval, and one of 21 scenario handlers.
Constructor¶
MultiScenarioCopilot()
No arguments – internal LangGraph is built in __init__.
Methods¶
| Method | Returns | Description |
|---|---|---|
run(query, context=None) |
Dict[str, Any] |
Run a query through the full workflow pipeline |
Usage¶
from src.workflow.orchestration.copilot import MultiScenarioCopilot
copilot = MultiScenarioCopilot()
# Auto-detect scenario from intent
result = copilot.run("Find buffer overflow risks in the parser module")
print(result["answer"])
print(f"Scenario: {result['scenario_id']}, Confidence: {result['confidence']}")
# Force a specific scenario
result = copilot.run(
"Analyze the executor module",
context={"scenario_id": "scenario_2"}
)
The returned dict follows the MultiScenarioState shape: query, intent, scenario_id, confidence, answer, evidence, metadata, error.
See Workflows Reference for the full list of 21 scenarios and composite workflows.
MultiScenarioState¶
File: src/workflow/state.py
TypedDict defining the shape of data flowing through LangGraph nodes.
class MultiScenarioState(TypedDict):
# Input
query: str
context: Optional[Dict[str, Any]]
language: Optional[str]
# Intent Classification
intent: Optional[str]
scenario_id: Optional[str]
confidence: Optional[float]
classification_method: Optional[str]
# CPG Data
cpg_results: Optional[List[Dict]]
subsystems: Optional[List[str]]
methods: Optional[List[Dict]]
call_graph: Optional[Any]
# Output
answer: Optional[str]
evidence: Optional[List[str]]
metadata: Optional[Dict[str, Any]]
retrieved_functions: Optional[List[str]]
# Error Handling
error: Optional[str]
retry_count: int
# Workflow Configuration
enrichment_config: Optional[Dict[str, Any]]
vector_store: Optional[Any]
# Multi-tenant scoping
db_path: Optional[str]
collection_prefix: Optional[str]
# Pre-retrieval (Phase E)
pre_retrieval_results: Optional[List[Dict[str, Any]]]
Helper function:
from src.workflow.state import create_initial_state
state = create_initial_state(
query="Where is heap_insert defined?",
context={"subsystem": "executor"}
)
Configuration¶
CPGConfig¶
File: src/config/cpg_config.py
Controls which domain-specific prompts and analyst persona are used.
Methods¶
| Method | Description |
|---|---|
set_cpg_type(cpg_type: str) |
Set the active domain (e.g., "postgresql_v2", "python_django", "go") |
get_code_analyst_title() -> str |
Get domain-specific analyst title |
from src.config.cpg_config import CPGConfig
config = CPGConfig()
config.set_cpg_type("postgresql_v2")
title = config.get_code_analyst_title()
# "PostgreSQL 17.6 expert"
DomainRegistry¶
File: src/domains/registry.py
Central registry for domain plugins (13 domains). Each plugin provides domain-specific taint sources/sinks, subsystem definitions, and pattern rules.
Class Methods¶
| Method | Returns | Description |
|---|---|---|
activate(domain_name) |
DomainPlugin |
Activate a domain plugin by name |
get_active_or_none() |
Optional[DomainPlugin] |
Get currently active domain, or None |
from src.domains.registry import DomainRegistry
plugin = DomainRegistry.activate("postgresql")
print(plugin.get_taint_sources())
print(plugin.get_subsystem_names())
# Check active domain
domain = DomainRegistry.get_active_or_none()
if domain:
print(f"Active domain: {domain.name}")
get_unified_config()¶
File: src/config/__init__.py
Returns a Pydantic-based configuration singleton loaded from config.yaml. Always use attribute access, never .get().
from src.config import get_unified_config
cfg = get_unified_config()
# Retrieval weights
cfg.retrieval_weights.default_vector_weight # 0.6
cfg.retrieval_weights.final_top_k # 10
# Reranking
cfg.reranking.boost_domain_match # 1.3
# Timeouts (never hardcode)
cfg.timeouts.external_api # 30
cfg.timeouts.llm_generation # 60
# Batch processing (never hardcode)
cfg.batch_processing.extraction_default_limit # 100
cfg.batch_processing.vector_store_batch # 100
# Cache
cfg.cache.embedding_cache_size # 100
cfg.cache.embedding_cache_ttl # 3600
To add new configuration fields: add a dataclass field in unified_config.py, add the corresponding YAML key in config.yaml, and access via get_unified_config().
API Infrastructure¶
ProjectContext¶
File: src/api/context.py
Frozen dataclass injected into every API request handler. Replaces direct calls to ProjectManager.get_active_db_path() in the API layer.
@dataclass(frozen=True)
class ProjectContext:
project_id: UUID
group_id: UUID
project_name: str
db_path: str
domain: Optional[str] = None
language: Optional[str] = None
collection_prefix: str = ""
Resolution priority in the API layer:
1. X-Project-Id header – look up project, validate user access
2. User’s active project – from database
3. Global fallback – ProjectContext.from_global_fallback() (CLI, demo mode)
# In a FastAPI route handler
@router.get("/methods")
async def list_methods(ctx: ProjectContext = Depends(get_project_context)):
cpg = ProjectScopedServices.get_cpg(ctx)
return cpg.get_database_stats()
ProjectScopedServices¶
File: src/api/services/project_services.py
LRU-cached per-project service instances. Avoids opening a new DuckDB connection per request while bounding memory.
Class Methods¶
| Method | Returns | Description |
|---|---|---|
get_cpg(ctx: ProjectContext) |
CPGQueryService |
Get or create a CPG service for the project |
get_vector_store(ctx: ProjectContext) |
VectorStoreReal |
Get or create a vector store with project prefix |
health_check() |
Dict[str, Any] |
Check health of all cached connections |
init_from_config() |
None |
Set cache size from unified config |
reset() |
None |
Close all cached services (for testing/shutdown) |
set_max_cache(size: int) |
None |
Update max concurrent project connections |
from src.api.services.project_services import ProjectScopedServices
# Initialize from config on startup
ProjectScopedServices.init_from_config()
# Get project-scoped services
cpg = ProjectScopedServices.get_cpg(ctx)
vs = ProjectScopedServices.get_vector_store(ctx)
# Health check
status = ProjectScopedServices.health_check()
# {'cpg_cache_size': 2, 'vector_cache_size': 1, 'max_cache': 10, ...}
Default max cache: 10 concurrent project connections. Configurable via multi_tenant.max_project_connections in config.yaml.
FastAPI Dependencies¶
File: src/api/dependencies.py
Dependency injection functions for FastAPI route handlers.
Authentication Dependencies¶
| Dependency | Returns | Description |
|---|---|---|
get_current_user_optional() |
Optional[User] |
Get user from JWT or API key (returns None if unauthenticated) |
get_current_user() |
User |
Get user (raises 401 if unauthenticated) |
get_current_active_user() |
User |
Get active user (raises 403 if disabled) |
Authorization Dependencies¶
| Dependency | Description |
|---|---|
require_permission(permission: Permission) |
Factory: require a specific permission |
require_role(min_role: UserRole) |
Factory: require minimum role level |
RequireAdmin |
Shortcut for require_role(UserRole.ADMIN) |
RequireReviewer |
Shortcut for require_role(UserRole.REVIEWER) |
RequireAnalyst |
Shortcut for require_role(UserRole.ANALYST) |
Project & Utility Dependencies¶
| Dependency | Returns | Description |
|---|---|---|
get_project_context() |
ProjectContext |
Resolve per-request project scope |
get_cpg_service() |
CPGQueryService |
Get project-scoped CPG service |
get_request_id() |
str |
Get request ID from middleware |
get_client_ip() |
str |
Get client IP (supports X-Forwarded-For) |
from fastapi import Depends
from src.api.dependencies import (
get_project_context,
get_cpg_service,
RequireAdmin,
require_permission,
)
from src.api.auth.permissions import Permission
@router.post("/admin/reset")
async def reset_cache(user=Depends(RequireAdmin)):
ProjectScopedServices.reset()
@router.get("/query")
async def run_query(
ctx=Depends(get_project_context),
cpg=Depends(get_cpg_service),
user=Depends(require_permission(Permission.QUERY_EXECUTE)),
):
return cpg.get_database_stats()
Middleware Stack¶
File: src/api/main.py
Middleware is applied in this order (outermost first):
| Order | Middleware | Module | Description |
|---|---|---|---|
| 1 | CORSMiddleware |
fastapi.middleware.cors |
Cross-origin resource sharing |
| 2 | RateLimiterMiddleware |
src.api.middleware.rate_limiter |
3-tier rate limiting (IP / JWT / API key) |
| 3 | SecurityHeadersMiddleware |
src.api.middleware.security_headers |
CSP, X-Frame-Options, HSTS |
| 4 | MetricsMiddleware |
src.api.middleware.metrics |
Prometheus request metrics |
| 5 | DLPMiddleware |
src.api.middleware.dlp |
Data Loss Prevention (conditional) |
Request ID and timing headers are added via @app.middleware("http") handlers.
Authentication & Authorization¶
CodeGraph supports three authentication mechanisms:
| Mechanism | Module | Description |
|---|---|---|
| Local JWT | src/api/auth/jwt_handler.py |
Username/password with JWT tokens |
| OAuth2 | src/api/auth/oauth.py |
SourceCraft (Yandex ID) and GitVerse (Sber ID) |
| LDAP | src/api/auth/ldap_auth.py |
Enterprise LDAP/AD integration |
| API Keys | src/api/auth/api_keys.py |
Programmatic access with scoped keys |
Role hierarchy: VIEWER (0) < ANALYST (1) < REVIEWER (2) < ADMIN (3).
When multi_tenant.enabled: false (default in config.yaml), RBAC checks are no-ops.
For HTTP endpoint details, authentication headers, and token lifecycle, see REST API Documentation.
Error Handling¶
LocalizedHTTPException¶
File: src/api/errors.py
HTTPException subclass with automatic i18n. Messages are retrieved from src/config/localization/{en,ru}.yaml.
from src.api.errors import LocalizedHTTPException
raise LocalizedHTTPException(
status_code=401,
detail_key="invalid_credentials",
lang="en"
)
# With format parameters
raise LocalizedHTTPException(
status_code=404,
detail_key="user_not_found",
lang="ru",
username="john"
)
Factory Functions¶
Pre-built factory functions for common HTTP errors:
| Function | Status | Description |
|---|---|---|
raise_invalid_credentials(lang) |
401 | Invalid username/password |
raise_user_disabled(lang) |
401 | Disabled account |
raise_token_expired(lang) |
401 | Expired JWT |
raise_token_invalid(lang) |
401 | Invalid JWT |
raise_insufficient_permissions(lang) |
403 | Permission denied |
raise_user_not_found(lang, username) |
404 | User not found |
raise_api_key_not_found(lang) |
404 | API key not found |
raise_session_not_found(lang) |
404 | Session not found |
raise_duplicate_username(lang, username) |
409 | Username conflict |
raise_duplicate_email(lang, email) |
409 | Email conflict |
raise_rate_limit_exceeded(lang) |
429 | Rate limit hit |
raise_internal_error(lang) |
500 | Internal server error |
LLMProviderError¶
File: src/llm/base_provider.py
Raised when an LLM provider (Yandex, GigaChat, OpenAI, local) fails.
from src.llm.base_provider import LLMProviderError
try:
response = provider.generate(prompt)
except LLMProviderError as e:
logger.error(f"LLM generation failed: {e}")
AgentExecutionError¶
File: src/workflow/error_handling.py
Raised when a workflow agent fails during execution.
from src.workflow.error_handling import AgentExecutionError
try:
result = copilot.run("analyze security")
except AgentExecutionError as e:
logger.error(f"Agent failed: {e}")
Other Exceptions¶
| Exception | File | Description |
|---|---|---|
TokenError |
src/api/auth/jwt_handler.py |
JWT decode/validation failure |
OAuthError |
src/api/auth/oauth.py |
OAuth2 flow failure |
LDAPError |
src/api/auth/ldap_auth.py |
LDAP bind/search failure |
PlatformAPIError |
src/api/services/platform_client.py |
SourceCraft/GitVerse API failure |
GoCPGProcessError |
src/services/gocpg/subprocess_runner.py |
GoCPG process execution failure |
GoCPGTimeoutError |
src/services/gocpg/subprocess_runner.py |
GoCPG process timeout |
VaultError |
src/security/vault/client.py |
HashiCorp Vault operation failure |
DatabaseNotConfiguredError |
src/project_manager.py |
No active project / DB path available |
Error Handling Pattern¶
from src.project_manager import DatabaseNotConfiguredError
from src.llm.base_provider import LLMProviderError
from src.workflow.error_handling import AgentExecutionError
try:
service = CPGQueryService()
result = copilot.run(query)
except DatabaseNotConfiguredError:
print("No project configured. Use: python -m src.cli import /path/to/source")
except LLMProviderError as e:
print(f"LLM provider error: {e}")
except AgentExecutionError as e:
print(f"Workflow error: {e}")
Security Hardening Classes¶
HardeningScanner¶
D3FEND Source Code Hardening compliance scanner.
from src.security import HardeningScanner, HardeningCategory, HardeningSeverity
scanner = HardeningScanner(cpg_service=cpg_service, language="c")
Methods¶
scan_all(limit_per_check: int = 50) -> List[HardeningFinding]¶
Run all applicable hardening checks.
findings = scanner.scan_all(limit_per_check=50)
# Returns: [HardeningFinding(d3fend_id='D3-VI', severity='high', ...)]
scan_by_d3fend_id(d3fend_ids: List[str], limit: int = 50) -> List[HardeningFinding]¶
Run checks for specific D3FEND technique IDs.
findings = scanner.scan_by_d3fend_id(["D3-VI", "D3-NPC", "D3-TL"])
# Returns: Findings for Variable Initialization, Null Pointer Checking, Trusted Library
scan_by_category(category: HardeningCategory, limit: int = 50) -> List[HardeningFinding]¶
Run checks for a specific category.
findings = scanner.scan_by_category(HardeningCategory.MEMORY_SAFETY)
# Returns: Findings for all memory safety checks
scan_by_severity(min_severity: HardeningSeverity, limit: int = 50) -> List[HardeningFinding]¶
Run checks at or above a minimum severity level.
findings = scanner.scan_by_severity(HardeningSeverity.HIGH)
# Returns: Findings with CRITICAL or HIGH severity
get_compliance_score(findings: List[HardeningFinding]) -> Dict¶
Calculate compliance scores from findings.
scores = scanner.get_compliance_score(findings)
# Returns: {
# 'overall_score': 85.3,
# 'total_findings': 12,
# 'by_category': {'initialization': 3, 'pointer_safety': 5, ...},
# 'by_d3fend': {'D3-VI': 3, 'D3-NPC': 5, ...},
# 'by_severity': {'high': 2, 'medium': 6, 'low': 4},
# 'category_scores': {'initialization': 70, 'pointer_safety': 50, ...},
# 'd3fend_scores': {'D3-VI': 70, 'D3-NPC': 50, ...}
# }
get_remediation_report(findings: List[HardeningFinding]) -> str¶
Generate a Markdown remediation report.
report = scanner.get_remediation_report(findings)
print(report)
# # D3FEND Source Code Hardening Report
# ## Summary
# - **Overall Compliance Score**: 85.3%
# - **Total Findings**: 12
# ...
get_checks_summary() -> Dict¶
Get summary of available checks.
summary = scanner.get_checks_summary()
# Returns: {
# 'total_checks': 22,
# 'language': 'c',
# 'by_category': {...},
# 'by_d3fend': {...},
# 'domain_checks': 10
# }
HardeningCategory¶
Enum for D3FEND-aligned hardening categories.
from src.security import HardeningCategory
class HardeningCategory(Enum):
INITIALIZATION = "initialization" # D3-VI
CREDENTIAL_MANAGEMENT = "credential_mgmt" # D3-CS
INTEGER_SAFETY = "integer_safety" # D3-IRV
POINTER_SAFETY = "pointer_safety" # D3-PV, D3-NPC, D3-MBSV
MEMORY_SAFETY = "memory_safety" # D3-RN
LIBRARY_SAFETY = "library_safety" # D3-TL
TYPE_SAFETY = "type_safety" # D3-VTV
DOMAIN_VALIDATION = "domain_validation" # D3-DLV
OPERATIONAL_VALIDATION = "operational" # D3-OLV
HardeningSeverity¶
Enum for severity levels.
from src.security import HardeningSeverity
class HardeningSeverity(Enum):
CRITICAL = "critical" # Directly exploitable
HIGH = "high" # Significant security risk
MEDIUM = "medium" # Moderate security risk
LOW = "low" # Minor security concern
INFO = "info" # Best practice recommendation
HardeningCheck¶
Definition of a hardening check.
from src.security import HardeningCheck
@dataclass
class HardeningCheck:
id: str # "D3-VI-001"
d3fend_id: str # "D3-VI"
d3fend_name: str # "Variable Initialization"
category: HardeningCategory
severity: HardeningSeverity
description: str
cpgql_query: str # SQL query for CPG database
cwe_ids: List[str] # ["CWE-457"]
language_scope: List[str] # ["c", "cpp"] or ["*"]
indicators: List[str]
good_patterns: List[str]
remediation: str
example_code: str
confidence_weight: float # 0.0-1.0
Methods¶
applies_to_language(language: str) -> bool¶
Check if this check applies to the given language.
check = get_check_by_id("D3-VI-001")
if check.applies_to_language("c"):
print("Applies to C code")
HardeningFinding¶
Result from running a hardening check.
from src.security import HardeningFinding
@dataclass
class HardeningFinding:
finding_id: str # Unique ID
check_id: str # "D3-VI-001"
d3fend_id: str # "D3-VI"
category: str # "initialization"
severity: str # "high"
method_name: str # "process_input"
filename: str # "src/input.c"
line_number: int # 142
code_snippet: str # "int x; use(x);"
description: str
cwe_ids: List[str]
remediation: str
confidence: float # 0.0-1.0
metadata: Dict
Methods¶
to_dict() -> Dict¶
Convert finding to dictionary for serialization.
finding_dict = finding.to_dict()
# Returns: {'finding_id': 'a1b2c3', 'd3fend_id': 'D3-VI', ...}
from_check_and_row(check, row, confidence) -> HardeningFinding¶
Create a finding from a check definition and query result row.
finding = HardeningFinding.from_check_and_row(check, row, confidence=0.9)
Hardening Utility Functions¶
from src.security import (
get_check_by_id,
get_checks_by_category,
get_checks_by_d3fend_id,
get_all_checks,
get_checks_for_language,
D3FEND_TECHNIQUES,
D3FEND_TECHNIQUE_IDS,
)
get_check_by_id(check_id: str) -> Optional[HardeningCheck]¶
Get a check by its ID.
check = get_check_by_id("D3-VI-001")
get_checks_by_category(category: HardeningCategory) -> List[HardeningCheck]¶
Get all checks in a category.
memory_checks = get_checks_by_category(HardeningCategory.MEMORY_SAFETY)
get_checks_by_d3fend_id(d3fend_id: str) -> List[HardeningCheck]¶
Get all checks for a D3FEND technique.
null_checks = get_checks_by_d3fend_id("D3-NPC")
get_all_checks() -> List[HardeningCheck]¶
Get all registered hardening checks.
all_checks = get_all_checks()
print(f"Total checks: {len(all_checks)}")
get_checks_for_language(language: str) -> List[HardeningCheck]¶
Get checks applicable to a specific language.
c_checks = get_checks_for_language("c")
D3FEND Constants¶
# Available D3FEND technique IDs
D3FEND_TECHNIQUE_IDS = [
"D3-VI", # Variable Initialization
"D3-CS", # Credential Scrubbing
"D3-IRV", # Integer Range Validation
"D3-PV", # Pointer Validation
"D3-RN", # Reference Nullification
"D3-TL", # Trusted Library
"D3-VTV", # Variable Type Validation
"D3-MBSV", # Memory Block Start Validation
"D3-NPC", # Null Pointer Checking
"D3-DLV", # Domain Logic Validation
"D3-OLV", # Operational Logic Validation
]
# D3FEND technique metadata
D3FEND_TECHNIQUES = {
"D3-VI": {
"name": "Variable Initialization",
"description": "Setting variables to a known value before use",
"url": "https://next.d3fend.mitre.org/technique/d3f:VariableInitialization",
},
# ... other techniques
}
Next Steps¶
- Agents Reference – Detailed agent documentation (AnalyzerAgent, RetrieverAgent, GeneratorAgent, etc.)
- Workflows Reference – 21 scenarios and composite workflows
- REST API Documentation – HTTP endpoints and authentication
- OpenCode Plugin – OpenCode CPG integration
- MCP Tools Reference – MCP server tools for IDE integration