Содержание¶
- Обзор
- Возможности
- 1. Журналирование запросов и ответов LLM
- 2. Интеграция с SIEM
- 3. Предотвращение утечки данных (DLP)
- 4. Интеграция с HashiCorp Vault
- Архитектура
- Конфигурация
- Включение модуля безопасности
- Полная конфигурация (config.yaml)
- Переменные окружения
- Основные компоненты
- SecureLLMProvider
- ContentScanner
- SIEMDispatcher
- VaultClient
- Расширенные функции безопасности
- FileSecurityScanner
- TaintVerifiedScanner
- HardeningScanner
- ReportGenerator
- AutofixEngine
- Система гипотез безопасности
- Шаблоны DLP
- Встроенные шаблоны
- Пользовательские шаблоны
- Таблицы базы данных
- llm_audit_log
- dlp_events
- security_events
- Форматы событий SIEM
- SysLog (RFC 5424)
- CEF
- LEEF
- Интеграция через вебхуки
- Использование через CLI
- Команды аудита безопасности
- Аудит с автоисправлением
- Рекомендации по обеспечению безопасности
- Соответствие нормативным требованиям
- Инструменты MCP
- codegraph_autofix
- codegraph_taint_analysis
- Структура модуля безопасности
- Краткое руководство
- 1. Включение функций безопасности
- 2. Запуск аудита безопасности
- 3. Просмотр отчётов
- 4. Интеграция с CI/CD
- Смотрите также
Обзор¶
CodeGraph включает корпоративный модуль безопасности для защиты чувствительных данных при работе с внешними LLM-провайдерами, такими как GigaChat, Yandex AI и OpenAI. Он помогает выполнять требования по защите данных, вести аудит и подключать внешние системы контроля.
graph TD
A[Запрос пользователя] --> B[SecureLLMProvider]
B --> C[Сканер DLP<br/>до/после]
B --> D[Фильтр содержимого]
B --> E[Журнал аудита LLM<br/>БД + SIEM]
C --> F{Действие DLP}
F -->|BLOCK| G[Запрос отклонён]
F -->|MASK| H[Маскировка данных]
F -->|WARN| I[Предупреждение в лог]
F -->|LOG_ONLY| J[Запись в лог]
D --> K[Поставщик LLM<br/>GigaChat / Yandex AI]
E --> L[Диспетчер SIEM]
L --> M[SysLog]
L --> N[CEF]
L --> O[LEEF]
C --> P[Вебхук DLP]
Q[HashiCorp Vault] -.->|Ротация секретов| B
Возможности¶
1. Журналирование запросов и ответов LLM¶
- Полная аудиторская запись всех взаимодействий с LLM
- Настраиваемое маскирование промптов перед записью в журнал
- Метрики использования токенов и задержки
- Хранение в базе данных с политиками хранения
2. Интеграция с SIEM¶
Передача событий и журналов в корпоративные SIEM-системы в режиме реального времени: - SysLog (RFC 5424) — стандартный syslog со структурированными данными - CEF (Common Event Format) — для интеграции с ArcSight - LEEF (Log Event Extended Format) — для интеграции с IBM QRadar
3. Предотвращение утечки данных (DLP)¶
Сканирование по шаблонам для предотвращения утечек данных: - Обнаружение учётных данных — ключи API, пароли, закрытые ключи - Обнаружение персональных данных (ПДн) — электронная почта, телефоны, номера кредитных карт, ИНН/СНИЛС - Пути к исходному коду — внутренние пути, строки подключения - Пользовательские ключевые слова — списки запрещённых слов, специфичные для организации
Настраиваемые действия для каждой категории:
- BLOCK — полностью отклонить запрос
- MASK — заменить конфиденциальные данные на [REDACTED]
- WARN — записать предупреждение, но разрешить запрос
- LOG_ONLY — записать в журнал только для аудита
4. Интеграция с HashiCorp Vault¶
Безопасное управление секретами: - Динамическое получение учётных данных - Поддержка нескольких методов аутентификации (Token, AppRole, Kubernetes) - Автоматическая ротация секретов - Кеширование с ограничением времени жизни (TTL)
Архитектура¶
┌─────────────────────────────────────────────────────────────────┐
│ Запрос пользователя │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SecureLLMProvider │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Сканер DLP │ │ Фильтр │ │ Журнал запросов к │ │
│ │ (до/после) │ │ содержимого │ │ LLM (БД + SIEM) │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐
│ Действия DLP │ │ BaseLLMProv. │ │ Диспетчер SIEM │
│ BLOCK/MASK │ │ (GigaChat) │ │ ┌────┐┌────┐┌────┐ │
│ /WARN/LOG │ │ │ │ │Sys ││CEF ││LEEF│ │
└──────────────┘ └──────────────┘ │ │Log ││ ││ │ │
│ │ └────┘└────┘└────┘ │
▼ └──────────────────────┘
┌──────────────┐ │
│ Вебхук DLP │ ▼
│ (внешний) │ ┌──────────────┐
└──────────────┘ │ SIEM │
│ (Splunk/ │
┌──────────────┐ │ QRadar) │
│ HashiCorp │◄── Ротация секретов └──────────────┘
│ Vault │
└──────────────┘
Конфигурация¶
Включение модуля безопасности¶
Установите переменные окружения или обновите файл config.yaml:
export SECURITY_ENABLED=true
export SIEM_ENABLED=true
export DLP_ENABLED=true
Полная конфигурация (config.yaml)¶
security:
# Главный переключатель
enabled: true
# Журналирование запросов к LLM
llm_logging:
enabled: true
log_prompts: true
redact_prompts: true
max_prompt_length: 2000
log_responses: true
log_token_usage: true
log_latency: true
log_to_database: true
# Интеграция с SIEM
siem:
enabled: true
syslog:
enabled: true
protocol: "tls" # udp, tcp, tls
host: "siem.company.com"
port: 6514
facility: 16 # local0
app_name: "codegraph"
cef:
enabled: true
host: "arcsight.company.com"
port: 514
leef:
enabled: false
# Предотвращение утечки данных
dlp:
enabled: true
pre_request:
enabled: true
default_action: "WARN"
post_response:
enabled: true
default_action: "MASK"
categories:
credentials:
enabled: true
action: "BLOCK"
severity: "critical"
pii:
enabled: true
action: "MASK"
severity: "high"
webhook:
enabled: true
endpoint: "https://dlp.company.com/api/alerts"
notify_on: ["BLOCK", "WARN"]
# HashiCorp Vault
vault:
enabled: true
url: "https://vault.company.com:8200"
auth_method: "approle"
secrets_mount_point: "secret"
llm_secrets_path: "codegraph/llm"
Переменные окружения¶
| Переменная | Описание | По умолчанию |
|---|---|---|
SECURITY_ENABLED |
Включить модуль безопасности | false |
SIEM_ENABLED |
Включить интеграцию с SIEM | false |
SIEM_SYSLOG_HOST |
Хост сервера SysLog | localhost |
SIEM_SYSLOG_PORT |
Порт сервера SysLog | 514 |
SIEM_CEF_HOST |
Хост сервера CEF | localhost |
SIEM_LEEF_HOST |
Хост сервера LEEF | localhost |
DLP_ENABLED |
Включить сканирование DLP | true |
DLP_WEBHOOK_URL |
Адрес вебхука DLP | — |
DLP_WEBHOOK_AUTH |
Заголовок авторизации вебхука DLP | — |
VAULT_ENABLED |
Включить интеграцию с Vault | false |
VAULT_ADDR |
Адрес сервера Vault | http://localhost:8200 |
VAULT_TOKEN |
Токен Vault | — |
VAULT_ROLE_ID |
Идентификатор роли AppRole | — |
VAULT_SECRET_ID |
Секретный идентификатор AppRole | — |
Основные компоненты¶
SecureLLMProvider¶
Обёртка безопасности над поставщиком LLM. Автоматически перехватывает запросы и ответы, выполняет сканирование DLP и ведёт аудиторский журнал.
Расположение: src/security/llm/secure_provider.py
from src.security.llm.secure_provider import SecureLLMProvider
secure_provider = SecureLLMProvider(
wrapped_provider=base_provider,
config=get_security_config()
)
Автоматическое использование¶
При включённом модуле безопасности обёртка подключается автоматически:
from src.llm import create_llm_provider
# Поставщик автоматически оборачивается в защитный слой
provider = create_llm_provider()
# Все запросы фильтруются и записываются в журнал
response = provider.generate(
system_prompt="Вы — аналитик кода",
user_prompt="Проанализируйте эту функцию",
)
Ручная настройка¶
from src.llm import GigaChatProvider
from src.security import get_security_config, SecureLLMProvider
# Создание базового поставщика
base_provider = GigaChatProvider(config)
# Оборачивание в защитный слой
secure_provider = SecureLLMProvider(
wrapped_provider=base_provider,
config=get_security_config()
)
# Использование защищённого поставщика
response = secure_provider.generate(
system_prompt="Проанализируйте код",
user_prompt="def process_payment(card_number='4111111111111111')...",
_user_id="user-123", # Контекст пользователя для аудита
_ip_address="192.168.1.100", # IP-адрес для журнала
)
ContentScanner¶
Сканер содержимого для обнаружения конфиденциальных данных. Применяет действия DLP в зависимости от категории совпадения.
Расположение: src/security/dlp/scanner.py
Конструктор: ContentScanner(config: DLPConfig)
Методы:
| Метод | Описание |
|---|---|
scan(content) |
Сканирование строки, возвращает List[DLPMatch] |
scan_request(content) |
Сканирование входящего запроса, возвращает ScanResult |
scan_response(content) |
Сканирование исходящего ответа, возвращает ScanResult |
get_action(matches) |
Определение действия по списку совпадений |
mask(content, matches) |
Маскировка совпадений в строке |
ScanResult — результат сканирования:
- has_matches — были ли обнаружены совпадения
- matches — список объектов DLPMatch
- action — применённое действие
- modified_content — содержимое после маскировки
- blocked — был ли запрос заблокирован
DLPMatch — обнаруженное совпадение:
- category — категория (credentials, pii, source_code)
- pattern_name — название шаблона
- match_type — тип совпадения
- matched_text — совпавший текст
- start, end — позиция совпадения
- action — рекомендуемое действие
- mask_with — строка замены при маскировке
- severity — уровень серьёзности
from src.security.dlp import ContentScanner
from src.security.config import get_security_config
config = get_security_config()
scanner = ContentScanner(config.dlp)
# Сканирование содержимого
result = scanner.scan_request("API_KEY=sk-1234567890abcdef")
if result.blocked:
print(f"Содержимое заблокировано! Совпадения: {result.matches}")
elif result.has_matches:
print(f"Обнаружены конфиденциальные данные: {result.matches}")
# Использовать замаскированное содержимое
safe_content = result.modified_content
SIEMDispatcher¶
Диспетчер событий безопасности для отправки в системы SIEM. Поддерживает несколько обработчиков одновременно (SysLog, CEF, LEEF).
Расположение: src/security/siem/dispatcher.py
Конструктор: SIEMDispatcher(config: SIEMConfig)
Методы:
| Метод | Описание |
|---|---|
dispatch(event) |
Отправка события во все подключённые обработчики |
add_handler(handler) |
Добавление обработчика SIEM |
close() |
Закрытие всех соединений |
Фабричные функции:
| Функция | Описание |
|---|---|
init_siem_dispatcher(config) |
Создание и инициализация диспетчера |
get_siem_dispatcher() |
Получение ранее созданного экземпляра |
SecurityEventType — перечисление из 17 типов событий (LLM_REQUEST, LLM_RESPONSE, DLP_BLOCK, DLP_MASK, DLP_WARN, AUTH_SUCCESS, AUTH_FAILURE, MCP_AUTH_FAILURE и др.)
SecurityEvent — класс данных с 19 полями (event_id, event_type, severity, message, timestamp, request_id, user_id, session_id, ip_address, details и др.)
from src.security.siem import (
SecurityEvent, SecurityEventType,
init_siem_dispatcher
)
from src.security.config import get_security_config
# Инициализация диспетчера
dispatcher = init_siem_dispatcher(get_security_config().siem)
# Создание и отправка события
event = SecurityEvent.create(
event_type=SecurityEventType.DLP_BLOCK,
message="Учётные данные обнаружены в запросе LLM",
severity=3, # Ошибка
user_id="user-123",
request_id="req-456",
details={"pattern": "aws_key", "category": "credentials"}
)
dispatcher.dispatch(event)
VaultClient¶
Клиент для интеграции с HashiCorp Vault. Обеспечивает безопасное хранение и получение секретов.
Расположение: src/security/vault/client.py
Конструктор: VaultClient(config: VaultConfig)
from src.security.vault.client import VaultClient
client = VaultClient(config=vault_config)
Расширенные функции безопасности¶
FileSecurityScanner¶
Быстрое сканирование безопасности на уровне файлов без построения CPG. Подходит для оперативной оценки проекта.
Расположение: src/security/file_scanner.py
Метод: scan_project(project_path) — возвращает результат со списком обнаруженных проблем.
from src.security.file_scanner import FileSecurityScanner
scanner = FileSecurityScanner()
result = scanner.scan_project("/path/to/project")
print(f"Критических: {result.critical_count}")
print(f"Высокого уровня: {result.high_count}")
for finding in result.findings:
print(f"{finding.severity}: {finding.description}")
print(f" Файл: {finding.file_path}:{finding.line_number}")
TaintVerifiedScanner¶
Сканер с верификацией через taint-анализ. Снижает количество ложных срабатываний за счёт анализа потока данных от источников к приёмникам с использованием CPG.
Расположение: src/security/taint_verified_scanner.py
Конструктор: TaintVerifiedScanner(cpg_service) — принимает экземпляр сервиса CPG (не путь к базе данных).
Метод: verify_sql_injection(findings, ...) — возвращает List[VerifiedFinding].
Принцип работы¶
Традиционное сопоставление по шаблонам даёт много ложных срабатываний. Сканер с верификацией: 1. Выявляет потенциальные уязвимости с помощью шаблонов 2. Отслеживает поток данных от источников заражения (ввод пользователя) к приёмникам (опасные функции) 3. Сообщает только о проблемах с подтверждённым путём потока данных
Источники заражённых данных (Python/Django)¶
# Данные запроса Django
request.GET, request.POST, request.data
request.body, request.FILES, request.META
# Данные запроса Flask
request.args, request.form, request.json
# Стандартный ввод
input(), raw_input(), sys.stdin, os.getenv()
# Ввод из файлов и сети
open(), read(), recv(), urlopen()
Опасные приёмники по категориям¶
| Категория | Приёмники |
|---|---|
| SQL-инъекция | execute, raw, cursor.execute, RawSQL |
| Инъекция команд | os.system, subprocess.run, eval, exec |
| Обход пути | open, os.path.join, send_file |
| XSS | mark_safe, HttpResponse |
| Десериализация | pickle.loads, yaml.load, marshal.loads |
Пример использования¶
from src.security.taint_verified_scanner import TaintVerifiedScanner
from src.services.cpg import CPGQueryService
cpg_service = CPGQueryService(db_path)
scanner = TaintVerifiedScanner(cpg_service)
# Верификация результатов, связанных с SQL-инъекцией
verified = scanner.verify_sql_injection(raw_findings)
for finding in verified:
print(f"Подтверждено: {finding.description}")
print(f"Путь потока данных: {finding.taint_path}")
HardeningScanner¶
Сканер проверок усиления безопасности по MITRE D3FEND. Реализует все методы усиления безопасности исходного кода из стандарта D3FEND.
Расположение: src/security/hardening/hardening_scanner.py
Конструктор: HardeningScanner(cpg_service, language="c") — принимает экземпляр сервиса CPG (не путь к базе данных).
Методы:
| Метод | Описание |
|---|---|
scan_all(limit_per_check=50) |
Запуск всех проверок, возвращает List[HardeningFinding] |
scan_by_d3fend_id(d3fend_ids, limit=50) |
Запуск проверок по идентификаторам D3FEND |
scan_by_category(category, limit=50) |
Запуск проверок по категории |
scan_by_severity(min_severity, limit=50) |
Запуск проверок по минимальному уровню серьёзности |
get_compliance_score(findings) |
Расчёт оценки соответствия |
get_checks_summary() |
Сводка по всем доступным проверкам |
get_remediation_report(findings) |
Отчёт с рекомендациями по устранению |
Результат — плоский список List[HardeningFinding] (не объекты с вложенными полями .check / .violations).
Поле запроса в HardeningCheck называется cpgql_query (не sql_query).
Поддерживаемые проверки¶
| Идентификатор D3FEND | Название | Описание |
|---|---|---|
| D3-VI | Инициализация переменных | Обнаружение неинициализированных переменных |
| D3-CS | Очистка учётных данных | Гарантия удаления учётных данных из памяти |
| D3-IRV | Проверка диапазона целых чисел | Проверка рисков переполнения целых чисел |
| D3-RN | Обнуление ссылок | Проверка очистки указателей после освобождения памяти |
| D3-TL | Проверка доверенных библиотек | Проверка использования безопасных функций |
| D3-VTV | Проверка типов переменных | Проверка безопасности типов |
| D3-MBSV | Проверка начала блока памяти | Проверка границ блоков памяти |
| D3-NPC | Проверка на null-указатели | Обнаружение отсутствующих проверок на null |
| D3-DLV | Проверка бизнес-логики | Проверка валидации бизнес-логики |
| D3-OLV | Проверка операционной логики | Проверка операционных ограничений |
Пример использования¶
from src.security.hardening.hardening_scanner import HardeningScanner
from src.services.cpg import CPGQueryService
cpg_service = CPGQueryService(db_path)
scanner = HardeningScanner(cpg_service, language="c")
findings = scanner.scan_all(limit_per_check=50)
for finding in findings:
print(f"[{finding.d3fend_id}] {finding.check_id}")
print(f" Серьёзность: {finding.severity}")
print(f" Рекомендация: {finding.remediation}")
Конфигурация¶
security:
hardening:
enabled: true
checks:
D3-VI: true # Инициализация переменных
D3-CS: true # Очистка учётных данных
D3-NPC: true # Проверка на null-указатели
severity_threshold: "medium" # Пропускать проблемы низкого уровня
ReportGenerator¶
Генератор отчётов по результатам аудита безопасности. Поддерживает экспорт в несколько форматов.
Расположение: src/security/report_generator.py
Имя класса: ReportGenerator (не SecurityReportGenerator).
Конструктор: ReportGenerator() — без параметров. Язык указывается при сохранении отчёта.
Порядок использования: create_report() -> add_*_findings() -> save_report(output_dir, formats, language="en")
Методы добавления результатов:
| Метод | Описание |
|---|---|
create_report(project_name, ...) |
Создание нового отчёта |
add_cpg_findings(findings) |
Добавление результатов анализа CPG |
add_dlp_findings(findings) |
Добавление результатов сканирования DLP |
add_hardening_findings(findings) |
Добавление результатов проверок D3FEND |
save_report(output_dir, formats, language) |
Сохранение отчёта в указанных форматах |
SecurityAuditReport — объект отчёта:
- to_json() — экспорт в формат JSON
- to_markdown(language) — экспорт в Markdown с локализацией
- to_sarif() — экспорт в формат SARIF
Поддерживаемые форматы¶
| Формат | Назначение |
|---|---|
| JSON | Интеграция в CI/CD, программный доступ |
| Markdown | Документация, ручной анализ |
| SARIF | Оповещения безопасности GitHub, интеграция в IDE |
Языки отчётов¶
- Английский (
en) - Русский (
ru)
Пример использования¶
from src.security.report_generator import ReportGenerator
generator = ReportGenerator()
# Создание отчёта
generator.create_report(
project_name="MyProject",
project_path="./myproject",
)
# Добавление результатов из разных источников
generator.add_cpg_findings(cpg_findings)
generator.add_dlp_findings(dlp_findings)
generator.add_hardening_findings(hardening_findings)
# Сохранение в нескольких форматах
generator.save_report(
output_dir="./security_reports",
formats=["json", "markdown", "sarif"],
language="ru",
)
Разделы отчёта¶
- Сводка — общее количество обнаруженных проблем
- Критические уязвимости — требуют немедленного устранения
- Высокий уровень серьёзности — необходимо устранить до развёртывания
- Соответствие D3FEND — результаты проверок усиления безопасности
- Соответствие OWASP Top 10 — статус по категориям с тегами
OWASP-Axx - Пути потока заражённых данных — Mermaid-диаграммы путей эксплуатации (источник -> санитайзер -> приёмник)
- Подробные уязвимости — полный список с OWASP-тегами и рекомендациями по устранению
- Метрики — покрытие, точность, полнота
Возможности SARIF 2.1.0¶
codeFlowsсогласно SARIF 2.1.0 параграф 3.36 для визуализации путей зараженияOWASP-Axxтеги на правилахreportingDescriptorдля соответствия стандартам- Таксономии инструмента ссылаются на таксономию OWASP Top 10 2021
AutofixEngine¶
Автоматическая генерация исправлений безопасности для уязвимостей, обнаруженных taint-анализом. Сочетает шаблонные исправления на основе регулярных выражений с генерацией кода через LLM.
Расположение: src/analysis/autofix/engine.py
Конструктор: AutofixEngine(source_root="", dry_run=True)
Принцип работы¶
Движок использует стратегию «сначала шаблон, затем LLM»:
- Шаблонное исправление — сопоставление на основе регулярных выражений для 6 типов уязвимостей (SQL-инъекция, инъекция команд, переполнение буфера, форматная строка, обход пути, XSS). Степень уверенности: 0.7–0.9.
- Резервный вызов LLM — когда шаблоны не совпадают, строится промпт с контекстом уязвимости (CWE, поток заражения, исходный код) и вызывается настроенный поставщик LLM. Степень уверенности ограничена значением 0.6.
- Валидация —
DiffValidatorпроверяет, что оригинальный код присутствует в реальном исходном файле и коэффициент изменений ниже 50%.
Соответствие CWE¶
| Категория приёмника | CWE ID | Тип уязвимости |
|---|---|---|
sql |
CWE-89 | SQL-инъекция |
command |
CWE-78 | Инъекция команд ОС |
file |
CWE-22 | Обход пути |
xss |
CWE-79 | Межсайтовый скриптинг (XSS) |
buffer |
CWE-120 | Переполнение буфера |
null |
CWE-476 | Разыменование NULL-указателя |
Пример использования¶
from src.analysis.autofix.engine import AutofixEngine
engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths=paths)
for result in results:
print(f"[{result.strategy}] {result.cwe_id}: {result.fix.file_path}:{result.fix.line_number}")
print(f" Валидировано: {result.validated}, Уверенность: {result.fix.confidence}")
Конфигурация¶
# config.yaml -> workflows.handlers.autofix
autofix:
enabled: true
context_lines_before: 10
context_lines_after: 10
llm_max_confidence: 0.6
llm_temperature: 0.1
llm_max_tokens: 2048
max_fixes_per_run: 20
Система гипотез безопасности¶
Система формирования и проверки гипотез безопасности на основе базы CWE (58 записей) и CAPEC (27 записей). Включает 13 SQL-шаблонов и 12 шаблонов taint-анализа.
Подробная документация: Справочник по системе гипотез безопасности.
Шаблоны DLP¶
Встроенные шаблоны¶
Учётные данные¶
api_key— общие ключи APIaws_key— идентификаторы ключей доступа AWS (AKIA…)aws_secret— секретные ключи AWSprivate_key— закрытые ключи в формате PEMpassword— шаблоны паролейjwt_token— JSON Web Token (JWT)bearer_token— токены аутентификации Bearerbasic_auth— строки аутентификации Basic (Base64)
Персональные данные (российская локаль)¶
email— адреса электронной почтыphone_ru— российские номера телефоновcredit_card— номера кредитных картinn— российский ИННsnils— российский СНИЛСpassport_ru— номера российских паспортов
Исходный код¶
connection_string— строки подключения к базам данныхinternal_path— внутренние пути к файламip_address— IP-адреса
Пользовательские шаблоны¶
Добавление пользовательских шаблонов через конфигурацию:
dlp:
categories:
custom:
enabled: true
action: "WARN"
severity: "medium"
patterns:
- name: "project_code"
regex: "PROJECT-[A-Z]{2,4}-\\d{4,6}"
mask_with: "[PROJECT-ID]"
Таблицы базы данных¶
llm_audit_log¶
Содержит аудиторскую запись всех взаимодействий с LLM (25 столбцов):
| Столбец | Тип | Описание |
|---|---|---|
id |
BIGINT | Первичный ключ |
request_id |
UUID | Уникальный идентификатор запроса |
user_id |
UUID | Пользователь, выполнивший запрос |
session_id |
UUID | Идентификатор сеанса |
ip_address |
INET | IP-адрес клиента |
provider |
VARCHAR(50) | Название поставщика LLM |
model |
VARCHAR(100) | Название модели |
system_prompt_hash |
VARCHAR(64) | SHA256 системного промпта |
system_prompt_length |
INT | Длина системного промпта |
user_prompt_preview |
TEXT | Предварительный просмотр промпта (замаскированный) |
user_prompt_length |
INT | Длина пользовательского промпта |
response_preview |
TEXT | Предварительный просмотр ответа |
response_length |
INT | Длина ответа |
status |
VARCHAR(20) | Статус запроса (success, error, blocked) |
prompt_tokens |
INT | Количество токенов в промпте |
completion_tokens |
INT | Количество токенов в ответе |
total_tokens |
INT | Общее количество токенов |
latency_ms |
FLOAT | Задержка запроса (мс) |
dlp_action |
VARCHAR(20) | Применённое действие DLP |
dlp_match_count |
INT | Количество совпадений DLP |
dlp_categories |
ARRAY(VARCHAR) | Совпавшие категории DLP |
error_type |
VARCHAR(100) | Тип ошибки (при неудаче) |
error_message |
TEXT | Сообщение об ошибке (при неудаче) |
timestamp |
TIMESTAMP(tz) | Время запроса |
metadata |
JSONB | Дополнительные метаданные |
dlp_events¶
Детальные события срабатывания DLP (13 столбцов):
| Столбец | Тип | Описание |
|---|---|---|
id |
BIGINT | Первичный ключ |
audit_log_id |
BIGINT | Ссылка на запись в llm_audit_log |
request_id |
UUID | Идентификатор запроса |
event_type |
VARCHAR(50) | Тип события (request_scan, response_scan) |
action |
VARCHAR(20) | Применённое действие |
category |
VARCHAR(50) | Категория DLP |
pattern_name |
VARCHAR(100) | Название совпавшего шаблона |
severity |
VARCHAR(20) | Уровень серьёзности |
match_preview |
VARCHAR(200) | Замаскированный фрагмент совпадения |
position |
INT | Позиция совпадения в содержимом |
user_id |
UUID | Идентификатор пользователя |
ip_address |
INET | IP-адрес клиента |
timestamp |
TIMESTAMP(tz) | Время события |
security_events¶
Журнал событий безопасности для интеграции с SIEM (13 столбцов):
| Столбец | Тип | Описание |
|---|---|---|
id |
BIGINT | Первичный ключ |
event_id |
UUID | Уникальный идентификатор события |
event_type |
VARCHAR(50) | Тип события безопасности |
severity |
SMALLINT | Уровень серьёзности по syslog (0–7) |
request_id |
UUID | Идентификатор запроса |
user_id |
UUID | Идентификатор пользователя |
session_id |
UUID | Идентификатор сеанса |
ip_address |
INET | IP-адрес клиента |
message |
TEXT | Описание события |
details |
JSONB | Дополнительные данные о событии |
dispatched |
BOOLEAN | Было ли событие отправлено в SIEM |
dispatch_error |
TEXT | Ошибка отправки (при неудаче) |
timestamp |
TIMESTAMP(tz) | Время события |
Форматы событий SIEM¶
SysLog (RFC 5424)¶
<134>1 2024-12-09T10:30:00.000Z codegraph.company.com codegraph - llm.dlp.block [llm@12345 requestId="req-123" userId="user-456" provider="GigaChat" action="BLOCK" category="credentials"] DLP BLOCK: 2 patterns in request
CEF¶
CEF:0|CodeGraph|CodeAnalysis|1.0|llm.dlp.block|DLP Block|7|rt=Dec 09 2024 10:30:00 src=192.168.1.100 suser=user-456 cs1=req-123 cs1Label=RequestID cs2=GigaChat cs2Label=Provider act=BLOCK cat=credentials
LEEF¶
LEEF:2.0|CodeGraph|CodeAnalysis|1.0|llm.dlp.block| devTime=Dec 09 2024 10:30:00 src=192.168.1.100 usrName=user-456 requestId=req-123 provider=GigaChat action=BLOCK category=credentials
Интеграция через вебхуки¶
Формат полезной нагрузки вебхука DLP:
{
"alert_id": "a1b2c3d4e5f6",
"timestamp": "2024-12-09T10:30:00.000Z",
"action": "BLOCK",
"match_count": 2,
"categories": ["credentials"],
"patterns": ["api_key", "aws_key"],
"request_id": "req-123",
"user_id": "user-456",
"ip_address": "192.168.1.100",
"severity": "critical",
"context": {}
}
Использование через CLI¶
Команды аудита безопасности¶
Модуль src/cli/security_audit.py предоставляет следующие подкоманды:
# Полное сканирование проекта
python -m src.cli.security_audit full --path ./myproject --format all --verbose
# Быстрое сканирование (только файловый анализ)
python -m src.cli.security_audit quick --path ./myproject
# Просмотр текущих настроек безопасности
python -m src.cli.security_audit settings
# Поиск секретов в коде
python -m src.cli.security_audit secrets --path ./myproject
Доступные флаги: --path, --output, --format, --verbose, --language.
Аудит с автоисправлением¶
Флаг --autofix находится в отдельном модуле src/cli/audit_commands.py:
# Аудит с автоматическим формированием исправлений
python -m src.cli audit --autofix
Флаг --autofix запускает taint-анализ после сканирования и формирует предложения по исправлению, отображаемые в виде таблицы Rich с диффами.
Рекомендации по обеспечению безопасности¶
- Включите TLS для подключений SIEM в рабочей среде
- Используйте AppRole или аутентификацию через Kubernetes для Vault (а не простые токены)
- Настройте соответствующие действия DLP — BLOCK для учётных данных, MASK для персональных данных
- Настройте политики хранения журналов в вашей системе SIEM для соответствия требованиям
- Отслеживайте события DLP_BLOCK на предмет попыток несанкционированного копирования данных
- Регулярно обновляйте шаблоны для обнаружения новых форматов учётных данных
- Тестируйте шаблоны DLP перед развёртыванием в рабочей среде
Соответствие нормативным требованиям¶
Модуль безопасности помогает выполнить требования следующих нормативных актов:
- 152-ФЗ «О персональных данных» — обнаружение и маскировка персональных данных (ПДн)
- ГОСТ Р 57580 — аудит действий и контроль доступа для финансовых организаций
- PCI DSS — обнаружение номеров кредитных карт
- Требования ФСТЭК — защита информации ограниченного доступа (с использованием пользовательских шаблонов)
Инструменты MCP¶
codegraph_autofix¶
Автоматическое формирование исправлений безопасности. Доступен для интеграции с IDE через протокол MCP.
codegraph_autofix(
method_name="process_query", # Фокус на конкретном методе
cwe="CWE-89", # Фильтр по идентификатору CWE
)
Параметры:
- method_name (обязательный) — название метода для анализа
- cwe (необязательный) — фильтр по идентификатору CWE
codegraph_taint_analysis¶
Запуск taint-анализа для метода. Отслеживает поток данных от источников к приёмникам.
codegraph_taint_analysis(
method_name="handle_request",
source_category="user_input",
sink_category="sql",
)
Структура модуля безопасности¶
src/security/
├── __init__.py # Экспорт модуля
├── _base.py # Базовые классы (Severity, Category)
├── config.py # Конфигурация безопасности
│
├── dlp/ # Предотвращение утечки данных
│ ├── patterns.py # Шаблоны DLP (учётные данные, ПДн)
│ ├── scanner.py # Сканер содержимого (ContentScanner)
│ ├── actions.py # Действия DLP (BLOCK, MASK, WARN)
│ └── webhook.py # Вебхуки оповещений
│
├── siem/ # Интеграция с SIEM
│ ├── base_handler.py # Базовый класс обработчика, SecurityEvent
│ ├── syslog_handler.py # Обработчик SysLog
│ ├── cef_handler.py # Обработчик CEF
│ ├── leef_handler.py # Обработчик LEEF
│ ├── buffer.py # Буферизация событий
│ └── dispatcher.py # Диспетчер (SIEMDispatcher)
│
├── vault/ # HashiCorp Vault
│ ├── client.py # Клиент API Vault (VaultClient)
│ └── secret_manager.py # Управление секретами
│
├── llm/ # Безопасность LLM
│ ├── secure_provider.py # Обёртка SecureLLMProvider
│ └── request_logger.py # Журналирование запросов
│
├── hardening/ # Усиление безопасности (D3FEND)
│ ├── base.py # Определения проверок, HardeningFinding
│ ├── d3fend_checks.py # Все проверки D3FEND
│ └── hardening_scanner.py # Сканер (HardeningScanner)
│
├── patterns/ # Шаблоны уязвимостей
│ ├── injection.py # Инъекции SQL/команд
│ ├── memory.py # Безопасность работы с памятью
│ ├── crypto.py # Криптографические проблемы
│ ├── auth.py # Ошибки аутентификации
│ ├── concurrency.py # Состояния гонки
│ └── python_django.py # Специфичные для Python/Django
│
├── file_scanner.py # Файловый сканер (FileSecurityScanner)
├── taint_verified_scanner.py # Верификация через taint-анализ
├── report_generator.py # Генерация отчётов (ReportGenerator)
└── report_localizer.py # Поддержка локализации
Краткое руководство¶
1. Включение функций безопасности¶
export SECURITY_ENABLED=true
export DLP_ENABLED=true
export SIEM_ENABLED=true
2. Запуск аудита безопасности¶
# Полный аудит со всеми проверками
python -m src.cli.security_audit full \
--path ./myproject \
--format all \
--verbose
# Полный аудит с автоисправлением
python -m src.cli audit --autofix
3. Просмотр отчётов¶
# Отчёты сохраняются в ./security_reports/
ls security_reports/
# security_audit_20241209_103000.json
# security_audit_20241209_103000.md
# security_audit_20241209_103000.sarif
4. Интеграция с CI/CD¶
# .github/workflows/security.yml
- name: Security Audit
run: |
python -m src.cli.security_audit full \
--path . \
--format sarif \
--output security.sarif
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: security.sarif
Смотрите также¶
- Руководство по CLI — использование CLI для аудита безопасности
- Интеграция с OpenCode — интеграция с OpenCode
- Документация по API — конечные точки безопасности