ADR-0003 · Orchestration-neutral runtime
Статус: accepted v1.0 (2026-04-16) · Полный нормативный текст
Зачем инварианты
ADR-0001 ввёл три runtime-адаптера (in_process / mcp_stdio / mcp_http) и подразумевает принцип «плагин не знает, где он исполняется». Но какие именно ограничения делают этот принцип рабочим — не зафиксированы. Пока единственный вызывающий — ядро приложения в том же процессе, нарушения не проявляются: внешний event loop, общие синглтоны, локальные пути в файловой системе работают «сами собой».
Есть два сценария, которые это свойство сломают, если не зафиксировать инварианты заранее:
- Масштаб до 10⁴+ партиций — когда один плагин обрабатывает тысячи независимых единиц работы (per-tenant, per-repo), нужен orchestrator с очередями, retries, backfill, аудитом.
- Внешний orchestrator как kind плагина — Dagster, Celery, Airflow, k8s Job интегрируются через ту же plugin-систему. Плагин запускается в отдельном процессе/поде, прогресс стримится через абстрактный sink, checkpoint — в Postgres.
Если плагин опирается на внешний event loop или process-синглтоны, переход из in-process ядра в orchestrator-среду требует переписывания каждого плагина. ADR-0003 делает дисциплину нормативной до появления таких плагинов — чтобы не переписывать потом.
Восемь инвариантов
Полная таблица. Детали каждого — ниже.
| № | Инвариант | Что проверяется |
|---|---|---|
| 1 | Orchestration-neutrality | Плагин не читает внешнее состояние среды (event loop, синглтоны, working dir). |
| 2 | Сериализуемые границы | Входы/выходы хуков round-trip сериализуются в JSON. |
| 3 | Ресурсы через DI | HTTP-клиенты, БД-соединения, tmpdir инъектируются через PluginContext.resources. |
| 4 | Явная декларация execution_model | Плагин объявляет sync/async/thread_cpu_bound/process_cpu_bound. |
| 5 | Unit of Work для долгоживущих | Плагины на минуты+ декларируют partition_key, idempotency_mode, checkpointable. |
| 6 | Abstract Progress/Checkpoint sinks | Прогресс и checkpoint — через абстрактные интерфейсы, не WebSocket/файл напрямую. |
| 7 | Режимы идемпотентности | input_hash / output_hash / none, участвуют в оркестраторском skip-логике. |
| 8 | Детерминизм для output_hash плагинов | Время и random через ctx.clock / ctx.rng, без обращения к внешней среде. |
1. Orchestration-neutrality — запрет внешнего состояния среды
Запрещено:
- Полагаться на текущий event loop / executor / runtime-контекст в момент инициализации.
- Читать/писать process-level синглтоны (thread-locals, module-level mutable state, «глобальный клиент»).
- Использовать файловую систему за пределами путей, инъектированных через
PluginContext. - Читать переменные окружения вне
setup()(считывать и кэшировать — OK; читать «на лету» — нет, в распределённых сценариях у разных воркеров они различаются). - Полагаться на текущий working directory.
Результат: плагин одинаково работает, когда вызван из ядра в том же процессе, из orchestrator-op в отдельном процессе с новым event loop, из task-queue worker без event loop и из unit-теста в синхронном контексте.
Контрактный тест: плагин прогоняется в трёх хостах — in_process_host, forked_process_host, fresh_event_loop_host — с одинаковым input. Результаты сравниваются на equality.
2. Сериализуемые границы
Всё, что пересекает границу плагина (входы хука, выходы хука, checkpoint, progress-событие) должно быть round-trip JSON-сериализуемо.
Запрещено: live HTTP/DB-клиенты, session-объекты, connection pools, открытые файловые дескрипторы, native-library handles (C-extension), async-генераторы / корутины / потоки / каналы, замыкания, методы объектов, любые не-сериализуемые callable.
Разрешено: структурные данные, валидируемые JSON Schema (Python pydantic / TS zod / Go struct-tag), примитивы, массивы/словари из них, bytes (до лимита ≤10 МБ MVP), ссылки на внешние ресурсы (s3://..., file://...) — но не сами ресурсы.
Исключение для in_process_only плагинов: могут передавать живые объекты внутри одного вызова хука (например, streaming-плагин стримит AsyncIterator в SSE на клиент). Но checkpoint и progress всё равно сериализуемы — иначе resume после рестарта невозможен.
Контрактный тест: round-trip JSON-сериализация / десериализация / deep-equal для каждого hook-input/output. Отдельно проверяются типы, которые молча ломают JSON: datetime (ISO 8601), Decimal / BigInt (строка), set (→ массив), Enum (→ значение), UUID (→ строка).
3. Ресурсы через dependency injection
Плагин не создаёт долгоживущих ресурсов внутри себя. HTTP-клиент, БД-клиент, connection pool, blob store — всё это инъектируется хостом через PluginContext.resources.
Стандартные ресурсы (Phase 0):
| Ресурс | Назначение |
|---|---|
http_client | HTTP-клиент, предконфигурированный корпоративным CA bundle / TLS. |
tmpdir | Временная директория, cleanup по teardown(). |
blob_store | Абстрактный BlobStore (S3 / FS / в памяти). |
clock | Инъектируемый источник времени (freezable в тестах). |
rng | Инъектируемый источник случайности (seedable в тестах). |
Декларация в манифесте:
[plugin.resources]
required = ["http_client", "blob_store"]
optional = ["postgres"]
При несоответствии (required = ["postgres"], host не предоставляет) плагин помечается unavailable с внятным объяснением. При optional — просто None в ctx.resources.postgres, плагин обязан обработать.
4. Явная декларация execution_model
Плагин объявляет стиль исполнения в манифесте. Host подбирает executor.
| Значение | Семантика | Когда применять |
|---|---|---|
async | Async-aware (async/await в Python/TS, goroutines в Go); host вызывает в event loop / runtime. | I/O-bound — HTTP, БД. |
sync | Чистый синхронный, не блокирующий. | Быстрая трансформация (parse/format). |
thread_cpu_bound | Sync, CPU-bound; host ходит в thread pool / worker thread. | Re-ranking, небольшие ML-модели. |
process_cpu_bound | Sync, тяжёлый CPU; host выделяет отдельный процесс. | Большие ML-модели, тяжёлый парсинг. |
Контрактный тест: sync-плагин не делает блокирующих I/O (проверка через language-specific tooling); async-плагин не занимает CPU без явного yield.
5. Unit of Work для долгоживущих плагинов
Плагины с операциями на минуты+ декларируют единицу работы в манифесте:
[plugin.unit_of_work]
declared = true
partition_key = "tenant_id"
estimated_duration_sec = 600
idempotency_mode = "input_hash" # input_hash | output_hash | none
checkpointable = true
partition_key— ключ шардирования; orchestrator кладёт units одного partition в одну очередь.estimated_duration_sec— подсказка планировщику против блокировки очереди.idempotency_mode— см. инвариант 7.checkpointable = true— плагин умеет resume черезctx.checkpoint.
UoW-плагины вызываются оркестратором (kind = "orchestrator"), не напрямую ядром. Если внешний orchestrator не зарегистрирован — fallback на встроенный LocalExecutorOrchestrator (обязателен в каждой реализации).
6. Abstract Progress / Checkpoint sinks
Плагин публикует прогресс через абстрактный sink ctx.progress, не напрямую через WebSocket / SSE / файл лога.
Интерфейс ProgressSink:
ctx.progress.update(percent: float, message: str, payload: dict | None)
ctx.progress.event(event_type: str, payload: dict)
Реализация sink — ответственность хоста: в ядре in-process это может быть WebSocket-broadcast, в orchestrator-op — стрим в state, в unit-тесте — append в список для assertions.
Интерфейс CheckpointStore:
ctx.checkpoint.save(key: str, state: dict) → version_id
ctx.checkpoint.load(key: str) → (state: dict | None, version_id)
ctx.checkpoint.list(key_prefix: str) → list[(key, version_id)]
Контракт: save атомарен (write-then-rename для файлового backend; транзакция для БД), load после save всегда возвращает последний сохранённый state (read-after-write consistency), state — JSON-сериализуем (инвариант 2).
Контрактный тест: плагин запускается с in-memory CheckpointStore. Resume после смоделированного crash в произвольной точке прогресса даёт финальный результат, равный single-pass прогону.
7. Режимы идемпотентности
UoW-плагин декларирует idempotency_mode:
input_hash— повторный запуск с тем же input даёт тот же output. Hash входа = идентификатор результата. Orchestrator может пропустить повтор.output_hash— повторный запуск может дать новый output, но если hash совпадает с предыдущим — это один результат (для оптимизации downstream).none— каждый запуск — новый результат. Incremental rerun даёт duplicate (дедупликация на стороне потребителя).
Orchestrator перед запуском вычисляет input hash → проверяет registry на завершённый run с этим hash → если есть, пропускает и возвращает сохранённый output.
8. Детерминизм для output_hash
Плагин с idempotency_mode = "output_hash" обязан быть детерминистичным: тот же input + то же состояние ресурсов → тот же output.
Запрещено:
- Использовать внешнее
time.now()/Date.now()/time.Now()— толькоctx.clock.now(). - Использовать внешнее
random()/Math.random()/rand.Int()— толькоctx.rng.next(...). - Опираться на ordering в неупорядоченных коллекциях (Python
dictдо 3.7, Gomap, JS-объекты безMap) — нужна явная сортировка. - Полагаться на locale-зависимое поведение (сравнение строк, форматирование чисел).
Инъектируемые clock и rng позволяют тестировать с frozen-clock и seeded-rng — bit-equal output между прогонами. В production clock = системный, rng = OS — output корректный, но не bit-equal (это OK — output_hash считается по структуре результата, не по noise).
Пример: UoW-плагин с resources и checkpoint
- Python
- TypeScript
- Go
[plugin]
name = "repo-indexer"
kind = "pipeline"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
execution_model = "async"
[plugin.resources]
required = ["http_client", "blob_store", "clock"]
[plugin.unit_of_work]
declared = true
partition_key = "repo_id"
estimated_duration_sec = 1800
idempotency_mode = "input_hash"
checkpointable = true
from dagstack.plugin_system import PluginContext
class RepoIndexer:
async def setup(self, context: PluginContext) -> None:
self._http = context.resources.http_client
self._blobs = context.resources.blob_store
self._clock = context.resources.clock
self._progress = context.progress
self._checkpoint = context.checkpoint
async def index(self, repo_id: str) -> dict:
state, _ = self._checkpoint.load(key=f"repo-{repo_id}")
processed = state["processed_files"] if state else []
files = await self._list_files(repo_id)
for i, file in enumerate(f for f in files if f not in processed):
await self._process_file(file)
processed.append(file)
self._checkpoint.save(
key=f"repo-{repo_id}",
state={"processed_files": processed, "at": self._clock.now().isoformat()},
)
self._progress.update(
percent=(i + 1) / len(files),
message=f"Processed {i + 1}/{len(files)}",
payload={"current_file": file},
)
return {"repo_id": repo_id, "total": len(files)}
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
[plugin]
name = "repo-indexer"
kind = "pipeline"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
execution_model = "async"
[plugin.resources]
required = ["http_client", "blob_store", "clock"]
[plugin.unit_of_work]
declared = true
partition_key = "repo_id"
idempotency_mode = "input_hash"
checkpointable = true
package indexer
import (
"context"
"encoding/json"
"fmt"
"time"
pluginsystem "go.dagstack.dev/plugin-system"
)
// Local interfaces describe the shape the plugin needs from each named
// resource. The Go binding's Phase 1 Resources interface is open
// (`Get(name) (any, error)`); the typed catalogue lands in Phase 2, so
// plugins declare local interfaces and assert against ctx.Resources.Get.
type httpClient interface {
Get(ctx context.Context, url string) ([]byte, error)
}
type blobStore interface {
Put(ctx context.Context, key string, data []byte) error
}
type clock interface {
Now() time.Time
}
type RepoIndexer struct {
http httpClient
blobs blobStore
clock clock
progress pluginsystem.ProgressSink
checkpoint pluginsystem.CheckpointStore
}
func (p *RepoIndexer) Unwrap() any { return p }
func (p *RepoIndexer) Setup(ctx context.Context, pluginCtx *pluginsystem.PluginContext) error {
rawHTTP, err := pluginCtx.Resources.Get("http_client")
if err != nil { return err }
p.http, _ = rawHTTP.(httpClient)
rawBlobs, err := pluginCtx.Resources.Get("blob_store")
if err != nil { return err }
p.blobs, _ = rawBlobs.(blobStore)
rawClock, err := pluginCtx.Resources.Get("clock")
if err != nil { return err }
p.clock, _ = rawClock.(clock)
if p.http == nil || p.blobs == nil || p.clock == nil {
return fmt.Errorf("required resource does not satisfy expected interface")
}
p.progress = pluginCtx.Progress
p.checkpoint = pluginCtx.Checkpoint
return nil
}
func (p *RepoIndexer) Index(ctx context.Context, repoID string) (IndexResult, error) {
// ctx.checkpoint round-trips raw bytes; encode/decode JSON state at the
// boundary so the contract test can byte-compare across runs.
var state struct{ Processed []string `json:"processed_files"` }
if raw, err := p.checkpoint.Load("repo-" + repoID); err == nil && len(raw) > 0 {
_ = json.Unmarshal(raw, &state)
}
files, _ := p.listFiles(repoID)
for i, file := range filterNew(files, state.Processed) {
if err := p.processFile(file); err != nil {
return IndexResult{}, err
}
state.Processed = append(state.Processed, file)
payload, _ := json.Marshal(state)
_ = p.checkpoint.Save("repo-"+repoID, payload)
p.progress.Report(pluginsystem.Progress{
Current: i + 1,
Total: len(files),
Message: "Processed " + file,
})
}
return IndexResult{RepoID: repoID, Total: len(files)}, nil
}
Один и тот же код работает:
- из ядра in-process —
LocalExecutorOrchestratorзапускаетindex(repo_id="acme-core")в той же event loop; - из Dagster —
orchestrator-плагин для Dagster оборачиваетindexкак asset, partition =repo_id; - из Celery — та же обёртка для Celery, partition_key становится routing-key очереди;
- из unit-теста —
ctxсобирается вручную с in-memoryCheckpointStoreи frozenClock.
Новый kind: orchestrator
ADR-0003 вводит новый вид плагина — orchestrator. Он singleton, всегда in_process_only (держит состояние очереди, in-flight tracking, retry counters в памяти хоста; через MCP wire жить не может).
Хуки вида:
enqueue(plugin_name, args, idempotency_key?, partition_key?) → (unit_id, deduplicated)— поставить UoW в очередь, идемпотентно поidempotency_key.status(unit_id) → (state, started_at?, finished_at?, progress?, error?)— текущий статус.backfill(plugin_name?, partition_key?, since?, until?, states?) → (enqueued_count, skipped_count)— повторная постановка failed / expired units.
Встроенная реализация: LocalExecutorOrchestrator — обязательный in-tree плагин каждой реализации. Запускает enqueue-ed UoW в том же event loop / thread pool, очередь в памяти (или персистный JSON в Phase 0). Реальные очереди (Postgres-backed / Redis-backed) — внешние оркестраторы в Phase 1+ через Dagster-wrapper, Celery-wrapper и т.д.
Последствия
Положительные:
- Плагин одинаково работает в четырёх средах без условного кода.
- Resume долгоживущих UoW после crash — через
ctx.checkpoint, без переписывания логики плагина. - Добавление интеграции с Dagster/Celery/Airflow — это новый orchestrator-плагин, существующие плагины не трогаются.
- Unit-тесты дёшевы: альтернативные ресурсы (frozen clock, in-memory checkpoint) дают воспроизводимые прогоны.
Компромиссы:
- Цена дисциплины. Авторы плагинов должны соблюдать все восемь инвариантов. Контрактные тесты ловят основное, но не всё (внешнее время через стороннюю библиотеку требует tracing). Нужна культура ревью.
- Phase 0 не даёт реальной DI. В первой фазе
Resources,ProgressSink,CheckpointStore— Protocol-стабы; плагины, объявившие их, могут получитьNoneилиNotImplementedErrorпри доступе. Смягчается защитными warnings в реестре при несоответствии. output_hashдетерминизм — строгий контракт. Нарушается случайно (итерацияset, сторонние нативные библиотеки без frozen-clock). Контрактный тест ловит очевидное, но полная гарантия требует ручного ревью.
Что запрещено этим ADR:
- Чтение внешнего окружения (
os.environ,Date.now(), текущий working directory) в runtime-фазе. - Создание ресурсов внутри плагина (
httpx.AsyncClient()в конструкторе) — должны инъектироваться. - Прямой вызов WebSocket / SSE / файлового лога из плагина для публикации прогресса — только через
ctx.progress.
Связанные ADR
- ADR-0001 —
PluginManifestиPluginContext, которые ADR-0003 расширяет полямиexecution_model,resources,unit_of_work, а такжеprogress/checkpointвPluginContext. - ADR-0002 — lifecycle-ordering согласован с инвариантами (особенно 4 — sync vs async).
- ADR-0004 —
execution_modelфиксируется в hookspec-YAML и эмитируется в типы реализаций.
Нормативный источник
Полный текст ADR-0003 с формальными определениями каждого инварианта, контрактными тестами, Phase plan для реализации и обсуждением open questions: plugin-system-spec/adr/0003-orchestration-neutral-runtime.md.