Broadcast-collect — собрать результаты от всех
broadcast-collect — все зарегистрированные плагины вида вызываются, результаты собираются в массив. Типичное применение — каталоги инструментов, экспортёры метрик, capability-providers.
Сценарий: каталог инструментов для LLM-агента
LLM-агент спрашивает: «какие инструменты мне доступны?» — ответ собирается от всех подключённых плагинов вида tool_provider.
Шаг 1. Hookspec вида
kind: tool_provider
kind_api_version: 1.0.0
description: Источник инструментов для LLM-агента.
hooks:
- name: list_tools
dispatch: broadcast_collect
description: Список инструментов, которые этот источник предоставляет.
input_schema: schemas/empty.json
output_schema: schemas/tool_list.json
mcp_exposed: true
# Опциональная error-policy для broadcast-collect:
error_policy: fail_fast # по умолчанию
Шаг 2. Несколько плагинов вида
- Python
- TypeScript
- Go
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
class FilesystemToolProvider:
async def setup(self, ctx):
pass
def list_tools(self):
return [
{"name": "read_file", "description": "Read a file"},
{"name": "write_file", "description": "Write a file"},
{"name": "list_dir", "description": "List directory contents"},
]
[plugin]
name = "web"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 40
class WebToolProvider:
async def setup(self, ctx):
pass
def list_tools(self):
return [
{"name": "http_get", "description": "Fetch a page"},
{"name": "search", "description": "Search the web"},
]
{
"plugin": {
"name": "filesystem",
"kind": "tool_provider",
"runtime": "in_process",
"core_version": "^0.2",
"priority": 50
}
}
export class FilesystemToolProvider {
listTools(): Tool[] {
return [
{ name: "read_file", description: "Read a file" },
{ name: "write_file", description: "Write a file" },
];
}
}
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
package fs
import "context"
type FilesystemToolProvider struct{}
func (p *FilesystemToolProvider) ListTools(ctx context.Context) ([]Tool, error) {
return []Tool{
{Name: "read_file", Description: "Read a file"},
{Name: "write_file", Description: "Write a file"},
}, nil
}
Шаг 3. Собрать инструменты от всех
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastCollectDispatcher
dispatcher = BroadcastCollectDispatcher(registry)
tools_lists, errors = dispatcher.dispatch(
"tool_provider", "list_tools", ctx,
)
# tools_lists = [
# [{"name": "read_file", ...}, {"name": "write_file", ...}, ...], # from filesystem (priority=50)
# [{"name": "http_get", ...}, {"name": "search", ...}], # from web (priority=40)
# ]
# errors = [] # under fail_fast a non-empty list means the call already raised BroadcastErrors
# Flatten into a single list:
all_tools = [tool for tools in tools_lists for tool in tools]
agent.configure(tools=all_tools)
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
"log/slog"
pluginsystem "go.dagstack.dev/plugin-system"
)
collect := pluginsystem.NewDispatchBroadcastCollect(reg, "tool_provider")
results := collect.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) (any, error) {
return p.Unwrap().(ToolProvider).ListTools(ctx)
})
var allTools []Tool
for _, r := range results {
if r.Err != nil {
slog.Warn("tool_provider failed", "name", r.PluginName, "err", r.Err)
continue
}
allTools = append(allTools, r.Value.([]Tool)...)
}
Dispatch returns []CollectResult; each entry holds PluginName, Value and Err. The dispatcher walks every plugin registered under the kind in priority desc order and collects results without aborting on a single failure — fail-fast behaviour, when needed, is implemented by the caller (return early on the first non-nil Err).
Порядок результатов
Результаты собираются в порядке priority desc, при равных — по имени плагина. В нашем примере сначала идут инструменты от filesystem (priority=50), потом от web (priority=40).
Если порядок важен (например, поиск предпочитаемых инструментов идёт первым в LLM-контексте), фиксируйте его через приоритет.
Error-policy: fail-fast vs best-effort
Fail-fast (по умолчанию): падение одного плагина ломает весь collect, приложение получает исключение, упавший плагин помечается degraded.
Best-effort: падение одного плагина логируется и пропускается, collect возвращает частичный результат. Включается в hookspec:
hooks:
- name: list_tools
dispatch: broadcast_collect
error_policy: best_effort
Выбор:
- Fail-fast — когда целостность набора критична. Каталог инструментов частично загружен = риск, что LLM не увидит нужный инструмент и сделает ошибочный план.
- Best-effort — когда важнее доступность чем полнота. Metric-экспортёры: если один упал, остальные должны отправлять метрики дальше.
Типичные ошибки
| Симптом | Причина |
|---|---|
BroadcastErrors: plugin=web error=... | Один плагин упал, fail-fast-режим. Проверить логи упавшего; или перейти на best-effort в hookspec. |
| Результаты приходят в неожиданном порядке | Проверить priority в манифестах; при равных — сортировка по имени. |
| Пустой массив от одного плагина | Плагин возвращает [] — это валидный результат (пустой каталог), не ошибка. |
См. также
- Диспетчеризация — обзор всех классов.
- ADR-0002 §2 broadcast-collect — нормативный контракт + error-policy.
- Broadcast-notify — fire-and-forget, без сборки результатов.