Перейти к основному содержимому

Broadcast-collect — собрать результаты от всех

broadcast-collect — все зарегистрированные плагины вида вызываются, результаты собираются в массив. Типичное применение — каталоги инструментов, экспортёры метрик, capability-providers.

Сценарий: каталог инструментов для LLM-агента

LLM-агент спрашивает: «какие инструменты мне доступны?» — ответ собирается от всех подключённых плагинов вида tool_provider.

Шаг 1. Hookspec вида

kinds/tool_provider/v1.yaml
kind: tool_provider
kind_api_version: 1.0.0
description: Источник инструментов для LLM-агента.

hooks:
- name: list_tools
dispatch: broadcast_collect
description: Список инструментов, которые этот источник предоставляет.
input_schema: schemas/empty.json
output_schema: schemas/tool_list.json
mcp_exposed: true
# Опциональная error-policy для broadcast-collect:
error_policy: fail_fast # по умолчанию

Шаг 2. Несколько плагинов вида

plugins/tool_provider/filesystem/dagstack.toml
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
plugins/tool_provider/filesystem/plugin.py
class FilesystemToolProvider:
async def setup(self, ctx):
pass

def list_tools(self):
return [
{"name": "read_file", "description": "Read a file"},
{"name": "write_file", "description": "Write a file"},
{"name": "list_dir", "description": "List directory contents"},
]
plugins/tool_provider/web/dagstack.toml
[plugin]
name = "web"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 40
plugins/tool_provider/web/plugin.py
class WebToolProvider:
async def setup(self, ctx):
pass

def list_tools(self):
return [
{"name": "http_get", "description": "Fetch a page"},
{"name": "search", "description": "Search the web"},
]

Шаг 3. Собрать инструменты от всех

from dagstack.plugin_system import BroadcastCollectDispatcher

dispatcher = BroadcastCollectDispatcher(registry)
tools_lists, errors = dispatcher.dispatch(
"tool_provider", "list_tools", ctx,
)
# tools_lists = [
# [{"name": "read_file", ...}, {"name": "write_file", ...}, ...], # from filesystem (priority=50)
# [{"name": "http_get", ...}, {"name": "search", ...}], # from web (priority=40)
# ]
# errors = [] # under fail_fast a non-empty list means the call already raised BroadcastErrors

# Flatten into a single list:
all_tools = [tool for tools in tools_lists for tool in tools]
agent.configure(tools=all_tools)

Порядок результатов

Результаты собираются в порядке priority desc, при равных — по имени плагина. В нашем примере сначала идут инструменты от filesystem (priority=50), потом от web (priority=40).

Если порядок важен (например, поиск предпочитаемых инструментов идёт первым в LLM-контексте), фиксируйте его через приоритет.

Error-policy: fail-fast vs best-effort

Fail-fast (по умолчанию): падение одного плагина ломает весь collect, приложение получает исключение, упавший плагин помечается degraded.

Best-effort: падение одного плагина логируется и пропускается, collect возвращает частичный результат. Включается в hookspec:

hooks:
- name: list_tools
dispatch: broadcast_collect
error_policy: best_effort

Выбор:

  • Fail-fast — когда целостность набора критична. Каталог инструментов частично загружен = риск, что LLM не увидит нужный инструмент и сделает ошибочный план.
  • Best-effort — когда важнее доступность чем полнота. Metric-экспортёры: если один упал, остальные должны отправлять метрики дальше.

Типичные ошибки

СимптомПричина
BroadcastErrors: plugin=web error=...Один плагин упал, fail-fast-режим. Проверить логи упавшего; или перейти на best-effort в hookspec.
Результаты приходят в неожиданном порядкеПроверить priority в манифестах; при равных — сортировка по имени.
Пустой массив от одного плагинаПлагин возвращает [] — это валидный результат (пустой каталог), не ошибка.

См. также