Streaming Data
Once you have located the recording sessions you care about, either by name or through the query system, the next step is to actually read the data back. Mosaico offers two retrieval modes, each designed for a different use case.
The first mode, SequenceDataStreamer, is for situations where you need to work with multiple sensor streams together. A real robot records many sensors simultaneously: an IMU firing at 200 Hz, a GPS unit updating at 1 Hz, a pressure sensor somewhere in between. The messages from all of these accumulate independently during recording, but when you replay them you usually want them interleaved in the order they actually happened. SequenceDataStreamer handles this automatically by merging N topic streams into a single unified timeline using a K-way merge sort, the same algorithm that merge-sorts N sorted lists efficiently. As you iterate, you receive events one at a time in strict chronological order across all requested topics.
The second mode, TopicDataStreamer, is for situations where you only need one sensor. Because there is nothing to merge, the streamer reads directly from a single topic stream with minimal overhead. This is the right choice when you are isolating a sensor for analysis, building a calibration pipeline, or feeding a single data channel into a machine learning model. There is no point paying the cost of a merge when you only care about one input.
Both streamers support temporal slicing: you supply start_timestamp_ns and end_timestamp_ns to bound the replay window, so you never have to load a full recording when you only care about a segment.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Unified Multi-Sensor Replay
- Python
- C++
- Rust
SequenceDataStreamer is obtained from a SequenceHandler, which is the entry point to reading back a named Sequence. You call client.sequence_handler("mission_alpha") to get a handle to the recording session, then call .get_data_streamer() on it with the list of topics you want to merge. The streamer returns (topic_name, message) tuples as you iterate, so at each step you know both which sensor the message came from and the message itself, its timestamp and its typed data payload.
Before the loop starts you can call streamer.next_timestamp() to peek at the timestamp of the very first event without consuming it. This is useful when you need to synchronize the streamer with another data source before you begin processing, or simply to confirm that the time window contains data.
Always call seq_handler.close() when you are done. Handlers maintain an open connection to the daemon; closing them releases the server-side resources held for that session.
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
streamer = seq_handler.get_data_streamer(
topics=["/gps", "/imu"],
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000,
)
print(f"Streaming starts at: {streamer.next_timestamp()}")
for topic, msg in streamer:
print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}")
seq_handler.close()
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Targeted Access
TopicDataStreamer is obtained from a TopicHandler, which you get by calling client.topic_handler("mission_alpha", "/front/imu"). This gives you a handle to exactly one Topic within one Sequence. Calling .get_data_streamer() on it produces a stream of individual messages, with no tuples, just one message at a time from that single channel.
Because there is no merge happening, each message you receive is of a known type. In the example below every iteration yields an IMU message, and you call .get_data(IMU) to extract the typed payload directly. This pattern is common in ML data pipelines where a dataloader needs a clean, single-type stream it can batch without branching on sensor identity.
next_timestamp() is available here too, and serves the same purpose: peek at the first sample's timestamp before committing to the loop. It is particularly handy when you are building a custom synchronization loop across multiple TopicDataStreamer instances and need to decide which one to advance next.
As with SequenceHandler, always call top_handler.close() when the loop is done to release the server-side connection.
- Python
- C++
- Rust
from mosaicolabs import MosaicoClient, IMU
with MosaicoClient.connect("localhost", 6726) as client:
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
imu_stream = top_handler.get_data_streamer(
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000,
)
print(f"First sample at: {imu_stream.next_timestamp()}")
for imu_msg in imu_stream:
process_sample(imu_msg.get_data(IMU))
top_handler.close()
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Streamer Comparison
The table below summarizes the trade-offs. In practice, reach for SequenceDataStreamer whenever you need a global view of what happened during a recording: system-level replays, event correlation, multi-sensor visualizations. Reach for TopicDataStreamer whenever your downstream consumer only cares about one channel and you want the lowest possible overhead.
| Feature | SequenceDataStreamer | TopicDataStreamer |
|---|---|---|
| Primary Use Case | Multi-sensor fusion & system-wide replay | Isolated sensor analysis & ML training |
| Merge Logic | K-Way merge sort | Direct stream |
| Output | (topic_name, message) tuple | Single message |
| Temporal Slicing | Supported | Supported |