API Buffer Source/Sink#
Status: implementation plan for the keyed in-memory buffer MVP.
Goal#
MoDaCor should be able to run as a correction engine for externally chunked
data while keeping the normal IoSource and IoSink contracts. The external
pipeline remains responsible for unpacking, chunk selection, retry policy, and
final repacking. MoDaCor receives the data currently available in a session
buffer, processes it through a normal pipeline, and writes selected results back
to a session buffer.
Data Model#
The runtime buffer is session-scoped and keyed by:
(session_id, kind, ref, data_key)
where kind is either source or sink, ref is the registered source/sink
reference, and data_key is the same key passed through IoSource.get_data()
or IoSink.write() subpaths.
Array entries are stored internally as numpy.ndarray. Metadata and attributes
are JSON-compatible Python values. Source keys are exact keys, for example:
sample/signal/signal
sample/signal/weights
sample/signal/uncertainties/poisson
sample/signal@units
sample/mask/signal
sample/Q/signal
sample/Psi/signal
Wire Format#
The MVP supports .npy for array upload/download and JSON for metadata.
.npy preserves array shape, signed/unsigned integer dtypes, floating dtypes,
and array order metadata.
JSON/base64 array transfer is intentionally deferred. The buffer store and
buffer IO classes only work with numpy.ndarray; codecs live at the API
boundary so additional wire formats can be added later.
Runtime Registration#
Buffer IO is registered like other runtime IO:
{
"sources": [
{"ref": "chunk_input", "type": "buffer", "location": "buffer://session"}
],
"sinks": [
{"ref": "chunk_output", "type": "buffer", "location": "buffer://session"}
]
}
The location value is only a placeholder for registration parity. The actual
storage is the runtime session buffer.
Example Pipeline#
steps:
load_signal:
module: AppendProcessingData
requires_steps: []
configuration:
processing_key: sample
databundle_output_key: signal
signal_location: "chunk_input::sample/signal/signal"
units_location: "chunk_input::sample/signal/signal@units"
weights_location: "chunk_input::sample/signal/weights"
uncertainties_sources:
poisson: "chunk_input::sample/signal/uncertainties/poisson"
rank_of_data: 2
load_mask:
module: AppendProcessingData
requires_steps: []
configuration:
processing_key: sample
databundle_output_key: mask
signal_location: "chunk_input::sample/mask/signal"
units_override: dimensionless
rank_of_data: 2
export:
module: SinkProcessingData
requires_steps:
- corrected_step
configuration:
target: "chunk_output::current"
data_paths:
- /sample/corrected
Masks remain separate BaseData entries because BaseData has no mask
attribute and existing modules look for mask/Mask entries in a
DataBundle. Generic axes attachment is deferred for the MVP; axes can be
loaded as separate BaseData entries.
API#
Array upload:
PUT /v1/sessions/{session_id}/buffers/sources/{source_ref}/arrays/{data_key:path}
Content-Type: application/x-npy
Array attributes:
PUT /v1/sessions/{session_id}/buffers/sources/{source_ref}/attrs/{data_key:path}
Content-Type: application/json
Standalone metadata:
PUT /v1/sessions/{session_id}/buffers/sources/{source_ref}/metadata/{data_key:path}
Content-Type: application/json
The metadata payload is an object containing value.
Sink array fetch:
GET /v1/sessions/{session_id}/buffers/sinks/{sink_ref}/arrays/{data_key:path}
Accept: application/x-npy
Manifest:
GET /v1/sessions/{session_id}/buffers/{kind}/{ref}/manifest
Clear:
DELETE /v1/sessions/{session_id}/buffers
DELETE /v1/sessions/{session_id}/buffers/sinks/{sink_ref}
DELETE /v1/sessions/{session_id}/buffers/sinks/{sink_ref}/arrays/{data_key:path}
The general clear endpoint supports optional kind, ref, and data_key
query parameters.
Memory Behavior#
The sink buffer uses latest-only retention. Writing the same sink key overwrites the previous value.
Chunk clients should call /process with:
{
"mode": "partial",
"changed_keys": ["sample.signal"],
"rollback_snapshot": false
}
This avoids deep-copying large chunk-derived ProcessingData during partial
execution. Existing clients keep the previous rollback behavior because the
default remains rollback_snapshot: true.
Limitations#
Buffer data is in-process memory only and is lost on server restart.
MoDaCor does not schedule chunks or repack final arrays.
One active run per session remains unchanged.
Parallel chunk processing should use separate sessions in the MVP.
JSON/base64 array transfer and generic axes attachment are deferred.