Dispatch
When an application makes a call through plugin-system — for example, "find documents relevant to this query" — which plugin actually handles the call? The answer depends on the dispatch class of the plugin kind.
Plugin-system fixes five classes — normatively, as a closed enumeration (see ADR-0002 for the formal contract). Each plugin kind declares one of the five classes for every hook in its hookspec.
Overview of the five classes
| Class | Plugins called | Returns | Semantics |
|---|---|---|---|
| Singleton | one | the plugin's result | "pick the active one" |
| Broadcast-collect | all | array of results | "collect everything" |
| Broadcast-notify | all | nothing (void) | "let everyone know" |
| Chain | a chain in order | result of the last | "middleware" |
| Capability | one (matching) | the plugin's result | "route by input type" |
Singleton
One active plugin handles every call to the kind. Used for:
- Backend connectors — exactly one LLM provider, exactly one vector store.
- Orchestrators — one orchestrator per runtime.
- Any kind for which "several active" makes no sense.
Algorithm for picking the active plugin:
- An explicit application routing policy (per-tenant, blue/green) — if one is supplied.
- Override via the env var
DAGSTACK_ACTIVE_<KIND>=<plugin_name>. - Sort candidates by
priority descfrom the manifest; pick the highest. - On a tie in
prioritywith no override →AmbiguousPlugin, the core does not start.
- Python
- TypeScript
- Go
# Get the active plugin (singleton kinds resolve a single instance):
llm = registry.get_plugin("llm")
response = llm.complete(prompt="Hello")
# Or pick by an explicit name (when two LLMs are registered):
llm = registry.get_plugin("llm", name="openai_compatible")
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
disp := pluginsystem.NewDispatchSingleton[LLMPlugin](reg, "llm")
llm, err := disp.Resolve()
if err != nil {
return err
}
resp, _ := llm.Complete(ctx, "Hello")
Broadcast-collect
Every registered plugin of the kind is called and the results are collected into an array. Used for:
- Tool catalogues (each plugin returns its own list of tools) — uniform aggregation.
- Metric exporters (each plugin emits its own metrics).
- Capability providers (plugins declare what they can do — the collector gathers it).
Order: by priority desc, ties broken by name. Error policy: fail-fast by default (a single plugin's failure aborts the collect) — overridable to best_effort in the kind's hookspec metadata.
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastCollectDispatcher
dispatcher = BroadcastCollectDispatcher(registry)
results, errors = dispatcher.dispatch(
"metric_exporter",
"on_event",
ctx,
event="request_finished",
duration_ms=42,
)
# results = [report_from_prometheus, report_from_statsd, ...]
if errors:
for plugin_name, original in errors.errors:
ctx.logger.warning("plugin %s failed: %s", plugin_name, original)
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
collect := pluginsystem.NewDispatchBroadcastCollect(reg, "metric_exporter")
results := collect.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) (any, error) {
exporter := p.Unwrap().(MetricExporter)
return exporter.OnEvent(ctx, "request_finished", 42)
})
for _, r := range results {
if r.Err != nil {
slog.Warn("plugin failed", "name", r.PluginName, "err", r.Err)
}
}
Broadcast-notify
Fire-and-forget — every plugin is notified sequentially in priority desc order; return values are not collected, and a single plugin's failure is logged but not propagated. Used for:
- Lifecycle events (
on_started,on_request,on_error). - Telemetry events without aggregation.
- Audit hooks.
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastNotifyDispatcher
notifier = BroadcastNotifyDispatcher(registry)
notifier.dispatch(
"event_listener",
"on_event",
ctx,
event_type="user_logged_in",
user_id="acme-42",
)
# No return value; plugins are called sequentially, exceptions are logged.
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
notify := pluginsystem.NewDispatchBroadcastNotify(reg, "event_listener")
notify.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
listener := p.Unwrap().(EventListener)
return listener.OnEvent(ctx, "user_logged_in", "acme-42")
})
Chain
A sequential chain: output[N] becomes input[N+1]. Strict order by priority desc. Used for:
- Middleware — query rewriting, post-processing, result re-ranking.
- Pipeline steps that transform data.
- Horizontal middleware (governance, quota — see ADR-0005).
Aborting the chain — return an explicit sentinel value (STOP_CHAIN in Python) or raise an exception; subsequent plugins are not invoked.
- Python
- TypeScript
- Go
from dagstack.plugin_system import ChainDispatcher, STOP_CHAIN
chain = ChainDispatcher(registry)
# Each plugin receives the previous plugin's output via `initial_value=`:
rewritten = chain.dispatch(
"query_rewriter",
"rewrite",
ctx,
initial_value="how does dagstack work",
)
# query_rewriter #1 → normaliser → query_rewriter #2 → synonymiser → ...
# A plugin returning STOP_CHAIN halts the chain and that value is returned.
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
chain := pluginsystem.NewDispatchChain(reg, "query_rewriter")
final, err := chain.Dispatch(ctx, "how does dagstack work",
func(ctx context.Context, p pluginsystem.Plugin, v any) (any, error) {
rewriter := p.Unwrap().(QueryRewriter)
return rewriter.Rewrite(ctx, v.(string))
},
)
if err != nil {
return err
}
// Returning pluginsystem.StopChain from a step halts the chain;
// pluginsystem.IsStopChain(final) reports whether the chain stopped early.
_ = final
Important: the priority >= 1000 range is reserved for horizontal middleware (governance, quota, observability) — see ADR-0005. Business plugins use priority < 1000.
Capability
Routing by input type — several plugins of the kind, each handling its own subset of inputs. Exactly one plugin receives the request (chosen by the manifest's declared supports_* fields).
Scenario: a file indexer
A consumer application has a code indexer that walks a repository and needs a specialised handler per file type: an AST parser for Python files, a heading extractor for Markdown, a schema detector for JSON, a hash tagger for binary artefacts. All of them are plugins of the same kind file_indexer, dispatched by capability.
Each specialised plugin declares what it handles:
[plugin]
name = "python-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".py"]
priority = 50
[plugin]
name = "markdown-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".md", ".mdx"]
priority = 50
[plugin]
name = "binary-hasher"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
fallback = true # picks up whatever the others did not recognise
priority = 0
In the indexer's code — one dispatcher call per file:
- Python
- TypeScript
- Go
from dagstack.plugin_system import CapabilityDispatcher
capability = CapabilityDispatcher(registry)
for file_path in repo_files:
indexer = capability.dispatch(
"file_indexer",
"index",
ctx,
payload={"extension": file_path.suffix, "path": str(file_path)},
)
index_entry = indexer.index(file_path.read_bytes())
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
cap := pluginsystem.NewDispatchCapability(reg, "file_indexer")
for _, file := range repoFiles {
ext := filepath.Ext(file)
p, err := cap.Resolve(func(p pluginsystem.Plugin) bool {
return p.Unwrap().(FileIndexer).Supports(ext)
})
if err != nil {
return err
}
data, _ := os.ReadFile(file)
entry, _ := p.Unwrap().(FileIndexer).Index(ctx, data)
_ = entry
}
Adding a new file type (.go, .rs) = a new plugin with supports_extensions = [".go"]. The indexer code is untouched; binary-hasher will not receive the file because the new plugin advertises a more specific capability. If an unknown extension shows up, binary-hasher (fallback) still handles it correctly.
Fallback plugin — exactly one plugin per kind may declare fallback = true. Contract: the fallback must handle any valid input without raising.
How to choose a dispatch class for a new kind
| Scenario | Class |
|---|---|
| One active per kind (LLM, vector store, orchestrator) | singleton |
| Collect a list/catalogue from everyone (tools, metrics, capabilities) | broadcast_collect |
| Event with N independent subscribers (lifecycle, telemetry) | broadcast_notify |
| Middleware that transforms data (rewrite, post-process) | chain |
| Implementations specialised by input type (ext/lang/mime) | capability |
When the class cannot be changed
A dispatch class is fixed in the kind's hookspec and effectively baked into the contract. Changing the class = breaking change (a major bump of kind_api_version) — every plugin of the kind must be rewritten against the new semantics.
When designing a new kind, choose the class carefully: if you start with singleton and later add alternative implementations for different inputs, switching to capability will require migration. An alternative: design for capability from the start with a single implementation that declares fallback = true and supports_* = all cases — a future move adds specialised plugins without a breaking change.
See also
- ADR-0002: Hook invocation semantics — the normative contract for each class.
- Plugin manifest — the
priority,supports_*, andfallbackfields. - Guide: Writing a plugin — a practical walkthrough with contract tests.