Skip to main content

Broadcast-notify — fire-and-forget events

broadcast-notify — every plugin of the kind is notified in parallel, return values are not collected, and a single plugin's failure is logged but not propagated. Typical uses: lifecycle events, telemetry, audit hooks.

Scenario: application lifecycle events

The application wants to notify every interested plugin that a request is starting: the metrics exporter increments a counter, the audit logger records an event, the tracer creates a span.

The kind's hookspec

kinds/lifecycle_listener/v1.yaml
kind: lifecycle_listener
kind_api_version: 1.0.0
description: Subscribers to application lifecycle events.

hooks:
- name: on_request_started
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false

- name: on_request_finished
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false

- name: on_request_failed
dispatch: broadcast_notify
input_schema: schemas/request_error_event.json
mcp_exposed: false

Subscriber plugins

plugins/lifecycle/metrics-exporter/plugin.py
class MetricsExporter:
async def setup(self, ctx):
# Resources injected at setup time; `ctx.resources.<name>` is
# the synchronous attribute form of the `Resources` Protocol
# (the async `await ctx.resources.get(name)` form is also OK).
self._counters = ctx.resources.metrics_counter

def on_request_started(self, ctx, request_id, path, actor):
self._counters.inc("requests_started_total", labels={"path": path})

def on_request_finished(self, ctx, request_id, duration_ms):
self._counters.inc("requests_finished_total")
# And: we return nothing — fire-and-forget.

def on_request_failed(self, ctx, request_id, code):
self._counters.inc("requests_failed_total", labels={"error_code": code})
plugins/lifecycle/audit-logger/plugin.py
class AuditLogger:
async def setup(self, ctx):
self._logger = ctx.logger

def on_request_started(self, ctx, request_id, path, actor):
self._logger.info("request started",
extra={"request_id": request_id, "actor": actor})

def on_request_finished(self, ctx, request_id, duration_ms):
self._logger.info("request finished",
extra={"request_id": request_id, "duration_ms": duration_ms})

The plugins return nothing — return types of broadcast-notify hooks are always void/None.

Publishing an event

from dagstack.plugin_system import BroadcastNotifyDispatcher

notifier = BroadcastNotifyDispatcher(registry)

# In the application's middleware:
async def handle_request(request):
notifier.dispatch(
"lifecycle_listener", "on_request_started", ctx,
request_id=request.id,
path=request.path,
actor=request.user.id,
)
try:
response = await process(request)
notifier.dispatch(
"lifecycle_listener", "on_request_finished", ctx,
request_id=request.id,
duration_ms=int((time.time() - start) * 1000),
)
return response
except Exception as exc:
notifier.dispatch(
"lifecycle_listener", "on_request_failed", ctx,
request_id=request.id,
code=type(exc).__name__,
)
raise

Failure handling

When a subscriber plugin fails:

  1. The error is caught by the core.
  2. It is logged as plugin=<name> error=<message> in the structured log.
  3. The other subscribers still receive their events.
  4. The failed plugin remains operational; it simply did not handle that one event.

:::info Phase 2 scope Repeated-failure tracking (marking a chronically failing plugin as degraded and exposing it through an admin API) is part of the Phase 2 observability roadmap. :::

Parallel execution

Subscribers are invoked in parallel within a single notify() call. Important consequences:

  • The order in which subscribers handle the event is undefined.
  • The requests_finished_total metric may be incremented after the audit log entry, at the same time, or before it.
  • If business logic depends on order — use chain instead of broadcast-notify.

Concurrency safety

Subscribers do not share state with each other. When a plugin writes to a shared resource (a database, metrics), that resource must be thread-safe (or goroutine-safe in Go, async-safe in JS):

  • HTTP clients and metric exporters from standard libraries are typically safe.
  • Custom in-memory structures inside a plugin — not safe without explicit locking.

Common errors

SymptomCause
The plugin does not receive eventsThe hook in the plugin is named differently from the hookspec; check name in YAML against the method name.
An event is lost on plugin failureThis is expected — broadcast-notify does not retry. For reliable delivery, use an external bus (Kafka, Redis Streams).
notify() blocks the main flowAcceptable for very fast hooks (logging into a memory buffer). For slow operations, the plugin should accept the event itself and handle it asynchronously.

When broadcast-notify is the wrong fit

  • You need a result — use broadcast-collect.
  • You need a guaranteed order — use chain.
  • You need retries on failure — use an orchestrator plugin (enqueue the event as a UoW).
  • You need guaranteed deliverybroadcast-notify is fire-and-forget; use an external message bus.

See also