Broadcast-collect — gather results from all
broadcast-collect — every registered plugin of the kind is called and the results are collected into an array. Typical uses: tool catalogues, metric exporters, capability providers.
Scenario: a tool catalogue for an LLM agent
The LLM agent asks: "what tools are available to me?" — the answer is collected from every plugged-in plugin of the kind tool_provider.
Step 1. The kind's hookspec
kind: tool_provider
kind_api_version: 1.0.0
description: A source of tools for an LLM agent.
hooks:
- name: list_tools
dispatch: broadcast_collect
description: List of tools that this source provides.
input_schema: schemas/empty.json
output_schema: schemas/tool_list.json
mcp_exposed: true
# Optional error policy for broadcast-collect:
error_policy: fail_fast # default
Step 2. Several plugins of the kind
- Python
- TypeScript
- Go
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
class FilesystemToolProvider:
async def setup(self, ctx):
pass
def list_tools(self):
return [
{"name": "read_file", "description": "Read a file"},
{"name": "write_file", "description": "Write a file"},
{"name": "list_dir", "description": "List directory contents"},
]
[plugin]
name = "web"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 40
class WebToolProvider:
async def setup(self, ctx):
pass
def list_tools(self):
return [
{"name": "http_get", "description": "Fetch a page"},
{"name": "search", "description": "Search the web"},
]
{
"plugin": {
"name": "filesystem",
"kind": "tool_provider",
"runtime": "in_process",
"core_version": "^0.2",
"priority": 50
}
}
export class FilesystemToolProvider {
listTools(): Tool[] {
return [
{ name: "read_file", description: "Read a file" },
{ name: "write_file", description: "Write a file" },
];
}
}
[plugin]
name = "filesystem"
kind = "tool_provider"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
priority = 50
package fs
import "context"
type FilesystemToolProvider struct{}
func (p *FilesystemToolProvider) ListTools(ctx context.Context) ([]Tool, error) {
return []Tool{
{Name: "read_file", Description: "Read a file"},
{Name: "write_file", Description: "Write a file"},
}, nil
}
Step 3. Collect tools from all of them
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastCollectDispatcher
dispatcher = BroadcastCollectDispatcher(registry)
tools_lists, errors = dispatcher.dispatch(
"tool_provider", "list_tools", ctx,
)
# tools_lists = [
# [{"name": "read_file", ...}, {"name": "write_file", ...}, ...], # from filesystem (priority=50)
# [{"name": "http_get", ...}, {"name": "search", ...}], # from web (priority=40)
# ]
# errors = [] # under fail_fast a non-empty list means the call already raised BroadcastErrors
# Flatten into a single list:
all_tools = [tool for tools in tools_lists for tool in tools]
agent.configure(tools=all_tools)
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
"log/slog"
pluginsystem "go.dagstack.dev/plugin-system"
)
collect := pluginsystem.NewDispatchBroadcastCollect(reg, "tool_provider")
results := collect.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) (any, error) {
return p.Unwrap().(ToolProvider).ListTools(ctx)
})
var allTools []Tool
for _, r := range results {
if r.Err != nil {
slog.Warn("tool_provider failed", "name", r.PluginName, "err", r.Err)
continue
}
allTools = append(allTools, r.Value.([]Tool)...)
}
Dispatch returns []CollectResult; each entry holds PluginName, Value and Err. The dispatcher walks every plugin registered under the kind in priority desc order and collects results without aborting on a single failure — fail-fast behaviour, when needed, is implemented by the caller (return early on the first non-nil Err).
Result ordering
Results are gathered in priority desc order, with ties broken by plugin name. In our example, tools from filesystem (priority=50) come first, then those from web (priority=40).
If order matters (for example, preferred tools should appear first in the LLM context), pin it via priority.
Error policy: fail-fast vs best-effort
Fail-fast (default): a single plugin's failure aborts the entire collect, the application receives an exception, and the failed plugin is marked degraded.
Best-effort: a single plugin's failure is logged and skipped, and the collect returns a partial result. Enabled in the hookspec:
hooks:
- name: list_tools
dispatch: broadcast_collect
error_policy: best_effort
When to choose which:
- Fail-fast — when set integrity is critical. A partially loaded tool catalogue means the LLM might not see the tool it needs and produce a wrong plan.
- Best-effort — when availability matters more than completeness. Metric exporters: if one fails, the others must keep emitting metrics.
Common errors
| Symptom | Cause |
|---|---|
BroadcastErrors: plugin=web error=... | A plugin failed under fail-fast. Check the failed plugin's logs, or switch to best-effort in the hookspec. |
| Results arrive in an unexpected order | Check priority in the manifests; ties are sorted by name. |
| An empty array from a plugin | The plugin returned [] — this is a valid result (an empty catalogue), not an error. |
See also
- Dispatch — overview of all classes.
- ADR-0002 §2 broadcast-collect — the normative contract plus error policy.
- Broadcast-notify — fire-and-forget, no result aggregation.