Как написать плагин
В этом руководстве вы напишете плагин-чанкер, который разбивает текстовый файл на фрагменты фиксированной длины. По ходу будут раскрыты: структура папки плагина, манифест, реализация, локальная проверка и контрактные тесты.
Структура папки
Создайте папку и два файла в ней:
plugins/
└── fixed-chunker/
├── dagstack.toml
└── plugin.py
Имя папки должно совпадать с именем плагина в манифесте — это не обязательное техническое требование, но соглашение, принятое в экосистеме.
Манифест
Объявите плагин как реализацию вида chunker для среды in_process. Объявите минимальный набор ресурсов: только часы (для меток времени в трассировке).
[plugin]
name = "fixed"
kind = "chunker"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
entry_point = "plugin:FixedChunker"
version = "0.1.0"
description = "Фиксированный чанкер — разбивает текст на куски равной длины."
execution_model = "sync" # стиль исполнения хуков: sync | async | thread_cpu_bound | process_cpu_bound
[plugin.resources]
required = ["clock"]
Реализация
Плагин реализует контракт вида chunker: метод chunk(text: str, max_size: int) -> list[Chunk]. В минимальной форме достаточно трёх методов жизненного цикла (setup, teardown) и основного chunk.
- Python
- TypeScript
- Go
from dataclasses import dataclass
from dagstack.plugin_system import PluginContext
@dataclass
class Chunk:
text: str
offset: int
class FixedChunker:
"""Splits the input text into fragments of exactly max_size characters."""
async def setup(self, context: PluginContext) -> None:
self._clock = context.resources.clock
def chunk(self, text: str, max_size: int = 512) -> list[Chunk]:
if max_size <= 0:
raise ValueError("max_size must be positive")
return [
Chunk(text=text[i : i + max_size], offset=i)
for i in range(0, len(text), max_size)
]
async def teardown(self) -> None:
pass
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package fixed
import (
"context"
"errors"
pluginsystem "go.dagstack.dev/plugin-system"
)
type Chunk struct {
Text string
Offset int
}
type FixedChunker struct {
resources pluginsystem.Resources
}
func (c *FixedChunker) Setup(ctx context.Context, pluginCtx *pluginsystem.PluginContext) error {
c.resources = pluginCtx.Resources
return nil
}
func (c *FixedChunker) Chunk(text string, maxSize int) ([]Chunk, error) {
if maxSize <= 0 {
return nil, errors.New("maxSize must be positive")
}
chunks := make([]Chunk, 0, len(text)/maxSize+1)
for i := 0; i < len(text); i += maxSize {
end := i + maxSize
if end > len(text) {
end = len(text)
}
chunks = append(chunks, Chunk{Text: text[i:end], Offset: i})
}
return chunks, nil
}
func (c *FixedChunker) Teardown(ctx context.Context) error {
return nil
}
Локальная проверка
Убедитесь, что плагин виден реестру и проходит базовую smoke-проверку:
- Python
- TypeScript
- Go
import asyncio
import logging
from dagstack.plugin_system import PluginContext, PluginRegistry
async def main() -> None:
registry = PluginRegistry()
registry.discover("plugins/")
ctx = PluginContext(
config={},
logger=logging.getLogger("smoke"),
registry=registry,
)
await registry.setup_all(ctx)
try:
chunker = registry.get_plugin("chunker", name="fixed")
pieces = chunker.chunk("Hello, dagstack!", max_size=5)
assert [p.text for p in pieces] == ["Hello", ", dag", "stack", "!"]
finally:
await registry.teardown_all()
asyncio.run(main())
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
package main
import (
"context"
"fmt"
"log/slog"
pluginsystem "go.dagstack.dev/plugin-system"
fixed "github.com/your-org/plugins/fixed-chunker"
)
func main() {
ctx := context.Background()
entries, err := pluginsystem.Discover("plugins/")
if err != nil {
panic(err)
}
reg := pluginsystem.NewRegistry()
for _, entry := range entries {
// The discoverer reads the manifest; the host binds an instance.
plugin := buildPlugin(entry.Manifest)
if err := reg.RegisterManifest(entry.Manifest, plugin); err != nil {
panic(err)
}
}
pluginCtx := &pluginsystem.PluginContext{
Logger: slog.Default(),
Registry: reg,
}
if err := reg.SetupAll(ctx, pluginCtx); err != nil {
panic(err)
}
defer reg.TeardownAll(ctx)
disp := pluginsystem.NewDispatchSingleton[*fixed.FixedChunker](reg, "chunker")
chunker, err := disp.Resolve()
if err != nil {
panic(err)
}
pieces, _ := chunker.Chunk("Hello, dagstack!", 5)
for _, p := range pieces {
fmt.Println(p.Text)
}
// pieces = ["Hello", ", dag", "stack", "!"]
}
Контрактные тесты
Контрактные тесты проверяют, что плагин соблюдает восемь runtime-инвариантов (см. ADR-0003). Запускайте их в CI каждого плагина.
- Python
- TypeScript
- Go
from dagstack.plugin_system import (
load_manifest,
run_contract_suite,
ALL_CHECKS,
)
from plugins.fixed_chunker.plugin import FixedChunker
def test_contract():
manifest = load_manifest("plugins/fixed-chunker/dagstack.toml")
result = run_contract_suite(
plugin_class=FixedChunker,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()
:::warning TypeScript runtime ships in Phase 1
@dagstack/plugin-system@0.1.0-rc.2 exports only the spec-emitted types — VERSION, ToolV1, OrchestratorV1. The runtime (PluginRegistry, discover, dispatchers, contract suite) lands in Phase 1. Today: implement the kind contract against the published types, then host plugins through Python over mcp_stdio or wait for the Phase 1 release. See the TypeScript API reference for the planned shape.
:::
:::info Contract suite is Python-only in Phase 1
The full contract suite (run_contract_suite, assert_lifecycle_clean, assert_orchestration_neutral, assert_deterministic, etc.) ships only in dagstack-plugin-system (Python) for the v0.1 line. The Go binding will gain a parallel suite in Phase 2; until then, validate Go plugins via standard go test against the same scenarios.
:::
package fixed_test
import (
"testing"
pluginsystem "go.dagstack.dev/plugin-system"
)
func TestManifestValid(t *testing.T) {
manifest, err := pluginsystem.LoadManifestFromFile("plugins/fixed-chunker/dagstack.toml")
if err != nil {
t.Fatal(err)
}
if err := manifest.Validate(); err != nil {
t.Fatalf("manifest validation failed: %v", err)
}
}
Проваленные проверки возвращают структурированный отчёт с указанием нарушенного инварианта. Наиболее частые замечания:
AmbientStateViolation— плагин читает/пишет глобальное состояние (переменные модуля, синглтоны вне реестра). Выносите состояние вResources.SerializationError— вход или выход плагина не сериализуется в JSON. Используйте толькоdict/list/str/int/float/bool/Noneна границе вызова.LeakDetected—teardownне освобождает ресурсы, которые создалsetup(файловые дескрипторы, сетевые соединения).DeterminismError— плагин производит разный результат на одних и тех же входных данных без явного ресурса (Clock,Rng).
Что дальше
- Конфигурация плагинов — как плагин получает свои настройки из
dagstack/config. - Жизненный цикл — детали
setup,teardown, зависимостей между плагинами. - Реестр плагинов — как плагины сочетаются в единый runtime.