Skip to main content

Chain — a sequential plugin chain

chain — plugins line up in a chain by priority desc; the input of each plugin is the output of the previous one. Typical uses: request middleware, sequential post-processing, context-aware rewriting.

Before vector search, the user's query goes through several stages: normalisation, synonyms, expansion with keywords from the tenant context.

The kind's hookspec

kinds/query_rewriter/v1.yaml
kind: query_rewriter
kind_api_version: 1.0.0
description: Query transformer applied before search.

hooks:
- name: rewrite
dispatch: chain
description: Transform the query text.
input_schema: schemas/query.json
output_schema: schemas/query.json # input == output — data flows through the chain
mcp_exposed: true

Three plugins in the chain

plugins/query_rewriter/normalizer/dagstack.toml
[plugin]
name = "normalizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 100 # first — basic normalisation
plugins/query_rewriter/normalizer/plugin.py
import re

class Normalizer:
async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
text = query["text"].lower().strip()
text = re.sub(r"\s+", " ", text)
return {**query, "text": text}
plugins/query_rewriter/synonymizer/dagstack.toml
[plugin]
name = "synonymizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50 # after normalisation
plugins/query_rewriter/synonymizer/plugin.py
SYNONYMS = {"launch": "start", "setup": "configuration"}

class Synonymizer:
async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
text = " ".join(
SYNONYMS.get(word, word) for word in query["text"].split()
)
return {**query, "text": text}
plugins/query_rewriter/tenant-enricher/dagstack.toml
[plugin]
name = "tenant-enricher"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 10 # last — context enrichment
plugins/query_rewriter/tenant-enricher/plugin.py
class TenantEnricher:
async def setup(self, ctx):
self._tenant_keywords = ctx.registry.resource_registry.get("tenant_keywords")

def rewrite(self, ctx, query):
tenant_id = query.get("tenant_id")
if tenant_id is None:
return query
keywords = self._tenant_keywords.get(tenant_id, [])
if keywords:
return {**query, "text": query["text"] + " " + " ".join(keywords)}
return query

Running the chain

from dagstack.plugin_system import ChainDispatcher

dispatcher = ChainDispatcher(registry)

original = {"text": " How to LAUNCH dagstack? ", "tenant_id": "acme-corp"}
rewritten = dispatcher.dispatch(
"query_rewriter", "rewrite", ctx, initial_value=original,
)
# After normalizer: {"text": "how to launch dagstack?", "tenant_id": "..."}
# After synonymizer: {"text": "how to start dagstack?", ...}
# After tenant-enricher: {"text": "how to start dagstack? microservices python", ...}

Aborting the chain

A plugin can abort the chain early — by returning a sentinel or by raising an exception:

from dagstack.plugin_system import STOP_CHAIN

class Blacklist:
"""Blocks queries containing forbidden words — returns STOP_CHAIN."""

BANNED = {"delete_database", "rm -rf"}

async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
if any(bad in query["text"].lower() for bad in self.BANNED):
return STOP_CHAIN # subsequent plugins are not invoked
return query

Aborting via an exception: the hook raises an exception, the chain stops, and the error propagates upward through the dispatcher.

Middleware pattern: horizontal extensions

The chain is the canonical mechanism for horizontal middleware (see ADR-0005). Governance, quota, and observability are plugged in as chain plugins with priority >= 1000 and wrap every hook of every kind:

plugins/governance-middleware/dagstack.toml
[plugin]
name = "governance"
kind = "horizontal_middleware"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 1000 # horizontal range

[plugin.chain_wrap]
kinds = ["*"]
hooks = ["*"]

The range [1000, ∞) is reserved for middleware. Business plugins keep priority < 1000.

Limitations

  • Chain hooks must be RPC-safe (compatible with mcp_exposed = true). Streams and complex cyclic objects are not supported; the contract test verifies this.
  • The stage-N output schema must be compatible with the stage-N+1 input schema. In our example, input and output match (schemas/query.json). When the schemas diverge, hookspec compilation in the emitters surfaces an error.

Common errors

SymptomCause
DispatchMismatch: plugin returned type X, expected YThe plugin returned something the next stage was not expecting. Check the input/output schema compatibility.
Changes from one plugin do not reach the nextThe plugin returned a copy without changes, not the modified input. Check the logic — make sure the spread/...query is not lost.
The plugin runs in an unexpected orderCheck priority in the manifest; ties are sorted by name.
The chain aborts unexpectedlySome plugin returned the sentinel; check the dispatcher logs for chain stopped by <plugin>.

When chain is the wrong fit

  • All plugins must run in parallel — use broadcast-collect or broadcast-notify.
  • You need exactly one active plugin — use singleton.
  • Plugin selection depends on the input type, not on sequential processing — use capability.

See also