Перейти к основному содержимому

Runtime-инварианты

Plugin-system гарантирует, что один и тот же плагин работает одинаково:

  • в приложении in-process (FastAPI, Django, обычный Python/Node/Go-сервис);
  • в orchestrator-op (Dagster asset, Airflow task);
  • в task-queue worker (Celery, RQ);
  • в unit-тесте (с mock-ресурсами).

Это свойство не случайное. Оно возникает только если плагин соблюдает восемь runtime-инвариантов, зафиксированных в ADR-0003. Если хоть один нарушен — плагин перестаёт работать при переходе из одной среды в другую.

Страница-обзор, как применять каждый инвариант на практике. Нормативный контракт — в ADR-0003.

Сводная таблица

ИнвариантКак проверяется
1Orchestration-neutrality (запрет внешнего состояния среды)Автоматический тест в трёх хостах.
2Сериализуемые границыRound-trip JSON serialize для каждого входа/выхода.
3Ресурсы через DIПроверка, что плагин не создаёт HTTP/DB-клиентов сам.
4Явный execution_modelДекларация в манифесте; runtime-детектор блокирующих I/O в sync-плагинах.
5Unit of Work для долгоживущихДекларация в манифесте.
6Абстрактные ProgressSink / CheckpointStoreОтсутствие прямых импортов WebSocket / файлового I/O.
7Режимы идемпотентности (input_hash / output_hash / none)Декларация в манифесте + повторный запуск с тем же input.
8Детерминизм для output_hashДвойной запуск с frozen-clock и seeded-rng, bit-equal проверка.

1. Orchestration-neutrality

Правило: плагин не опирается на внешнее состояние среды хоста — текущий event loop, process-синглтоны, locale, working directory.

Что запрещено:

  • Полагаться на asyncio.get_event_loop() / Tokio::current() / эквиваленты в конструкторе или setup().
  • Читать/писать process-level синглтоны (thread-locals, module-mutable state, «глобальный клиент»).
  • Использовать файловую систему за пределами ctx.resources.tmpdir.
  • Читать os.environ в runtime-фазе (читать в setup() и кэшировать — OK).
  • Полагаться на os.getcwd().

Как исправить нарушения:

# ✗ Bad — capturing an external event loop.
class BadPlugin:
def __init__(self):
self.loop = asyncio.get_event_loop() # breaks inside an orchestrator

# ✓ Good — never hold a reference to the loop.
class GoodPlugin:
def __init__(self):
pass # lazy-init; everything happens in setup()

async def setup(self, ctx):
# asyncio.get_running_loop() inside an async function returns the current run's loop — fine.
pass

2. Сериализуемые границы

Правило: всё, что пересекает границу плагина (вход хука, выход хука, checkpoint, progress-событие), должно быть JSON-сериализуемо.

Что нельзя передавать:

  • Живые HTTP/DB-клиенты, session-объекты, connection pools.
  • Открытые файловые дескрипторы.
  • Async-генераторы, корутины, потоки, каналы.
  • Замыкания и методы объектов.
  • C-extension native handles.

Что можно:

  • Структурные данные, описуемые JSON Schema.
  • Примитивы (строки, числа, bool, null).
  • Массивы и словари из них.
  • Bytes (до ≤10 МБ в v1.0 MVP).
  • Ссылки на внешние ресурсы в виде URL: s3://bucket/key, file://path.

Типичные «молчаливо ломают JSON» случаи:

ТипКак сериализовать
datetimeISO 8601 строка ("2026-01-01T12:00:00Z")
Decimal / BigIntстрока
setмассив
Enumvalue enum
UUIDстрока ("a1b2c3...")

Исключение для in_process_only плагинов: могут передавать живые объекты (например, stream-объекты) внутри одного вызова хука. Но checkpoint и progress всё равно сериализуемы.

3. Ресурсы через DI

Правило: HTTP-клиенты, БД-соединения, connection pools — инъектируются через ctx.resources, не создаются внутри плагина.

Детали — на отдельной странице «Ресурсы (Resources DI)».

4. Явный execution_model

Правило: плагин декларирует в манифесте стиль исполнения. Host подбирает подходящий executor.

execution_model = "async"
# или
execution_model = "sync"
execution_model = "thread_cpu_bound"
execution_model = "process_cpu_bound"

Последствия:

  • sync-плагин не должен блокироваться на I/O — host проверит через специализированный тулинг (в Python — blockbuster).
  • async-плагин не должен занимать CPU без явного yield — блокирует event loop.
  • thread_cpu_bound-плагин ходит в thread-pool, не в event-loop.
  • process_cpu_bound — отдельный процесс (для очень тяжёлых задач).

5. Unit of Work для долгоживущих плагинов

Правило: плагин, чья операция занимает минуты+, декларирует параметры UoW в манифесте:

[plugin.unit_of_work]
declared = true
partition_key = "tenant_id"
estimated_duration_sec = 600
idempotency_mode = "input_hash"
checkpointable = true
  • partition_key — ключ шардирования; orchestrator параллелит units по этому ключу.
  • idempotency_mode — см. инвариант 7.
  • checkpointable = true — плагин сохраняет прогресс через ctx.checkpoint и умеет resume.

UoW-плагины вызываются оркестратором (kind = "orchestrator"), не напрямую приложением.

6. Абстрактные ProgressSink / CheckpointStore

Правило: плагин публикует прогресс через абстрактный ctx.progress, не напрямую через WebSocket / SSE / файл лога. Аналогично ctx.checkpoint для сохранения/восстановления состояния.

# ✗ Bad — a direct dependency on WebSocket.
class BadPlugin:
async def process(self):
await broadcast_to_websocket({"event": "progress", "percent": 50})

# ✓ Good — through the abstract sink.
class GoodPlugin:
async def process(self):
self._progress.update(percent=0.5, message="Halfway there")

Host подставляет разные реализации sink:

  • В web-приложении — WebSocket-broadcast.
  • В orchestrator-op — stream в state оркестратора.
  • В unit-тесте — append в массив для assertions.

7. Режимы идемпотентности

Правило: UoW-плагин декларирует idempotency_mode:

РежимСемантикаКогда применять
input_hashПовтор с тем же input → тот же output. Orchestrator пропускает повтор.Индексация, обогащение данных.
output_hashПовтор может дать новый output, но если hash совпал — это один результат.Идемпотентное обновление downstream по hash результата.
noneКаждый запуск — новый результат. Дедупликация — ответственность consumer.Notifications, fire-and-forget.

8. Детерминизм для output_hash

Правило: плагин с idempotency_mode = "output_hash" обязан быть детерминистичным — тот же input + тот же state ресурсов → тот же output.

Что запрещено:

  • time.now() / Date.now() / time.Now() — только ctx.resources.clock.now().
  • random() / Math.random() / rand.Int() — только ctx.resources.rng.next(...).
  • Итерация по set без сортировки.
  • Итерация по неупорядоченному dict/map.
  • Locale-зависимое форматирование.

Как проверить детерминизм: прогоните плагин дважды с frozen-clock и seeded-rng, сравните outputs byte-by-byte. Если различаются — какой-то источник недетерминизма не через DI.

Как контрактная рамка тестов проверяет инварианты

В Python-биндинге (dagstack-plugin-system >= 0.2) есть встроенная рамка контрактных тестов. Эквивалентные API в TypeScript / Go реализациях появятся при их выходе в stable. Ниже — Python-пример:

from dagstack.plugin_system import (
run_contract_suite,
assert_no_ambient_state,
assert_json_serializable_boundaries,
assert_lifecycle_clean,
assert_deterministic,
assert_manifest_valid,
ALL_CHECKS,
)


def test_my_plugin_contracts():
manifest = load_manifest("plugins/my-plugin/dagstack.toml")
result = run_contract_suite(
plugin_class=MyPlugin,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()

Каждая проверка покрывает один или несколько инвариантов:

ПроверкаИнварианты
assert_manifest_valid4, 5, 7 (декларации в манифесте)
assert_no_ambient_state1
assert_json_serializable_boundaries2
assert_lifecycle_clean3 (Resources DI — не утекают ресурсы)
assert_deterministic8

Детали запуска — на странице Руководство: Тестирование плагинов.

См. также