Диспетчеризация
Когда приложение делает вызов через plugin-system — например, «найди релевантные документы по запросу» — какой именно плагин обработает вызов? Ответ зависит от класса диспетчеризации вида плагина.
Plugin-system фиксирует пять классов — нормативно, закрытым перечислением (см. ADR-0002 для формального контракта). Каждый вид плагина в своём hookspec объявляет один из пяти классов для каждого своего хука.
Обзор пяти классов
| Класс | Сколько плагинов вызывается | Что возвращает | Семантика |
|---|---|---|---|
| Singleton | один | результат плагина | «выбери активного» |
| Broadcast-collect | все | массив результатов | «соберу всё» |
| Broadcast-notify | все | ничего (void) | «пусть все узнают» |
| Chain | цепочка по порядку | результат последнего | «middleware» |
| Capability | один (подходящий) | результат плагина | «роутинг по типу входа» |
Singleton
Один активный плагин обрабатывает все вызовы вида. Используется для:
- Backend connectors — ровно один LLM-провайдер, ровно один vector-store.
- Оркестраторы — один orchestrator per runtime.
- Любой вид, где «несколько активных» не имеет смысла.
Алгоритм выбора активного плагина:
- Явная routing-policy приложения (per-tenant, blue/green) — если задана.
- Override через env-переменную
DAGSTACK_ACTIVE_<KIND>=<plugin_name>. - Сортировка кандидатов по
priority descв манифесте; выбирается с максимальным. - При равном
priorityбез override →AmbiguousPlugin, ядро не стартует.
- Python
- TypeScript
- Go
# Get the active plugin (singleton kinds resolve a single instance):
llm = registry.get_plugin("llm")
response = llm.complete(prompt="Hello")
# Or pick by an explicit name (when two LLMs are registered):
llm = registry.get_plugin("llm", name="openai_compatible")
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
disp := pluginsystem.NewDispatchSingleton[LLMPlugin](reg, "llm")
llm, err := disp.Resolve()
if err != nil {
return err
}
resp, _ := llm.Complete(ctx, "Hello")
Broadcast-collect
Все зарегистрированные плагины вида вызываются, результаты собираются в массив. Используется для:
- Каталоги инструментов (каждый плагин возвращает свой список tools) — одноимённое агрегирование.
- Metric-экспортёры (каждый плагин отправляет свои метрики).
- Capability-providers (плагины декларируют, что они умеют — collector собирает).
Порядок: по priority desc, при равных — по имени. Error-policy: fail-fast по умолчанию (падение одного плагина ломает collect) — можно переопределить на best_effort в hookspec-метаданных вида.
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastCollectDispatcher
dispatcher = BroadcastCollectDispatcher(registry)
results, errors = dispatcher.dispatch(
"metric_exporter",
"on_event",
ctx,
event="request_finished",
duration_ms=42,
)
# results = [report_from_prometheus, report_from_statsd, ...]
if errors:
for plugin_name, original in errors.errors:
ctx.logger.warning("plugin %s failed: %s", plugin_name, original)
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
collect := pluginsystem.NewDispatchBroadcastCollect(reg, "metric_exporter")
results := collect.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) (any, error) {
exporter := p.Unwrap().(MetricExporter)
return exporter.OnEvent(ctx, "request_finished", 42)
})
for _, r := range results {
if r.Err != nil {
slog.Warn("plugin failed", "name", r.PluginName, "err", r.Err)
}
}
Broadcast-notify
Fire-and-forget — все плагины уведомляются параллельно, возвращаемые значения не собираются, падение одного плагина логируется и не пропагируется. Используется для:
- Lifecycle-события (
on_started,on_request,on_error). - Telemetry-события без агрегации.
- Audit-hooks.
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastNotifyDispatcher
notifier = BroadcastNotifyDispatcher(registry)
notifier.dispatch(
"event_listener",
"on_event",
ctx,
event_type="user_logged_in",
user_id="acme-42",
)
# No return value; plugins are called sequentially, exceptions are logged.
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
notify := pluginsystem.NewDispatchBroadcastNotify(reg, "event_listener")
notify.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
listener := p.Unwrap().(EventListener)
return listener.OnEvent(ctx, "user_logged_in", "acme-42")
})
Chain
Последовательная цепочка: output[N] становится input[N+1]. Строгий порядок по priority desc. Используется для:
- Middleware — переписывание запроса, постобработка, re-ranking результатов.
- Pipeline-шаги, которые трансформируют данные.
- Горизонтальные middleware (governance, quota — см. ADR-0005).
Прерывание цепочки — явный return sentinel-значения (STOP_CHAIN в Python) или выброс исключения; последующие плагины не вызываются.
- Python
- TypeScript
- Go
from dagstack.plugin_system import ChainDispatcher, STOP_CHAIN
chain = ChainDispatcher(registry)
# Each plugin receives the previous plugin's output via `initial_value=`:
rewritten = chain.dispatch(
"query_rewriter",
"rewrite",
ctx,
initial_value="how does dagstack work",
)
# query_rewriter #1 → normaliser → query_rewriter #2 → synonymiser → ...
# A plugin returning STOP_CHAIN halts the chain and that value is returned.
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
chain := pluginsystem.NewDispatchChain(reg, "query_rewriter")
final, err := chain.Dispatch(ctx, "how does dagstack work",
func(ctx context.Context, p pluginsystem.Plugin, v any) (any, error) {
rewriter := p.Unwrap().(QueryRewriter)
return rewriter.Rewrite(ctx, v.(string))
},
)
if err != nil {
return err
}
// Returning pluginsystem.StopChain from a step halts the chain;
// pluginsystem.IsStopChain(final) reports whether the chain stopped early.
_ = final
Важно: в диапазоне priority >= 1000 выполняются горизонтальные middleware (governance, quota, observability) — зарезервировано ADR-0005. Бизнес-плагины используют priority < 1000.
Capability
Роутинг по типу входа — несколько плагинов вида, каждый умеет обрабатывать своё подмножество входов. Ровно один плагин получает запрос (выбор по декларированным supports_*-полям манифеста).
Сценарий: файловый индексатор
У приложения-потребителя есть индексатор кода, который обходит репозиторий и нуждается в специализированном обработчике каждого типа файла: для Python-файлов — AST-парсер, для Markdown — heading-extractor, для JSON — схема-детектор, для binary-артефактов — hash-tagger. Всё это — плагины одного вида file_indexer, диспетчеризация — capability.
Каждый специализированный плагин объявляет, что умеет:
[plugin]
name = "python-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".py"]
priority = 50
[plugin]
name = "markdown-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".md", ".mdx"]
priority = 50
[plugin]
name = "binary-hasher"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
fallback = true # получает всё, что не распознали другие
priority = 0
В коде индексатора — один вызов диспетчера на файл:
- Python
- TypeScript
- Go
from dagstack.plugin_system import CapabilityDispatcher
capability = CapabilityDispatcher(registry)
for file_path in repo_files:
indexer = capability.dispatch(
"file_indexer",
"index",
ctx,
payload={"extension": file_path.suffix, "path": str(file_path)},
)
index_entry = indexer.index(file_path.read_bytes())
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
cap := pluginsystem.NewDispatchCapability(reg, "file_indexer")
for _, file := range repoFiles {
ext := filepath.Ext(file)
p, err := cap.Resolve(func(p pluginsystem.Plugin) bool {
return p.Unwrap().(FileIndexer).Supports(ext)
})
if err != nil {
return err
}
data, _ := os.ReadFile(file)
entry, _ := p.Unwrap().(FileIndexer).Index(ctx, data)
_ = entry
}
Добавление нового типа файла (.go, .rs) = новый плагин с supports_extensions = [".go"]. Код индексатора не трогается, binary-hasher не получит файл, потому что новый плагин имеет приоритетную capability. Если добавили неизвестное расширение — binary-hasher (fallback) обеспечит корректную обработку.
Fallback-плагин — ровно один плагин вида может декларировать fallback = true. Контракт: fallback обязан обработать любой валидный вход без исключений.
Как выбрать класс диспетчеризации для нового вида
| Сценарий | Класс |
|---|---|
| Один активный на вид (LLM, vector-store, orchestrator) | singleton |
| Собрать список/каталог от всех (tools, metrics, capabilities) | broadcast_collect |
| Событие с N независимыми подписчиками (lifecycle, telemetry) | broadcast_notify |
| Middleware с трансформацией данных (rewrite, post-process) | chain |
| Реализации специализированы по типу входа (ext/lang/mime) | capability |
Когда класс нельзя изменить
Класс диспетчеризации фиксируется в hookspec вида и фактически зашит в контракт. Смена класса = breaking change (major bump kind_api_version) — все плагины вида должны быть переписаны под новую семантику.
В проектировании нового вида аккуратно выбирайте класс: если стартуете с singleton и потом добавляются альтернативные реализации под разные входы, переход на capability потребует миграции. Альтернатива: сразу проектируйте под capability с одной реализацией, которая декларирует fallback = true и supports_* = все случаи — будущий переход добавлением специализированных плагинов без breaking change.
См. также
- ADR-0002: Семантика вызова хуков — нормативный контракт каждого класса.
- Манифест плагина — поля
priority,supports_*,fallback. - Руководство: Написать плагин — практический walkthrough с контрактными тестами.