Перейти к основному содержимому

Chain — последовательная цепочка плагинов

chain — плагины выстраиваются в цепочку по priority desc, вход очередного плагина = выход предыдущего. Типичное применение — middleware для запросов, последовательная постобработка, переписывание с учётом контекста.

Сценарий: переписывание запроса перед векторным поиском

До векторного поиска запрос пользователя проходит через несколько этапов: нормализация, синонимы, расширение ключевыми словами из tenant-контекста.

Hookspec вида

kinds/query_rewriter/v1.yaml
kind: query_rewriter
kind_api_version: 1.0.0
description: Преобразователь запросов перед поиском.

hooks:
- name: rewrite
dispatch: chain
description: Преобразовать текст запроса.
input_schema: schemas/query.json
output_schema: schemas/query.json # input == output — data flow через цепочку
mcp_exposed: true

Три плагина в цепочке

plugins/query_rewriter/normalizer/dagstack.toml
[plugin]
name = "normalizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 100 # first — basic normalisation
plugins/query_rewriter/normalizer/plugin.py
import re

class Normalizer:
async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
text = query["text"].lower().strip()
text = re.sub(r"\s+", " ", text)
return {**query, "text": text}
plugins/query_rewriter/synonymizer/dagstack.toml
[plugin]
name = "synonymizer"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50 # after normalisation
plugins/query_rewriter/synonymizer/plugin.py
SYNONYMS = {"launch": "start", "setup": "configuration"}

class Synonymizer:
async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
text = " ".join(
SYNONYMS.get(word, word) for word in query["text"].split()
)
return {**query, "text": text}
plugins/query_rewriter/tenant-enricher/dagstack.toml
[plugin]
name = "tenant-enricher"
kind = "query_rewriter"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 10 # last — context enrichment
plugins/query_rewriter/tenant-enricher/plugin.py
class TenantEnricher:
async def setup(self, ctx):
self._tenant_keywords = ctx.registry.resource_registry.get("tenant_keywords")

def rewrite(self, ctx, query):
tenant_id = query.get("tenant_id")
if tenant_id is None:
return query
keywords = self._tenant_keywords.get(tenant_id, [])
if keywords:
return {**query, "text": query["text"] + " " + " ".join(keywords)}
return query

Запуск цепочки

from dagstack.plugin_system import ChainDispatcher

dispatcher = ChainDispatcher(registry)

original = {"text": " How to LAUNCH dagstack? ", "tenant_id": "acme-corp"}
rewritten = dispatcher.dispatch(
"query_rewriter", "rewrite", ctx, initial_value=original,
)
# After normalizer: {"text": "how to launch dagstack?", "tenant_id": "..."}
# After synonymizer: {"text": "how to start dagstack?", ...}
# After tenant-enricher: {"text": "how to start dagstack? microservices python", ...}

Прерывание цепочки

Плагин может прервать цепочку досрочно — возвратом sentinel или исключением:

from dagstack.plugin_system import STOP_CHAIN

class Blacklist:
"""Blocks queries containing forbidden words — returns STOP_CHAIN."""

BANNED = {"delete_database", "rm -rf"}

async def setup(self, ctx):
pass

def rewrite(self, ctx, query):
if any(bad in query["text"].lower() for bad in self.BANNED):
return STOP_CHAIN # subsequent plugins are not invoked
return query

Прерывание исключением: хук бросает exception, цепочка останавливается, ошибка пропагируется наверх через диспетчер.

Middleware pattern: горизонтальные расширения

Цепочка — канонический механизм для горизонтальных middleware (см. ADR-0005). Governance, quota, observability подключаются как chain-плагины с priority >= 1000 и оборачивают все хуки всех видов:

plugins/governance-middleware/dagstack.toml
[plugin]
name = "governance"
kind = "horizontal_middleware"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 1000 # horizontal-range

[plugin.chain_wrap]
kinds = ["*"]
hooks = ["*"]

Range [1000, ∞) зарезервирован для middleware. Бизнес-плагины держат priority < 1000.

Ограничения

  • Chain-хуки обязаны быть RPC-safe (mcp_exposed = true совместимо). Потоки, сложные циклические объекты — не поддерживаются; контрактный тест это проверяет.
  • Output-schema этапа N обязан быть совместим с input-schema этапа N+1. Для нашего примера input и output одинаковы (schemas/query.json). Если схемы расходятся — компиляция hookspec в эмиттерах покажет ошибку.

Типичные ошибки

СимптомПричина
DispatchMismatch: plugin returned type X, expected YПлагин вернул не то, что ожидает следующий этап. Проверить совместимость input/output-schema.
Изменения из одного плагина не доходят до следующегоПлагин вернул копию без изменений, а не модифицированный input. Проверить логику — не теряется ли spread/...query.
Плагин выполняется в неожиданном порядкеПроверить priority в манифесте; при равных — сортировка по имени.
Chain неожиданно прерываетсяКакой-то плагин вернул sentinel; проверить логи диспетчера на chain stopped by <plugin>.

Когда chain не подходит

  • Все плагины должны вызваться параллельно — используйте broadcast-collect или broadcast-notify.
  • Нужен ровно один активный плагин — используйте singleton.
  • Выбор плагина зависит от типа входа, а не от последовательной обработки — используйте capability.

См. также