Runtime-инварианты
Plugin-system гарантирует, что один и тот же плагин работает одинаково:
- в приложении in-process (FastAPI, Django, обычный Python/Node/Go-сервис);
- в orchestrator-op (Dagster asset, Airflow task);
- в task-queue worker (Celery, RQ);
- в unit-тесте (с mock-ресурсами).
Это свойство не случайное. Оно возникает только если плагин соблюдает восемь runtime-инвариантов, зафиксированных в ADR-0003. Если хоть один нарушен — плагин перестаёт работать при переходе из одной среды в другую.
Страница-обзор, как применять каждый инвариант на практике. Нормативный контракт — в ADR-0003.
Сводная таблица
| № | Инвариант | Как проверяется |
|---|---|---|
| 1 | Orchestration-neutrality (запрет внешнего состояния среды) | Автоматический тест в трёх хостах. |
| 2 | Сериализуемые границы | Round-trip JSON serialize для каждого входа/выхода. |
| 3 | Ресурсы через DI | Проверка, что плагин не создаёт HTTP/DB-клиентов сам. |
| 4 | Явный execution_model | Декларация в манифесте; runtime-детектор блокирующих I/O в sync-плагинах. |
| 5 | Unit of Work для долгоживущих | Декларация в манифесте. |
| 6 | Абстрактные ProgressSink / CheckpointStore | Отсутствие прямых импортов WebSocket / файлового I/O. |
| 7 | Режимы идемпотентности (input_hash / output_hash / none) | Декларация в манифесте + повторный запуск с тем же input. |
| 8 | Детерминизм для output_hash | Двойной запуск с frozen-clock и seeded-rng, bit-equal проверка. |
1. Orchestration-neutrality
Правило: плагин не опирается на внешнее состояние среды хоста — текущий event loop, process-синглтоны, locale, working directory.
Что запрещено:
- Полагаться на
asyncio.get_event_loop()/Tokio::current()/ эквиваленты в конструкторе илиsetup(). - Читать/писать process-level синглтоны (thread-locals, module-mutable state, «глобальный клиент»).
- Использовать файловую систему за пределами
ctx.resources.tmpdir. - Читать
os.environв runtime-фазе (читать вsetup()и кэшировать — OK). - Полагаться на
os.getcwd().
Как исправить нарушения:
- Python
- TypeScript
- Go
# ✗ Bad — capturing an external event loop.
class BadPlugin:
def __init__(self):
self.loop = asyncio.get_event_loop() # breaks inside an orchestrator
# ✓ Good — never hold a reference to the loop.
class GoodPlugin:
def __init__(self):
pass # lazy-init; everything happens in setup()
async def setup(self, ctx):
# asyncio.get_running_loop() inside an async function returns the current run's loop — fine.
pass
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
// ✗ Bad — global state.
var cachedPool *sql.DB
// ✓ Good — through Resources DI.
type GoodPlugin struct {
db DBClient // user-defined interface
}
func (p *GoodPlugin) Unwrap() any { return p }
func (p *GoodPlugin) Setup(ctx context.Context, pluginCtx *pluginsystem.PluginContext) error {
raw, err := pluginCtx.Resources.Get("postgres")
if err != nil {
return err
}
p.db = raw.(DBClient)
return nil
}
2. Сериализуемые границы
Правило: всё, что пересекает границу плагина (вход хука, выход хука, checkpoint, progress-событие), должно быть JSON-сериализуемо.
Что нельзя передавать:
- Живые HTTP/DB-клиенты, session-объекты, connection pools.
- Открытые файловые дескрипторы.
- Async-генераторы, корутины, потоки, каналы.
- Замыкания и методы объектов.
- C-extension native handles.
Что можно:
- Структурные данные, описуемые JSON Schema.
- Примитивы (строки, числа, bool, null).
- Массивы и словари из них.
- Bytes (до ≤10 МБ в v1.0 MVP).
- Ссылки на внешние ресурсы в виде URL:
s3://bucket/key,file://path.
Типичные «молчаливо ломают JSON» случаи:
| Тип | Как сериализовать |
|---|---|
datetime | ISO 8601 строка ("2026-01-01T12:00:00Z") |
Decimal / BigInt | строка |
set | массив |
Enum | value enum |
UUID | строка ("a1b2c3...") |
Исключение для in_process_only плагинов: могут передавать живые объекты (например, stream-объекты) внутри одного вызова хука. Но checkpoint и progress всё равно сериализуемы.
3. Ресурсы через DI
Правило: HTTP-клиенты, БД-соединения, connection pools — инъектируются через ctx.resources, не создаются внутри плагина.
Детали — на отдельной странице «Ресурсы (Resources DI)».
4. Явный execution_model
Правило: плагин декларирует в манифесте стиль исполнения. Host подбирает подходящий executor.
execution_model = "async"
# или
execution_model = "sync"
execution_model = "thread_cpu_bound"
execution_model = "process_cpu_bound"
Последствия:
sync-плагин не должен блокироваться на I/O — host проверит через специализированный тулинг (в Python —blockbuster).async-плагин не должен занимать CPU без явного yield — блокирует event loop.thread_cpu_bound-плагин ходит в thread-pool, не в event-loop.process_cpu_bound— отдельный процесс (для очень тяжёлых задач).
5. Unit of Work для долгоживущих плагинов
Правило: плагин, чья операция занимает минуты+, декларирует параметры UoW в манифесте:
[plugin.unit_of_work]
declared = true
partition_key = "tenant_id"
estimated_duration_sec = 600
idempotency_mode = "input_hash"
checkpointable = true
partition_key— ключ шардирования; orchestrator параллелит units по этому ключу.idempotency_mode— см. инвариант 7.checkpointable = true— плагин сохраняет прогресс черезctx.checkpointи умеет resume.
UoW-плагины вызываются оркестратором (kind = "orchestrator"), не напрямую приложением.
6. Абстрактные ProgressSink / CheckpointStore
Правило: плагин публикует прогресс через абстрактный ctx.progress, не напрямую через WebSocket / SSE / файл лога. Аналогично ctx.checkpoint для сохранения/восстановления состояния.
- Python
- TypeScript
- Go
# ✗ Bad — a direct dependency on WebSocket.
class BadPlugin:
async def process(self):
await broadcast_to_websocket({"event": "progress", "percent": 50})
# ✓ Good — through the abstract sink.
class GoodPlugin:
async def process(self):
self._progress.update(percent=0.5, message="Halfway there")
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
func (p *GoodPlugin) Process() error {
p.progress.Update(0.5, "Halfway there", nil)
return nil
}
Host подставляет разные реализации sink:
- В web-приложении — WebSocket-broadcast.
- В orchestrator-op — stream в state оркестратора.
- В unit-тесте — append в массив для assertions.
7. Режимы идемпотентности
Правило: UoW-плагин декларирует idempotency_mode:
| Режим | Семантика | Когда применять |
|---|---|---|
input_hash | Повтор с тем же input → тот же output. Orchestrator пропускает повтор. | Индексация, обогащение данных. |
output_hash | Повтор может дать новый output, но если hash совпал — это один результат. | Идемпотентное обновление downstream по hash результата. |
none | Каждый запуск — новый результат. Дедупликация — ответственность consumer. | Notifications, fire-and-forget. |
8. Детерминизм для output_hash
Правило: плагин с idempotency_mode = "output_hash" обязан быть детерминистичным — тот же input + тот же state ресурсов → тот же output.
Что запрещено:
time.now()/Date.now()/time.Now()— толькоctx.resources.clock.now().random()/Math.random()/rand.Int()— толькоctx.resources.rng.next(...).- Итерация по
setбез сортировки. - Итерация по неупорядоченному
dict/map. - Locale-зависимое форматирование.
Как проверить детерминизм: прогоните плагин дважды с frozen-clock и seeded-rng, сравните outputs byte-by-byte. Если различаются — какой-то источник недетерминизма не через DI.
Как контрактная рамка тестов проверяет инварианты
В Python-биндинге (dagstack-plugin-system >= 0.2) есть встроенная рамка контрактных тестов. Эквивалентные API в TypeScript / Go реализациях появятся при их выходе в stable. Ниже — Python-пример:
from dagstack.plugin_system import (
run_contract_suite,
assert_no_ambient_state,
assert_json_serializable_boundaries,
assert_lifecycle_clean,
assert_deterministic,
assert_manifest_valid,
ALL_CHECKS,
)
def test_my_plugin_contracts():
manifest = load_manifest("plugins/my-plugin/dagstack.toml")
result = run_contract_suite(
plugin_class=MyPlugin,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()
Каждая проверка покрывает один или несколько инвариантов:
| Проверка | Инварианты |
|---|---|
assert_manifest_valid | 4, 5, 7 (декларации в манифесте) |
assert_no_ambient_state | 1 |
assert_json_serializable_boundaries | 2 |
assert_lifecycle_clean | 3 (Resources DI — не утекают ресурсы) |
assert_deterministic | 8 |
Детали запуска — на странице Руководство: Тестирование плагинов.
См. также
- ADR-0003: Orchestration-neutral runtime — нормативный контракт каждого инварианта.
- Ресурсы (Resources DI) — инвариант 3 в деталях.
- Жизненный цикл плагина — где именно применяются инварианты в lifecycle.
- Руководство: Написать плагин — walkthrough с запуском контрактных тестов.