Перейти к основному содержимому

Ресурсы (Resources DI)

Plugin-system следует принципу dependency injection для ресурсов: плагин не создаёт HTTP-клиенты, БД-соединения или другие долгоживущие объекты — он объявляет, какие ресурсы ему нужны, и получает их через PluginContext.resources. Host-приложение отвечает за конфигурацию и lifecycle ресурсов.

Это ключевая часть ADR-0003 — инвариант 3 «Resources через dependency injection».

Зачем нужна DI для ресурсов

Альтернатива — плагин создаёт ресурсы сам:

# Антипаттерн: плагин создаёт HTTP-клиент сам.
class BadPlugin:
def setup(self, ctx):
self.http = httpx.AsyncClient(
verify="/etc/ssl/certs/corp-ca.pem",
timeout=30,
)

Проблемы:

  1. Хост теряет контроль — корпоративный CA-bundle, TLS-конфиг, rate-limiter должны быть едиными для всех плагинов, но каждый плагин настраивает по-своему.
  2. Тестирование. Подменить httpx.AsyncClient на mock в тестах — гимнастика с monkey-patching.
  3. Изоляция. Плагин с утечкой соединений забивает connection-pool всего приложения; с DI хост управляет общим пулом.
  4. Orchestration-neutrality — если плагин запускается в отдельном процессе (через mcp_stdio), его локально-созданный HTTP-клиент бесполезен для хоста.

С DI плагин получает настроенный хостом клиент через контекст:

# Правильно: плагин получает готовый клиент.
class GoodPlugin:
def setup(self, ctx):
self.http = ctx.resources.http_client

Стандартные ресурсы

v1.0 plugin-system фиксирует пять стандартных ресурсов, доступных в PluginContext.resources. Каждый из них — протокол (Protocol в Python, interface в TS/Go), реализации живут в рамках конкретного binding-пакета.

РесурсПротоколНазначение
http_clientHttpClientHTTP-клиент, предконфигурированный TLS/CA-bundle/таймаутами.
tmpdirTempDirВременная директория, автоочистка в teardown().
blob_storeBlobStoreАбстрактное хранилище blobs (S3 / FS / in-memory).
clockClockИсточник времени (в тестах — freezable).
rngRngИсточник случайности (в тестах — seedable).

Фабричные имплементации на примере Python:

  • Production: SystemClock, RandomRng, InMemoryBlobStore, HttpClient на основе httpx.AsyncClient.
  • Test: FrozenClock, DeterministicRng с seed.

http_client

HTTP-клиент, настроенный хостом с учётом корпоративной политики: CA-bundle для внутренних сертификатов, retry-политика, rate-limiter, таймауты.

class MyPlugin:
def setup(self, ctx):
self._http = ctx.resources.http_client

async def fetch(self, url: str) -> dict:
response = await self._http.get(url)
response.raise_for_status()
return response.json()

clock и rng

Источники времени и случайности всегда должны использоваться через инъекцию — не time.time() / Math.random(). Это:

  • делает плагин тестируемым (freezable clock, seedable rng дают воспроизводимые тесты);
  • обеспечивает детерминизм для плагинов с idempotency_mode = "output_hash" (инвариант 8 из ADR-0003).
class TimestampedProcessor:
def setup(self, ctx):
self._clock = ctx.resources.clock
self._rng = ctx.resources.rng

def process(self, data: dict) -> dict:
return {
**data,
"processed_at": self._clock.now().isoformat(),
"trace_id": f"trace-{self._rng.randint(0, 2**32)}",
}

blob_store и tmpdir

blob_store — абстрактный интерфейс с методами put(key, bytes), get(key), delete(key), list(prefix). Реализация выбирается хостом: в тестах — InMemoryBlobStore, в production — реализация, работающая с S3 или файловой системой.

tmpdir — готовая временная директория (создаётся хостом, автоочистка в teardown()). Плагин не занимается самостоятельным управлением temp-файлами.

Декларация ресурсов в манифесте

Плагин объявляет, какие ресурсы нужны:

[plugin.resources]
required = ["http_client", "clock"]
optional = ["blob_store", "rng"]
  • required — без них плагин не работает. Если host не предоставляет http_client, плагин помечается unavailable с понятной ошибкой.
  • optional — плагин работает и без них; ctx.resources.blob_store будет None, плагин обязан обработать.

Подмена в тестах

Подмена стандартных ресурсов тестовыми версиями — стандартный приём для unit-тестов:

import logging
from datetime import datetime, timezone

from dagstack.plugin_system import PluginContext, PluginRegistry
from dagstack.plugin_system import (
FrozenClock,
DeterministicRng,
InMemoryBlobStore,
ResourceRegistry,
)


def test_timestamped_processor():
resources = ResourceRegistry()
resources.register(
"clock",
FrozenClock(now=datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc)),
)
resources.register("rng", DeterministicRng(seed=42))
resources.register("blob_store", InMemoryBlobStore())

registry = PluginRegistry(resource_registry=resources)
ctx = PluginContext(
config={},
logger=logging.getLogger("test"),
registry=registry,
)

plugin = TimestampedProcessor()
plugin.setup(ctx)

result = plugin.process({"text": "hello"})
assert result["processed_at"].startswith("2026-01-01T12:00:00")

Кастомные ресурсы

Приложение может регистрировать собственные ресурсы — например, postgres, tenant_registry, rate_limiter. Плагин объявляет их в resources.required/optional, приложение при инициализации создаёт и передаёт.

# Inside the application (FastAPI lifespan, Dagster main()):
import logging
from dagstack.plugin_system import PluginContext, PluginRegistry
from dagstack.plugin_system import ResourceRegistry

resources = ResourceRegistry()
resources.register("http_client", await create_http_client())
resources.register("postgres", await create_pg_pool()) # custom
resources.register("tenant_registry", tenant_registry) # custom

registry = PluginRegistry(resource_registry=resources)
registry.discover("plugins/")

ctx = PluginContext(
config={},
logger=logging.getLogger("app"),
registry=registry,
)
await registry.setup_all(ctx)

Правила для ресурсов

  1. Протокольно типизированы. Интерфейс ресурса обязан быть формально специфицирован (Protocol / interface). Без этого ломается decoration (см. ADR-0005 §3).
  2. Thread-safe / goroutine-safe. Ресурсы — синглтоны на host, доступны нескольким плагинам одновременно.
  3. Lifecycle управляется хостом. Ресурс создаётся до setup-плагинов, уничтожается после teardown.
  4. Не передавать кастомное состояние через ресурсы. Ресурс — способ доступа к инфраструктуре, не контейнер бизнес-логики.

См. также