Skip to main content

Standard resources

Plugin-system v1.0 fixes five standard resources available through PluginContext.resources. Each one is a protocol (Protocol in Python, interface in TS/Go); concrete implementations live inside the corresponding binding package.

For a conceptual overview see Resources (Resources DI). This page lists exact signatures and the available implementations.

:::info What ships in dagstack-plugin-system 0.1.0-rc.2 (Python)

The four protocols (Clock, Rng, HttpClient, BlobStore) are defined and exported. Concrete test implementations shipped today: FrozenClock, DeterministicRng, InMemoryBlobStore (alias MemoryBlobStore). The production implementations (SystemClock, RandomRng, the host-configured HttpClient, S3-backed blob store) and the tmpdir resource catalogue land in Phase 1+.

For Phase 2 resources without a shipped reference implementation the application or the test injects its own object that satisfies the protocol; see "Reference implementation not yet shipped" admonitions below.

:::

:::info What ships in go.dagstack.dev/plugin-system 0.1.x

The Go binding exports only the open Resources interface (Get(name string) (any, error)) — see context.go. The typed-interface catalogue (Clock, Rng, HTTPClient, BlobStore, TempDir) and the test fakes (FrozenClock, DeterministicRng, InMemoryBlobStore, …) are a Phase 2 deliverable. Today, a Go plugin pulls a host-supplied resource through ctx.Resources.Get("name") and the plugin author defines the local interface they expect:

// In plugin code today: declare the shape you need locally, then assert it.
type clock interface {
Now() time.Time
}

raw, err := pluginCtx.Resources.Get("clock")
if err != nil { return err }
c, ok := raw.(clock)
if !ok { return fmt.Errorf("resource 'clock' does not satisfy the expected interface") }

The Go-flavoured snippets below show planned signatures so the kind contract is visible cross-language; in 0.1.x neither the typed interfaces nor the named-method receivers below are exported by pluginsystem.

:::

clock

An injectable time source. Mandatory for plugins with idempotency_mode = "output_hash" (invariant 8).

Interface

from typing import Protocol
from datetime import datetime


class Clock(Protocol):
def now(self) -> datetime:
"""Current time. Takes no arguments."""

def monotonic_ns(self) -> int:
"""Monotonic counter in nanoseconds, used to measure intervals."""

Implementations

NameWhereStatus
SystemClockproductionPhase 1+ — application provides its own implementation today.
FrozenClock(now=…)testsShipped in 0.1.0-rc.2. advance(seconds=…) to move time forward.

rng

An injectable randomness source. Mandatory for plugins with idempotency_mode = "output_hash".

Interface

class Rng(Protocol):
def next_float(self) -> float:
"""[0.0, 1.0)"""

def next_int(self, low: int, high: int) -> int:
"""[low, high]"""

def uuid4(self) -> str:
"""A random UUID v4."""

def choice(self, items: list) -> any:
"""A random element of the list."""

Implementations

NameWhereStatus
RandomRngproductionPhase 1+ — application provides its own OS-backed implementation today.
DeterministicRng(*, seed=…)testsShipped in 0.1.0-rc.2. Keyword-only seed, reproducible stream.

http_client

An injectable HTTP client preconfigured by the host (CA bundle, TLS, timeouts, retry policy).

Interface

class HttpClient(Protocol):
async def get(self, url: str, *, headers: dict = None) -> HttpResponse: ...
async def post(self, url: str, *, json: dict = None, headers: dict = None) -> HttpResponse: ...
async def put(self, url: str, *, json: dict = None, headers: dict = None) -> HttpResponse: ...
async def delete(self, url: str, *, headers: dict = None) -> HttpResponse: ...

class HttpResponse(Protocol):
status_code: int
headers: dict[str, str]
def json(self) -> any: ...
def text(self) -> str: ...

Implementations

:::info Reference implementation not yet shipped

dagstack-plugin-system 0.1.0-rc.2 exports the HttpClient Protocol but no concrete implementation — neither the production httpx/fetch wrapper nor a programmable test double. They land together with the resource catalogue formalisation in Phase 1+. Until then, applications and tests inject their own implementation that satisfies the Protocol.

:::

Test double satisfying HttpClient — pure stdlib
from dagstack.plugin_system import HttpClient # the Protocol


class StubHttpClient:
"""In-process programmable HTTP client for plugin tests."""

def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []

async def get(self, url: str, *, headers: dict | None = None):
self.calls.append(("GET", url))
return {"status_code": 200, "body": "{}"}

async def post(self, url: str, *, json=None, headers=None):
self.calls.append(("POST", url))
return {"status_code": 200, "body": "{}"}


resources.register("http_client", StubHttpClient())

blob_store

An abstract store for blobs (byte-array objects).

Interface

class BlobStore(Protocol):
async def put(self, key: str, data: bytes, *, content_type: str = None) -> None: ...
async def get(self, key: str) -> bytes: ...
async def delete(self, key: str) -> None: ...
async def list(self, prefix: str = "") -> list[str]: ...
async def exists(self, key: str) -> bool: ...

Implementations

NameWhereStatus
InMemoryBlobStore()tests / devShipped in 0.1.0-rc.2 (alias MemoryBlobStore); dict[str, bytes] in memory.
Filesystem-backed storedev / in-processPhase 1+ — application provides its own today.
S3-compatible storeproductionPhase 1+ — application provides its own today.

tmpdir

Temporary directory with automatic cleanup.

:::info Reference implementation not yet shipped

The TempDir resource is reserved in the spec; in dagstack-plugin-system 0.1.0-rc.2 neither the Protocol nor concrete implementations are exported yet. The catalogue lands in Phase 1+ together with the rest of the resource implementations. Until then, applications that need a temp-dir resource inject their own object satisfying the shape below; plugins read it through ctx.resources.tmpdir.

:::

Planned interface (Phase 1+)

from pathlib import Path
from typing import Protocol


class TempDir(Protocol):
path: Path
def create_file(self, name: str, *, suffix: str = "") -> Path: ...
def create_subdir(self, name: str) -> Path: ...

Custom resources

Applications register their own resources through ResourceRegistry:

resources.register("postgres", await create_pg_pool())
resources.register("tenant_registry", TenantRegistryImpl(...))
resources.register("rate_limiter", RateLimiter(...))

A plugin declares them in the manifest (resources.required / resources.optional) and then reads them via ctx.resources.postgres, ctx.resources.tenant_registry, and so on.

The naming of custom resources is up to the application; the convention is snake_case.

Lifecycle

  1. Application startup: the host creates a ResourceRegistry and populates it with standard and custom resources.
  2. PluginContext build: the host attaches the populated ResourceRegistry to a PluginContext and passes the context to await registry.setup_all(ctx).
  3. Per-plugin setup(): the plugin reads the resources it needs from ctx.resources.*.
  4. Application running: the plugin uses the resources inside its hooks.
  5. Per-plugin teardown(): the plugin releases whatever it built on top of the resources (but not the resources themselves — those belong to the host).
  6. Application shutdown: the host closes the resources (DB pools, HTTP clients) only after every teardown() call has completed.

Requirements for resource implementations

  • Thread-safe / goroutine-safe. Resources are singletons on the host and are accessed by several plugins concurrently.
  • Specified by a protocol. The interface must be formally specified.
  • Idempotently closeable. Calling close a second time must not raise.

See also