Runtime API IoSink Configuration#
Design note for runtime-service IoSink configuration with the same
registration model already used for IoSource.
Implementation status: implemented in this branch.
Pre-Implementation State#
The runtime API stores source registrations on each session and rebuilds an
IoSources registry when processing starts. The flow is:
PipelineSession.sourcesstores normalized registration dictionaries.RuntimeService.upsert_sources(...)andpatch_source(...)expose the HTTP behavior through FastAPI routes.build_sources_from_session(...)adapts session state tobuild_sources_from_specs(...).run_pipeline_job(...)receives the resultingIoSourcesregistry.
The sink infrastructure already exists but is only partially connected to the runtime API:
IoSinksandIoSinkmirror the source registry and base class contracts.run_pipeline_job(...)already accepts an optionalsinksregistry and assigns it to every process step.build_sinks_from_specs(...)exists, but currently supports only CSV sinks.The runtime service never stores session sinks, never builds an
IoSinksregistry from session state, and never passes sinks intorun_pipeline_job.Process-level
write_hdfis a separate artifact export shortcut usingHDFProcessingSink; it is not currently a registered session sink.
Goal#
Allow clients to register, inspect, update, and delete output sinks through the runtime API in the same shape as source registration:
{
"sinks": [
{
"ref": "export_csv",
"type": "csv",
"location": "/data/out/current.csv",
"kwargs": {"delimiter": ","}
}
]
}
Pipelines can then use SinkProcessingData or other sink-aware steps with
targets such as export_csv:: while the concrete file path remains
runtime-configurable.
API#
The implementation adds these endpoints under the existing
/v1/sessions/{session_id} resource:
PUT /sinks: upsert one or more sink registrations.POST /sinks/patch: convenience upsert for one sink registration.DELETE /sinks/{ref}: remove one sink registration.
Session detail responses should include both:
sources: current source registrations.sinks: current sink registrations.
Use the same normalized registration shape for sinks as for sources:
ref: stable runtime reference used before::.type: built-in type orcustom.location: resource location passed to the sink constructor.kwargs: optional sink-specific keyword configuration.
Built-in sink types:
csv->modacor.io.csv.csv_sink.CSVSinkhdf->modacor.io.hdf.hdf_processing_sink.HDFProcessingSinkhdf_processing->HDFProcessingSinkalias for claritycustom-> class selected bykwargs.class_path
For custom sinks, mirror custom source behavior: kwargs.class_path is consumed
as the fully qualified import path, and the remaining kwargs are forwarded as
sink method kwargs.
Runtime Behavior#
At process start, build sources and sinks from the current session snapshot:
Parse the pipeline YAML.
Build
IoSourcesfromsession.sources.Build
IoSinksfromsession.sinks.Call
run_pipeline_job(..., sources=sources, sinks=sinks, ...).
The auto-fallback path must reuse the same built sink registry when a failed partial run falls back to a full run.
write_hdf should remain supported for now. It is a process-level artifact
shortcut that adds pipeline YAML/spec and trace metadata. Registered HDF sinks
serve pipeline steps that explicitly write through IoSinks; they should not
replace write_hdf in this enhancement.
API-registered HDF sinks may optionally request runtime metadata, but metadata
inclusion must not be the default. The process-level write_hdf path remains
the main artifact export path and continues to include pipeline and trace
metadata by default when requested. Registered HDF sinks should only receive
pipeline/trace metadata when their sink registration explicitly opts in through
sink-specific kwargs. Because in-pipeline sink writes happen before the complete
run result exists, complete trace metadata is naturally better served by
write_hdf; any HDF-sink metadata opt-in should document exactly which metadata
is available at the time of the write.
If an API-registered sink has the same ref as a pipeline AppendSink step, the
existing AppendSink behavior means the pre-registered sink wins because
AppendSink skips refs that are already present. This mirrors the current
AppendSource pattern and keeps runtime-provided I/O definitions authoritative.
Sink writes are external side effects. A failed partial run can restore
ProcessingData, but it cannot roll back files already written by a sink step.
This is already true for pipeline-defined sinks; the API feature should document
the same non-transactional behavior rather than hiding it.
Implementation Summary#
Extend session state.
Add PipelineSession.sinks alongside sources. Add SessionManager methods
for upsert/delete sink registrations. Prefer a small shared registration
normalization helper so source and sink handling stay aligned and malformed
payload errors are consistently reported.
Extend runtime I/O helpers.
Add build_sinks_from_session(session) to src/modacor/server/io_utils.py and
use existing modacor.io.runtime_support.build_sinks_from_specs(...).
Make sink specs as capable as source specs.
Update build_sinks_from_specs(...) to use a type map and support custom
with kwargs.class_path. Forward kwargs.get("iosink_method_kwargs", kwargs)
to mirror source builder behavior. For HDF sinks, keep any metadata inclusion
explicit via kwargs; do not silently add pipeline or trace metadata.
Wire runtime execution.
Update RuntimeService.process(...), _execute_process_run(...),
_handle_process_failure(...), and _run_auto_fallback(...) so both the
normal execution path and auto fallback receive the configured IoSinks.
Add API methods and routes.
Add service methods matching the source methods: upsert_sinks,
patch_sink, and delete_sink. Add FastAPI routes under
src/modacor/server/api.py.
Update CLI wrappers.
Add modacor session set-sink and modacor session delete-sink for parity
with set-source and delete-source. Keep modacor run --csv-sink unchanged.
Update public docs.
Updated public docs:
docs/pipeline_operations/runtime_service_api.mddocs/pipeline_operations/runtime_service_openapi.yamldocs/pipeline_operations/pipeline_basics.mddocs/extending/io_source_sink_guide.mddocs/getting_started/cli_and_runner.md
Test Coverage#
Focused unit tests cover:
build_sinks_from_specs(...)builds CSV and HDF sinks.build_sinks_from_specs(...)supportscustomsinks withkwargs.class_path.unsupported sink types fail with a clear
ValueError.nested
iosink_method_kwargsand flat kwargs both work.SessionManagercan upsert/delete sinks without disturbing sources.
Service-level tests cover:
session detail includes
sinks.upsert_sinks,patch_sink, anddelete_sinkmirror source behavior.RuntimeService.process(...)passes configured sinks torun_pipeline_job.auto fallback also receives configured sinks.
a seeded partial run with
SinkProcessingDatawrites to an API-registered CSV sink.
CLI tests cover:
modacor session set-sinksendsPUT /v1/sessions/{id}/sinks.modacor session delete-sinksendsDELETE /v1/sessions/{id}/sinks/{ref}.
Verification commands:
targeted runtime/sink tests first,
server API tests with FastAPI available,
full project tests if the environment has the required dependencies.
Targeted check on 2026-05-06:
./.venv-dev/bin/python -m pytest \
tests/io/test_runtime_support.py \
tests/server/test_io_utils.py \
tests/server/test_api_e2e.py \
tests/server/test_session_manager.py \
tests/modules/base_modules/test_append_sink.py \
tests/modules/base_modules/test_sink_processing_data.py \
tests/io/hdf/test_hdf_processing_sink.py \
tests/test_cli.py
Result: 42 passed.
Full check on 2026-05-06:
./.venv-dev/bin/python -m pytest
Result: 425 passed, with 3 existing numerical runtime warnings.
Resolved Decisions#
Should the first implementation include the CLI session wrappers?
Decision: yes. They are small and keep session source/sink ergonomics consistent.
Should API-registered HDF sinks include trace and pipeline metadata automatically?
Decision: no by default. API-registered HDF sinks may optionally include
pipeline/trace metadata through explicit sink configuration, but the main
write_hdf path remains responsible for the default metadata-rich HDF artifact.
Should sink changes participate in partial-run invalidation?
Decision: not in the first implementation. Sink-only re-export is not a main
use case, and full invalidation is acceptable for now. A dedicated
changed_sinks field can be designed later if sink-only re-export becomes an
operational requirement.