Skip to main content

Streaming Data

Once you have located the recording sessions you care about, either by name or through the query system, the next step is to actually read the data back. Mosaico offers two retrieval modes, each designed for a different use case.

The first mode, SequenceDataStreamer, is for situations where you need to work with multiple sensor streams together. A real robot records many sensors simultaneously: an IMU firing at 200 Hz, a GPS unit updating at 1 Hz, a pressure sensor somewhere in between. The messages from all of these accumulate independently during recording, but when you replay them you usually want them interleaved in the order they actually happened. SequenceDataStreamer handles this automatically by merging N topic streams into a single unified timeline using a K-way merge sort, the same algorithm that merge-sorts N sorted lists efficiently. As you iterate, you receive events one at a time in strict chronological order across all requested topics.

The second mode, TopicDataStreamer, is for situations where you only need one sensor. Because there is nothing to merge, the streamer reads directly from a single topic stream with minimal overhead. This is the right choice when you are isolating a sensor for analysis, building a calibration pipeline, or feeding a single data channel into a machine learning model. There is no point paying the cost of a merge when you only care about one input.

Both streamers support temporal slicing: you supply start_timestamp_ns and end_timestamp_ns to bound the replay window, so you never have to load a full recording when you only care about a segment.

Unified Multi-Sensor Replay

SequenceDataStreamer is obtained from a SequenceHandler, which is the entry point to reading back a named Sequence. You call client.sequence_handler("mission_alpha") to get a handle to the recording session, then call .get_data_streamer() on it with the list of topics you want to merge. The streamer returns (topic_name, message) tuples as you iterate, so at each step you know both which sensor the message came from and the message itself, its timestamp and its typed data payload.

Before the loop starts you can call streamer.next_timestamp() to peek at the timestamp of the very first event without consuming it. This is useful when you need to synchronize the streamer with another data source before you begin processing, or simply to confirm that the time window contains data.

Always call seq_handler.close() when you are done. Handlers maintain an open connection to the daemon; closing them releases the server-side resources held for that session.

Multi-sensor replay
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
streamer = seq_handler.get_data_streamer(
topics=["/gps", "/imu"],
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000,
)
print(f"Streaming starts at: {streamer.next_timestamp()}")
for topic, msg in streamer:
print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}")
seq_handler.close()

Targeted Access

TopicDataStreamer is obtained from a TopicHandler, which you get by calling client.topic_handler("mission_alpha", "/front/imu"). This gives you a handle to exactly one Topic within one Sequence. Calling .get_data_streamer() on it produces a stream of individual messages, with no tuples, just one message at a time from that single channel.

Because there is no merge happening, each message you receive is of a known type. In the example below every iteration yields an IMU message, and you call .get_data(IMU) to extract the typed payload directly. This pattern is common in ML data pipelines where a dataloader needs a clean, single-type stream it can batch without branching on sensor identity.

next_timestamp() is available here too, and serves the same purpose: peek at the first sample's timestamp before committing to the loop. It is particularly handy when you are building a custom synchronization loop across multiple TopicDataStreamer instances and need to decide which one to advance next.

As with SequenceHandler, always call top_handler.close() when the loop is done to release the server-side connection.

Single topic stream
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
imu_stream = top_handler.get_data_streamer(
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000,
)
print(f"First sample at: {imu_stream.next_timestamp()}")
for imu_msg in imu_stream:
process_sample(imu_msg.get_data(IMU))
top_handler.close()

Streamer Comparison

The table below summarizes the trade-offs. In practice, reach for SequenceDataStreamer whenever you need a global view of what happened during a recording: system-level replays, event correlation, multi-sensor visualizations. Reach for TopicDataStreamer whenever your downstream consumer only cares about one channel and you want the lowest possible overhead.

FeatureSequenceDataStreamerTopicDataStreamer
Primary Use CaseMulti-sensor fusion & system-wide replayIsolated sensor analysis & ML training
Merge LogicK-Way merge sortDirect stream
Output(topic_name, message) tupleSingle message
Temporal SlicingSupportedSupported