Introduction
Remotia is an open-source media processing and streaming framework written in Rust. While being designed with remote rendering and cloud gaming in mind, its components are versatile and can be used in a variety of contexts. Bindings and plug-and-play modules for well-established libraries such as ffmpeg and the SRT protocol are available, while more are in development and will be released soon.
Remotia is actively used for research projects of the IPLab of the University of Catania.
Bibliography
If you use remotia in your work, please cite the following paper:
@article{remotia,
title={An open source framework for video streaming in cloud gaming},
author={Catania, Lorenzo and Giudice, Oliver and Battiato, Sebastiano and Stanco, Filippo and Allegra, Dario},
journal={Multimedia Tools and Applications},
pages={1--24},
year={2025},
publisher={Springer}
}
Quickstart
Add the dependency
[dependencies.remotia]
version = "0.1.0" # Check crates.io for the latest version
default-features = false
features = [] # Enable only what you need (see Crate map)
Note: The feature flag is named
profilationfor historical reasons, but it enables profiling utilities.
A minimal pipeline
This example creates a two-pipeline architecture: a main pipeline that ticks, increments a counter, and prints it, and an overflow pipeline that handles frames when the counter exceeds a threshold.
use remotia::{ traits::{FrameError, FrameProcessor}, pipeline::{Pipeline, PipelineRegistry}, processors::{ ticker::Ticker, switch::Switch, functional::Closure, }, }; use async_trait::async_trait; // ── 1. Define the DTO ──────────────────────────────────────────────── #[derive(Debug, Default)] struct MyDto { counter: u64, error: Option<String>, } impl FrameError<String> for MyDto { fn report_error(&mut self, error: String) { self.error = Some(error); } fn get_error(&self) -> Option<String> { self.error.clone() } } // ── 2. Define a custom processor ───────────────────────────────────── struct Incrementer; #[async_trait] impl FrameProcessor<MyDto> for Incrementer { async fn process(&mut self, mut dto: MyDto) -> Option<MyDto> { dto.counter += 1; Some(dto) } } // ── 3. Build and run the pipelines ─────────────────────────────────── #[tokio::main] async fn main() { let mut registry = PipelineRegistry::<MyDto, &str>::new(); // Register the overflow pipeline first so we can get a feeder for the switch let overflow_pipeline = Pipeline::new() .link( Component::singleton( Closure::new(|dto: MyDto| { println!("overflow: counter = {}", dto.counter); None // consumed — pipeline stops processing this frame }) ) ) .tag("overflow"); registry.register("overflow", overflow_pipeline); // Build a switch that redirects to the overflow pipeline let overflow_switch = { let overflow_pipe = registry.get_mut(&"overflow"); Switch::new(overflow_pipe) }; // Build the main pipeline let main_pipeline = Pipeline::new() .link( Component::new() .append(Ticker::new(100)) // tick every 100ms .append(Incrementer) // increment counter .append(Closure::new(|dto: MyDto| { // print & check println!("main: counter = {}", dto.counter); if dto.counter > 10 { None // send to overflow via switch below } else { Some(dto) } })) .append(overflow_switch) // redirect if previous returned None .tag("main-step") ) .tag("main"); registry.register("main", main_pipeline); // Run all pipelines (blocks until all tasks complete) registry.run().await; }
What happens at runtime
- The
Tickerpaces the loop at ~10 Hz. Incrementerbumpscounterby 1 each tick.- The closure prints the counter. If
counter > 10, it returnsNone. - Returning
Nonemeans the DTO does not reach theSwitch. If you want the switch to fire instead of consuming the frame, restructure so the switch comes before the conditional return. A typical pattern is:
#![allow(unused)] fn main() { Component::new() .append(Ticker::new(100)) .append(Incrementer) .append(Closure::new(|dto: MyDto| { println!("main: counter = {}", dto.counter); Some(dto) // always pass forward })) // The switch always redirects — frame never continues past it }
To conditionally redirect, use OnErrorSwitch or implement the branching logic inside a single closure.
Next steps
- Read the Key concepts for the full trait and API reference.
- Browse the Processors catalog to find built-in processors for your pipeline.
- See Pipelines & lifecycle for shutdown, feedable pipelines, and the registry.
- Check the Crate map for feature flags and optional crates.
Key concepts
Remotia is built around four abstractions that compose into streaming pipelines. This page defines each one and shows the Rust APIs you implement and use.
Pipeline
┌──────────────────────────┐ ┌──────────────────────────┐
│ Component A │ │ Component B │
│ ┌──────────┐ ┌─────────┐│ ch │ ┌──────────┐ ┌─────────┐│
│ │Processor 1│→│Processor 2││──────▶│ │Processor 3│→│Processor 4││
│ └──────────┘ └─────────┘│ │ └──────────┘ └─────────┘│
└──────────────────────────┘ └──────────────────────────┘
- A DTO (Data Transfer Object) carries frame data and metadata through the pipeline.
- A processor performs one atomic operation on a DTO.
- A component groups processors into a single async task, connected to neighbors by channels.
- A pipeline links components into a directed chain; multiple pipelines are connected by switches.
Data Transfer Object (DTO)
The DTO is the data structure that flows through the pipeline. It typically holds frame buffers and per-frame statistics. You define your own DTO type and implement the traits that the processors in your pipeline require.
Core trait: FrameProcessor input
Every processor operates on a generic type F. Your DTO is that type F. The framework does not mandate a specific struct — you define one adapted to your use case.
Trait: FrameProperties<K, V>
Used by switches that read or write routing keys on the DTO (e.g. PoolingSwitch, DepoolingSwitch).
#![allow(unused)] fn main() { pub trait FrameProperties<K, V> { fn set(&mut self, key: K, value: V); fn get(&self, key: &K) -> Option<V>; } }
Trait: FrameError<E>
Used by OnErrorSwitch to inspect error state on the DTO.
#![allow(unused)] fn main() { pub trait FrameError<E> { fn report_error(&mut self, error: E); fn get_error(&self) -> Option<E>; } }
Other available traits
| Trait | Purpose | Used by |
|---|---|---|
PullableFrameProperties<K, V> | Push/pull semantics for properties | Advanced routing |
OptionalFrameData<D> | Access optional embedded data | Buffer utilities |
BorrowFrameProperties<K, V> | Get a reference to a property value | Read-only inspection |
BorrowMutFrameProperties<K, V> | Get a mutable reference to a property value | In-place mutation |
Minimal DTO example
#![allow(unused)] fn main() { use remotia::traits::{FrameProperties, FrameError}; #[derive(Debug, Default)] struct MyDto { buffer: Vec<u8>, frame_id: u64, error: Option<String>, } impl FrameProperties<String, u64> for MyDto { fn set(&mut self, key: String, value: u64) { if key == "frame_id" { self.frame_id = value; } } fn get(&self, key: &String) -> Option<u64> { if key == "frame_id" { Some(self.frame_id) } else { None } } } impl FrameError<String> for MyDto { fn report_error(&mut self, error: String) { self.error = Some(error); } fn get_error(&self) -> Option<String> { self.error.clone() } } }
Custom processor modules may define additional traits. Your DTO must implement them if you use those modules in your pipeline.
Processors
A processor is a single unit of work applied to a DTO. The core trait is:
#![allow(unused)] fn main() { #[async_trait] pub trait FrameProcessor<F> { async fn process(&mut self, frame_data: F) -> Option<F>; } }
Return contract:
Some(dto)— the DTO is passed to the next processor in the component.None— the DTO is consumed. The pipeline interprets this as "this frame is done here" — it may have been redirected to another pipeline (by a switch), stored, or dropped.
Processors have full ownership of the DTO while processing it. This avoids borrowing conflicts and makes the data flow explicit.
See the Processors page for the catalog of built-in processor types.
Components
A component groups an ordered sequence of processors into a single async task (a Tokio task). Each component:
- Receives a DTO from an input channel (or allocates one itself).
- Passes it through its processors sequentially.
- Sends the resulting DTO (if any) to the next component via an output channel.
Components are the unit of concurrency. By grouping processors into different components, you control which work shares a task and which runs in parallel. The framework uses unbounded mpsc channels to connect adjacent components within a pipeline.
Builder API
#![allow(unused)] fn main() { Component::new() .append(processor_a) .append(processor_b) .tag("encoder") }
Or, for a single-processor component:
#![allow(unused)] fn main() { Component::singleton(processor) }
Pipelines
A pipeline is a chain of components connected by channels. Components within a pipeline run concurrently (each is a separate Tokio task), while processors within a component run sequentially.
Builder API
#![allow(unused)] fn main() { Pipeline::new() .link(component_a) .link(component_b) .tag("main") .run() }
Or, for a single-component pipeline:
#![allow(unused)] fn main() { Pipeline::singleton(component) }
Calling .run() automatically creates the channels between adjacent components and spawns each component as a Tokio task. It returns Vec<JoinHandle<()>>.
Feedable pipelines
Mark a pipeline as .feedable() to allow external code to inject DTOs into its head:
#![allow(unused)] fn main() { let mut pipeline = Pipeline::new().link(component).feedable(); let feeder = pipeline.get_feeder(); feeder.feed(my_dto); }
Multi-pipeline architectures
Complex systems use multiple pipelines connected by switches. For example, a main streaming pipeline and a separate error-handling pipeline, linked by an OnErrorSwitch. See the Pipelines & Lifecycle page for the full API and lifecycle details.
Switches
Switches are processors that move DTOs between pipelines. Instead of returning Some(dto) to continue in the current pipeline, they send the DTO to a different pipeline and return None.
The framework provides several switch types:
| Switch | Behavior |
|---|---|
Switch | Unconditionally redirects the DTO to another pipeline |
CloneSwitch | Clones the DTO, sends the clone to another pipeline, passes the original forward |
OnErrorSwitch | Redirects the DTO to another pipeline if it carries a matching error |
PoolingSwitch | Picks a random destination from a pool and stamps the DTO with the pool key |
DepoolingSwitch | Routes the DTO to the destination matching its pool key |
See the Processors page for constructor signatures and usage details.
Processors
Processors implement the FrameProcessor<F> trait and perform a single atomic operation on the DTO. This page catalogs every built-in processor in remotia-core.
#![allow(unused)] fn main() { #[async_trait] pub trait FrameProcessor<F> { async fn process(&mut self, frame_data: F) -> Option<F>; } }
Routing switches
Switches move DTOs between pipelines. They return None (or Some with the original) so the current pipeline can continue or terminate for that frame.
Switch
Redirects the DTO into a different pipeline. Returns None — the current pipeline stops processing this frame.
#![allow(unused)] fn main() { use remotia::processors::switch::Switch; let switch = Switch::new(&mut destination_pipeline); }
Use when: branching — e.g. sending frames down an error or debug pipeline instead of the main one.
CloneSwitch
Clones the DTO, sends the clone to another pipeline, and passes the original forward. The DTO type must implement Clone.
#![allow(unused)] fn main() { use remotia::processors::clone_switch::CloneSwitch; let clone_switch = CloneSwitch::new(&mut profiling_pipeline); }
Use when: parallel side-channels — e.g. a profiling pipeline that receives every frame while the main pipeline continues uninterrupted.
OnErrorSwitch
Redirects the DTO to a destination pipeline if it carries a matching error. If no error is present (or the error does not match), the frame passes through with Some(frame). The DTO must implement FrameError<E>.
#![allow(unused)] fn main() { use remotia::processors::error_switch::OnErrorSwitch; let error_switch = OnErrorSwitch::new::<MyDto, MyError>(&mut error_pipeline) .detect(MyError::Timeout) .detect(MyError::ConnectionError); }
Use when: conditional error routing — only certain error variants are diverted.
PoolingSwitch
Picks a random destination from a registered pool, stamps the DTO with the chosen pool key (via FrameProperties), and sends it. Returns None. The DTO must implement FrameProperties<P, K>.
#![allow(unused)] fn main() { use remotia::processors::pool_switch::PoolingSwitch; let pool_switch = PoolingSwitch::<MyDto, &str, usize>::new("worker_id") .entry(0, &mut worker_pipeline_0) .entry(1, &mut worker_pipeline_1) .entry(2, &mut worker_pipeline_2); }
Use when: fan-out to a pool of workers — e.g. distributing encoding across N encoder pipelines.
DepoolingSwitch
Routes the DTO to the destination matching its pool key (read via FrameProperties). Returns None. The DTO must implement FrameProperties<P, K>.
#![allow(unused)] fn main() { use remotia::processors::pool_switch::DepoolingSwitch; let depool_switch = DepoolingSwitch::<MyDto, &str, usize>::new("worker_id") .entry(0, &mut merger_pipeline_0) .entry(1, &mut merger_pipeline_1); }
Use when: fan-in — merging results from a worker pool back into per-worker downstream pipelines.
Timing
Ticker
Waits for the configured interval, then passes the DTO forward unchanged.
#![allow(unused)] fn main() { use remotia::processors::ticker::Ticker; let ticker = Ticker::new(16); // 16 ms ≈ 60 FPS }
Use when: frame-rate pacing at the head of a capture pipeline, or throttling any component.
Inline processors
These wrap bare functions, closures, or async functions as FrameProcessor implementors. Use them for quick logic without defining a dedicated struct.
Function
Wraps a function pointer.
#![allow(unused)] fn main() { use remotia::processors::functional::Function; let proc = Function::new(|mut dto: MyDto| -> Option<MyDto> { dto.frame_id += 1; Some(dto) }); }
Closure
Wraps a capturing closure.
#![allow(unused)] fn main() { use remotia::processors::functional::Closure; let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let counter_clone = counter.clone(); let proc = Closure::new(move |dto: MyDto| -> Option<MyDto> { counter_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed); Some(dto) }); }
Convenience method on Component:
#![allow(unused)] fn main() { component.closure(|dto: MyDto| Some(dto)) }
AsyncFunction
Wraps an async function pointer. Use the async_func! macro to create the pinned future.
#![allow(unused)] fn main() { use remotia::processors::async_functional::AsyncFunction; use remotia::async_func; async fn fetch_frame(dto: MyDto) -> Option<MyDto> { // async I/O ... Some(dto) } let proc = AsyncFunction::new(|dto| async_func!(async move { fetch_frame(dto).await })); }
Containers
Sequential
Runs a sequence of processors in order inside a single component. Each processor receives the output of the previous one. If any processor returns None, the sequence stops.
#![allow(unused)] fn main() { use remotia::processors::containers::sequential::Sequential; let seq = Sequential::new() .append(processor_a) .append(processor_b) .append(processor_c); }
Use when: grouping processors that must run in the same async task — e.g. a tick-then-capture pattern, or a sequence of buffer operations that should not be split across component boundaries.
Pipelines & lifecycle
This page covers the full Pipeline, PipelineFeeder, PipelineRegistry, and PipelineHandle APIs, plus the shutdown/drain lifecycle.
Pipeline
A Pipeline<F> is an ordered chain of Component<F> instances. When you call .run(), the framework:
- Binds — creates unbounded
mpscchannels between each pair of adjacent components. - Spawns — launches each component as a separate Tokio task.
- Returns
Vec<JoinHandle<()>>you can await.
Builder API
#![allow(unused)] fn main() { use remotia::pipeline::Pipeline; let handles = Pipeline::new() .link(capture_component) .link(encoder_component) .link(transmission_component) .tag("main") .run(); }
| Method | Description |
|---|---|
Pipeline::new() | Create an empty pipeline |
Pipeline::singleton(component) | Create a one-component pipeline |
.link(component) | Append a component |
.tag(name) | Set a tag for log messages |
.feedable() | Mark the pipeline to accept external DTOs (call before .run()) |
.get_feeder() | Get a PipelineFeeder for injecting DTOs (pipeline must be feedable) |
.get_handle() | Get a PipelineHandle for requesting shutdown |
.shutdown_signal() | Get the Arc<AtomicBool> shutdown signal |
.run() | Bind channels, spawn tasks, return join handles |
PipelineFeeder
A PipelineFeeder<F> lets you inject DTOs into the head of a feedable pipeline from external code.
#![allow(unused)] fn main() { let mut pipeline = Pipeline::new() .link(component) .feedable(); let feeder = pipeline.get_feeder(); // Later, from any task: feeder.feed(my_dto); // panics if the channel is closed }
PipelineFeeder is Clone-safe — you can share it across tasks.
PipelineHandle
A PipelineHandle lets you request graceful shutdown of a pipeline.
#![allow(unused)] fn main() { let mut pipeline = Pipeline::new().link(component).tag("main"); let handle = pipeline.get_handle(); let handles = pipeline.run(); // From another task: handle.request_shutdown(); }
When request_shutdown() is called, the shared AtomicBool is set. Every component in the pipeline checks this signal each iteration and exits when it becomes true.
PipelineHandle is Clone — multiple callers can hold a handle to the same pipeline.
PipelineRegistry
PipelineRegistry<F, K> manages multiple named pipelines and runs them together. This is the standard way to launch a multi-pipeline architecture.
Builder API
#![allow(unused)] fn main() { use remotia::pipeline::registry::PipelineRegistry; let mut registry = PipelineRegistry::<MyDto, &str>::new(); registry.register("main", main_pipeline); registry.register("errors", error_pipeline); registry.run().await; // runs all pipelines, blocks until all finish }
| Method | Description |
|---|---|
PipelineRegistry::new() | Create an empty registry |
.register(id, pipeline) | Insert a pipeline with a key |
.register_empty(id) | Insert an empty pipeline |
.get(&id) / .get_mut(&mut id) | Access a pipeline by key |
.lazy_handle(id) | Get a PipelineHandle for a pipeline (works even before .run()) |
.run() | Bind and spawn all pipelines, await all tasks |
Connecting pipelines with switches
When using a registry, you typically get feeders from destination pipelines before calling .run():
#![allow(unused)] fn main() { let mut registry = PipelineRegistry::<MyDto, &str>::new(); let error_pipeline = Pipeline::new() .link(error_component) .tag("errors"); registry.register("errors", error_pipeline); let error_switch = { let error_pipe = registry.get_mut(&"errors"); OnErrorSwitch::new(error_pipe).detect(MyError::Timeout) }; let main_pipeline = Pipeline::new() .link(capture_component.append(ticker).append(error_switch)) .tag("main"); registry.register("main", main_pipeline); registry.run().await; }
Lifecycle: shutdown and drain
Understanding the component lifecycle is important for graceful shutdown.
Normal operation
Component receives DTO from input channel
→ passes through processors
→ sends result to output channel
→ repeats
Shutdown signal
When PipelineHandle::request_shutdown() is called, the shared AtomicBool is set. On the next iteration, each component observes the signal and breaks out of its loop.
Drain mode
If a component's input channel is closed (e.g. the upstream component has exited), the component enters drain mode:
- It stops reading from the channel (it will always be
None). - It continues the loop, yielding
F::default()as the DTO, and running processors. - When a processor returns
None(frame consumed), the component checks if it should exit. - If the input channel was closed and the last processor returned
None, the component exits.
This ensures that in-flight DTOs in downstream components can finish processing before shutdown.
Component without an input channel
A component with no receiver (e.g. the head of a non-feedable pipeline that generates its own DTOs) relies solely on the shutdown signal. It checks is_shutdown() each iteration and exits when the signal fires.
Channel binding details
When .run() is called on a pipeline that has not been bound, the framework automatically creates an unbounded mpsc channel between each pair of adjacent components:
Component[i] ──sender──▶ channel ──receiver──▶ Component[i+1]
The last component has no sender; the first component has no receiver unless the pipeline is feedable (in which case the feeder's sender is wired to the first component's receiver).
Crate map
Remotia is organized as a workspace of crates. The remotia umbrella crate re-exports them behind feature flags so you only compile what you need.
Core crate
| Feature flag | Crate | Purpose | Key types |
|---|---|---|---|
| (always enabled) | remotia-core | Core traits, pipeline engine, built-in processors | FrameProcessor, FrameProperties, FrameError, Pipeline, Component, PipelineFeeder, PipelineHandle, PipelineRegistry, Ticker, Switch, CloneSwitch, OnErrorSwitch, PoolingSwitch, DepoolingSwitch, Sequential, Function, Closure, AsyncFunction |
Optional crates (feature flags on the remotia crate)
| Feature flag | Crate(s) | Purpose | Key types |
|---|---|---|---|
buffers | remotia-buffer-utils, remotia-buffer-utils-macros | Buffer pool management and allocation | BuffersPool, BufferAllocator, BufferBorrower, BufferRedeemer, PoolRegistry, #[buffers_map] macro |
capture | remotia-core-capturers | Screen and file capture | ScrapFrameCapturer, Y4MFrameCapturer, Y4MRGBAFrameCapturer, yuv420_to_rgba() |
render | remotia-core-renderers | Window rendering via winit + pixels | WinitRenderer, WinitRunner |
transmission | remotia-core-transmission | TCP frame transport | TcpFrameSender, TcpFrameReceiver |
profilation | remotia-profilation-utils | Profiling, logging, frame-dropping | TimestampAdder, TimestampDiffCalculator, ProfiledSequential, ThresholdBasedFrameDropper, TimestampBasedFrameDropper, ConsoleAverageStatsLogger, CSVFrameDataSerializer, ConsoleDropReasonLogger |
serialization | remotia-serialization-utils | Bincode serialization of frame data | BincodeSerializer, BincodeDeserializer |
External crates (separate Cargo dependencies)
These live in separate repositories and are not part of the remotia umbrella crate. Add them as direct dependencies.
| Crate | Purpose | Key types |
|---|---|---|
remotia-ffmpeg-codecs | FFmpeg encoder/decoder integration via pusher/puller architecture | EncoderPusher, EncoderPuller, DecoderPusher, DecoderPuller, EncoderBuilder, DecoderBuilder, ScalerBuilder |
remotia-srt | SRT protocol sender/receiver processors | SRT sender/receiver processors |
Feature flag usage
[dependencies.remotia]
version = "0.1.0"
default-features = false
features = ["capture", "transmission"] # only what you need
All features are off by default. Enable only the ones you use to minimize compile time and dependencies.
Examples
This chapter contains worked examples that demonstrate the framework's mechanisms and design patterns.
Each example includes a step-by-step walkthrough with inline code. Full source code is available on GitHub.
Screen snapper — Capture the screen and save to disk
Periodically captures the primary display and saves each frame as a PNG file.
Full source: GitHub — screen-snapper
Architecture
Two components in a single pipeline:
Component 1: Capture Component 2: Save
┌──────────────────────────────────┐ ┌─────────────────┐
│ Ticker → BufferAllocator → │ channel │ PNGBufferSaver │
│ XCapCapturer │─────────▶│ │
└──────────────────────────────────┘ └─────────────────┘
- Component 1 paces the capture at 1 FPS, allocates a buffer, fills it with screen pixels.
- Component 2 reads the buffer and writes a PNG.
The DTO
The DTO holds a single optional BytesMut buffer identified by an enum key. It implements PullableFrameProperties so processors can pull and push the buffer by key.
#![allow(unused)] fn main() { #[derive(Default, Debug)] pub struct RecorderData { screen_buffer: Option<BytesMut>, } #[derive(Clone, Copy)] pub enum Buffers { CapturedScreenBuffer, } impl PullableFrameProperties<Buffers, BytesMut> for RecorderData { fn push(&mut self, key: Buffers, value: BytesMut) { match key { Buffers::CapturedScreenBuffer => self.screen_buffer.replace(value), }; } fn pull(&mut self, key: &Buffers) -> Option<BytesMut> { match key { Buffers::CapturedScreenBuffer => self.screen_buffer.take(), } } } }
The pull/push pattern ensures the buffer is taken out of the DTO during processing and returned after, avoiding mutable aliasing.
Pipeline construction
#![allow(unused)] fn main() { fn capturer(monitor_id: usize, height: u32, width: u32) -> Component<RecorderData> { Component::new() .append(Ticker::new(1000)) // 1 tick/second .append(BufferAllocator::new( // allocate a fresh BytesMut Buffers::CapturedScreenBuffer, height as usize * width as usize * 3, )) .append( XCapCapturer::builder() .buffer_key(Buffers::CapturedScreenBuffer) .monitor_id(monitor_id) .build(), ) } fn saver(height: u32, width: u32) -> Component<RecorderData> { Component::new().append( PNGBufferSaver::builder() .buffer_key(Buffers::CapturedScreenBuffer) .path("./screenshots/") .height(height) .width(width) .build(), ) } let pipeline = Pipeline::<RecorderData>::new() .link(capturer(monitor_id, height, width)) .link(saver(height, width)); for handle in pipeline.run() { handle.await.unwrap(); } }
Key takeaways
Tickerat the head of a component paces frame generation.BufferAllocatoris a processor that allocates a newBytesMutinto the DTO each tick — useful when the buffer size is known ahead of time.- Custom processors (
XCapCapturer,PNGBufferSaver) implementFrameProcessorand usePullableFramePropertiesto exchange buffers with the DTO.
Platform-dependant screen snapper — Handling platform-specific backends
Same capture-and-save pipeline as the screen snapper, but uses Cargo feature flags to select between X11/XCap and Wayland/libwayshot capture backends at compile time.
Full source: GitHub — platform-dependant-screen-snapper
Architecture
Identical runtime layout — two components in one pipeline. The difference is in how the capturer is selected:
Component 1: Capture Component 2: Save
┌──────────────────────────────────┐ ┌─────────────────┐
│ Ticker → BufferAllocator → │ channel │ PNGBufferSaver │
│ [xcap OR wayshot] Capturer │─────────▶│ │
└──────────────────────────────────┘ └─────────────────┘
Feature flags in Cargo.toml
[features]
default = ["wayshot"]
xcap = ["dep:xcap"]
wayshot = ["dep:libwayshot"]
The lib.rs uses compile-time guards to prevent misconfiguration:
#![allow(unused)] fn main() { #[cfg(not(any(feature = "xcap", feature = "wayshot")))] compile_error!("No snapper backend enabled"); #[cfg(all(feature = "xcap", feature = "wayshot"))] compile_error!("Compiling with both wayshot and xcap is not currently supported."); #[cfg(feature = "wayshot")] pub mod wayshot_capturer; #[cfg(feature = "xcap")] pub mod xcap_capturer; }
Runtime selection with conditional compilation
A capture.rs module provides the same interface regardless of backend:
#![allow(unused)] fn main() { #[cfg(feature = "xcap")] pub mod xcap { pub fn fetch_screen_resolution() -> (u32, u32) { xcap_utils::display_size(MONITOR_ID) } pub fn capturer_processor() -> XCapCapturer<Buffers> { XCapCapturer::builder() .buffer_key(Buffers::CapturedScreenBuffer) .monitor_id(MONITOR_ID) .build() } } #[cfg(feature = "xcap")] pub use xcap::*; #[cfg(feature = "wayshot")] pub mod libwayshot { pub fn fetch_screen_resolution() -> (u32, u32) { wayshot_utils::display_size() } pub fn capturer_processor() -> WayshotCapturer<Buffers> { WayshotCapturer::builder() .buffer_key(Buffers::CapturedScreenBuffer) .build() } } #[cfg(feature = "wayshot")] pub use libwayshot::*; }
The main.rs calls fetch_screen_resolution() and capturer_processor() without knowing which backend is active.
Building
# Default (wayshot)
cargo run --example autosnapper
# X11/XCap backend
cargo run --example autosnapper --no-default-features --features xcap
Key takeaways
- Cargo feature flags are the idiomatic Rust way to handle platform-specific dependencies. Remotia's
FrameProcessortrait makes it easy — both backends implement the same trait, so the pipeline code is backend-agnostic. compile_error!guards prevent invalid feature combinations at compile time.- The DTO and save component are identical across backends — only the capture processor changes.
Multi-pipeline with switches — Connecting pipelines
This example demonstrates the core differentiator of remotia: multiple pipelines connected by switches. A main processing pipeline routes frames to different downstream pipelines based on conditions.
Full source: GitHub — examples
Architecture
Three pipelines connected by switches:
Pipeline "main"
┌──────────────────────────────────────────┐
│ Component: │
│ Ticker → Closure(print) → CloneSwitch ──┼──▶ Pipeline "profiling"
│ → Switch ──────┼──▶ Pipeline "errors" (if error)
└──────────────────────────────────────────┘
- Main pipeline: generates frames, prints their state, clones each frame to the profiling pipeline, and conditionally switches errored frames to the error pipeline.
- Profiling pipeline: receives a clone of every frame and logs statistics.
- Error pipeline: receives only frames that carry an error.
The DTO
#![allow(unused)] fn main() { #[derive(Debug, Default, Clone)] struct FrameDto { frame_id: u64, timestamp: Option<u128>, error: Option<String>, } impl FrameProperties<String, u64> for FrameDto { fn set(&mut self, key: String, value: u64) { if key == "frame_id" { self.frame_id = value; } } fn get(&self, key: &String) -> Option<u64> { if key == "frame_id" { Some(self.frame_id) } else { None } } } impl FrameError<String> for FrameDto { fn report_error(&mut self, error: String) { self.error = Some(error); } fn get_error(&self) -> Option<String> { self.error.clone() } } }
Note that Clone is required because CloneSwitch clones the DTO.
Building the pipelines
#![allow(unused)] fn main() { let mut registry = PipelineRegistry::<FrameDto, &str>::new(); // 1. Register profiling pipeline (receives a clone of every frame) let profiling_pipeline = Pipeline::new() .link(Component::singleton(Closure::new(|dto: FrameDto| { log::info!("[profiling] frame_id={}", dto.frame_id); None }))) .tag("profiling"); registry.register("profiling", profiling_pipeline); // 2. Register error pipeline (receives only errored frames) let error_pipeline = Pipeline::new() .link(Component::singleton(Closure::new(|dto: FrameDto| { log::warn!("[errors] frame_id={} error={:?}", dto.frame_id, dto.error); None }))) .tag("errors"); registry.register("errors", error_pipeline); // 3. Build switches that target the above pipelines let clone_switch = CloneSwitch::new(registry.get_mut(&"profiling")); let error_switch = OnErrorSwitch::new(registry.get_mut(&"errors")); // 4. Build main pipeline let main_pipeline = Pipeline::new() .link( Component::new() .append(Ticker::new(33)) // ~30 FPS .append(Closure::new(|mut dto: FrameDto| { dto.frame_id += 1; log::info!("[main] frame_id={}", dto.frame_id); Some(dto) })) .append(error_switch) // redirect errored frames .append(clone_switch) // clone to profiling pipeline ) .tag("main"); registry.register("main", main_pipeline); registry.run().await; }
How switches compose
The processor order matters:
error_switchchecks if the frame has an error. If yes, it sends the frame to the error pipeline and returnsNone— the frame never reachesclone_switch. If no error, it returnsSome(frame)and processing continues.clone_switchclones the frame, sends the clone to profiling, and returnsSome(original). The original then exits the component and flows to the next component in the main pipeline (if any).
By placing error_switch before clone_switch, errored frames are not cloned to profiling. Reverse the order if you want profiling to see every frame including errored ones.
Key takeaways
CloneSwitchis non-destructive: it forwards the original and sends a copy elsewhere. Use it for side-channels like profiling or logging.OnErrorSwitchis conditional: it only redirects when the DTO carries a matching error. Otherwise the frame passes through.Switchis unconditional: it always redirects and returnsNone. Use it when the main pipeline should not continue for that frame.- Processor ordering within a component determines which switches fire and in what sequence. Plan the order carefully when combining multiple switches.