Writing a plugin
In this guide you will write a chunker plugin that splits a text file into fragments of a fixed length. Along the way we cover the plugin folder layout, the manifest, the implementation, local verification, and the contract test suite.
Folder layout
Create a folder with two files inside:
plugins/
└── fixed-chunker/
├── dagstack.toml
└── plugin.py
The folder name should match the plugin name in the manifest — this is not a strict technical requirement but a convention adopted across the ecosystem.
Manifest
Declare the plugin as a chunker-kind implementation for the in_process runtime. Declare the minimum set of resources: only the clock (for timestamps in tracing).
[plugin]
name = "fixed"
kind = "chunker"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
entry_point = "plugin:FixedChunker"
version = "0.1.0"
description = "Fixed chunker — splits text into pieces of equal length."
execution_model = "sync" # hook execution style: sync | async | thread_cpu_bound | process_cpu_bound
[plugin.resources]
required = ["clock"]
Implementation
The plugin implements the chunker contract: a chunk(text: str, max_size: int) -> list[Chunk] method. In its minimal form, three lifecycle methods (setup, teardown) plus the main chunk method are sufficient.
- Python
- TypeScript
- Go
from dataclasses import dataclass
from dagstack.plugin_system import PluginContext
@dataclass
class Chunk:
text: str
offset: int
class FixedChunker:
"""Splits the input text into fragments of exactly max_size characters."""
async def setup(self, context: PluginContext) -> None:
self._clock = context.resources.clock
def chunk(self, text: str, max_size: int = 512) -> list[Chunk]:
if max_size <= 0:
raise ValueError("max_size must be positive")
return [
Chunk(text=text[i : i + max_size], offset=i)
for i in range(0, len(text), max_size)
]
async def teardown(self) -> None:
pass
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package fixed
import (
"context"
"errors"
pluginsystem "go.dagstack.dev/plugin-system"
)
type Chunk struct {
Text string
Offset int
}
type FixedChunker struct {
resources pluginsystem.Resources
}
func (c *FixedChunker) Setup(ctx context.Context, pluginCtx *pluginsystem.PluginContext) error {
c.resources = pluginCtx.Resources
return nil
}
func (c *FixedChunker) Chunk(text string, maxSize int) ([]Chunk, error) {
if maxSize <= 0 {
return nil, errors.New("maxSize must be positive")
}
chunks := make([]Chunk, 0, len(text)/maxSize+1)
for i := 0; i < len(text); i += maxSize {
end := i + maxSize
if end > len(text) {
end = len(text)
}
chunks = append(chunks, Chunk{Text: text[i:end], Offset: i})
}
return chunks, nil
}
func (c *FixedChunker) Teardown(ctx context.Context) error {
return nil
}
Local verification
Make sure the plugin is visible to the registry and passes a basic smoke check:
- Python
- TypeScript
- Go
import asyncio
import logging
from dagstack.plugin_system import PluginContext, PluginRegistry
async def main() -> None:
registry = PluginRegistry()
registry.discover("plugins/")
ctx = PluginContext(
config={},
logger=logging.getLogger("smoke"),
registry=registry,
)
await registry.setup_all(ctx)
try:
chunker = registry.get_plugin("chunker", name="fixed")
pieces = chunker.chunk("Hello, dagstack!", max_size=5)
assert [p.text for p in pieces] == ["Hello", ", dag", "stack", "!"]
finally:
await registry.teardown_all()
asyncio.run(main())
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package main
import (
"context"
"fmt"
"log/slog"
pluginsystem "go.dagstack.dev/plugin-system"
fixed "github.com/your-org/plugins/fixed-chunker"
)
func main() {
ctx := context.Background()
entries, err := pluginsystem.Discover("plugins/")
if err != nil {
panic(err)
}
reg := pluginsystem.NewRegistry()
for _, entry := range entries {
// The discoverer reads the manifest; the host binds an instance.
plugin := buildPlugin(entry.Manifest)
if err := reg.RegisterManifest(entry.Manifest, plugin); err != nil {
panic(err)
}
}
pluginCtx := &pluginsystem.PluginContext{
Logger: slog.Default(),
Registry: reg,
}
if err := reg.SetupAll(ctx, pluginCtx); err != nil {
panic(err)
}
defer reg.TeardownAll(ctx)
disp := pluginsystem.NewDispatchSingleton[*fixed.FixedChunker](reg, "chunker")
chunker, err := disp.Resolve()
if err != nil {
panic(err)
}
pieces, _ := chunker.Chunk("Hello, dagstack!", 5)
for _, p := range pieces {
fmt.Println(p.Text)
}
// pieces = ["Hello", ", dag", "stack", "!"]
}
Contract tests
Contract tests verify that the plugin honours the eight runtime invariants (see ADR-0003). Run them in every plugin's CI pipeline.
- Python
- TypeScript
- Go
from dagstack.plugin_system import (
load_manifest,
run_contract_suite,
ALL_CHECKS,
)
from plugins.fixed_chunker.plugin import FixedChunker
def test_contract():
manifest = load_manifest("plugins/fixed-chunker/dagstack.toml")
result = run_contract_suite(
plugin_class=FixedChunker,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
:::info Contract suite is Python-only in Phase 1
The full contract suite (run_contract_suite, assert_lifecycle_clean, assert_orchestration_neutral, assert_deterministic, etc.) ships only in dagstack-plugin-system (Python) for the v0.1 line. The Go binding will gain a parallel suite in Phase 2; until then, validate Go plugins via standard go test against the same scenarios.
:::
package fixed_test
import (
"testing"
pluginsystem "go.dagstack.dev/plugin-system"
)
func TestManifestValid(t *testing.T) {
manifest, err := pluginsystem.LoadManifestFromFile("plugins/fixed-chunker/dagstack.toml")
if err != nil {
t.Fatal(err)
}
if err := manifest.Validate(); err != nil {
t.Fatalf("manifest validation failed: %v", err)
}
}
Failed checks return a structured report identifying the violated invariant. The most frequent findings are:
AmbientStateViolation— the plugin reads from or writes to global state (module-level variables, singletons outside the registry). Move state intoResources.SerializationError— the plugin's input or output is not JSON-serialisable. Use onlydict/list/str/int/float/bool/Noneat the call boundary.LeakDetected—teardowndoes not release resources thatsetupallocated (file descriptors, network connections).DeterminismError— the plugin produces different output for the same input without an explicit resource (Clock,Rng).
What's next
- Plugin configuration — how a plugin receives its settings from
dagstack/config. - Lifecycle — details of
setup,teardown, and dependencies between plugins. - Plugin registry — how plugins combine into a single runtime.