Chain — a sequential plugin chain
chain — plugins line up in a chain by priority desc; the input of each plugin is the output of the previous one. Typical uses: request middleware, sequential post-processing, context-aware rewriting.
Scenario: rewriting a query before vector search
Before vector search, the user's query goes through several stages: normalisation, synonyms, expansion with keywords from the tenant context.
The kind's hookspec
kind: query_rewriter
kind_api_version: 1.0.0
description: Query transformer applied before search.
hooks:
- name: rewrite
dispatch: chain
description: Transform the query text.
input_schema: schemas/query.json
output_schema: schemas/query.json # input == output — data flows through the chain
mcp_exposed: true
Three plugins in the chain
- Python
- TypeScript
- Go
[plugin]
name = "normalizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 100 # first — basic normalisation
import re
class Normalizer:
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
text = query["text"].lower().strip()
text = re.sub(r"\s+", " ", text)
return {**query, "text": text}
[plugin]
name = "synonymizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50 # after normalisation
SYNONYMS = {"launch": "start", "setup": "configuration"}
class Synonymizer:
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
text = " ".join(
SYNONYMS.get(word, word) for word in query["text"].split()
)
return {**query, "text": text}
[plugin]
name = "tenant-enricher"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 10 # last — context enrichment
class TenantEnricher:
async def setup(self, ctx):
self._tenant_keywords = ctx.registry.resource_registry.get("tenant_keywords")
def rewrite(self, ctx, query):
tenant_id = query.get("tenant_id")
if tenant_id is None:
return query
keywords = self._tenant_keywords.get(tenant_id, [])
if keywords:
return {**query, "text": query["text"] + " " + " ".join(keywords)}
return query
export class Normalizer {
rewrite(query: Query): Query {
return {
...query,
text: query.text.toLowerCase().trim().replace(/\s+/g, " "),
};
}
}
package normalizer
import (
"context"
"regexp"
"strings"
)
var spaceRe = regexp.MustCompile(`\s+`)
type Normalizer struct{}
func (p *Normalizer) Rewrite(ctx context.Context, q Query) (Query, error) {
q.Text = spaceRe.ReplaceAllString(strings.TrimSpace(strings.ToLower(q.Text)), " ")
return q, nil
}
Running the chain
- Python
- TypeScript
- Go
from dagstack.plugin_system import ChainDispatcher
dispatcher = ChainDispatcher(registry)
original = {"text": " How to LAUNCH dagstack? ", "tenant_id": "acme-corp"}
rewritten = dispatcher.dispatch(
"query_rewriter", "rewrite", ctx, initial_value=original,
)
# After normalizer: {"text": "how to launch dagstack?", "tenant_id": "..."}
# After synonymizer: {"text": "how to start dagstack?", ...}
# After tenant-enricher: {"text": "how to start dagstack? microservices python", ...}
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
pluginsystem "go.dagstack.dev/plugin-system"
)
chain := pluginsystem.NewDispatchChain(reg, "query_rewriter")
final, err := chain.Dispatch(ctx, Query{
Text: " How to LAUNCH dagstack? ",
TenantID: "acme-corp",
}, func(ctx context.Context, p pluginsystem.Plugin, v any) (any, error) {
return p.Unwrap().(QueryRewriter).Rewrite(ctx, v.(Query))
})
if err != nil {
return err
}
rewritten := final.(Query)
Dispatch walks the plugins of the kind in priority desc order, threading the value through each step. The closure receives the previous step's output and returns the next step's input. Stop early by returning pluginsystem.StopChain from the closure.
Aborting the chain
A plugin can abort the chain early — by returning a sentinel or by raising an exception:
- Python
- TypeScript
- Go
from dagstack.plugin_system import STOP_CHAIN
class Blacklist:
"""Blocks queries containing forbidden words — returns STOP_CHAIN."""
BANNED = {"delete_database", "rm -rf"}
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
if any(bad in query["text"].lower() for bad in self.BANNED):
return STOP_CHAIN # subsequent plugins are not invoked
return query
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package blacklist
import (
"context"
pluginsystem "go.dagstack.dev/plugin-system"
)
type Blacklist struct{}
// The signature returns `any` so the chain dispatcher can short-circuit on
// `pluginsystem.StopChain`. The closure passed to `chain.Dispatch` returns
// this value verbatim — the dispatcher detects the sentinel via
// `pluginsystem.IsStopChain` and stops the chain.
func (p *Blacklist) Rewrite(ctx context.Context, q Query) (any, error) {
if containsBanned(q.Text) {
return pluginsystem.StopChain, nil
}
return q, nil
}
Aborting via an exception: the hook raises an exception, the chain stops, and the error propagates upward through the dispatcher.
Middleware pattern: horizontal extensions
The chain is the canonical mechanism for horizontal middleware (see ADR-0005). Governance, quota, and observability are plugged in as chain plugins with priority >= 1000 and wrap every hook of every kind:
[plugin]
name = "governance"
kind = "horizontal_middleware"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 1000 # horizontal range
[plugin.chain_wrap]
kinds = ["*"]
hooks = ["*"]
The range [1000, ∞) is reserved for middleware. Business plugins keep priority < 1000.
Limitations
- Chain hooks must be RPC-safe (compatible with
mcp_exposed = true). Streams and complex cyclic objects are not supported; the contract test verifies this. - The stage-N output schema must be compatible with the stage-N+1 input schema. In our example, input and output match (
schemas/query.json). When the schemas diverge, hookspec compilation in the emitters surfaces an error.
Common errors
| Symptom | Cause |
|---|---|
DispatchMismatch: plugin returned type X, expected Y | The plugin returned something the next stage was not expecting. Check the input/output schema compatibility. |
| Changes from one plugin do not reach the next | The plugin returned a copy without changes, not the modified input. Check the logic — make sure the spread/...query is not lost. |
| The plugin runs in an unexpected order | Check priority in the manifest; ties are sorted by name. |
| The chain aborts unexpectedly | Some plugin returned the sentinel; check the dispatcher logs for chain stopped by <plugin>. |
When chain is the wrong fit
- All plugins must run in parallel — use
broadcast-collectorbroadcast-notify. - You need exactly one active plugin — use
singleton. - Plugin selection depends on the input type, not on sequential processing — use
capability.
See also
- Dispatch — overview of all classes.
- ADR-0002 §4 chain — the normative contract.
- ADR-0005: Horizontal extensions — chain as the middleware mechanism.