Broadcast-notify — fire-and-forget events
broadcast-notify — every plugin of the kind is notified in parallel, return values are not collected, and a single plugin's failure is logged but not propagated. Typical uses: lifecycle events, telemetry, audit hooks.
Scenario: application lifecycle events
The application wants to notify every interested plugin that a request is starting: the metrics exporter increments a counter, the audit logger records an event, the tracer creates a span.
The kind's hookspec
kind: lifecycle_listener
kind_api_version: 1.0.0
description: Subscribers to application lifecycle events.
hooks:
- name: on_request_started
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_finished
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_failed
dispatch: broadcast_notify
input_schema: schemas/request_error_event.json
mcp_exposed: false
Subscriber plugins
- Python
- TypeScript
- Go
class MetricsExporter:
async def setup(self, ctx):
# Resources injected at setup time; `ctx.resources.<name>` is
# the synchronous attribute form of the `Resources` Protocol
# (the async `await ctx.resources.get(name)` form is also OK).
self._counters = ctx.resources.metrics_counter
def on_request_started(self, ctx, request_id, path, actor):
self._counters.inc("requests_started_total", labels={"path": path})
def on_request_finished(self, ctx, request_id, duration_ms):
self._counters.inc("requests_finished_total")
# And: we return nothing — fire-and-forget.
def on_request_failed(self, ctx, request_id, code):
self._counters.inc("requests_failed_total", labels={"error_code": code})
class AuditLogger:
async def setup(self, ctx):
self._logger = ctx.logger
def on_request_started(self, ctx, request_id, path, actor):
self._logger.info("request started",
extra={"request_id": request_id, "actor": actor})
def on_request_finished(self, ctx, request_id, duration_ms):
self._logger.info("request finished",
extra={"request_id": request_id, "duration_ms": duration_ms})
import type { PluginContext } from "@dagstack/plugin-system";
type RequestEvent = { requestId: string; path: string; actor: string };
type ErrorEvent = { requestId: string; code: string };
export class MetricsExporter {
private counters!: Counters;
async setup(ctx: PluginContext): Promise<void> {
this.counters = ctx.resources.get<Counters>("counters");
}
onRequestStarted(event: RequestEvent): void {
this.counters.inc("requests_started_total", { path: event.path });
}
onRequestFinished(): void {
this.counters.inc("requests_finished_total", {});
}
onRequestFailed(event: ErrorEvent): void {
this.counters.inc("requests_failed_total", { error_code: event.code });
}
}
import type { PluginContext } from "@dagstack/plugin-system";
export class AuditLogger {
private logger!: Logger;
async setup(ctx: PluginContext): Promise<void> {
this.logger = ctx.resources.get<Logger>("logger");
}
onRequestStarted(event: { requestId: string; actor: string }): void {
this.logger.info("request started", { request_id: event.requestId, actor: event.actor });
}
onRequestFinished(event: { requestId: string; durationMs: number }): void {
this.logger.info("request finished", { request_id: event.requestId, duration_ms: event.durationMs });
}
}
package metrics
import "context"
type MetricsExporter struct {
counters Counters
}
func (p *MetricsExporter) OnRequestStarted(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_started_total", map[string]string{"path": event.Path})
return nil
}
func (p *MetricsExporter) OnRequestFinished(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_finished_total", nil)
return nil
}
The plugins return nothing — return types of broadcast-notify hooks are always void/None.
Publishing an event
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastNotifyDispatcher
notifier = BroadcastNotifyDispatcher(registry)
# In the application's middleware:
async def handle_request(request):
notifier.dispatch(
"lifecycle_listener", "on_request_started", ctx,
request_id=request.id,
path=request.path,
actor=request.user.id,
)
try:
response = await process(request)
notifier.dispatch(
"lifecycle_listener", "on_request_finished", ctx,
request_id=request.id,
duration_ms=int((time.time() - start) * 1000),
)
return response
except Exception as exc:
notifier.dispatch(
"lifecycle_listener", "on_request_failed", ctx,
request_id=request.id,
code=type(exc).__name__,
)
raise
import { BroadcastNotifyDispatcher } from "@dagstack/plugin-system";
const notifier = new BroadcastNotifyDispatcher("lifecycle_listener");
async function handleRequest(request: Request): Promise<Response> {
const startTime = Date.now();
// Fire-and-forget: errors in listeners are swallowed and logged
// via console.warn; the response path is never blocked.
await notifier.dispatch(registry.getLoadedPlugins(), "onRequestStarted", {
requestId: request.id,
path: request.path,
actor: request.user.id,
});
try {
const response = await process(request);
await notifier.dispatch(registry.getLoadedPlugins(), "onRequestFinished", {
requestId: request.id,
durationMs: Date.now() - startTime,
});
return response;
} catch (exc) {
await notifier.dispatch(registry.getLoadedPlugins(), "onRequestFailed", {
requestId: request.id,
code: exc instanceof Error ? exc.constructor.name : String(exc),
});
throw exc;
}
}
import (
"context"
"fmt"
pluginsystem "go.dagstack.dev/plugin-system"
)
notifier := pluginsystem.NewDispatchBroadcastNotify(reg, "lifecycle_listener")
func handleRequest(ctx context.Context, req Request) (Response, error) {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestStarted(ctx, RequestEvent{
RequestID: req.ID,
Path: req.Path,
})
})
resp, err := process(ctx, req)
if err != nil {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestFailed(ctx, RequestErrorEvent{
RequestID: req.ID,
Code: fmt.Sprintf("%T", err),
})
})
}
return resp, err
}
Dispatch is fire-and-forget: per-subscriber errors are logged through the registry's logger but never returned to the caller. The closure receives a pluginsystem.Plugin — call Unwrap() and assert it to the kind contract.
Failure handling
When a subscriber plugin fails:
- The error is caught by the core.
- It is logged as
plugin=<name> error=<message>in the structured log. - The other subscribers still receive their events.
- The failed plugin remains operational; it simply did not handle that one event.
:::info Phase 2 scope
Repeated-failure tracking (marking a chronically failing plugin as degraded and exposing it through an admin API) is part of the Phase 2 observability roadmap.
:::
Parallel execution
Subscribers are invoked in parallel within a single notify() call. Important consequences:
- The order in which subscribers handle the event is undefined.
- The
requests_finished_totalmetric may be incremented after the audit log entry, at the same time, or before it. - If business logic depends on order — use
chaininstead ofbroadcast-notify.
Concurrency safety
Subscribers do not share state with each other. When a plugin writes to a shared resource (a database, metrics), that resource must be thread-safe (or goroutine-safe in Go, async-safe in JS):
- HTTP clients and metric exporters from standard libraries are typically safe.
- Custom in-memory structures inside a plugin — not safe without explicit locking.
Common errors
| Symptom | Cause |
|---|---|
| The plugin does not receive events | The hook in the plugin is named differently from the hookspec; check name in YAML against the method name. |
| An event is lost on plugin failure | This is expected — broadcast-notify does not retry. For reliable delivery, use an external bus (Kafka, Redis Streams). |
notify() blocks the main flow | Acceptable for very fast hooks (logging into a memory buffer). For slow operations, the plugin should accept the event itself and handle it asynchronously. |
When broadcast-notify is the wrong fit
- You need a result — use
broadcast-collect. - You need a guaranteed order — use
chain. - You need retries on failure — use an orchestrator plugin (enqueue the event as a UoW).
- You need guaranteed delivery —
broadcast-notifyis fire-and-forget; use an external message bus.
See also
- Dispatch — overview of all classes.
- ADR-0002 §3 broadcast-notify — the normative contract.
- Broadcast-collect — when you need a result from each.