Broadcast-notify — fire-and-forget события
broadcast-notify — все плагины вида уведомляются параллельно, возвращаемые значения не собираются, падение одного плагина логируется и не пропагируется. Типичное применение — lifecycle-события, telemetry, audit-hooks.
Сценарий: lifecycle-события приложения
Приложение хочет уведомить всех заинтересованных плагинов о начале обработки запроса: metrics-экспортёр увеличит счётчик, audit-logger запишет событие, tracer создаст span.
Hookspec вида
kind: lifecycle_listener
kind_api_version: 1.0.0
description: Подписчики на lifecycle-события приложения.
hooks:
- name: on_request_started
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_finished
dispatch: broadcast_notify
input_schema: schemas/request_event.json
mcp_exposed: false
- name: on_request_failed
dispatch: broadcast_notify
input_schema: schemas/request_error_event.json
mcp_exposed: false
Плагины-подписчики
- Python
- TypeScript
- Go
class MetricsExporter:
async def setup(self, ctx):
# Resources injected at setup time; `ctx.resources.<name>` is
# the synchronous attribute form of the `Resources` Protocol
# (the async `await ctx.resources.get(name)` form is also OK).
self._counters = ctx.resources.metrics_counter
def on_request_started(self, ctx, request_id, path, actor):
self._counters.inc("requests_started_total", labels={"path": path})
def on_request_finished(self, ctx, request_id, duration_ms):
self._counters.inc("requests_finished_total")
# And: we return nothing — fire-and-forget.
def on_request_failed(self, ctx, request_id, code):
self._counters.inc("requests_failed_total", labels={"error_code": code})
class AuditLogger:
async def setup(self, ctx):
self._logger = ctx.logger
def on_request_started(self, ctx, request_id, path, actor):
self._logger.info("request started",
extra={"request_id": request_id, "actor": actor})
def on_request_finished(self, ctx, request_id, duration_ms):
self._logger.info("request finished",
extra={"request_id": request_id, "duration_ms": duration_ms})
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package metrics
import "context"
type MetricsExporter struct {
counters Counters
}
func (p *MetricsExporter) OnRequestStarted(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_started_total", map[string]string{"path": event.Path})
return nil
}
func (p *MetricsExporter) OnRequestFinished(ctx context.Context, event RequestEvent) error {
p.counters.Inc("requests_finished_total", nil)
return nil
}
Плагины не возвращают значений — возвращаемые типы хуков broadcast-notify всегда void/None.
Публикация события
- Python
- TypeScript
- Go
from dagstack.plugin_system import BroadcastNotifyDispatcher
notifier = BroadcastNotifyDispatcher(registry)
# In the application's middleware:
async def handle_request(request):
notifier.dispatch(
"lifecycle_listener", "on_request_started", ctx,
request_id=request.id,
path=request.path,
actor=request.user.id,
)
try:
response = await process(request)
notifier.dispatch(
"lifecycle_listener", "on_request_finished", ctx,
request_id=request.id,
duration_ms=int((time.time() - start) * 1000),
)
return response
except Exception as exc:
notifier.dispatch(
"lifecycle_listener", "on_request_failed", ctx,
request_id=request.id,
code=type(exc).__name__,
)
raise
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
import (
"context"
"fmt"
pluginsystem "go.dagstack.dev/plugin-system"
)
notifier := pluginsystem.NewDispatchBroadcastNotify(reg, "lifecycle_listener")
func handleRequest(ctx context.Context, req Request) (Response, error) {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestStarted(ctx, RequestEvent{
RequestID: req.ID,
Path: req.Path,
})
})
resp, err := process(ctx, req)
if err != nil {
notifier.Dispatch(ctx, func(ctx context.Context, p pluginsystem.Plugin) error {
return p.Unwrap().(LifecycleListener).OnRequestFailed(ctx, RequestErrorEvent{
RequestID: req.ID,
Code: fmt.Sprintf("%T", err),
})
})
}
return resp, err
}
Dispatch is fire-and-forget: per-subscriber errors are logged through the registry's logger but never returned to the caller. The closure receives a pluginsystem.Plugin — call Unwrap() and assert it to the kind contract.
Обработка падений
Если плагин-подписчик упал:
- Ошибка перехватывается ядром.
- Логируется как
plugin=<name> error=<message>в структурированный лог. - Остальные подписчики продолжают получать события.
- Упавший плагин не попадает в
unavailable— он работоспособен, просто одно событие не обработал.
Исключение: если плагин падает каждый раз — пометить как degraded, приложение видит это через admin API.
Parallel execution
Подписчики вызываются параллельно в пределах одного notify()-вызова. Важные следствия:
- Порядок обработки события между подписчиками не определён.
- Метрика
requests_finished_totalможет инкрементироваться после audit-записи, одновременно с ней или до неё. - Если бизнес-логика зависит от порядка — используйте
chainвместоbroadcast-notify.
Безопасность параллелизма
Подписчики не разделяют состояние между собой. Если плагин пишет в общий ресурс (БД, метрики), этот ресурс обязан быть thread-safe (или goroutine-safe в Go, async-safe в JS):
- HTTP-клиенты и metric-экспортёры стандартных библиотек обычно safe.
- Кастомные in-memory структуры внутри плагина — не safe без явных блокировок.
Типичные ошибки
| Симптом | Причина |
|---|---|
| Плагин не получает события | Хук в плагине называется иначе, чем в hookspec; проверить name в YAML vs имя метода. |
| Событие теряется при падении плагина | Это ожидаемо — broadcast-notify не делает retry. Для надёжной доставки используйте внешнюю шину (Kafka, Redis Streams). |
notify() блокирует основной flow | Нормально для очень быстрых хуков (логирование в memory-буфер). Для медленных операций — плагин сам должен принимать событие и асинхронно обрабатывать. |
Когда broadcast-notify не подходит
- Нужен результат — используйте
broadcast-collect. - Нужен гарантированный порядок — используйте
chain. - Нужно retry при падении — используйте orchestrator-плагин (enqueue событие как UoW).
- Нужна гарантированная доставка —
broadcast-notifyfire-and-forget, используйте внешнюю шину сообщений.
См. также
- Диспетчеризация — обзор всех классов.
- ADR-0002 §3 broadcast-notify — нормативный контракт.
- Broadcast-collect — когда нужен результат от каждого.