Перейти к основному содержимому

Диспетчеризация

Когда приложение делает вызов через plugin-system — например, «найди релевантные документы по запросу» — какой именно плагин обработает вызов? Ответ зависит от класса диспетчеризации вида плагина.

Plugin-system фиксирует пять классов — нормативно, закрытым перечислением (см. ADR-0002 для формального контракта). Каждый вид плагина в своём hookspec объявляет один из пяти классов для каждого своего хука.

Обзор пяти классов

КлассСколько плагинов вызываетсяЧто возвращаетСемантика
Singletonодинрезультат плагина«выбери активного»
Broadcast-collectвсемассив результатов«соберу всё»
Broadcast-notifyвсеничего (void)«пусть все узнают»
Chainцепочка по порядкурезультат последнего«middleware»
Capabilityодин (подходящий)результат плагина«роутинг по типу входа»

Singleton

Один активный плагин обрабатывает все вызовы вида. Используется для:

  • Backend connectors — ровно один LLM-провайдер, ровно один vector-store.
  • Оркестраторы — один orchestrator per runtime.
  • Любой вид, где «несколько активных» не имеет смысла.

Алгоритм выбора активного плагина:

  1. Явная routing-policy приложения (per-tenant, blue/green) — если задана.
  2. Override через env-переменную DAGSTACK_ACTIVE_<KIND>=<plugin_name>.
  3. Сортировка кандидатов по priority desc в манифесте; выбирается с максимальным.
  4. При равном priority без override → AmbiguousPlugin, ядро не стартует.
# Get the active plugin (singleton kinds resolve a single instance):
llm = registry.get_plugin("llm")
response = llm.complete(prompt="Hello")

# Or pick by an explicit name (when two LLMs are registered):
llm = registry.get_plugin("llm", name="openai_compatible")

Broadcast-collect

Все зарегистрированные плагины вида вызываются, результаты собираются в массив. Используется для:

  • Каталоги инструментов (каждый плагин возвращает свой список tools) — одноимённое агрегирование.
  • Metric-экспортёры (каждый плагин отправляет свои метрики).
  • Capability-providers (плагины декларируют, что они умеют — collector собирает).

Порядок: по priority desc, при равных — по имени. Error-policy: fail-fast по умолчанию (падение одного плагина ломает collect) — можно переопределить на best_effort в hookspec-метаданных вида.

from dagstack.plugin_system import BroadcastCollectDispatcher

dispatcher = BroadcastCollectDispatcher(registry)
results, errors = dispatcher.dispatch(
"metric_exporter",
"on_event",
ctx,
event="request_finished",
duration_ms=42,
)
# results = [report_from_prometheus, report_from_statsd, ...]
if errors:
for plugin_name, original in errors.errors:
ctx.logger.warning("plugin %s failed: %s", plugin_name, original)

Broadcast-notify

Fire-and-forget — все плагины уведомляются параллельно, возвращаемые значения не собираются, падение одного плагина логируется и не пропагируется. Используется для:

  • Lifecycle-события (on_started, on_request, on_error).
  • Telemetry-события без агрегации.
  • Audit-hooks.
from dagstack.plugin_system import BroadcastNotifyDispatcher

notifier = BroadcastNotifyDispatcher(registry)
notifier.dispatch(
"event_listener",
"on_event",
ctx,
event_type="user_logged_in",
user_id="acme-42",
)
# No return value; plugins are called sequentially, exceptions are logged.

Chain

Последовательная цепочка: output[N] становится input[N+1]. Строгий порядок по priority desc. Используется для:

  • Middleware — переписывание запроса, постобработка, re-ranking результатов.
  • Pipeline-шаги, которые трансформируют данные.
  • Горизонтальные middleware (governance, quota — см. ADR-0005).

Прерывание цепочки — явный return sentinel-значения (STOP_CHAIN в Python) или выброс исключения; последующие плагины не вызываются.

from dagstack.plugin_system import ChainDispatcher, STOP_CHAIN

chain = ChainDispatcher(registry)
# Each plugin receives the previous plugin's output via `initial_value=`:
rewritten = chain.dispatch(
"query_rewriter",
"rewrite",
ctx,
initial_value="how does dagstack work",
)
# query_rewriter #1 → normaliser → query_rewriter #2 → synonymiser → ...
# A plugin returning STOP_CHAIN halts the chain and that value is returned.

Важно: в диапазоне priority >= 1000 выполняются горизонтальные middleware (governance, quota, observability) — зарезервировано ADR-0005. Бизнес-плагины используют priority < 1000.

Capability

Роутинг по типу входа — несколько плагинов вида, каждый умеет обрабатывать своё подмножество входов. Ровно один плагин получает запрос (выбор по декларированным supports_*-полям манифеста).

Сценарий: файловый индексатор

У приложения-потребителя есть индексатор кода, который обходит репозиторий и нуждается в специализированном обработчике каждого типа файла: для Python-файлов — AST-парсер, для Markdown — heading-extractor, для JSON — схема-детектор, для binary-артефактов — hash-tagger. Всё это — плагины одного вида file_indexer, диспетчеризация — capability.

Каждый специализированный плагин объявляет, что умеет:

plugins/python-indexer/dagstack.toml
[plugin]
name = "python-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".py"]
priority = 50
plugins/markdown-indexer/dagstack.toml
[plugin]
name = "markdown-indexer"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
supports_extensions = [".md", ".mdx"]
priority = 50
plugins/binary-hasher/dagstack.toml
[plugin]
name = "binary-hasher"
kind = "file_indexer"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
fallback = true # получает всё, что не распознали другие
priority = 0

В коде индексатора — один вызов диспетчера на файл:

from dagstack.plugin_system import CapabilityDispatcher

capability = CapabilityDispatcher(registry)

for file_path in repo_files:
indexer = capability.dispatch(
"file_indexer",
"index",
ctx,
payload={"extension": file_path.suffix, "path": str(file_path)},
)
index_entry = indexer.index(file_path.read_bytes())

Добавление нового типа файла (.go, .rs) = новый плагин с supports_extensions = [".go"]. Код индексатора не трогается, binary-hasher не получит файл, потому что новый плагин имеет приоритетную capability. Если добавили неизвестное расширение — binary-hasher (fallback) обеспечит корректную обработку.

Fallback-плагин — ровно один плагин вида может декларировать fallback = true. Контракт: fallback обязан обработать любой валидный вход без исключений.

Как выбрать класс диспетчеризации для нового вида

СценарийКласс
Один активный на вид (LLM, vector-store, orchestrator)singleton
Собрать список/каталог от всех (tools, metrics, capabilities)broadcast_collect
Событие с N независимыми подписчиками (lifecycle, telemetry)broadcast_notify
Middleware с трансформацией данных (rewrite, post-process)chain
Реализации специализированы по типу входа (ext/lang/mime)capability

Когда класс нельзя изменить

Класс диспетчеризации фиксируется в hookspec вида и фактически зашит в контракт. Смена класса = breaking change (major bump kind_api_version) — все плагины вида должны быть переписаны под новую семантику.

В проектировании нового вида аккуратно выбирайте класс: если стартуете с singleton и потом добавляются альтернативные реализации под разные входы, переход на capability потребует миграции. Альтернатива: сразу проектируйте под capability с одной реализацией, которая декларирует fallback = true и supports_* = все случаи — будущий переход добавлением специализированных плагинов без breaking change.

См. также