Broadcast-notify — fire-and-forget events
broadcast-notify — every plugin of the kind is notified in parallel, return values are not collected, and a single plugin's failure is logged but not propagated. Typical uses: lifecycle events, telemetry, audit hooks.
Scenario: application lifecycle events
The application wants to notify every interested plugin that a request is starting: the metrics exporter increments a counter, the audit logger records an event, the tracer creates a span.
The kind's hookspec
kind: lifecycle_listener
kind_api_version: 1.0.0
description: Subscribers to application lifecycle events.
hooks:
- name: on_request_started
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_finished
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_failed
dispatch: broadcast_notify
input_schema: schemas/request_error_event.json
mcp_exposed: false
Subscriber plugins
- Python
- TypeScript
- Go
class MetricsExporter:
async def setup(self, ctx):
# Resources injected at setup time; `ctx.resources.<name>` is
# the synchronous attribute form of the `Resources` Protocol
# (the async `await ctx.resources.get(name)` form is also OK).
self._counters = ctx.resources.metrics_counter
def on_request_started(self, ctx, request_id, path, actor):
self._counters.inc("requests_started_total", labels={"path": path})
def on_request_finished(self, ctx, request_id, duration_ms):
self._counters.inc("requests_finished_total")
# And: we return nothing — fire-and-forget.
def on_request_failed(self, ctx, request_id, code):
self._counters.inc("requests_failed_total", labels={"error_code": code})
class AuditLogger:
async def setup(self, ctx):
self._logger = ctx.logger
def on_request_started(self, ctx, request_id, path, actor):
self._logger.info("request started",
extra={"request_id": request_id, "actor": actor})
def on_request_finished(self, ctx, request_id, duration_ms):
self._logger.info("request finished",
extra={"request_id": request_id, "duration_ms": duration_ms})
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package metrics
import "context"
type MetricsExporter struct {
counters Counters
}
func (p *MetricsExporter) OnRequestStarted(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_started_total", map[string]string{"path": event.Path})
return nil
}
func (p *MetricsExporter) OnRequestFinished(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_finished_total", nil)
return nil
}
The plugins return nothing — return types of broadcast-notify hooks are always void/None.
Publishing an event
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastNotifyDispatcher
notifier = BroadcastNotifyDispatcher(registry)
# In the application's middleware:
async def handle_request(request):
notifier.dispatch(
"lifecycle_listener", "on_request_started", ctx,
request_id=request.id,
path=request.path,
actor=request.user.id,
)
try:
response = await process(request)
notifier.dispatch(
"lifecycle_listener", "on_request_finished", ctx,
request_id=request.id,
duration_ms=int((time.time() - start) * 1000),
)
return response
except Exception as exc:
notifier.dispatch(
"lifecycle_listener", "on_request_failed", ctx,
request_id=request.id,
code=type(exc).__name__,
)
raise
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
"fmt"
pluginsystem "go.dagstack.dev/plugin-system"
)
notifier := pluginsystem.NewDispatchBroadcastNotify(reg, "lifecycle_listener")
func handleRequest(ctx context.Context, req Request) (Response, error) {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestStarted(ctx, RequestEvent{
RequestID: req.ID,
Path: req.Path,
})
})
resp, err := process(ctx, req)
if err != nil {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestFailed(ctx, RequestErrorEvent{
RequestID: req.ID,
Code: fmt.Sprintf("%T", err),
})
})
}
return resp, err
}
Dispatch is fire-and-forget: per-subscriber errors are logged through the registry's logger but never returned to the caller. The closure receives a pluginsystem.Plugin — call Unwrap() and assert it to the kind contract.
Failure handling
When a subscriber plugin fails:
- The error is caught by the core.
- It is logged as
plugin=<name> error=<message>in the structured log. - The other subscribers still receive their events.
- The failed plugin remains operational; it simply did not handle that one event.
:::info Phase 2 scope
Repeated-failure tracking (marking a chronically failing plugin as degraded and exposing it through an admin API) is part of the Phase 2 observability roadmap.
:::
Parallel execution
Subscribers are invoked in parallel within a single notify() call. Important consequences:
- The order in which subscribers handle the event is undefined.
- The
requests_finished_totalmetric may be incremented after the audit log entry, at the same time, or before it. - If business logic depends on order — use
chaininstead ofbroadcast-notify.
Concurrency safety
Subscribers do not share state with each other. When a plugin writes to a shared resource (a database, metrics), that resource must be thread-safe (or goroutine-safe in Go, async-safe in JS):
- HTTP clients and metric exporters from standard libraries are typically safe.
- Custom in-memory structures inside a plugin — not safe without explicit locking.
Common errors
| Symptom | Cause |
|---|---|
| The plugin does not receive events | The hook in the plugin is named differently from the hookspec; check name in YAML against the method name. |
| An event is lost on plugin failure | This is expected — broadcast-notify does not retry. For reliable delivery, use an external bus (Kafka, Redis Streams). |
notify() blocks the main flow | Acceptable for very fast hooks (logging into a memory buffer). For slow operations, the plugin should accept the event itself and handle it asynchronously. |
When broadcast-notify is the wrong fit
- You need a result — use
broadcast-collect. - You need a guaranteed order — use
chain. - You need retries on failure — use an orchestrator plugin (enqueue the event as a UoW).
- You need guaranteed delivery —
broadcast-notifyis fire-and-forget; use an external message bus.
See also
- Dispatch — overview of all classes.
- ADR-0002 §3 broadcast-notify — the normative contract.
- Broadcast-collect — when you need a result from each.