Перейти к основному содержимому

ADR-0003 · Orchestration-neutral runtime

Статус: accepted v1.0 (2026-04-16) · Полный нормативный текст

Зачем инварианты

ADR-0001 ввёл три runtime-адаптера (in_process / mcp_stdio / mcp_http) и подразумевает принцип «плагин не знает, где он исполняется». Но какие именно ограничения делают этот принцип рабочим — не зафиксированы. Пока единственный вызывающий — ядро приложения в том же процессе, нарушения не проявляются: внешний event loop, общие синглтоны, локальные пути в файловой системе работают «сами собой».

Есть два сценария, которые это свойство сломают, если не зафиксировать инварианты заранее:

  1. Масштаб до 10⁴+ партиций — когда один плагин обрабатывает тысячи независимых единиц работы (per-tenant, per-repo), нужен orchestrator с очередями, retries, backfill, аудитом.
  2. Внешний orchestrator как kind плагина — Dagster, Celery, Airflow, k8s Job интегрируются через ту же plugin-систему. Плагин запускается в отдельном процессе/поде, прогресс стримится через абстрактный sink, checkpoint — в Postgres.

Если плагин опирается на внешний event loop или process-синглтоны, переход из in-process ядра в orchestrator-среду требует переписывания каждого плагина. ADR-0003 делает дисциплину нормативной до появления таких плагинов — чтобы не переписывать потом.

Восемь инвариантов

Полная таблица. Детали каждого — ниже.

ИнвариантЧто проверяется
1Orchestration-neutralityПлагин не читает внешнее состояние среды (event loop, синглтоны, working dir).
2Сериализуемые границыВходы/выходы хуков round-trip сериализуются в JSON.
3Ресурсы через DIHTTP-клиенты, БД-соединения, tmpdir инъектируются через PluginContext.resources.
4Явная декларация execution_modelПлагин объявляет sync/async/thread_cpu_bound/process_cpu_bound.
5Unit of Work для долгоживущихПлагины на минуты+ декларируют partition_key, idempotency_mode, checkpointable.
6Abstract Progress/Checkpoint sinksПрогресс и checkpoint — через абстрактные интерфейсы, не WebSocket/файл напрямую.
7Режимы идемпотентностиinput_hash / output_hash / none, участвуют в оркестраторском skip-логике.
8Детерминизм для output_hash плагиновВремя и random через ctx.clock / ctx.rng, без обращения к внешней среде.

1. Orchestration-neutrality — запрет внешнего состояния среды

Запрещено:

  • Полагаться на текущий event loop / executor / runtime-контекст в момент инициализации.
  • Читать/писать process-level синглтоны (thread-locals, module-level mutable state, «глобальный клиент»).
  • Использовать файловую систему за пределами путей, инъектированных через PluginContext.
  • Читать переменные окружения вне setup() (считывать и кэшировать — OK; читать «на лету» — нет, в распределённых сценариях у разных воркеров они различаются).
  • Полагаться на текущий working directory.

Результат: плагин одинаково работает, когда вызван из ядра в том же процессе, из orchestrator-op в отдельном процессе с новым event loop, из task-queue worker без event loop и из unit-теста в синхронном контексте.

Контрактный тест: плагин прогоняется в трёх хостах — in_process_host, forked_process_host, fresh_event_loop_host — с одинаковым input. Результаты сравниваются на equality.

2. Сериализуемые границы

Всё, что пересекает границу плагина (входы хука, выходы хука, checkpoint, progress-событие) должно быть round-trip JSON-сериализуемо.

Запрещено: live HTTP/DB-клиенты, session-объекты, connection pools, открытые файловые дескрипторы, native-library handles (C-extension), async-генераторы / корутины / потоки / каналы, замыкания, методы объектов, любые не-сериализуемые callable.

Разрешено: структурные данные, валидируемые JSON Schema (Python pydantic / TS zod / Go struct-tag), примитивы, массивы/словари из них, bytes (до лимита ≤10 МБ MVP), ссылки на внешние ресурсы (s3://..., file://...) — но не сами ресурсы.

Исключение для in_process_only плагинов: могут передавать живые объекты внутри одного вызова хука (например, streaming-плагин стримит AsyncIterator в SSE на клиент). Но checkpoint и progress всё равно сериализуемы — иначе resume после рестарта невозможен.

Контрактный тест: round-trip JSON-сериализация / десериализация / deep-equal для каждого hook-input/output. Отдельно проверяются типы, которые молча ломают JSON: datetime (ISO 8601), Decimal / BigInt (строка), set (→ массив), Enum (→ значение), UUID (→ строка).

3. Ресурсы через dependency injection

Плагин не создаёт долгоживущих ресурсов внутри себя. HTTP-клиент, БД-клиент, connection pool, blob store — всё это инъектируется хостом через PluginContext.resources.

Стандартные ресурсы (Phase 0):

РесурсНазначение
http_clientHTTP-клиент, предконфигурированный корпоративным CA bundle / TLS.
tmpdirВременная директория, cleanup по teardown().
blob_storeАбстрактный BlobStore (S3 / FS / в памяти).
clockИнъектируемый источник времени (freezable в тестах).
rngИнъектируемый источник случайности (seedable в тестах).

Декларация в манифесте:

[plugin.resources]
required = ["http_client", "blob_store"]
optional = ["postgres"]

При несоответствии (required = ["postgres"], host не предоставляет) плагин помечается unavailable с внятным объяснением. При optional — просто None в ctx.resources.postgres, плагин обязан обработать.

4. Явная декларация execution_model

Плагин объявляет стиль исполнения в манифесте. Host подбирает executor.

ЗначениеСемантикаКогда применять
asyncAsync-aware (async/await в Python/TS, goroutines в Go); host вызывает в event loop / runtime.I/O-bound — HTTP, БД.
syncЧистый синхронный, не блокирующий.Быстрая трансформация (parse/format).
thread_cpu_boundSync, CPU-bound; host ходит в thread pool / worker thread.Re-ranking, небольшие ML-модели.
process_cpu_boundSync, тяжёлый CPU; host выделяет отдельный процесс.Большие ML-модели, тяжёлый парсинг.

Контрактный тест: sync-плагин не делает блокирующих I/O (проверка через language-specific tooling); async-плагин не занимает CPU без явного yield.

5. Unit of Work для долгоживущих плагинов

Плагины с операциями на минуты+ декларируют единицу работы в манифесте:

[plugin.unit_of_work]
declared = true
partition_key = "tenant_id"
estimated_duration_sec = 600
idempotency_mode = "input_hash" # input_hash | output_hash | none
checkpointable = true
  • partition_key — ключ шардирования; orchestrator кладёт units одного partition в одну очередь.
  • estimated_duration_sec — подсказка планировщику против блокировки очереди.
  • idempotency_mode — см. инвариант 7.
  • checkpointable = true — плагин умеет resume через ctx.checkpoint.

UoW-плагины вызываются оркестратором (kind = "orchestrator"), не напрямую ядром. Если внешний orchestrator не зарегистрирован — fallback на встроенный LocalExecutorOrchestrator (обязателен в каждой реализации).

6. Abstract Progress / Checkpoint sinks

Плагин публикует прогресс через абстрактный sink ctx.progress, не напрямую через WebSocket / SSE / файл лога.

Интерфейс ProgressSink:

ctx.progress.update(percent: float, message: str, payload: dict | None)
ctx.progress.event(event_type: str, payload: dict)

Реализация sink — ответственность хоста: в ядре in-process это может быть WebSocket-broadcast, в orchestrator-op — стрим в state, в unit-тесте — append в список для assertions.

Интерфейс CheckpointStore:

ctx.checkpoint.save(key: str, state: dict) → version_id
ctx.checkpoint.load(key: str) → (state: dict | None, version_id)
ctx.checkpoint.list(key_prefix: str) → list[(key, version_id)]

Контракт: save атомарен (write-then-rename для файлового backend; транзакция для БД), load после save всегда возвращает последний сохранённый state (read-after-write consistency), state — JSON-сериализуем (инвариант 2).

Контрактный тест: плагин запускается с in-memory CheckpointStore. Resume после смоделированного crash в произвольной точке прогресса даёт финальный результат, равный single-pass прогону.

7. Режимы идемпотентности

UoW-плагин декларирует idempotency_mode:

  • input_hash — повторный запуск с тем же input даёт тот же output. Hash входа = идентификатор результата. Orchestrator может пропустить повтор.
  • output_hash — повторный запуск может дать новый output, но если hash совпадает с предыдущим — это один результат (для оптимизации downstream).
  • none — каждый запуск — новый результат. Incremental rerun даёт duplicate (дедупликация на стороне потребителя).

Orchestrator перед запуском вычисляет input hash → проверяет registry на завершённый run с этим hash → если есть, пропускает и возвращает сохранённый output.

8. Детерминизм для output_hash

Плагин с idempotency_mode = "output_hash" обязан быть детерминистичным: тот же input + то же состояние ресурсов → тот же output.

Запрещено:

  • Использовать внешнее time.now() / Date.now() / time.Now() — только ctx.clock.now().
  • Использовать внешнее random() / Math.random() / rand.Int() — только ctx.rng.next(...).
  • Опираться на ordering в неупорядоченных коллекциях (Python dict до 3.7, Go map, JS-объекты без Map) — нужна явная сортировка.
  • Полагаться на locale-зависимое поведение (сравнение строк, форматирование чисел).

Инъектируемые clock и rng позволяют тестировать с frozen-clock и seeded-rng — bit-equal output между прогонами. В production clock = системный, rng = OS — output корректный, но не bit-equal (это OK — output_hash считается по структуре результата, не по noise).

Пример: UoW-плагин с resources и checkpoint

plugins/repo-indexer/dagstack.toml
[plugin]
name = "repo-indexer"
kind = "pipeline"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
execution_model = "async"

[plugin.resources]
required = ["http_client", "blob_store", "clock"]

[plugin.unit_of_work]
declared = true
partition_key = "repo_id"
estimated_duration_sec = 1800
idempotency_mode = "input_hash"
checkpointable = true
plugins/repo-indexer/plugin.py
from dagstack.plugin_system import PluginContext


class RepoIndexer:
async def setup(self, context: PluginContext) -> None:
self._http = context.resources.http_client
self._blobs = context.resources.blob_store
self._clock = context.resources.clock
self._progress = context.progress
self._checkpoint = context.checkpoint

async def index(self, repo_id: str) -> dict:
state, _ = self._checkpoint.load(key=f"repo-{repo_id}")
processed = state["processed_files"] if state else []

files = await self._list_files(repo_id)
for i, file in enumerate(f for f in files if f not in processed):
await self._process_file(file)
processed.append(file)

self._checkpoint.save(
key=f"repo-{repo_id}",
state={"processed_files": processed, "at": self._clock.now().isoformat()},
)
self._progress.update(
percent=(i + 1) / len(files),
message=f"Processed {i + 1}/{len(files)}",
payload={"current_file": file},
)

return {"repo_id": repo_id, "total": len(files)}

Один и тот же код работает:

  • из ядра in-process — LocalExecutorOrchestrator запускает index(repo_id="acme-core") в той же event loop;
  • из Dagster — orchestrator-плагин для Dagster оборачивает index как asset, partition = repo_id;
  • из Celery — та же обёртка для Celery, partition_key становится routing-key очереди;
  • из unit-теста — ctx собирается вручную с in-memory CheckpointStore и frozen Clock.

Новый kind: orchestrator

ADR-0003 вводит новый вид плагина — orchestrator. Он singleton, всегда in_process_only (держит состояние очереди, in-flight tracking, retry counters в памяти хоста; через MCP wire жить не может).

Хуки вида:

  • enqueue(plugin_name, args, idempotency_key?, partition_key?) → (unit_id, deduplicated) — поставить UoW в очередь, идемпотентно по idempotency_key.
  • status(unit_id) → (state, started_at?, finished_at?, progress?, error?) — текущий статус.
  • backfill(plugin_name?, partition_key?, since?, until?, states?) → (enqueued_count, skipped_count) — повторная постановка failed / expired units.

Встроенная реализация: LocalExecutorOrchestrator — обязательный in-tree плагин каждой реализации. Запускает enqueue-ed UoW в том же event loop / thread pool, очередь в памяти (или персистный JSON в Phase 0). Реальные очереди (Postgres-backed / Redis-backed) — внешние оркестраторы в Phase 1+ через Dagster-wrapper, Celery-wrapper и т.д.

Последствия

Положительные:

  • Плагин одинаково работает в четырёх средах без условного кода.
  • Resume долгоживущих UoW после crash — через ctx.checkpoint, без переписывания логики плагина.
  • Добавление интеграции с Dagster/Celery/Airflow — это новый orchestrator-плагин, существующие плагины не трогаются.
  • Unit-тесты дёшевы: альтернативные ресурсы (frozen clock, in-memory checkpoint) дают воспроизводимые прогоны.

Компромиссы:

  • Цена дисциплины. Авторы плагинов должны соблюдать все восемь инвариантов. Контрактные тесты ловят основное, но не всё (внешнее время через стороннюю библиотеку требует tracing). Нужна культура ревью.
  • Phase 0 не даёт реальной DI. В первой фазе Resources, ProgressSink, CheckpointStore — Protocol-стабы; плагины, объявившие их, могут получить None или NotImplementedError при доступе. Смягчается защитными warnings в реестре при несоответствии.
  • output_hash детерминизм — строгий контракт. Нарушается случайно (итерация set, сторонние нативные библиотеки без frozen-clock). Контрактный тест ловит очевидное, но полная гарантия требует ручного ревью.

Что запрещено этим ADR:

  • Чтение внешнего окружения (os.environ, Date.now(), текущий working directory) в runtime-фазе.
  • Создание ресурсов внутри плагина (httpx.AsyncClient() в конструкторе) — должны инъектироваться.
  • Прямой вызов WebSocket / SSE / файлового лога из плагина для публикации прогресса — только через ctx.progress.

Связанные ADR

  • ADR-0001PluginManifest и PluginContext, которые ADR-0003 расширяет полями execution_model, resources, unit_of_work, а также progress / checkpoint в PluginContext.
  • ADR-0002 — lifecycle-ordering согласован с инвариантами (особенно 4 — sync vs async).
  • ADR-0004execution_model фиксируется в hookspec-YAML и эмитируется в типы реализаций.

Нормативный источник

Полный текст ADR-0003 с формальными определениями каждого инварианта, контрактными тестами, Phase plan для реализации и обсуждением open questions: plugin-system-spec/adr/0003-orchestration-neutral-runtime.md.