Перейти к основному содержимому

Как написать плагин

В этом руководстве вы напишете плагин-чанкер, который разбивает текстовый файл на фрагменты фиксированной длины. По ходу будут раскрыты: структура папки плагина, манифест, реализация, локальная проверка и контрактные тесты.

Структура папки

Создайте папку и два файла в ней:

plugins/
└── fixed-chunker/
├── dagstack.toml
└── plugin.py

Имя папки должно совпадать с именем плагина в манифесте — это не обязательное техническое требование, но соглашение, принятое в экосистеме.

Манифест

Объявите плагин как реализацию вида chunker для среды in_process. Объявите минимальный набор ресурсов: только часы (для меток времени в трассировке).

plugins/fixed-chunker/dagstack.toml
[plugin]
name = "fixed"
kind = "chunker"
runtime = "in_process"
core_version = ">=0.1.0,<1.0.0"
entry_point = "plugin:FixedChunker"
version = "0.1.0"
description = "Фиксированный чанкер — разбивает текст на куски равной длины."
execution_model = "sync" # стиль исполнения хуков: sync | async | thread_cpu_bound | process_cpu_bound

[plugin.resources]
required = ["clock"]

Реализация

Плагин реализует контракт вида chunker: метод chunk(text: str, max_size: int) -> list[Chunk]. В минимальной форме достаточно трёх методов жизненного цикла (setup, teardown) и основного chunk.

plugins/fixed-chunker/plugin.py
from dataclasses import dataclass
from dagstack.plugin_system import PluginContext


@dataclass
class Chunk:
text: str
offset: int


class FixedChunker:
"""Splits the input text into fragments of exactly max_size characters."""

async def setup(self, context: PluginContext) -> None:
self._clock = context.resources.clock

def chunk(self, text: str, max_size: int = 512) -> list[Chunk]:
if max_size <= 0:
raise ValueError("max_size must be positive")
return [
Chunk(text=text[i : i + max_size], offset=i)
for i in range(0, len(text), max_size)
]

async def teardown(self) -> None:
pass

Локальная проверка

Убедитесь, что плагин виден реестру и проходит базовую smoke-проверку:

import asyncio
import logging
from dagstack.plugin_system import PluginContext, PluginRegistry

async def main() -> None:
registry = PluginRegistry()
registry.discover("plugins/")
ctx = PluginContext(
config={},
logger=logging.getLogger("smoke"),
registry=registry,
)
await registry.setup_all(ctx)
try:
chunker = registry.get_plugin("chunker", name="fixed")
pieces = chunker.chunk("Hello, dagstack!", max_size=5)
assert [p.text for p in pieces] == ["Hello", ", dag", "stack", "!"]
finally:
await registry.teardown_all()

asyncio.run(main())

Контрактные тесты

Контрактные тесты проверяют, что плагин соблюдает восемь runtime-инвариантов (см. ADR-0003). Запускайте их в CI каждого плагина.

from dagstack.plugin_system import (
load_manifest,
run_contract_suite,
ALL_CHECKS,
)
from plugins.fixed_chunker.plugin import FixedChunker


def test_contract():
manifest = load_manifest("plugins/fixed-chunker/dagstack.toml")
result = run_contract_suite(
plugin_class=FixedChunker,
manifest=manifest,
checks=ALL_CHECKS,
)
assert result.ok, result.format_failures()

Проваленные проверки возвращают структурированный отчёт с указанием нарушенного инварианта. Наиболее частые замечания:

  • AmbientStateViolation — плагин читает/пишет глобальное состояние (переменные модуля, синглтоны вне реестра). Выносите состояние в Resources.
  • SerializationError — вход или выход плагина не сериализуется в JSON. Используйте только dict/list/str/int/float/bool/None на границе вызова.
  • LeakDetectedteardown не освобождает ресурсы, которые создал setup (файловые дескрипторы, сетевые соединения).
  • DeterminismError — плагин производит разный результат на одних и тех же входных данных без явного ресурса (Clock, Rng).

Что дальше