ADR-0003 · Orchestration-neutral runtime
Status: accepted v1.0 (2026-04-16) · Full normative text
Why these invariants
ADR-0001 introduced three runtime adapters (in_process / mcp_stdio / mcp_http) and implies the principle that "a plugin does not know where it is executed." The concrete constraints that make this principle workable, however, are not pinned down. As long as the only caller is the application core in the same process, violations stay invisible: an ambient event loop, shared singletons, and local filesystem paths all just work.
Two scenarios will break this property unless the invariants are fixed up front:
- Scale to 10⁴+ partitions — when a single plugin processes thousands of independent units of work (per-tenant, per-repo), an orchestrator with queues, retries, backfill, and audit becomes necessary.
- External orchestrator as a plugin kind — Dagster, Celery, Airflow, k8s Job all integrate through the same plugin-system. The plugin runs in a separate process / pod, progress is streamed through an abstract sink, and checkpoints land in Postgres.
If a plugin relies on an ambient event loop or process-level singletons, moving from the in-process core to an orchestrator environment forces every plugin to be rewritten. ADR-0003 makes the discipline normative before such plugins appear — so they need not be rewritten later.
Eight invariants
The full table. Details for each follow below.
| # | Invariant | What it checks |
|---|---|---|
| 1 | Orchestration-neutrality | The plugin does not read ambient host state (event loop, singletons, working directory). |
| 2 | Serialisable boundaries | Hook inputs and outputs round-trip through JSON. |
| 3 | Resources via DI | HTTP clients, DB connections, tmpdir are injected through PluginContext.resources. |
| 4 | Explicit execution_model declaration | The plugin declares sync / async / thread_cpu_bound / process_cpu_bound. |
| 5 | Unit of Work for long-running plugins | Plugins running for minutes+ declare partition_key, idempotency_mode, checkpointable. |
| 6 | Abstract progress / checkpoint sinks | Progress and checkpoints travel through abstract interfaces, not WebSocket / file directly. |
| 7 | Idempotency modes | input_hash / output_hash / none, used by orchestrator skip logic. |
| 8 | Determinism for output_hash plugins | Time and randomness flow through ctx.clock / ctx.rng, never through the ambient host. |
1. Orchestration-neutrality — ban on ambient host state
Forbidden:
- Relying on the current event loop / executor / runtime context at initialisation time.
- Reading or writing process-level singletons (thread-locals, module-level mutable state, "the global client").
- Using the filesystem outside paths injected through
PluginContext. - Reading environment variables outside
setup()(read and cache — OK; read on the fly — no, in distributed scenarios different workers see different values). - Relying on the current working directory.
Result: the plugin behaves identically when invoked from the in-process core, from an orchestrator op in a separate process with a fresh event loop, from a task-queue worker with no event loop, and from a unit test in a sync context.
Contract test: the plugin runs in three hosts — in_process_host, forked_process_host, fresh_event_loop_host — with identical input. Results MUST compare equal.
2. Serialisable boundaries
Anything that crosses a plugin boundary (hook input, hook output, checkpoint, progress event) MUST be round-trip JSON-serialisable.
Forbidden: live HTTP/DB clients, session objects, connection pools, open file descriptors, native-library handles (C extensions), async generators / coroutines / threads / channels, closures, bound methods, any non-serialisable callables.
Allowed: structured data validatable through JSON Schema (Python pydantic / TS zod / Go struct tag), primitives, arrays / dictionaries of those, bytes (up to the ≤10MB MVP limit), references to external resources (s3://..., file://...) — but not the resources themselves.
Exception for in_process_only plugins: live objects MAY be passed within a single hook invocation (for example, a streaming plugin emits an AsyncIterator to SSE on the client). Checkpoints and progress remain serialisable regardless — otherwise resume after a restart is impossible.
Contract test: round-trip JSON serialise / deserialise / deep-equal for every hook input/output. Special checks for types that silently break JSON: datetime (ISO 8601), Decimal / BigInt (string), set (→ array), Enum (→ value), UUID (→ string).
3. Resources via dependency injection
A plugin MUST NOT create long-lived resources internally. HTTP clients, DB clients, connection pools, blob stores — all of these are injected by the host through PluginContext.resources.
Standard resources (Phase 0):
| Resource | Purpose |
|---|---|
http_client | HTTP client preconfigured with the corporate CA bundle / TLS. |
tmpdir | Temporary directory; cleaned up on teardown(). |
blob_store | Abstract BlobStore (S3 / FS / in-memory). |
clock | Injectable time source (freezable in tests). |
rng | Injectable randomness source (seedable in tests). |
Manifest declaration:
[plugin.resources]
required = ["http_client", "blob_store"]
optional = ["postgres"]
If the contract is not met (required = ["postgres"], host does not provide it), the plugin is marked unavailable with a clear explanation. For optional — ctx.resources.postgres is simply None, and the plugin MUST handle that.
4. Explicit execution_model declaration
The plugin declares its execution style in the manifest. The host picks the executor.
| Value | Semantics | When to use |
|---|---|---|
async | Async-aware (async/await in Python/TS, goroutines in Go); the host calls into the event loop / runtime. | I/O-bound — HTTP, DB. |
sync | Pure synchronous, non-blocking. | Fast transformations (parse/format). |
thread_cpu_bound | Sync, CPU-bound; the host runs it on a thread pool / worker thread. | Re-ranking, small ML models. |
process_cpu_bound | Sync, heavy CPU; the host allocates a separate process. | Large ML models, heavy parsing. |
Contract test: a sync plugin MUST NOT perform blocking I/O (verified through language-specific tooling); an async plugin MUST NOT occupy the CPU without an explicit yield.
5. Unit of Work for long-running plugins
Plugins with operations that take minutes+ declare a unit of work in the manifest:
[plugin.unit_of_work]
declared = true
partition_key = "tenant_id"
estimated_duration_sec = 600
idempotency_mode = "input_hash" # input_hash | output_hash | none
checkpointable = true
partition_key— sharding key; the orchestrator places units of one partition into a single queue.estimated_duration_sec— a hint to the scheduler against head-of-line blocking.idempotency_mode— see invariant 7.checkpointable = true— the plugin can resume throughctx.checkpoint.
UoW plugins are invoked by the orchestrator (kind = "orchestrator"), not directly by the core. If no external orchestrator is registered, execution falls back to the built-in LocalExecutorOrchestrator (mandatory in every implementation).
6. Abstract progress / checkpoint sinks
A plugin publishes progress through the abstract sink ctx.progress, never directly through a WebSocket / SSE / log file.
ProgressSink interface:
ctx.progress.update(percent: float, message: str, payload: dict | None)
ctx.progress.event(event_type: str, payload: dict)
The sink implementation is the host's responsibility: in the in-process core it MAY be a WebSocket broadcast, in an orchestrator op a stream into state, in a unit test an append to a list for assertions.
CheckpointStore interface:
ctx.checkpoint.save(key: str, state: dict) → version_id
ctx.checkpoint.load(key: str) → (state: dict | None, version_id)
ctx.checkpoint.list(key_prefix: str) → list[(key, version_id)]
Contract: save is atomic (write-then-rename for a file-backed store; transactional for a database), load after save always returns the latest saved state (read-after-write consistency), state is JSON-serialisable (invariant 2).
Contract test: the plugin runs against an in-memory CheckpointStore. Resume after a simulated crash at an arbitrary point in progress yields a final result equal to a single-pass run.
7. Idempotency modes
A UoW plugin declares idempotency_mode:
input_hash— a re-run with the same input produces the same output. The hash of the input is the result identifier. The orchestrator MAY skip the repeat.output_hash— a re-run MAY produce a new output, but if the hash matches the previous one, it is treated as a single result (downstream optimisation).none— every run is a new result. An incremental rerun yields a duplicate (deduplication is the consumer's responsibility).
Before launching a unit, the orchestrator computes the input hash → checks the registry for a completed run with that hash → if one exists, skips it and returns the saved output.
8. Determinism for output_hash
A plugin with idempotency_mode = "output_hash" MUST be deterministic: the same input + the same resource state → the same output.
Forbidden:
- Using ambient
time.now()/Date.now()/time.Now()— onlyctx.clock.now(). - Using ambient
random()/Math.random()/rand.Int()— onlyctx.rng.next(...). - Relying on iteration order in unordered collections (Python
dictbefore 3.7, Gomap, JS objects withoutMap) — explicit sorting is required. - Relying on locale-dependent behaviour (string comparison, number formatting).
Injectable clock and rng allow testing with a frozen clock and seeded rng — bit-equal output between runs. In production clock is the system clock, rng is the OS source — output is correct but not bit-equal between runs (this is fine — output_hash is computed from the structure of the result, not from the noise).
Example: a UoW plugin with resources and checkpoint
- Python
- TypeScript
- Go
[plugin]
name = "repo-indexer"
kind = "pipeline"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
execution_model = "async"
[plugin.resources]
required = ["http_client", "blob_store", "clock"]
[plugin.unit_of_work]
declared = true
partition_key = "repo_id"
estimated_duration_sec = 1800
idempotency_mode = "input_hash"
checkpointable = true
from dagstack.plugin_system import PluginContext
class RepoIndexer:
async def setup(self, context: PluginContext) -> None:
self._http = context.resources.http_client
self._blobs = context.resources.blob_store
self._clock = context.resources.clock
self._progress = context.progress
self._checkpoint = context.checkpoint
async def index(self, repo_id: str) -> dict:
state, _ = self._checkpoint.load(key=f"repo-{repo_id}")
processed = state["processed_files"] if state else []
files = await self._list_files(repo_id)
for i, file in enumerate(f for f in files if f not in processed):
await self._process_file(file)
processed.append(file)
self._checkpoint.save(
key=f"repo-{repo_id}",
state={"processed_files": processed, "at": self._clock.now().isoformat()},
)
self._progress.update(
percent=(i + 1) / len(files),
message=f"Processed {i + 1}/{len(files)}",
payload={"current_file": file},
)
return {"repo_id": repo_id, "total": len(files)}
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
[plugin]
name = "repo-indexer"
kind = "pipeline"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
execution_model = "async"
[plugin.resources]
required = ["http_client", "blob_store", "clock"]
[plugin.unit_of_work]
declared = true
partition_key = "repo_id"
idempotency_mode = "input_hash"
checkpointable = true
package indexer
import (
"context"
"encoding/json"
"fmt"
"time"
pluginsystem "go.dagstack.dev/plugin-system"
)
// Local interfaces describe the shape the plugin needs from each named
// resource. The Go binding's Phase 1 Resources interface is open
// (`Get(name) (any, error)`); the typed catalogue lands in Phase 2, so
// plugins declare local interfaces and assert against ctx.Resources.Get.
type httpClient interface {
Get(ctx context.Context, url string) ([]byte, error)
}
type blobStore interface {
Put(ctx context.Context, key string, data []byte) error
}
type clock interface {
Now() time.Time
}
type RepoIndexer struct {
http httpClient
blobs blobStore
clock clock
progress pluginsystem.ProgressSink
checkpoint pluginsystem.CheckpointStore
}
func (p *RepoIndexer) Unwrap() any { return p }
func (p *RepoIndexer) Setup(ctx context.Context, pluginCtx *pluginsystem.PluginContext) error {
rawHTTP, err := pluginCtx.Resources.Get("http_client")
if err != nil { return err }
p.http, _ = rawHTTP.(httpClient)
rawBlobs, err := pluginCtx.Resources.Get("blob_store")
if err != nil { return err }
p.blobs, _ = rawBlobs.(blobStore)
rawClock, err := pluginCtx.Resources.Get("clock")
if err != nil { return err }
p.clock, _ = rawClock.(clock)
if p.http == nil || p.blobs == nil || p.clock == nil {
return fmt.Errorf("required resource does not satisfy expected interface")
}
p.progress = pluginCtx.Progress
p.checkpoint = pluginCtx.Checkpoint
return nil
}
func (p *RepoIndexer) Index(ctx context.Context, repoID string) (IndexResult, error) {
// ctx.checkpoint round-trips raw bytes; encode/decode JSON state at the
// boundary so the contract test can byte-compare across runs.
var state struct{ Processed []string `json:"processed_files"` }
if raw, err := p.checkpoint.Load("repo-" + repoID); err == nil && len(raw) > 0 {
_ = json.Unmarshal(raw, &state)
}
files, _ := p.listFiles(repoID)
for i, file := range filterNew(files, state.Processed) {
if err := p.processFile(file); err != nil {
return IndexResult{}, err
}
state.Processed = append(state.Processed, file)
payload, _ := json.Marshal(state)
_ = p.checkpoint.Save("repo-"+repoID, payload)
p.progress.Report(pluginsystem.Progress{
Current: i + 1,
Total: len(files),
Message: "Processed " + file,
})
}
return IndexResult{RepoID: repoID, Total: len(files)}, nil
}
The same code runs:
- from the in-process core —
LocalExecutorOrchestratorinvokesindex(repo_id="acme-core")on the same event loop; - from Dagster — the Dagster
orchestratorplugin wrapsindexas an asset, partition =repo_id; - from Celery — the same wrapper for Celery, where
partition_keybecomes the routing key of the queue; - from a unit test —
ctxis assembled by hand with an in-memoryCheckpointStoreand a frozenClock.
New kind: orchestrator
ADR-0003 introduces a new plugin kind — orchestrator. It is a singleton, always in_process_only (it holds queue state, in-flight tracking, and retry counters in host memory; it cannot live behind an MCP wire).
Hooks:
enqueue(plugin_name, args, idempotency_key?, partition_key?) → (unit_id, deduplicated)— place a UoW into the queue, idempotent onidempotency_key.status(unit_id) → (state, started_at?, finished_at?, progress?, error?)— current status.backfill(plugin_name?, partition_key?, since?, until?, states?) → (enqueued_count, skipped_count)— re-enqueue failed / expired units.
Built-in implementation: LocalExecutorOrchestrator — a mandatory in-tree plugin in every implementation. It runs enqueued UoW on the same event loop / thread pool, with the queue held in memory (or as persistent JSON in Phase 0). Real queues (Postgres-backed / Redis-backed) are external orchestrators in Phase 1+ via a Dagster wrapper, a Celery wrapper, and so on.
Consequences
Positive:
- The plugin behaves identically across four environments with no conditional code.
- Resume of long-running UoW after a crash flows through
ctx.checkpoint, with no rewrite of the plugin's logic. - Adding a Dagster / Celery / Airflow integration is a new orchestrator plugin; existing plugins are not touched.
- Unit tests are cheap: alternative resources (frozen clock, in-memory checkpoint) yield reproducible runs.
Trade-offs:
- The discipline tax. Plugin authors MUST honour all eight invariants. Contract tests catch the basics, but not everything (ambient time accessed through a third-party library requires tracing). A review culture is required.
- Phase 0 does not deliver real DI. In the first phase
Resources,ProgressSink, andCheckpointStoreare Protocol stubs; plugins that declare them MAY receiveNoneorNotImplementedErroron access. Mitigated by defensive warnings in the registry on a contract mismatch. output_hashdeterminism is a strict contract. It is broken accidentally (iteration over aset, third-party native libraries unaware of a frozen clock). The contract test catches the obvious cases, but a full guarantee requires manual review.
What this ADR forbids:
- Reading the ambient environment (
os.environ,Date.now(), the current working directory) in the runtime phase. - Creating resources inside the plugin (
httpx.AsyncClient()in the constructor) — they MUST be injected. - Calling WebSocket / SSE / a log file directly from a plugin to publish progress — only through
ctx.progress.
Related ADRs
- ADR-0001 —
PluginManifestandPluginContext, which ADR-0003 extends with the fieldsexecution_model,resources,unit_of_work, plusprogress/checkpointinPluginContext. - ADR-0002 — lifecycle ordering aligned with the invariants (especially 4 — sync vs async).
- ADR-0004 —
execution_modelis fixed in the hookspec YAML and emitted into the implementation types.
Normative source
The full text of ADR-0003 with formal definitions of every invariant, contract tests, an implementation phase plan, and a discussion of open questions: plugin-system-spec/adr/0003-orchestration-neutral-runtime.md.