Skip to main content

Broadcast-collect — gather results from all

broadcast-collect — every registered plugin of the kind is called and the results are collected into an array. Typical uses: tool catalogues, metric exporters, capability providers.

Scenario: a tool catalogue for an LLM agent

The LLM agent asks: "what tools are available to me?" — the answer is collected from every plugged-in plugin of the kind tool_provider.

Step 1. The kind's hookspec

kinds/tool_provider/v1.yaml
kind: tool_provider
kind_api_version: 1.0.0
description: A source of tools for an LLM agent.

hooks:
- name: list_tools
dispatch: broadcast_collect
description: List of tools that this source provides.
input_schema: schemas/empty.json
output_schema: schemas/tool_list.json
mcp_exposed: true
# Optional error policy for broadcast-collect:
error_policy: fail_fast # default

Step 2. Several plugins of the kind

plugins/tool_provider/filesystem/dagstack.toml
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
plugins/tool_provider/filesystem/plugin.py
class FilesystemToolProvider:
async def setup(self, ctx):
pass

def list_tools(self):
return [
{"name": "read_file", "description": "Read a file"},
{"name": "write_file", "description": "Write a file"},
{"name": "list_dir", "description": "List directory contents"},
]
plugins/tool_provider/web/dagstack.toml
[plugin]
name = "web"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 40
plugins/tool_provider/web/plugin.py
class WebToolProvider:
async def setup(self, ctx):
pass

def list_tools(self):
return [
{"name": "http_get", "description": "Fetch a page"},
{"name": "search", "description": "Search the web"},
]

Step 3. Collect tools from all of them

from dagstack.plugin_system import BroadcastCollectDispatcher

dispatcher = BroadcastCollectDispatcher(registry)
tools_lists, errors = dispatcher.dispatch(
"tool_provider", "list_tools", ctx,
)
# tools_lists = [
# [{"name": "read_file", ...}, {"name": "write_file", ...}, ...], # from filesystem (priority=50)
# [{"name": "http_get", ...}, {"name": "search", ...}], # from web (priority=40)
# ]
# errors = [] # under fail_fast a non-empty list means the call already raised BroadcastErrors

# Flatten into a single list:
all_tools = [tool for tools in tools_lists for tool in tools]
agent.configure(tools=all_tools)

Result ordering

Results are gathered in priority desc order, with ties broken by plugin name. In our example, tools from filesystem (priority=50) come first, then those from web (priority=40).

If order matters (for example, preferred tools should appear first in the LLM context), pin it via priority.

Error policy: fail-fast vs best-effort

Fail-fast (default): a single plugin's failure aborts the entire collect, the application receives an exception, and the failed plugin is marked degraded.

Best-effort: a single plugin's failure is logged and skipped, and the collect returns a partial result. Enabled in the hookspec:

hooks:
- name: list_tools
dispatch: broadcast_collect
error_policy: best_effort

When to choose which:

  • Fail-fast — when set integrity is critical. A partially loaded tool catalogue means the LLM might not see the tool it needs and produce a wrong plan.
  • Best-effort — when availability matters more than completeness. Metric exporters: if one fails, the others must keep emitting metrics.

Common errors

SymptomCause
BroadcastErrors: plugin=web error=...A plugin failed under fail-fast. Check the failed plugin's logs, or switch to best-effort in the hookspec.
Results arrive in an unexpected orderCheck priority in the manifests; ties are sorted by name.
An empty array from a pluginThe plugin returned [] — this is a valid result (an empty catalogue), not an error.

See also