Introduction

Remotia is an open-source media processing and streaming framework written in Rust. While being designed with remote rendering and cloud gaming in mind, its components are versatile and can be used in a variety of contexts. Bindings and plug-and-play modules for well-established libraries such as ffmpeg and the SRT protocol are available, while more are in development and will be released soon.

Remotia is actively used for research projects of the IPLab of the University of Catania.

Bibliography

If you use remotia in your work, please cite the following paper:

@article{remotia,
  title={An open source framework for video streaming in cloud gaming},
  author={Catania, Lorenzo and Giudice, Oliver and Battiato, Sebastiano and Stanco, Filippo and Allegra, Dario},
  journal={Multimedia Tools and Applications},
  pages={1--24},
  year={2025},
  publisher={Springer}
}

Quickstart

Add the dependency

[dependencies.remotia]
version = "0.1.0" # Check crates.io for the latest version
default-features = false
features = []     # Enable only what you need (see Crate map)

Note: The feature flag is named profilation for historical reasons, but it enables profiling utilities.

A minimal pipeline

This example creates a two-pipeline architecture: a main pipeline that ticks, increments a counter, and prints it, and an overflow pipeline that handles frames when the counter exceeds a threshold.

use remotia::{
    traits::{FrameError, FrameProcessor},
    pipeline::{Pipeline, PipelineRegistry},
    processors::{
        ticker::Ticker,
        switch::Switch,
        functional::Closure,
    },
};
use async_trait::async_trait;

// ── 1. Define the DTO ────────────────────────────────────────────────

#[derive(Debug, Default)]
struct MyDto {
    counter: u64,
    error: Option<String>,
}

impl FrameError<String> for MyDto {
    fn report_error(&mut self, error: String) { self.error = Some(error); }
    fn get_error(&self) -> Option<String> { self.error.clone() }
}

// ── 2. Define a custom processor ─────────────────────────────────────

struct Incrementer;

#[async_trait]
impl FrameProcessor<MyDto> for Incrementer {
    async fn process(&mut self, mut dto: MyDto) -> Option<MyDto> {
        dto.counter += 1;
        Some(dto)
    }
}

// ── 3. Build and run the pipelines ───────────────────────────────────

#[tokio::main]
async fn main() {
    let mut registry = PipelineRegistry::<MyDto, &str>::new();

    // Register the overflow pipeline first so we can get a feeder for the switch
    let overflow_pipeline = Pipeline::new()
        .link(
            Component::singleton(
                Closure::new(|dto: MyDto| {
                    println!("overflow: counter = {}", dto.counter);
                    None // consumed — pipeline stops processing this frame
                })
            )
        )
        .tag("overflow");
    registry.register("overflow", overflow_pipeline);

    // Build a switch that redirects to the overflow pipeline
    let overflow_switch = {
        let overflow_pipe = registry.get_mut(&"overflow");
        Switch::new(overflow_pipe)
    };

    // Build the main pipeline
    let main_pipeline = Pipeline::new()
        .link(
            Component::new()
                .append(Ticker::new(100))     // tick every 100ms
                .append(Incrementer)           // increment counter
                .append(Closure::new(|dto: MyDto| {  // print & check
                    println!("main: counter = {}", dto.counter);
                    if dto.counter > 10 {
                        None // send to overflow via switch below
                    } else {
                        Some(dto)
                    }
                }))
                .append(overflow_switch)       // redirect if previous returned None
                .tag("main-step")
        )
        .tag("main");

    registry.register("main", main_pipeline);

    // Run all pipelines (blocks until all tasks complete)
    registry.run().await;
}

What happens at runtime

  1. The Ticker paces the loop at ~10 Hz.
  2. Incrementer bumps counter by 1 each tick.
  3. The closure prints the counter. If counter > 10, it returns None.
  4. Returning None means the DTO does not reach the Switch. If you want the switch to fire instead of consuming the frame, restructure so the switch comes before the conditional return. A typical pattern is:
#![allow(unused)]
fn main() {
Component::new()
    .append(Ticker::new(100))
    .append(Incrementer)
    .append(Closure::new(|dto: MyDto| {
        println!("main: counter = {}", dto.counter);
        Some(dto) // always pass forward
    }))
    // The switch always redirects — frame never continues past it
}

To conditionally redirect, use OnErrorSwitch or implement the branching logic inside a single closure.

Next steps

  • Read the Key concepts for the full trait and API reference.
  • Browse the Processors catalog to find built-in processors for your pipeline.
  • See Pipelines & lifecycle for shutdown, feedable pipelines, and the registry.
  • Check the Crate map for feature flags and optional crates.

Key concepts

Remotia is built around four abstractions that compose into streaming pipelines. This page defines each one and shows the Rust APIs you implement and use.

Pipeline
 ┌──────────────────────────┐       ┌──────────────────────────┐
 │ Component A               │       │ Component B               │
 │  ┌──────────┐ ┌─────────┐│  ch   │  ┌──────────┐ ┌─────────┐│
 │  │Processor 1│→│Processor 2││──────▶│  │Processor 3│→│Processor 4││
 │  └──────────┘ └─────────┘│       │  └──────────┘ └─────────┘│
 └──────────────────────────┘       └──────────────────────────┘
  • A DTO (Data Transfer Object) carries frame data and metadata through the pipeline.
  • A processor performs one atomic operation on a DTO.
  • A component groups processors into a single async task, connected to neighbors by channels.
  • A pipeline links components into a directed chain; multiple pipelines are connected by switches.

Data Transfer Object (DTO)

The DTO is the data structure that flows through the pipeline. It typically holds frame buffers and per-frame statistics. You define your own DTO type and implement the traits that the processors in your pipeline require.

Core trait: FrameProcessor input

Every processor operates on a generic type F. Your DTO is that type F. The framework does not mandate a specific struct — you define one adapted to your use case.

Trait: FrameProperties<K, V>

Used by switches that read or write routing keys on the DTO (e.g. PoolingSwitch, DepoolingSwitch).

#![allow(unused)]
fn main() {
pub trait FrameProperties<K, V> {
    fn set(&mut self, key: K, value: V);
    fn get(&self, key: &K) -> Option<V>;
}
}

Trait: FrameError<E>

Used by OnErrorSwitch to inspect error state on the DTO.

#![allow(unused)]
fn main() {
pub trait FrameError<E> {
    fn report_error(&mut self, error: E);
    fn get_error(&self) -> Option<E>;
}
}

Other available traits

TraitPurposeUsed by
PullableFrameProperties<K, V>Push/pull semantics for propertiesAdvanced routing
OptionalFrameData<D>Access optional embedded dataBuffer utilities
BorrowFrameProperties<K, V>Get a reference to a property valueRead-only inspection
BorrowMutFrameProperties<K, V>Get a mutable reference to a property valueIn-place mutation

Minimal DTO example

#![allow(unused)]
fn main() {
use remotia::traits::{FrameProperties, FrameError};

#[derive(Debug, Default)]
struct MyDto {
    buffer: Vec<u8>,
    frame_id: u64,
    error: Option<String>,
}

impl FrameProperties<String, u64> for MyDto {
    fn set(&mut self, key: String, value: u64) {
        if key == "frame_id" { self.frame_id = value; }
    }
    fn get(&self, key: &String) -> Option<u64> {
        if key == "frame_id" { Some(self.frame_id) } else { None }
    }
}

impl FrameError<String> for MyDto {
    fn report_error(&mut self, error: String) { self.error = Some(error); }
    fn get_error(&self) -> Option<String> { self.error.clone() }
}
}

Custom processor modules may define additional traits. Your DTO must implement them if you use those modules in your pipeline.


Processors

A processor is a single unit of work applied to a DTO. The core trait is:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait FrameProcessor<F> {
    async fn process(&mut self, frame_data: F) -> Option<F>;
}
}

Return contract:

  • Some(dto) — the DTO is passed to the next processor in the component.
  • None — the DTO is consumed. The pipeline interprets this as "this frame is done here" — it may have been redirected to another pipeline (by a switch), stored, or dropped.

Processors have full ownership of the DTO while processing it. This avoids borrowing conflicts and makes the data flow explicit.

See the Processors page for the catalog of built-in processor types.


Components

A component groups an ordered sequence of processors into a single async task (a Tokio task). Each component:

  1. Receives a DTO from an input channel (or allocates one itself).
  2. Passes it through its processors sequentially.
  3. Sends the resulting DTO (if any) to the next component via an output channel.

Components are the unit of concurrency. By grouping processors into different components, you control which work shares a task and which runs in parallel. The framework uses unbounded mpsc channels to connect adjacent components within a pipeline.

Builder API

#![allow(unused)]
fn main() {
Component::new()
    .append(processor_a)
    .append(processor_b)
    .tag("encoder")
}

Or, for a single-processor component:

#![allow(unused)]
fn main() {
Component::singleton(processor)
}

Pipelines

A pipeline is a chain of components connected by channels. Components within a pipeline run concurrently (each is a separate Tokio task), while processors within a component run sequentially.

Builder API

#![allow(unused)]
fn main() {
Pipeline::new()
    .link(component_a)
    .link(component_b)
    .tag("main")
    .run()
}

Or, for a single-component pipeline:

#![allow(unused)]
fn main() {
Pipeline::singleton(component)
}

Calling .run() automatically creates the channels between adjacent components and spawns each component as a Tokio task. It returns Vec<JoinHandle<()>>.

Feedable pipelines

Mark a pipeline as .feedable() to allow external code to inject DTOs into its head:

#![allow(unused)]
fn main() {
let mut pipeline = Pipeline::new().link(component).feedable();
let feeder = pipeline.get_feeder();
feeder.feed(my_dto);
}

Multi-pipeline architectures

Complex systems use multiple pipelines connected by switches. For example, a main streaming pipeline and a separate error-handling pipeline, linked by an OnErrorSwitch. See the Pipelines & Lifecycle page for the full API and lifecycle details.


Switches

Switches are processors that move DTOs between pipelines. Instead of returning Some(dto) to continue in the current pipeline, they send the DTO to a different pipeline and return None.

The framework provides several switch types:

SwitchBehavior
SwitchUnconditionally redirects the DTO to another pipeline
CloneSwitchClones the DTO, sends the clone to another pipeline, passes the original forward
OnErrorSwitchRedirects the DTO to another pipeline if it carries a matching error
PoolingSwitchPicks a random destination from a pool and stamps the DTO with the pool key
DepoolingSwitchRoutes the DTO to the destination matching its pool key

See the Processors page for constructor signatures and usage details.

Processors

Processors implement the FrameProcessor<F> trait and perform a single atomic operation on the DTO. This page catalogs every built-in processor in remotia-core.

#![allow(unused)]
fn main() {
#[async_trait]
pub trait FrameProcessor<F> {
    async fn process(&mut self, frame_data: F) -> Option<F>;
}
}

Routing switches

Switches move DTOs between pipelines. They return None (or Some with the original) so the current pipeline can continue or terminate for that frame.

Switch

Redirects the DTO into a different pipeline. Returns None — the current pipeline stops processing this frame.

#![allow(unused)]
fn main() {
use remotia::processors::switch::Switch;

let switch = Switch::new(&mut destination_pipeline);
}

Use when: branching — e.g. sending frames down an error or debug pipeline instead of the main one.


CloneSwitch

Clones the DTO, sends the clone to another pipeline, and passes the original forward. The DTO type must implement Clone.

#![allow(unused)]
fn main() {
use remotia::processors::clone_switch::CloneSwitch;

let clone_switch = CloneSwitch::new(&mut profiling_pipeline);
}

Use when: parallel side-channels — e.g. a profiling pipeline that receives every frame while the main pipeline continues uninterrupted.


OnErrorSwitch

Redirects the DTO to a destination pipeline if it carries a matching error. If no error is present (or the error does not match), the frame passes through with Some(frame). The DTO must implement FrameError<E>.

#![allow(unused)]
fn main() {
use remotia::processors::error_switch::OnErrorSwitch;

let error_switch = OnErrorSwitch::new::<MyDto, MyError>(&mut error_pipeline)
    .detect(MyError::Timeout)
    .detect(MyError::ConnectionError);
}

Use when: conditional error routing — only certain error variants are diverted.


PoolingSwitch

Picks a random destination from a registered pool, stamps the DTO with the chosen pool key (via FrameProperties), and sends it. Returns None. The DTO must implement FrameProperties<P, K>.

#![allow(unused)]
fn main() {
use remotia::processors::pool_switch::PoolingSwitch;

let pool_switch = PoolingSwitch::<MyDto, &str, usize>::new("worker_id")
    .entry(0, &mut worker_pipeline_0)
    .entry(1, &mut worker_pipeline_1)
    .entry(2, &mut worker_pipeline_2);
}

Use when: fan-out to a pool of workers — e.g. distributing encoding across N encoder pipelines.


DepoolingSwitch

Routes the DTO to the destination matching its pool key (read via FrameProperties). Returns None. The DTO must implement FrameProperties<P, K>.

#![allow(unused)]
fn main() {
use remotia::processors::pool_switch::DepoolingSwitch;

let depool_switch = DepoolingSwitch::<MyDto, &str, usize>::new("worker_id")
    .entry(0, &mut merger_pipeline_0)
    .entry(1, &mut merger_pipeline_1);
}

Use when: fan-in — merging results from a worker pool back into per-worker downstream pipelines.


Timing

Ticker

Waits for the configured interval, then passes the DTO forward unchanged.

#![allow(unused)]
fn main() {
use remotia::processors::ticker::Ticker;

let ticker = Ticker::new(16); // 16 ms ≈ 60 FPS
}

Use when: frame-rate pacing at the head of a capture pipeline, or throttling any component.


Inline processors

These wrap bare functions, closures, or async functions as FrameProcessor implementors. Use them for quick logic without defining a dedicated struct.

Function

Wraps a function pointer.

#![allow(unused)]
fn main() {
use remotia::processors::functional::Function;

let proc = Function::new(|mut dto: MyDto| -> Option<MyDto> {
    dto.frame_id += 1;
    Some(dto)
});
}

Closure

Wraps a capturing closure.

#![allow(unused)]
fn main() {
use remotia::processors::functional::Closure;

let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = counter.clone();

let proc = Closure::new(move |dto: MyDto| -> Option<MyDto> {
    counter_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    Some(dto)
});
}

Convenience method on Component:

#![allow(unused)]
fn main() {
component.closure(|dto: MyDto| Some(dto))
}

AsyncFunction

Wraps an async function pointer. Use the async_func! macro to create the pinned future.

#![allow(unused)]
fn main() {
use remotia::processors::async_functional::AsyncFunction;
use remotia::async_func;

async fn fetch_frame(dto: MyDto) -> Option<MyDto> {
    // async I/O ...
    Some(dto)
}

let proc = AsyncFunction::new(|dto| async_func!(async move {
    fetch_frame(dto).await
}));
}

Containers

Sequential

Runs a sequence of processors in order inside a single component. Each processor receives the output of the previous one. If any processor returns None, the sequence stops.

#![allow(unused)]
fn main() {
use remotia::processors::containers::sequential::Sequential;

let seq = Sequential::new()
    .append(processor_a)
    .append(processor_b)
    .append(processor_c);
}

Use when: grouping processors that must run in the same async task — e.g. a tick-then-capture pattern, or a sequence of buffer operations that should not be split across component boundaries.

Pipelines & lifecycle

This page covers the full Pipeline, PipelineFeeder, PipelineRegistry, and PipelineHandle APIs, plus the shutdown/drain lifecycle.


Pipeline

A Pipeline<F> is an ordered chain of Component<F> instances. When you call .run(), the framework:

  1. Binds — creates unbounded mpsc channels between each pair of adjacent components.
  2. Spawns — launches each component as a separate Tokio task.
  3. Returns Vec<JoinHandle<()>> you can await.

Builder API

#![allow(unused)]
fn main() {
use remotia::pipeline::Pipeline;

let handles = Pipeline::new()
    .link(capture_component)
    .link(encoder_component)
    .link(transmission_component)
    .tag("main")
    .run();
}
MethodDescription
Pipeline::new()Create an empty pipeline
Pipeline::singleton(component)Create a one-component pipeline
.link(component)Append a component
.tag(name)Set a tag for log messages
.feedable()Mark the pipeline to accept external DTOs (call before .run())
.get_feeder()Get a PipelineFeeder for injecting DTOs (pipeline must be feedable)
.get_handle()Get a PipelineHandle for requesting shutdown
.shutdown_signal()Get the Arc<AtomicBool> shutdown signal
.run()Bind channels, spawn tasks, return join handles

PipelineFeeder

A PipelineFeeder<F> lets you inject DTOs into the head of a feedable pipeline from external code.

#![allow(unused)]
fn main() {
let mut pipeline = Pipeline::new()
    .link(component)
    .feedable();

let feeder = pipeline.get_feeder();

// Later, from any task:
feeder.feed(my_dto); // panics if the channel is closed
}

PipelineFeeder is Clone-safe — you can share it across tasks.


PipelineHandle

A PipelineHandle lets you request graceful shutdown of a pipeline.

#![allow(unused)]
fn main() {
let mut pipeline = Pipeline::new().link(component).tag("main");
let handle = pipeline.get_handle();
let handles = pipeline.run();

// From another task:
handle.request_shutdown();
}

When request_shutdown() is called, the shared AtomicBool is set. Every component in the pipeline checks this signal each iteration and exits when it becomes true.

PipelineHandle is Clone — multiple callers can hold a handle to the same pipeline.


PipelineRegistry

PipelineRegistry<F, K> manages multiple named pipelines and runs them together. This is the standard way to launch a multi-pipeline architecture.

Builder API

#![allow(unused)]
fn main() {
use remotia::pipeline::registry::PipelineRegistry;

let mut registry = PipelineRegistry::<MyDto, &str>::new();

registry.register("main", main_pipeline);
registry.register("errors", error_pipeline);

registry.run().await; // runs all pipelines, blocks until all finish
}
MethodDescription
PipelineRegistry::new()Create an empty registry
.register(id, pipeline)Insert a pipeline with a key
.register_empty(id)Insert an empty pipeline
.get(&id) / .get_mut(&mut id)Access a pipeline by key
.lazy_handle(id)Get a PipelineHandle for a pipeline (works even before .run())
.run()Bind and spawn all pipelines, await all tasks

Connecting pipelines with switches

When using a registry, you typically get feeders from destination pipelines before calling .run():

#![allow(unused)]
fn main() {
let mut registry = PipelineRegistry::<MyDto, &str>::new();

let error_pipeline = Pipeline::new()
    .link(error_component)
    .tag("errors");

registry.register("errors", error_pipeline);

let error_switch = {
    let error_pipe = registry.get_mut(&"errors");
    OnErrorSwitch::new(error_pipe).detect(MyError::Timeout)
};

let main_pipeline = Pipeline::new()
    .link(capture_component.append(ticker).append(error_switch))
    .tag("main");

registry.register("main", main_pipeline);
registry.run().await;
}

Lifecycle: shutdown and drain

Understanding the component lifecycle is important for graceful shutdown.

Normal operation

Component receives DTO from input channel
  → passes through processors
  → sends result to output channel
  → repeats

Shutdown signal

When PipelineHandle::request_shutdown() is called, the shared AtomicBool is set. On the next iteration, each component observes the signal and breaks out of its loop.

Drain mode

If a component's input channel is closed (e.g. the upstream component has exited), the component enters drain mode:

  1. It stops reading from the channel (it will always be None).
  2. It continues the loop, yielding F::default() as the DTO, and running processors.
  3. When a processor returns None (frame consumed), the component checks if it should exit.
  4. If the input channel was closed and the last processor returned None, the component exits.

This ensures that in-flight DTOs in downstream components can finish processing before shutdown.

Component without an input channel

A component with no receiver (e.g. the head of a non-feedable pipeline that generates its own DTOs) relies solely on the shutdown signal. It checks is_shutdown() each iteration and exits when the signal fires.


Channel binding details

When .run() is called on a pipeline that has not been bound, the framework automatically creates an unbounded mpsc channel between each pair of adjacent components:

Component[i]  ──sender──▶  channel  ──receiver──▶  Component[i+1]

The last component has no sender; the first component has no receiver unless the pipeline is feedable (in which case the feeder's sender is wired to the first component's receiver).

Crate map

Remotia is organized as a workspace of crates. The remotia umbrella crate re-exports them behind feature flags so you only compile what you need.

Core crate

Feature flagCratePurposeKey types
(always enabled)remotia-coreCore traits, pipeline engine, built-in processorsFrameProcessor, FrameProperties, FrameError, Pipeline, Component, PipelineFeeder, PipelineHandle, PipelineRegistry, Ticker, Switch, CloneSwitch, OnErrorSwitch, PoolingSwitch, DepoolingSwitch, Sequential, Function, Closure, AsyncFunction

Optional crates (feature flags on the remotia crate)

Feature flagCrate(s)PurposeKey types
buffersremotia-buffer-utils, remotia-buffer-utils-macrosBuffer pool management and allocationBuffersPool, BufferAllocator, BufferBorrower, BufferRedeemer, PoolRegistry, #[buffers_map] macro
captureremotia-core-capturersScreen and file captureScrapFrameCapturer, Y4MFrameCapturer, Y4MRGBAFrameCapturer, yuv420_to_rgba()
renderremotia-core-renderersWindow rendering via winit + pixelsWinitRenderer, WinitRunner
transmissionremotia-core-transmissionTCP frame transportTcpFrameSender, TcpFrameReceiver
profilationremotia-profilation-utilsProfiling, logging, frame-droppingTimestampAdder, TimestampDiffCalculator, ProfiledSequential, ThresholdBasedFrameDropper, TimestampBasedFrameDropper, ConsoleAverageStatsLogger, CSVFrameDataSerializer, ConsoleDropReasonLogger
serializationremotia-serialization-utilsBincode serialization of frame dataBincodeSerializer, BincodeDeserializer

External crates (separate Cargo dependencies)

These live in separate repositories and are not part of the remotia umbrella crate. Add them as direct dependencies.

CratePurposeKey types
remotia-ffmpeg-codecsFFmpeg encoder/decoder integration via pusher/puller architectureEncoderPusher, EncoderPuller, DecoderPusher, DecoderPuller, EncoderBuilder, DecoderBuilder, ScalerBuilder
remotia-srtSRT protocol sender/receiver processorsSRT sender/receiver processors

Feature flag usage

[dependencies.remotia]
version = "0.1.0"
default-features = false
features = ["capture", "transmission"]  # only what you need

All features are off by default. Enable only the ones you use to minimize compile time and dependencies.

Examples

This chapter contains worked examples that demonstrate the framework's mechanisms and design patterns.

Each example includes a step-by-step walkthrough with inline code. Full source code is available on GitHub.

Screen snapper — Capture the screen and save to disk

Periodically captures the primary display and saves each frame as a PNG file.

Full source: GitHub — screen-snapper

Architecture

Two components in a single pipeline:

Component 1: Capture                          Component 2: Save
┌──────────────────────────────────┐          ┌─────────────────┐
│ Ticker → BufferAllocator →       │  channel │ PNGBufferSaver  │
│ XCapCapturer                     │─────────▶│                 │
└──────────────────────────────────┘          └─────────────────┘
  • Component 1 paces the capture at 1 FPS, allocates a buffer, fills it with screen pixels.
  • Component 2 reads the buffer and writes a PNG.

The DTO

The DTO holds a single optional BytesMut buffer identified by an enum key. It implements PullableFrameProperties so processors can pull and push the buffer by key.

#![allow(unused)]
fn main() {
#[derive(Default, Debug)]
pub struct RecorderData {
    screen_buffer: Option<BytesMut>,
}

#[derive(Clone, Copy)]
pub enum Buffers {
    CapturedScreenBuffer,
}

impl PullableFrameProperties<Buffers, BytesMut> for RecorderData {
    fn push(&mut self, key: Buffers, value: BytesMut) {
        match key {
            Buffers::CapturedScreenBuffer => self.screen_buffer.replace(value),
        };
    }
    fn pull(&mut self, key: &Buffers) -> Option<BytesMut> {
        match key {
            Buffers::CapturedScreenBuffer => self.screen_buffer.take(),
        }
    }
}
}

The pull/push pattern ensures the buffer is taken out of the DTO during processing and returned after, avoiding mutable aliasing.

Pipeline construction

#![allow(unused)]
fn main() {
fn capturer(monitor_id: usize, height: u32, width: u32) -> Component<RecorderData> {
    Component::new()
        .append(Ticker::new(1000))       // 1 tick/second
        .append(BufferAllocator::new(    // allocate a fresh BytesMut
            Buffers::CapturedScreenBuffer,
            height as usize * width as usize * 3,
        ))
        .append(
            XCapCapturer::builder()
                .buffer_key(Buffers::CapturedScreenBuffer)
                .monitor_id(monitor_id)
                .build(),
        )
}

fn saver(height: u32, width: u32) -> Component<RecorderData> {
    Component::new().append(
        PNGBufferSaver::builder()
            .buffer_key(Buffers::CapturedScreenBuffer)
            .path("./screenshots/")
            .height(height)
            .width(width)
            .build(),
    )
}

let pipeline = Pipeline::<RecorderData>::new()
    .link(capturer(monitor_id, height, width))
    .link(saver(height, width));

for handle in pipeline.run() {
    handle.await.unwrap();
}
}

Key takeaways

  • Ticker at the head of a component paces frame generation.
  • BufferAllocator is a processor that allocates a new BytesMut into the DTO each tick — useful when the buffer size is known ahead of time.
  • Custom processors (XCapCapturer, PNGBufferSaver) implement FrameProcessor and use PullableFrameProperties to exchange buffers with the DTO.

Platform-dependant screen snapper — Handling platform-specific backends

Same capture-and-save pipeline as the screen snapper, but uses Cargo feature flags to select between X11/XCap and Wayland/libwayshot capture backends at compile time.

Full source: GitHub — platform-dependant-screen-snapper

Architecture

Identical runtime layout — two components in one pipeline. The difference is in how the capturer is selected:

Component 1: Capture                          Component 2: Save
┌──────────────────────────────────┐          ┌─────────────────┐
│ Ticker → BufferAllocator →       │  channel │ PNGBufferSaver  │
│ [xcap OR wayshot] Capturer      │─────────▶│                 │
└──────────────────────────────────┘          └─────────────────┘

Feature flags in Cargo.toml

[features]
default = ["wayshot"]
xcap = ["dep:xcap"]
wayshot = ["dep:libwayshot"]

The lib.rs uses compile-time guards to prevent misconfiguration:

#![allow(unused)]
fn main() {
#[cfg(not(any(feature = "xcap", feature = "wayshot")))]
compile_error!("No snapper backend enabled");

#[cfg(all(feature = "xcap", feature = "wayshot"))]
compile_error!("Compiling with both wayshot and xcap is not currently supported.");

#[cfg(feature = "wayshot")]
pub mod wayshot_capturer;

#[cfg(feature = "xcap")]
pub mod xcap_capturer;
}

Runtime selection with conditional compilation

A capture.rs module provides the same interface regardless of backend:

#![allow(unused)]
fn main() {
#[cfg(feature = "xcap")]
pub mod xcap {
    pub fn fetch_screen_resolution() -> (u32, u32) {
        xcap_utils::display_size(MONITOR_ID)
    }
    pub fn capturer_processor() -> XCapCapturer<Buffers> {
        XCapCapturer::builder()
            .buffer_key(Buffers::CapturedScreenBuffer)
            .monitor_id(MONITOR_ID)
            .build()
    }
}
#[cfg(feature = "xcap")]
pub use xcap::*;

#[cfg(feature = "wayshot")]
pub mod libwayshot {
    pub fn fetch_screen_resolution() -> (u32, u32) {
        wayshot_utils::display_size()
    }
    pub fn capturer_processor() -> WayshotCapturer<Buffers> {
        WayshotCapturer::builder()
            .buffer_key(Buffers::CapturedScreenBuffer)
            .build()
    }
}
#[cfg(feature = "wayshot")]
pub use libwayshot::*;
}

The main.rs calls fetch_screen_resolution() and capturer_processor() without knowing which backend is active.

Building

# Default (wayshot)
cargo run --example autosnapper

# X11/XCap backend
cargo run --example autosnapper --no-default-features --features xcap

Key takeaways

  • Cargo feature flags are the idiomatic Rust way to handle platform-specific dependencies. Remotia's FrameProcessor trait makes it easy — both backends implement the same trait, so the pipeline code is backend-agnostic.
  • compile_error! guards prevent invalid feature combinations at compile time.
  • The DTO and save component are identical across backends — only the capture processor changes.

Multi-pipeline with switches — Connecting pipelines

This example demonstrates the core differentiator of remotia: multiple pipelines connected by switches. A main processing pipeline routes frames to different downstream pipelines based on conditions.

Full source: GitHub — examples

Architecture

Three pipelines connected by switches:

Pipeline "main"
┌──────────────────────────────────────────┐
│ Component:                               │
│  Ticker → Closure(print) → CloneSwitch ──┼──▶ Pipeline "profiling"
│                           → Switch ──────┼──▶ Pipeline "errors" (if error)
└──────────────────────────────────────────┘
  • Main pipeline: generates frames, prints their state, clones each frame to the profiling pipeline, and conditionally switches errored frames to the error pipeline.
  • Profiling pipeline: receives a clone of every frame and logs statistics.
  • Error pipeline: receives only frames that carry an error.

The DTO

#![allow(unused)]
fn main() {
#[derive(Debug, Default, Clone)]
struct FrameDto {
    frame_id: u64,
    timestamp: Option<u128>,
    error: Option<String>,
}

impl FrameProperties<String, u64> for FrameDto {
    fn set(&mut self, key: String, value: u64) {
        if key == "frame_id" { self.frame_id = value; }
    }
    fn get(&self, key: &String) -> Option<u64> {
        if key == "frame_id" { Some(self.frame_id) } else { None }
    }
}

impl FrameError<String> for FrameDto {
    fn report_error(&mut self, error: String) { self.error = Some(error); }
    fn get_error(&self) -> Option<String> { self.error.clone() }
}
}

Note that Clone is required because CloneSwitch clones the DTO.

Building the pipelines

#![allow(unused)]
fn main() {
let mut registry = PipelineRegistry::<FrameDto, &str>::new();

// 1. Register profiling pipeline (receives a clone of every frame)
let profiling_pipeline = Pipeline::new()
    .link(Component::singleton(Closure::new(|dto: FrameDto| {
        log::info!("[profiling] frame_id={}", dto.frame_id);
        None
    })))
    .tag("profiling");
registry.register("profiling", profiling_pipeline);

// 2. Register error pipeline (receives only errored frames)
let error_pipeline = Pipeline::new()
    .link(Component::singleton(Closure::new(|dto: FrameDto| {
        log::warn!("[errors] frame_id={} error={:?}", dto.frame_id, dto.error);
        None
    })))
    .tag("errors");
registry.register("errors", error_pipeline);

// 3. Build switches that target the above pipelines
let clone_switch = CloneSwitch::new(registry.get_mut(&"profiling"));
let error_switch = OnErrorSwitch::new(registry.get_mut(&"errors"));

// 4. Build main pipeline
let main_pipeline = Pipeline::new()
    .link(
        Component::new()
            .append(Ticker::new(33))       // ~30 FPS
            .append(Closure::new(|mut dto: FrameDto| {
                dto.frame_id += 1;
                log::info!("[main] frame_id={}", dto.frame_id);
                Some(dto)
            }))
            .append(error_switch)           // redirect errored frames
            .append(clone_switch)           // clone to profiling pipeline
    )
    .tag("main");
registry.register("main", main_pipeline);

registry.run().await;
}

How switches compose

The processor order matters:

  1. error_switch checks if the frame has an error. If yes, it sends the frame to the error pipeline and returns None — the frame never reaches clone_switch. If no error, it returns Some(frame) and processing continues.
  2. clone_switch clones the frame, sends the clone to profiling, and returns Some(original). The original then exits the component and flows to the next component in the main pipeline (if any).

By placing error_switch before clone_switch, errored frames are not cloned to profiling. Reverse the order if you want profiling to see every frame including errored ones.

Key takeaways

  • CloneSwitch is non-destructive: it forwards the original and sends a copy elsewhere. Use it for side-channels like profiling or logging.
  • OnErrorSwitch is conditional: it only redirects when the DTO carries a matching error. Otherwise the frame passes through.
  • Switch is unconditional: it always redirects and returns None. Use it when the main pipeline should not continue for that frame.
  • Processor ordering within a component determines which switches fire and in what sequence. Plan the order carefully when combining multiple switches.