Pipelines & lifecycle
This page covers the full Pipeline, PipelineFeeder, PipelineRegistry, and PipelineHandle APIs, plus the shutdown/drain lifecycle.
Pipeline
A Pipeline<F> is an ordered chain of Component<F> instances. When you call .run(), the framework:
- Binds — creates unbounded
mpscchannels between each pair of adjacent components. - Spawns — launches each component as a separate Tokio task.
- Returns
Vec<JoinHandle<()>>you can await.
Builder API
#![allow(unused)] fn main() { use remotia::pipeline::Pipeline; let handles = Pipeline::new() .link(capture_component) .link(encoder_component) .link(transmission_component) .tag("main") .run(); }
| Method | Description |
|---|---|
Pipeline::new() | Create an empty pipeline |
Pipeline::singleton(component) | Create a one-component pipeline |
.link(component) | Append a component |
.tag(name) | Set a tag for log messages |
.feedable() | Mark the pipeline to accept external DTOs (call before .run()) |
.get_feeder() | Get a PipelineFeeder for injecting DTOs (pipeline must be feedable) |
.get_handle() | Get a PipelineHandle for requesting shutdown |
.shutdown_signal() | Get the Arc<AtomicBool> shutdown signal |
.run() | Bind channels, spawn tasks, return join handles |
PipelineFeeder
A PipelineFeeder<F> lets you inject DTOs into the head of a feedable pipeline from external code.
#![allow(unused)] fn main() { let mut pipeline = Pipeline::new() .link(component) .feedable(); let feeder = pipeline.get_feeder(); // Later, from any task: feeder.feed(my_dto); // panics if the channel is closed }
PipelineFeeder is Clone-safe — you can share it across tasks.
PipelineHandle
A PipelineHandle lets you request graceful shutdown of a pipeline.
#![allow(unused)] fn main() { let mut pipeline = Pipeline::new().link(component).tag("main"); let handle = pipeline.get_handle(); let handles = pipeline.run(); // From another task: handle.request_shutdown(); }
When request_shutdown() is called, the shared AtomicBool is set. Every component in the pipeline checks this signal each iteration and exits when it becomes true.
PipelineHandle is Clone — multiple callers can hold a handle to the same pipeline.
PipelineRegistry
PipelineRegistry<F, K> manages multiple named pipelines and runs them together. This is the standard way to launch a multi-pipeline architecture.
Builder API
#![allow(unused)] fn main() { use remotia::pipeline::registry::PipelineRegistry; let mut registry = PipelineRegistry::<MyDto, &str>::new(); registry.register("main", main_pipeline); registry.register("errors", error_pipeline); registry.run().await; // runs all pipelines, blocks until all finish }
| Method | Description |
|---|---|
PipelineRegistry::new() | Create an empty registry |
.register(id, pipeline) | Insert a pipeline with a key |
.register_empty(id) | Insert an empty pipeline |
.get(&id) / .get_mut(&mut id) | Access a pipeline by key |
.lazy_handle(id) | Get a PipelineHandle for a pipeline (works even before .run()) |
.run() | Bind and spawn all pipelines, await all tasks |
Connecting pipelines with switches
When using a registry, you typically get feeders from destination pipelines before calling .run():
#![allow(unused)] fn main() { let mut registry = PipelineRegistry::<MyDto, &str>::new(); let error_pipeline = Pipeline::new() .link(error_component) .tag("errors"); registry.register("errors", error_pipeline); let error_switch = { let error_pipe = registry.get_mut(&"errors"); OnErrorSwitch::new(error_pipe).detect(MyError::Timeout) }; let main_pipeline = Pipeline::new() .link(capture_component.append(ticker).append(error_switch)) .tag("main"); registry.register("main", main_pipeline); registry.run().await; }
Lifecycle: shutdown and drain
Understanding the component lifecycle is important for graceful shutdown.
Normal operation
Component receives DTO from input channel
→ passes through processors
→ sends result to output channel
→ repeats
Shutdown signal
When PipelineHandle::request_shutdown() is called, the shared AtomicBool is set. On the next iteration, each component observes the signal and breaks out of its loop.
Drain mode
If a component's input channel is closed (e.g. the upstream component has exited), the component enters drain mode:
- It stops reading from the channel (it will always be
None). - It continues the loop, yielding
F::default()as the DTO, and running processors. - When a processor returns
None(frame consumed), the component checks if it should exit. - If the input channel was closed and the last processor returned
None, the component exits.
This ensures that in-flight DTOs in downstream components can finish processing before shutdown.
Component without an input channel
A component with no receiver (e.g. the head of a non-feedable pipeline that generates its own DTOs) relies solely on the shutdown signal. It checks is_shutdown() each iteration and exits when the signal fires.
Channel binding details
When .run() is called on a pipeline that has not been bound, the framework automatically creates an unbounded mpsc channel between each pair of adjacent components:
Component[i] ──sender──▶ channel ──receiver──▶ Component[i+1]
The last component has no sender; the first component has no receiver unless the pipeline is feedable (in which case the feeder's sender is wired to the first component's receiver).