Chain — последовательная цепочка плагинов
chain — плагины выстраиваются в цепочку по priority desc, вход очередного плагина = выход предыдущего. Типичное применение — middleware для запросов, последовательная постобработка, переписывание с учётом контекста.
Сценарий: переписывание запроса перед векторным поиском
До векторного поиска запрос пользователя проходит через несколько этапов: нормализация, синонимы, расширение ключевыми словами из tenant-контекста.
Hookspec вида
kind: query_rewriter
kind_api_version: 1.0.0
description: Преобразователь запросов перед поиском.
hooks:
- name: rewrite
dispatch: chain
description: Преобразовать текст запроса.
input_schema: schemas/query.json
output_schema: schemas/query.json # input == output — data flow через цепочку
mcp_exposed: true
Три плагина в цепочке
- Python
- TypeScript
- Go
[plugin]
name = "normalizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 100 # first — basic normalisation
import re
class Normalizer:
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
text = query["text"].lower().strip()
text = re.sub(r"\s+", " ", text)
return {**query, "text": text}
[plugin]
name = "synonymizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50 # after normalisation
SYNONYMS = {"launch": "start", "setup": "configuration"}
class Synonymizer:
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
text = " ".join(
SYNONYMS.get(word, word) for word in query["text"].split()
)
return {**query, "text": text}
[plugin]
name = "tenant-enricher"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 10 # last — context enrichment
class TenantEnricher:
async def setup(self, ctx):
self._tenant_keywords = ctx.registry.resource_registry.get("tenant_keywords")
def rewrite(self, ctx, query):
tenant_id = query.get("tenant_id")
if tenant_id is None:
return query
keywords = self._tenant_keywords.get(tenant_id, [])
if keywords:
return {**query, "text": query["text"] + " " + " ".join(keywords)}
return query
export class Normalizer {
rewrite(query: Query): Query {
return {
...query,
text: query.text.toLowerCase().trim().replace(/\s+/g, " "),
};
}
}
package normalizer
import (
"context"
"regexp"
"strings"
)
var spaceRe = regexp.MustCompile(`\s+`)
type Normalizer struct{}
func (p *Normalizer) Rewrite(ctx context.Context, q Query) (Query, error) {
q.Text = spaceRe.ReplaceAllString(strings.TrimSpace(strings.ToLower(q.Text)), " ")
return q, nil
}
Запуск цепочки
- Python
- TypeScript
- Go
from dagstack.plugin_system import ChainDispatcher
dispatcher = ChainDispatcher(registry)
original = {"text": " How to LAUNCH dagstack? ", "tenant_id": "acme-corp"}
rewritten = dispatcher.dispatch(
"query_rewriter", "rewrite", ctx, initial_value=original,
)
# After normalizer: {"text": "how to launch dagstack?", "tenant_id": "..."}
# After synonymizer: {"text": "how to start dagstack?", ...}
# After tenant-enricher: {"text": "how to start dagstack? microservices python", ...}
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
pluginsystem "go.dagstack.dev/plugin-system"
)
chain := pluginsystem.NewDispatchChain(reg, "query_rewriter")
final, err := chain.Dispatch(ctx, Query{
Text: " How to LAUNCH dagstack? ",
TenantID: "acme-corp",
}, func(ctx context.Context, p pluginsystem.Plugin, v any) (any, error) {
return p.Unwrap().(QueryRewriter).Rewrite(ctx, v.(Query))
})
if err != nil {
return err
}
rewritten := final.(Query)
Dispatch walks the plugins of the kind in priority desc order, threading the value through each step. The closure receives the previous step's output and returns the next step's input. Stop early by returning pluginsystem.StopChain from the closure.
Прерывание цепочки
Плагин может прервать цепочку досрочно — возвратом sentinel или исключением:
- Python
- TypeScript
- Go
from dagstack.plugin_system import STOP_CHAIN
class Blacklist:
"""Blocks queries containing forbidden words — returns STOP_CHAIN."""
BANNED = {"delete_database", "rm -rf"}
async def setup(self, ctx):
pass
def rewrite(self, ctx, query):
if any(bad in query["text"].lower() for bad in self.BANNED):
return STOP_CHAIN # subsequent plugins are not invoked
return query
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package blacklist
import (
"context"
pluginsystem "go.dagstack.dev/plugin-system"
)
type Blacklist struct{}
// The signature returns `any` so the chain dispatcher can short-circuit on
// `pluginsystem.StopChain`. The closure passed to `chain.Dispatch` returns
// this value verbatim — the dispatcher detects the sentinel via
// `pluginsystem.IsStopChain` and stops the chain.
func (p *Blacklist) Rewrite(ctx context.Context, q Query) (any, error) {
if containsBanned(q.Text) {
return pluginsystem.StopChain, nil
}
return q, nil
}
Прерывание исключением: хук бросает exception, цепочка останавливается, ошибка пропагируется наверх через диспетчер.
Middleware pattern: горизонтальные расширения
Цепочка — канонический механизм для горизонтальных middleware (см. ADR-0005). Governance, quota, observability подключаются как chain-плагины с priority >= 1000 и оборачивают все хуки всех видов:
[plugin]
name = "governance"
kind = "horizontal_middleware"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 1000 # horizontal-range
[plugin.chain_wrap]
kinds = ["*"]
hooks = ["*"]
Range [1000, ∞) зарезервирован для middleware. Бизнес-плагины держат priority < 1000.
Ограничения
- Chain-хуки обязаны быть RPC-safe (
mcp_exposed = trueсовместимо). Потоки, сложные циклические объекты — не поддерживаются; контрактный тест это проверяет. - Output-schema этапа N обязан быть совместим с input-schema этапа N+1. Для нашего примера input и output одинаковы (
schemas/query.json). Если схемы расходятся — компиляция hookspec в эмиттерах покажет ошибку.
Типичные ошибки
| Симптом | Причина |
|---|---|
DispatchMismatch: plugin returned type X, expected Y | Плагин вернул не то, что ожидает следующий этап. Проверить совместимость input/output-schema. |
| Изменения из одного плагина не доходят до следующего | Плагин вернул копию без изменений, а не модифицированный input. Проверить логику — не теряется ли spread/...query. |
| Плагин выполняется в неожиданном порядке | Проверить priority в манифесте; при равных — сортировка по имени. |
| Chain неожиданно прерывается | Какой-то плагин вернул sentinel; проверить логи диспетчера на chain stopped by <plugin>. |
Когда chain не подходит
- Все плагины должны вызваться параллельно — используйте
broadcast-collectилиbroadcast-notify. - Нужен ровно один активный плагин — используйте
singleton. - Выбор плагина зависит от типа входа, а не от последовательной обработки — используйте
capability.
См. также
- Диспетчеризация — обзор всех классов.
- ADR-0002 §4 chain — нормативный контракт.
- ADR-0005: Горизонтальные расширения — chain как механизм middleware.