Skip to main content

Writing a plugin

In this guide you will write a chunker plugin that splits a text file into fragments of a fixed length. Along the way we cover the plugin folder layout, the manifest, the implementation, local verification, and the contract test suite.

Folder layout

Create a folder with two files inside:

plugins/
└── fixed-chunker/
├── dagstack.toml
└── plugin.py

The folder name should match the plugin name in the manifest — this is not a strict technical requirement but a convention adopted across the ecosystem.

Manifest

Declare the plugin as a chunker-kind implementation for the in_process runtime. Declare the minimum set of resources: only the clock (for timestamps in tracing).

plugins/fixed-chunker/dagstack.toml
[plugin]
name = "fixed"
kind = "chunker"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
entry_point = "plugin:FixedChunker"
version = "0.1.0"
description = "Fixed chunker — splits text into pieces of equal length."
execution_model = "sync" # hook execution style: sync | async | thread_cpu_bound | process_cpu_bound

[plugin.resources]
required = ["clock"]

Implementation

The plugin implements the chunker contract: a chunk(text: str, max_size: int) -> list[Chunk] method. In its minimal form, three lifecycle methods (setup, teardown) plus the main chunk method are sufficient.

plugins/fixed-chunker/plugin.py
from dataclasses import dataclass
from dagstack.plugin_system import PluginContext


@dataclass
class Chunk:
text: str
offset: int


class FixedChunker:
"""Splits the input text into fragments of exactly max_size characters."""

async def setup(self, context: PluginContext) -> None:
self._clock = context.resources.clock

def chunk(self, text: str, max_size: int = 512) -> list[Chunk]:
if max_size <= 0:
raise ValueError("max_size must be positive")
return [
Chunk(text=text[i : i + max_size], offset=i)
for i in range(0, len(text), max_size)
]

async def teardown(self) -> None:
pass

Local verification

Make sure the plugin is visible to the registry and passes a basic smoke check:

import asyncio
import logging
from dagstack.plugin_system import PluginContext, PluginRegistry

async def main() -> None:
registry = PluginRegistry()
registry.discover("plugins/")
ctx = PluginContext(
config={},
logger=logging.getLogger("smoke"),
registry=registry,
)
await registry.setup_all(ctx)
try:
chunker = registry.get_plugin("chunker", name="fixed")
pieces = chunker.chunk("Hello, dagstack!", max_size=5)
assert [p.text for p in pieces] == ["Hello", ", dag", "stack", "!"]
finally:
await registry.teardown_all()

asyncio.run(main())

Contract tests

Contract tests verify that the plugin honours the eight runtime invariants (see ADR-0003). Run them in every plugin's CI pipeline.

from dagstack.plugin_system import (
load_manifest,
run_contract_suite,
ALL_CHECKS,
)
from plugins.fixed_chunker.plugin import FixedChunker


def test_contract():
manifest = load_manifest("plugins/fixed-chunker/dagstack.toml")
result = run_contract_suite(
plugin_class=FixedChunker,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()

Failed checks return a structured report identifying the violated invariant. The most frequent findings are:

  • AmbientStateViolation — the plugin reads from or writes to global state (module-level variables, singletons outside the registry). Move state into Resources.
  • SerializationError — the plugin's input or output is not JSON-serialisable. Use only dict/list/str/int/float/bool/None at the call boundary.
  • LeakDetectedteardown does not release resources that setup allocated (file descriptors, network connections).
  • DeterminismError — the plugin produces different output for the same input without an explicit resource (Clock, Rng).

What's next

  • Plugin configuration — how a plugin receives its settings from dagstack/config.
  • Lifecycle — details of setup, teardown, and dependencies between plugins.
  • Plugin registry — how plugins combine into a single runtime.