Перейти к основному содержимому

Broadcast-notify — fire-and-forget события

broadcast-notify — все плагины вида уведомляются параллельно, возвращаемые значения не собираются, падение одного плагина логируется и не пропагируется. Типичное применение — lifecycle-события, telemetry, audit-hooks.

Сценарий: lifecycle-события приложения

Приложение хочет уведомить всех заинтересованных плагинов о начале обработки запроса: metrics-экспортёр увеличит счётчик, audit-logger запишет событие, tracer создаст span.

Hookspec вида

kinds/lifecycle_listener/v1.yaml
kind: lifecycle_listener
kind_api_version: 1.0.0
description: Подписчики на lifecycle-события приложения.

hooks:
- name: on_request_started
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false

- name: on_request_finished
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false

- name: on_request_failed
dispatch: broadcast_notify
input_schema: schemas/request_error_event.json
mcp_exposed: false

Плагины-подписчики

plugins/lifecycle/metrics-exporter/plugin.py
class MetricsExporter:
async def setup(self, ctx):
# Resources injected at setup time; `ctx.resources.<name>` is
# the synchronous attribute form of the `Resources` Protocol
# (the async `await ctx.resources.get(name)` form is also OK).
self._counters = ctx.resources.metrics_counter

def on_request_started(self, ctx, request_id, path, actor):
self._counters.inc("requests_started_total", labels={"path": path})

def on_request_finished(self, ctx, request_id, duration_ms):
self._counters.inc("requests_finished_total")
# And: we return nothing — fire-and-forget.

def on_request_failed(self, ctx, request_id, code):
self._counters.inc("requests_failed_total", labels={"error_code": code})
plugins/lifecycle/audit-logger/plugin.py
class AuditLogger:
async def setup(self, ctx):
self._logger = ctx.logger

def on_request_started(self, ctx, request_id, path, actor):
self._logger.info("request started",
extra={"request_id": request_id, "actor": actor})

def on_request_finished(self, ctx, request_id, duration_ms):
self._logger.info("request finished",
extra={"request_id": request_id, "duration_ms": duration_ms})

Плагины не возвращают значений — возвращаемые типы хуков broadcast-notify всегда void/None.

Публикация события

from dagstack.plugin_system import BroadcastNotifyDispatcher

notifier = BroadcastNotifyDispatcher(registry)

# In the application's middleware:
async def handle_request(request):
notifier.dispatch(
"lifecycle_listener", "on_request_started", ctx,
request_id=request.id,
path=request.path,
actor=request.user.id,
)
try:
response = await process(request)
notifier.dispatch(
"lifecycle_listener", "on_request_finished", ctx,
request_id=request.id,
duration_ms=int((time.time() - start) * 1000),
)
return response
except Exception as exc:
notifier.dispatch(
"lifecycle_listener", "on_request_failed", ctx,
request_id=request.id,
code=type(exc).__name__,
)
raise

Обработка падений

Если плагин-подписчик упал:

  1. Ошибка перехватывается ядром.
  2. Логируется как plugin=<name> error=<message> в структурированный лог.
  3. Остальные подписчики продолжают получать события.
  4. Упавший плагин не попадает в unavailable — он работоспособен, просто одно событие не обработал.

Исключение: если плагин падает каждый раз — пометить как degraded, приложение видит это через admin API.

Parallel execution

Подписчики вызываются параллельно в пределах одного notify()-вызова. Важные следствия:

  • Порядок обработки события между подписчиками не определён.
  • Метрика requests_finished_total может инкрементироваться после audit-записи, одновременно с ней или до неё.
  • Если бизнес-логика зависит от порядка — используйте chain вместо broadcast-notify.

Безопасность параллелизма

Подписчики не разделяют состояние между собой. Если плагин пишет в общий ресурс (БД, метрики), этот ресурс обязан быть thread-safe (или goroutine-safe в Go, async-safe в JS):

  • HTTP-клиенты и metric-экспортёры стандартных библиотек обычно safe.
  • Кастомные in-memory структуры внутри плагина — не safe без явных блокировок.

Типичные ошибки

СимптомПричина
Плагин не получает событияХук в плагине называется иначе, чем в hookspec; проверить name в YAML vs имя метода.
Событие теряется при падении плагинаЭто ожидаемо — broadcast-notify не делает retry. Для надёжной доставки используйте внешнюю шину (Kafka, Redis Streams).
notify() блокирует основной flowНормально для очень быстрых хуков (логирование в memory-буфер). Для медленных операций — плагин сам должен принимать событие и асинхронно обрабатывать.

Когда broadcast-notify не подходит

  • Нужен результат — используйте broadcast-collect.
  • Нужен гарантированный порядок — используйте chain.
  • Нужно retry при падении — используйте orchestrator-плагин (enqueue событие как UoW).
  • Нужна гарантированная доставкаbroadcast-notify fire-and-forget, используйте внешнюю шину сообщений.

См. также