Модуль корпоративной безопасности

Содержание

Обзор

CodeGraph включает корпоративный модуль безопасности для защиты чувствительных данных при работе с внешними LLM-провайдерами, такими как GigaChat, Yandex AI и OpenAI. Он помогает выполнять требования по защите данных, вести аудит и подключать внешние системы контроля.

graph TD
    A[Запрос пользователя] --> B[SecureLLMProvider]
    B --> C[Сканер DLP<br/>до/после]
    B --> D[Фильтр содержимого]
    B --> E[Журнал аудита LLM<br/>БД + SIEM]
    C --> F{Действие DLP}
    F -->|BLOCK| G[Запрос отклонён]
    F -->|MASK| H[Маскировка данных]
    F -->|WARN| I[Предупреждение в лог]
    F -->|LOG_ONLY| J[Запись в лог]
    D --> K[Поставщик LLM<br/>GigaChat / Yandex AI]
    E --> L[Диспетчер SIEM]
    L --> M[SysLog]
    L --> N[CEF]
    L --> O[LEEF]
    C --> P[Вебхук DLP]
    Q[HashiCorp Vault] -.->|Ротация секретов| B

Возможности

1. Журналирование запросов и ответов LLM

  • Полная аудиторская запись всех взаимодействий с LLM
  • Настраиваемое маскирование промптов перед записью в журнал
  • Метрики использования токенов и задержки
  • Хранение в базе данных с политиками хранения

2. Интеграция с SIEM

Передача событий и журналов в корпоративные SIEM-системы в режиме реального времени: - SysLog (RFC 5424) — стандартный syslog со структурированными данными - CEF (Common Event Format) — для интеграции с ArcSight - LEEF (Log Event Extended Format) — для интеграции с IBM QRadar

3. Предотвращение утечки данных (DLP)

Сканирование по шаблонам для предотвращения утечек данных: - Обнаружение учётных данных — ключи API, пароли, закрытые ключи - Обнаружение персональных данных (ПДн) — электронная почта, телефоны, номера кредитных карт, ИНН/СНИЛС - Пути к исходному коду — внутренние пути, строки подключения - Пользовательские ключевые слова — списки запрещённых слов, специфичные для организации

Настраиваемые действия для каждой категории: - BLOCK — полностью отклонить запрос - MASK — заменить конфиденциальные данные на [REDACTED] - WARN — записать предупреждение, но разрешить запрос - LOG_ONLY — записать в журнал только для аудита

4. Интеграция с HashiCorp Vault

Безопасное управление секретами: - Динамическое получение учётных данных - Поддержка нескольких методов аутентификации (Token, AppRole, Kubernetes) - Автоматическая ротация секретов - Кеширование с ограничением времени жизни (TTL)

Архитектура

┌─────────────────────────────────────────────────────────────────┐
│                     Запрос пользователя                          │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     SecureLLMProvider                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐  │
│  │  Сканер DLP  │  │ Фильтр      │  │  Журнал запросов к   │  │
│  │  (до/после)  │  │ содержимого  │  │  LLM (БД + SIEM)    │  │
│  └──────────────┘  └──────────────┘  └──────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
         │                    │                    │
         ▼                    ▼                    ▼
┌──────────────┐     ┌──────────────┐     ┌──────────────────────┐
│ Действия DLP │     │ BaseLLMProv. │     │  Диспетчер SIEM      │
│ BLOCK/MASK   │     │ (GigaChat)   │     │  ┌────┐┌────┐┌────┐ │
│ /WARN/LOG    │     │              │     │  │Sys ││CEF ││LEEF│ │
└──────────────┘     └──────────────┘     │  │Log ││    ││    │ │
         │                                │  └────┘└────┘└────┘ │
         ▼                                └──────────────────────┘
┌──────────────┐                                    │
│ Вебхук DLP   │                                    ▼
│ (внешний)    │                           ┌──────────────┐
└──────────────┘                           │    SIEM      │
                                           │  (Splunk/    │
┌──────────────┐                           │   QRadar)    │
│ HashiCorp    │◄── Ротация секретов       └──────────────┘
│ Vault        │
└──────────────┘

Конфигурация

Включение модуля безопасности

Установите переменные окружения или обновите файл config.yaml:

export SECURITY_ENABLED=true
export SIEM_ENABLED=true
export DLP_ENABLED=true

Полная конфигурация (config.yaml)

security:
  # Главный переключатель
  enabled: true

  # Журналирование запросов к LLM
  llm_logging:
    enabled: true
    log_prompts: true
    redact_prompts: true
    max_prompt_length: 2000
    log_responses: true
    log_token_usage: true
    log_latency: true
    log_to_database: true

  # Интеграция с SIEM
  siem:
    enabled: true
    syslog:
      enabled: true
      protocol: "tls"  # udp, tcp, tls
      host: "siem.company.com"
      port: 6514
      facility: 16  # local0
      app_name: "codegraph"
    cef:
      enabled: true
      host: "arcsight.company.com"
      port: 514
    leef:
      enabled: false

  # Предотвращение утечки данных
  dlp:
    enabled: true
    pre_request:
      enabled: true
      default_action: "WARN"
    post_response:
      enabled: true
      default_action: "MASK"
    categories:
      credentials:
        enabled: true
        action: "BLOCK"
        severity: "critical"
      pii:
        enabled: true
        action: "MASK"
        severity: "high"
    webhook:
      enabled: true
      endpoint: "https://dlp.company.com/api/alerts"
      notify_on: ["BLOCK", "WARN"]

  # HashiCorp Vault
  vault:
    enabled: true
    url: "https://vault.company.com:8200"
    auth_method: "approle"
    secrets_mount_point: "secret"
    llm_secrets_path: "codegraph/llm"

Переменные окружения

Переменная Описание По умолчанию
SECURITY_ENABLED Включить модуль безопасности false
SIEM_ENABLED Включить интеграцию с SIEM false
SIEM_SYSLOG_HOST Хост сервера SysLog localhost
SIEM_SYSLOG_PORT Порт сервера SysLog 514
SIEM_CEF_HOST Хост сервера CEF localhost
SIEM_LEEF_HOST Хост сервера LEEF localhost
DLP_ENABLED Включить сканирование DLP true
DLP_WEBHOOK_URL Адрес вебхука DLP
DLP_WEBHOOK_AUTH Заголовок авторизации вебхука DLP
VAULT_ENABLED Включить интеграцию с Vault false
VAULT_ADDR Адрес сервера Vault http://localhost:8200
VAULT_TOKEN Токен Vault
VAULT_ROLE_ID Идентификатор роли AppRole
VAULT_SECRET_ID Секретный идентификатор AppRole

Основные компоненты

SecureLLMProvider

Обёртка безопасности над поставщиком LLM. Автоматически перехватывает запросы и ответы, выполняет сканирование DLP и ведёт аудиторский журнал.

Расположение: src/security/llm/secure_provider.py

from src.security.llm.secure_provider import SecureLLMProvider

secure_provider = SecureLLMProvider(
    wrapped_provider=base_provider,
    config=get_security_config()
)

Автоматическое использование

При включённом модуле безопасности обёртка подключается автоматически:

from src.llm import create_llm_provider

# Поставщик автоматически оборачивается в защитный слой
provider = create_llm_provider()

# Все запросы фильтруются и записываются в журнал
response = provider.generate(
    system_prompt="Вы — аналитик кода",
    user_prompt="Проанализируйте эту функцию",
)

Ручная настройка

from src.llm import GigaChatProvider
from src.security import get_security_config, SecureLLMProvider

# Создание базового поставщика
base_provider = GigaChatProvider(config)

# Оборачивание в защитный слой
secure_provider = SecureLLMProvider(
    wrapped_provider=base_provider,
    config=get_security_config()
)

# Использование защищённого поставщика
response = secure_provider.generate(
    system_prompt="Проанализируйте код",
    user_prompt="def process_payment(card_number='4111111111111111')...",
    _user_id="user-123",        # Контекст пользователя для аудита
    _ip_address="192.168.1.100",  # IP-адрес для журнала
)

ContentScanner

Сканер содержимого для обнаружения конфиденциальных данных. Применяет действия DLP в зависимости от категории совпадения.

Расположение: src/security/dlp/scanner.py

Конструктор: ContentScanner(config: DLPConfig)

Методы:

Метод Описание
scan(content) Сканирование строки, возвращает List[DLPMatch]
scan_request(content) Сканирование входящего запроса, возвращает ScanResult
scan_response(content) Сканирование исходящего ответа, возвращает ScanResult
get_action(matches) Определение действия по списку совпадений
mask(content, matches) Маскировка совпадений в строке

ScanResult — результат сканирования: - has_matches — были ли обнаружены совпадения - matches — список объектов DLPMatch - action — применённое действие - modified_content — содержимое после маскировки - blocked — был ли запрос заблокирован

DLPMatch — обнаруженное совпадение: - category — категория (credentials, pii, source_code) - pattern_name — название шаблона - match_type — тип совпадения - matched_text — совпавший текст - start, end — позиция совпадения - action — рекомендуемое действие - mask_with — строка замены при маскировке - severity — уровень серьёзности

from src.security.dlp import ContentScanner
from src.security.config import get_security_config

config = get_security_config()
scanner = ContentScanner(config.dlp)

# Сканирование содержимого
result = scanner.scan_request("API_KEY=sk-1234567890abcdef")

if result.blocked:
    print(f"Содержимое заблокировано! Совпадения: {result.matches}")
elif result.has_matches:
    print(f"Обнаружены конфиденциальные данные: {result.matches}")
    # Использовать замаскированное содержимое
    safe_content = result.modified_content

SIEMDispatcher

Диспетчер событий безопасности для отправки в системы SIEM. Поддерживает несколько обработчиков одновременно (SysLog, CEF, LEEF).

Расположение: src/security/siem/dispatcher.py

Конструктор: SIEMDispatcher(config: SIEMConfig)

Методы:

Метод Описание
dispatch(event) Отправка события во все подключённые обработчики
add_handler(handler) Добавление обработчика SIEM
close() Закрытие всех соединений

Фабричные функции:

Функция Описание
init_siem_dispatcher(config) Создание и инициализация диспетчера
get_siem_dispatcher() Получение ранее созданного экземпляра

SecurityEventType — перечисление из 17 типов событий (LLM_REQUEST, LLM_RESPONSE, DLP_BLOCK, DLP_MASK, DLP_WARN, AUTH_SUCCESS, AUTH_FAILURE, MCP_AUTH_FAILURE и др.)

SecurityEvent — класс данных с 19 полями (event_id, event_type, severity, message, timestamp, request_id, user_id, session_id, ip_address, details и др.)

from src.security.siem import (
    SecurityEvent, SecurityEventType,
    init_siem_dispatcher
)
from src.security.config import get_security_config

# Инициализация диспетчера
dispatcher = init_siem_dispatcher(get_security_config().siem)

# Создание и отправка события
event = SecurityEvent.create(
    event_type=SecurityEventType.DLP_BLOCK,
    message="Учётные данные обнаружены в запросе LLM",
    severity=3,  # Ошибка
    user_id="user-123",
    request_id="req-456",
    details={"pattern": "aws_key", "category": "credentials"}
)

dispatcher.dispatch(event)

VaultClient

Клиент для интеграции с HashiCorp Vault. Обеспечивает безопасное хранение и получение секретов.

Расположение: src/security/vault/client.py

Конструктор: VaultClient(config: VaultConfig)

from src.security.vault.client import VaultClient

client = VaultClient(config=vault_config)

Расширенные функции безопасности

FileSecurityScanner

Быстрое сканирование безопасности на уровне файлов без построения CPG. Подходит для оперативной оценки проекта.

Расположение: src/security/file_scanner.py

Метод: scan_project(project_path) — возвращает результат со списком обнаруженных проблем.

from src.security.file_scanner import FileSecurityScanner

scanner = FileSecurityScanner()
result = scanner.scan_project("/path/to/project")

print(f"Критических: {result.critical_count}")
print(f"Высокого уровня: {result.high_count}")

for finding in result.findings:
    print(f"{finding.severity}: {finding.description}")
    print(f"  Файл: {finding.file_path}:{finding.line_number}")

TaintVerifiedScanner

Сканер с верификацией через taint-анализ. Снижает количество ложных срабатываний за счёт анализа потока данных от источников к приёмникам с использованием CPG.

Расположение: src/security/taint_verified_scanner.py

Конструктор: TaintVerifiedScanner(cpg_service) — принимает экземпляр сервиса CPG (не путь к базе данных).

Метод: verify_sql_injection(findings, ...) — возвращает List[VerifiedFinding].

Принцип работы

Традиционное сопоставление по шаблонам даёт много ложных срабатываний. Сканер с верификацией: 1. Выявляет потенциальные уязвимости с помощью шаблонов 2. Отслеживает поток данных от источников заражения (ввод пользователя) к приёмникам (опасные функции) 3. Сообщает только о проблемах с подтверждённым путём потока данных

Источники заражённых данных (Python/Django)

# Данные запроса Django
request.GET, request.POST, request.data
request.body, request.FILES, request.META

# Данные запроса Flask
request.args, request.form, request.json

# Стандартный ввод
input(), raw_input(), sys.stdin, os.getenv()

# Ввод из файлов и сети
open(), read(), recv(), urlopen()

Опасные приёмники по категориям

Категория Приёмники
SQL-инъекция execute, raw, cursor.execute, RawSQL
Инъекция команд os.system, subprocess.run, eval, exec
Обход пути open, os.path.join, send_file
XSS mark_safe, HttpResponse
Десериализация pickle.loads, yaml.load, marshal.loads

Пример использования

from src.security.taint_verified_scanner import TaintVerifiedScanner
from src.services.cpg import CPGQueryService

cpg_service = CPGQueryService(db_path)
scanner = TaintVerifiedScanner(cpg_service)

# Верификация результатов, связанных с SQL-инъекцией
verified = scanner.verify_sql_injection(raw_findings)

for finding in verified:
    print(f"Подтверждено: {finding.description}")
    print(f"Путь потока данных: {finding.taint_path}")

HardeningScanner

Сканер проверок усиления безопасности по MITRE D3FEND. Реализует все методы усиления безопасности исходного кода из стандарта D3FEND.

Расположение: src/security/hardening/hardening_scanner.py

Конструктор: HardeningScanner(cpg_service, language="c") — принимает экземпляр сервиса CPG (не путь к базе данных).

Методы:

Метод Описание
scan_all(limit_per_check=50) Запуск всех проверок, возвращает List[HardeningFinding]
scan_by_d3fend_id(d3fend_ids, limit=50) Запуск проверок по идентификаторам D3FEND
scan_by_category(category, limit=50) Запуск проверок по категории
scan_by_severity(min_severity, limit=50) Запуск проверок по минимальному уровню серьёзности
get_compliance_score(findings) Расчёт оценки соответствия
get_checks_summary() Сводка по всем доступным проверкам
get_remediation_report(findings) Отчёт с рекомендациями по устранению

Результат — плоский список List[HardeningFinding] (не объекты с вложенными полями .check / .violations).

Поле запроса в HardeningCheck называется cpgql_query (не sql_query).

Поддерживаемые проверки

Идентификатор D3FEND Название Описание
D3-VI Инициализация переменных Обнаружение неинициализированных переменных
D3-CS Очистка учётных данных Гарантия удаления учётных данных из памяти
D3-IRV Проверка диапазона целых чисел Проверка рисков переполнения целых чисел
D3-RN Обнуление ссылок Проверка очистки указателей после освобождения памяти
D3-TL Проверка доверенных библиотек Проверка использования безопасных функций
D3-VTV Проверка типов переменных Проверка безопасности типов
D3-MBSV Проверка начала блока памяти Проверка границ блоков памяти
D3-NPC Проверка на null-указатели Обнаружение отсутствующих проверок на null
D3-DLV Проверка бизнес-логики Проверка валидации бизнес-логики
D3-OLV Проверка операционной логики Проверка операционных ограничений

Пример использования

from src.security.hardening.hardening_scanner import HardeningScanner
from src.services.cpg import CPGQueryService

cpg_service = CPGQueryService(db_path)
scanner = HardeningScanner(cpg_service, language="c")
findings = scanner.scan_all(limit_per_check=50)

for finding in findings:
    print(f"[{finding.d3fend_id}] {finding.check_id}")
    print(f"  Серьёзность: {finding.severity}")
    print(f"  Рекомендация: {finding.remediation}")

Конфигурация

security:
  hardening:
    enabled: true
    checks:
      D3-VI: true    # Инициализация переменных
      D3-CS: true    # Очистка учётных данных
      D3-NPC: true   # Проверка на null-указатели
    severity_threshold: "medium"  # Пропускать проблемы низкого уровня

ReportGenerator

Генератор отчётов по результатам аудита безопасности. Поддерживает экспорт в несколько форматов.

Расположение: src/security/report_generator.py

Имя класса: ReportGenerator (не SecurityReportGenerator).

Конструктор: ReportGenerator() — без параметров. Язык указывается при сохранении отчёта.

Порядок использования: create_report() -> add_*_findings() -> save_report(output_dir, formats, language="en")

Методы добавления результатов:

Метод Описание
create_report(project_name, ...) Создание нового отчёта
add_cpg_findings(findings) Добавление результатов анализа CPG
add_dlp_findings(findings) Добавление результатов сканирования DLP
add_hardening_findings(findings) Добавление результатов проверок D3FEND
save_report(output_dir, formats, language) Сохранение отчёта в указанных форматах

SecurityAuditReport — объект отчёта: - to_json() — экспорт в формат JSON - to_markdown(language) — экспорт в Markdown с локализацией - to_sarif() — экспорт в формат SARIF

Поддерживаемые форматы

Формат Назначение
JSON Интеграция в CI/CD, программный доступ
Markdown Документация, ручной анализ
SARIF Оповещения безопасности GitHub, интеграция в IDE

Языки отчётов

  • Английский (en)
  • Русский (ru)

Пример использования

from src.security.report_generator import ReportGenerator

generator = ReportGenerator()

# Создание отчёта
generator.create_report(
    project_name="MyProject",
    project_path="./myproject",
)

# Добавление результатов из разных источников
generator.add_cpg_findings(cpg_findings)
generator.add_dlp_findings(dlp_findings)
generator.add_hardening_findings(hardening_findings)

# Сохранение в нескольких форматах
generator.save_report(
    output_dir="./security_reports",
    formats=["json", "markdown", "sarif"],
    language="ru",
)

Разделы отчёта

  1. Сводка — общее количество обнаруженных проблем
  2. Критические уязвимости — требуют немедленного устранения
  3. Высокий уровень серьёзности — необходимо устранить до развёртывания
  4. Соответствие D3FEND — результаты проверок усиления безопасности
  5. Соответствие OWASP Top 10 — статус по категориям с тегами OWASP-Axx
  6. Пути потока заражённых данных — Mermaid-диаграммы путей эксплуатации (источник -> санитайзер -> приёмник)
  7. Подробные уязвимости — полный список с OWASP-тегами и рекомендациями по устранению
  8. Метрики — покрытие, точность, полнота

Возможности SARIF 2.1.0

  • codeFlows согласно SARIF 2.1.0 параграф 3.36 для визуализации путей заражения
  • OWASP-Axx теги на правилах reportingDescriptor для соответствия стандартам
  • Таксономии инструмента ссылаются на таксономию OWASP Top 10 2021

AutofixEngine

Автоматическая генерация исправлений безопасности для уязвимостей, обнаруженных taint-анализом. Сочетает шаблонные исправления на основе регулярных выражений с генерацией кода через LLM.

Расположение: src/analysis/autofix/engine.py

Конструктор: AutofixEngine(source_root="", dry_run=True)

Принцип работы

Движок использует стратегию «сначала шаблон, затем LLM»:

  1. Шаблонное исправление — сопоставление на основе регулярных выражений для 6 типов уязвимостей (SQL-инъекция, инъекция команд, переполнение буфера, форматная строка, обход пути, XSS). Степень уверенности: 0.7–0.9.
  2. Резервный вызов LLM — когда шаблоны не совпадают, строится промпт с контекстом уязвимости (CWE, поток заражения, исходный код) и вызывается настроенный поставщик LLM. Степень уверенности ограничена значением 0.6.
  3. ВалидацияDiffValidator проверяет, что оригинальный код присутствует в реальном исходном файле и коэффициент изменений ниже 50%.

Соответствие CWE

Категория приёмника CWE ID Тип уязвимости
sql CWE-89 SQL-инъекция
command CWE-78 Инъекция команд ОС
file CWE-22 Обход пути
xss CWE-79 Межсайтовый скриптинг (XSS)
buffer CWE-120 Переполнение буфера
null CWE-476 Разыменование NULL-указателя

Пример использования

from src.analysis.autofix.engine import AutofixEngine

engine = AutofixEngine(source_root="/path/to/project", dry_run=True)
results = engine.generate_fixes(taint_paths=paths)

for result in results:
    print(f"[{result.strategy}] {result.cwe_id}: {result.fix.file_path}:{result.fix.line_number}")
    print(f"  Валидировано: {result.validated}, Уверенность: {result.fix.confidence}")

Конфигурация

# config.yaml -> workflows.handlers.autofix
autofix:
  enabled: true
  context_lines_before: 10
  context_lines_after: 10
  llm_max_confidence: 0.6
  llm_temperature: 0.1
  llm_max_tokens: 2048
  max_fixes_per_run: 20

Система гипотез безопасности

Система формирования и проверки гипотез безопасности на основе базы CWE (58 записей) и CAPEC (27 записей). Включает 13 SQL-шаблонов и 12 шаблонов taint-анализа.

Подробная документация: Справочник по системе гипотез безопасности.

Шаблоны DLP

Встроенные шаблоны

Учётные данные

  • api_key — общие ключи API
  • aws_key — идентификаторы ключей доступа AWS (AKIA…)
  • aws_secret — секретные ключи AWS
  • private_key — закрытые ключи в формате PEM
  • password — шаблоны паролей
  • jwt_token — JSON Web Token (JWT)
  • bearer_token — токены аутентификации Bearer
  • basic_auth — строки аутентификации Basic (Base64)

Персональные данные (российская локаль)

  • email — адреса электронной почты
  • phone_ru — российские номера телефонов
  • credit_card — номера кредитных карт
  • inn — российский ИНН
  • snils — российский СНИЛС
  • passport_ru — номера российских паспортов

Исходный код

  • connection_string — строки подключения к базам данных
  • internal_path — внутренние пути к файлам
  • ip_address — IP-адреса

Пользовательские шаблоны

Добавление пользовательских шаблонов через конфигурацию:

dlp:
  categories:
    custom:
      enabled: true
      action: "WARN"
      severity: "medium"
      patterns:
        - name: "project_code"
          regex: "PROJECT-[A-Z]{2,4}-\\d{4,6}"
          mask_with: "[PROJECT-ID]"

Таблицы базы данных

llm_audit_log

Содержит аудиторскую запись всех взаимодействий с LLM (25 столбцов):

Столбец Тип Описание
id BIGINT Первичный ключ
request_id UUID Уникальный идентификатор запроса
user_id UUID Пользователь, выполнивший запрос
session_id UUID Идентификатор сеанса
ip_address INET IP-адрес клиента
provider VARCHAR(50) Название поставщика LLM
model VARCHAR(100) Название модели
system_prompt_hash VARCHAR(64) SHA256 системного промпта
system_prompt_length INT Длина системного промпта
user_prompt_preview TEXT Предварительный просмотр промпта (замаскированный)
user_prompt_length INT Длина пользовательского промпта
response_preview TEXT Предварительный просмотр ответа
response_length INT Длина ответа
status VARCHAR(20) Статус запроса (success, error, blocked)
prompt_tokens INT Количество токенов в промпте
completion_tokens INT Количество токенов в ответе
total_tokens INT Общее количество токенов
latency_ms FLOAT Задержка запроса (мс)
dlp_action VARCHAR(20) Применённое действие DLP
dlp_match_count INT Количество совпадений DLP
dlp_categories ARRAY(VARCHAR) Совпавшие категории DLP
error_type VARCHAR(100) Тип ошибки (при неудаче)
error_message TEXT Сообщение об ошибке (при неудаче)
timestamp TIMESTAMP(tz) Время запроса
metadata JSONB Дополнительные метаданные

dlp_events

Детальные события срабатывания DLP (13 столбцов):

Столбец Тип Описание
id BIGINT Первичный ключ
audit_log_id BIGINT Ссылка на запись в llm_audit_log
request_id UUID Идентификатор запроса
event_type VARCHAR(50) Тип события (request_scan, response_scan)
action VARCHAR(20) Применённое действие
category VARCHAR(50) Категория DLP
pattern_name VARCHAR(100) Название совпавшего шаблона
severity VARCHAR(20) Уровень серьёзности
match_preview VARCHAR(200) Замаскированный фрагмент совпадения
position INT Позиция совпадения в содержимом
user_id UUID Идентификатор пользователя
ip_address INET IP-адрес клиента
timestamp TIMESTAMP(tz) Время события

security_events

Журнал событий безопасности для интеграции с SIEM (13 столбцов):

Столбец Тип Описание
id BIGINT Первичный ключ
event_id UUID Уникальный идентификатор события
event_type VARCHAR(50) Тип события безопасности
severity SMALLINT Уровень серьёзности по syslog (0–7)
request_id UUID Идентификатор запроса
user_id UUID Идентификатор пользователя
session_id UUID Идентификатор сеанса
ip_address INET IP-адрес клиента
message TEXT Описание события
details JSONB Дополнительные данные о событии
dispatched BOOLEAN Было ли событие отправлено в SIEM
dispatch_error TEXT Ошибка отправки (при неудаче)
timestamp TIMESTAMP(tz) Время события

Форматы событий SIEM

SysLog (RFC 5424)

<134>1 2024-12-09T10:30:00.000Z codegraph.company.com codegraph - llm.dlp.block [llm@12345 requestId="req-123" userId="user-456" provider="GigaChat" action="BLOCK" category="credentials"] DLP BLOCK: 2 patterns in request

CEF

CEF:0|CodeGraph|CodeAnalysis|1.0|llm.dlp.block|DLP Block|7|rt=Dec 09 2024 10:30:00 src=192.168.1.100 suser=user-456 cs1=req-123 cs1Label=RequestID cs2=GigaChat cs2Label=Provider act=BLOCK cat=credentials

LEEF

LEEF:2.0|CodeGraph|CodeAnalysis|1.0|llm.dlp.block|  devTime=Dec 09 2024 10:30:00    src=192.168.1.100   usrName=user-456    requestId=req-123   provider=GigaChat   action=BLOCK    category=credentials

Интеграция через вебхуки

Формат полезной нагрузки вебхука DLP:

{
  "alert_id": "a1b2c3d4e5f6",
  "timestamp": "2024-12-09T10:30:00.000Z",
  "action": "BLOCK",
  "match_count": 2,
  "categories": ["credentials"],
  "patterns": ["api_key", "aws_key"],
  "request_id": "req-123",
  "user_id": "user-456",
  "ip_address": "192.168.1.100",
  "severity": "critical",
  "context": {}
}

Использование через CLI

Команды аудита безопасности

Модуль src/cli/security_audit.py предоставляет следующие подкоманды:

# Полное сканирование проекта
python -m src.cli.security_audit full --path ./myproject --format all --verbose

# Быстрое сканирование (только файловый анализ)
python -m src.cli.security_audit quick --path ./myproject

# Просмотр текущих настроек безопасности
python -m src.cli.security_audit settings

# Поиск секретов в коде
python -m src.cli.security_audit secrets --path ./myproject

Доступные флаги: --path, --output, --format, --verbose, --language.

Аудит с автоисправлением

Флаг --autofix находится в отдельном модуле src/cli/audit_commands.py:

# Аудит с автоматическим формированием исправлений
python -m src.cli audit --autofix

Флаг --autofix запускает taint-анализ после сканирования и формирует предложения по исправлению, отображаемые в виде таблицы Rich с диффами.

Рекомендации по обеспечению безопасности

  1. Включите TLS для подключений SIEM в рабочей среде
  2. Используйте AppRole или аутентификацию через Kubernetes для Vault (а не простые токены)
  3. Настройте соответствующие действия DLP — BLOCK для учётных данных, MASK для персональных данных
  4. Настройте политики хранения журналов в вашей системе SIEM для соответствия требованиям
  5. Отслеживайте события DLP_BLOCK на предмет попыток несанкционированного копирования данных
  6. Регулярно обновляйте шаблоны для обнаружения новых форматов учётных данных
  7. Тестируйте шаблоны DLP перед развёртыванием в рабочей среде

Соответствие нормативным требованиям

Модуль безопасности помогает выполнить требования следующих нормативных актов:

  • 152-ФЗ «О персональных данных» — обнаружение и маскировка персональных данных (ПДн)
  • ГОСТ Р 57580 — аудит действий и контроль доступа для финансовых организаций
  • PCI DSS — обнаружение номеров кредитных карт
  • Требования ФСТЭК — защита информации ограниченного доступа (с использованием пользовательских шаблонов)

Инструменты MCP

codegraph_autofix

Автоматическое формирование исправлений безопасности. Доступен для интеграции с IDE через протокол MCP.

codegraph_autofix(
    method_name="process_query",  # Фокус на конкретном методе
    cwe="CWE-89",                 # Фильтр по идентификатору CWE
)

Параметры: - method_name (обязательный) — название метода для анализа - cwe (необязательный) — фильтр по идентификатору CWE

codegraph_taint_analysis

Запуск taint-анализа для метода. Отслеживает поток данных от источников к приёмникам.

codegraph_taint_analysis(
    method_name="handle_request",
    source_category="user_input",
    sink_category="sql",
)

Структура модуля безопасности

src/security/
├── __init__.py               # Экспорт модуля
├── _base.py                  # Базовые классы (Severity, Category)
├── config.py                 # Конфигурация безопасности
│
├── dlp/                      # Предотвращение утечки данных
│   ├── patterns.py           # Шаблоны DLP (учётные данные, ПДн)
│   ├── scanner.py            # Сканер содержимого (ContentScanner)
│   ├── actions.py            # Действия DLP (BLOCK, MASK, WARN)
│   └── webhook.py            # Вебхуки оповещений
│
├── siem/                     # Интеграция с SIEM
│   ├── base_handler.py       # Базовый класс обработчика, SecurityEvent
│   ├── syslog_handler.py     # Обработчик SysLog
│   ├── cef_handler.py        # Обработчик CEF
│   ├── leef_handler.py       # Обработчик LEEF
│   ├── buffer.py             # Буферизация событий
│   └── dispatcher.py         # Диспетчер (SIEMDispatcher)
│
├── vault/                    # HashiCorp Vault
│   ├── client.py             # Клиент API Vault (VaultClient)
│   └── secret_manager.py     # Управление секретами
│
├── llm/                      # Безопасность LLM
│   ├── secure_provider.py    # Обёртка SecureLLMProvider
│   └── request_logger.py     # Журналирование запросов
│
├── hardening/                # Усиление безопасности (D3FEND)
│   ├── base.py               # Определения проверок, HardeningFinding
│   ├── d3fend_checks.py      # Все проверки D3FEND
│   └── hardening_scanner.py  # Сканер (HardeningScanner)
│
├── patterns/                 # Шаблоны уязвимостей
│   ├── injection.py          # Инъекции SQL/команд
│   ├── memory.py             # Безопасность работы с памятью
│   ├── crypto.py             # Криптографические проблемы
│   ├── auth.py               # Ошибки аутентификации
│   ├── concurrency.py        # Состояния гонки
│   └── python_django.py      # Специфичные для Python/Django
│
├── file_scanner.py           # Файловый сканер (FileSecurityScanner)
├── taint_verified_scanner.py # Верификация через taint-анализ
├── report_generator.py       # Генерация отчётов (ReportGenerator)
└── report_localizer.py       # Поддержка локализации

Краткое руководство

1. Включение функций безопасности

export SECURITY_ENABLED=true
export DLP_ENABLED=true
export SIEM_ENABLED=true

2. Запуск аудита безопасности

# Полный аудит со всеми проверками
python -m src.cli.security_audit full \
    --path ./myproject \
    --format all \
    --verbose

# Полный аудит с автоисправлением
python -m src.cli audit --autofix

3. Просмотр отчётов

# Отчёты сохраняются в ./security_reports/
ls security_reports/
# security_audit_20241209_103000.json
# security_audit_20241209_103000.md
# security_audit_20241209_103000.sarif

4. Интеграция с CI/CD

# .github/workflows/security.yml
- name: Security Audit
  run: |
    python -m src.cli.security_audit full \
      --path . \
      --format sarif \
      --output security.sarif

- name: Upload SARIF
  uses: github/codeql-action/upload-sarif@v2
  with:
    sarif_file: security.sarif

Смотрите также