Writing a Single Topic
This guide demonstrates how to ingest data into Mosaico. The example uses a CSV file as the data source, chosen purely for its simplicity.
In production, your source data will almost certainly come from a binary container rather than a CSV.
Mosaico's SDK is format-agnostic: it does not care where your data come from.
The same push() call works whether you are reading an .mcap file, a .bag, a .hdf5 archive, a proprietary binary log from your hardware vendor, or a live data stream from a running robot.
The only thing that changes between formats is the loader code that converts raw bytes into typed ontology model, the connection to the daemon, topic and sequence management, and the push logic are identical regardless of source.
For a real-world example using MCAP, see the Writing Interleaved Topics guide.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Understanding the Mosaico Data Model
Before writing a single line of ingestion code, it helps to understand the four concepts you will interact with in this guide: the Sequence, the Topic, the Message, and the Ontology.
A Sequence is Mosaico's fundamental unit of a recording session. Think of it as a named, time-bounded container, analogous to a ROS bag or a test run folder, that groups all sensor streams captured during a single operational session. When you create a sequence, you give it a name and optional metadata (key-value tags like {"source": "manual_upload"}). That metadata travels with the data forever and makes it searchable later. Under the hood, creating a sequence causes the Mosaico Daemon (mosaicod), the server-side process that owns all storage, to allocate the necessary catalog entries and prepare to receive incoming data.
A Topic lives inside a Sequence and represents a single sensor stream. Each topic has a strict one-to-one relationship with a data type from the Ontology, Mosaico's type system. The Ontology defines the shape of every piece of data the platform can store. Built-in types like IMU, GPS, and Pressure are already registered, so you can start using them immediately. This strictness is intentional: because every record in a topic is the same type, the daemon can serialize and index data far more efficiently using Apache Arrow columnar layout. When you create a topic and bind it to IMU, you are telling the daemon to open a typed, append-only stream optimized for that exact schema.
A Message is the in-memory wrapper you hand to the SDK. It couples a typed data payload (your IMU, GPS, or Pressure object) with a nanosecond-precision timestamp. The timestamp is what Mosaico uses to order and index records in time. It is not the wall-clock time of the ingestion call, but the sensor timestamp embedded in your source data. The SDK serializes the Message and transmits it to the daemon over gRPC. You never deal with bytes directly; you construct typed objects, wrap them in Message, and let the SDK handle the rest.
Chunked Loading for High-Volume Data
In this example the source is imu.csv, a file with columns timestamp, acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z. Download it to follow along locally.
- Python
- C++
- Rust
When working with large files, loading everything into memory at once is impractical. Instead, we write a generator function that reads the CSV in fixed-size chunks using pandas and yields one Message at a time. This way, only a small slice of the file occupies memory at any moment, and the rest of the pipeline (the SDK's internal batching, the gRPC transport, the daemon's write path) can operate in a steady, pipelined fashion rather than in a single massive burst.
Notice that the generator yields None on parse errors rather than raising an exception. This is a deliberate choice: a corrupt or missing value in one row of a CSV should not abort the entire ingestion. By yielding None, we signal to the caller that this particular record could not be constructed; the caller logs it and moves on. The generator itself remains simple and composable.
import pandas as pd
from mosaicolabs import (
MosaicoClient,
setup_sdk_logging,
SessionLevelErrorPolicy,
Message,
IMU,
Vector3d,
)
def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=IMU(
acceleration=Vector3d(
x=float(row.acc_x),
y=float(row.acc_y),
z=float(row.acc_z),
),
angular_velocity=Vector3d(
x=float(row.gyro_x),
y=float(row.gyro_y),
z=float(row.gyro_z),
),
),
)
except Exception:
yield None
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Here IMU and Vector3d are built-in Mosaico Ontology types. You do not need to define or register them; they are part of the default type catalog that ships with the SDK. The Message wrapper carries the typed IMU payload along with the nanosecond timestamp parsed directly from the CSV's timestamp column, preserving the original sensor time rather than substituting the current wall clock.
Sequence Upload
- Python
- C++
- Rust
With the data loader in place, the next step is to open a connection to the Mosaico Daemon and create a Sequence. MosaicoClient.connect() establishes a gRPC channel to the daemon running on the specified host and port. Using it as a context manager ensures the connection is cleanly closed even if an exception occurs.
Inside the client context, client.sequence_create() registers a new Sequence with the daemon and returns a SequenceWriter, the object that orchestrates the entire write session. All topic creation and data pushing happens through this writer. When the with block exits normally, the SequenceWriter flushes any buffered data, finalizes the sequence's time bounds, and marks it as successfully completed in the daemon's catalog. If the block exits due to an unhandled exception, the behavior is governed by the on_error policy you supply.
setup_sdk_logging(level="INFO", pretty=True)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="csv_ingestion_test",
metadata={"source": "manual_upload", "format": "csv"},
on_error=SessionLevelErrorPolicy.Delete # Delete sequence on unhandled error
) as swriter:
# Topic creation and data push happen inside this block...
pass
It is mandatory to use SequenceWriter inside its own with context. Using it outside will raise an exception.
Sequence-Level Error Handling: The on_error policy is your first line of defense against partial or corrupt data reaching the platform. SessionLevelErrorPolicy.Delete instructs the daemon to remove the entire sequence if an unhandled exception escapes the with block. This is the safer default when you would rather have no data than incomplete data. SessionLevelErrorPolicy.Report, by contrast, keeps whatever records were successfully transmitted and marks the sequence with a failed status, allowing you to inspect what arrived before the error. Choose Delete when data integrity across the full session is critical; choose Report when partial data is still useful for debugging or analysis.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Topic Creation
Inside the sequence writer, you register one or more Topics. Calling swriter.topic_create() tells the daemon to open a typed stream named sensors/imu that will accept only IMU messages. The metadata dictionary here annotates the topic itself, useful for tagging the physical sensor unit, calibration version, or any other attribute you want to query later.
The return value is a TopicWriter, which manages the actual write operations for this stream. Internally, the TopicWriter accumulates messages into batches before transmitting them to the daemon, amortizing the cost of individual gRPC calls. You do not need to manage this batching manually; just call push() for each message and the SDK handles the rest.
- Python
- C++
- Rust
with client.sequence_create(...) as swriter:
imu_twriter = swriter.topic_create(
topic_name="sensors/imu",
metadata={"sensor_id": "accel_01"},
ontology_type=IMU,
on_error=TopicLevelErrorPolicy.Ignore
)
The TopicLevelErrorPolicy.Ignore error policy will log on the server any error happening within the TopicWriter context
(e.g. while pushing a message), and then will ignore it, keeping the injestion stream up and active. For a more detailed explanation,
refer to the Topic-Level Error Handling Section.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Pushing Data
- Python
- C++
- Rust
With the TopicWriter in hand, you iterate over the generator and push each message. The SDK serializes each Message
using Apache Arrow, batches it with preceding messages, and transmits batches to the daemon over gRPC.
From your perspective, the call is a simple method invocation; the network and serialization complexity is hidden.
Wrapping the operations related to a topic inside a with context block, allows the SDK to correctly handle errors inside the context,
by means of the selected TopicLevelErrorPolicy policy.
for msg in stream_imu_from_csv("imu_data.csv"):
with imu_twriter:
# Any exception inside here will trigger the `TopicLevelErrorPolicy.Ignore`
imu_twriter.push(message=msg)
if imu_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"IMU message skipped with error: {imu_twriter.last_error}")
Topic-Level Error Management: The with context around each push() call is important.
If an error occurs at the push level, for example a network hiccup or a type validation failure for a particular message, it will propagate up through the generator loop,
exit the SequenceWriter context block abnormally, and trigger the sequence-level on_error policy, potentially deleting everything.
By wrapping each operation related to a topic in the related TopicWriter context, you contain errors to the individual message that caused them.
In this case, since the TopicLevelErrorPolicy.Ignore policy has been selected, The affected timestamp is logged remotely, and ingestion continues with the next message.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
How It Fits Together
It is worth pausing to see the full picture before looking at the complete code. The Mosaico Daemon is always running, managing storage and serving the gRPC API. Your application acts as a client: it opens a connection, instructs the daemon to create a Sequence (a catalog entry with metadata and time bounds), and then registers a Topic within that sequence (a typed, append-only stream). As you iterate through your CSV, each row becomes a Message, a typed, timestamped record. The SDK batches those messages and streams them to the daemon via gRPC using Arrow-encoded payloads. When the SequenceWriter scope exits cleanly, the daemon seals the sequence and it becomes queryable.
- Python
- C++
- Rust
Error handling is layered deliberately. At the row level, the generator yields None for unparseable rows, keeping bad source data out of the message stream entirely. At the push level, a try-except catches transport or validation errors for individual messages without aborting the session. At the sequence level, the on_error policy decides what to do if something truly unrecoverable happens. This three-layer approach means a single bad CSV row never brings down an entire recording session.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Full Example
- Python
- C++
- Rust
import pandas as pd
from mosaicolabs import (
MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, Message, IMU, Vector3d,
)
def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=IMU(
acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)),
angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z)),
),
)
except Exception:
yield None
def main():
setup_sdk_logging(level="INFO", pretty=True)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="csv_ingestion_test",
metadata={"source": "manual_upload", "format": "csv"},
on_error=SessionLevelErrorPolicy.Delete
) as swriter:
imu_twriter = swriter.topic_create(
topic_name="sensors/imu",
metadata={"sensor_id": "accel_01"},
ontology_type=IMU,
on_error=TopicLevelErrorPolicy.Ignore,
)
for msg in stream_imu_from_csv("imu.csv"):
with imu_twriter::
imu_twriter.push(message=msg)
if imu_twriter.status == TopicWriterStatus.IgnoredLastError:
print(f"IMU message skipped with error: {imu_twriter.last_error}")
print("Successfully injected data from CSV into Mosaico!")
if __name__ == "__main__":
main()
The C++ SDK is currently in development.
The Rust SDK is currently in development.