Writing Multiple Topics
This guide demonstrates how to ingest data from multiple CSV files into Mosaico, writing several topics serially within a single sequence. The example covers IMU, GPS, and Pressure sensors, each sourced from a separate CSV file.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Why Multiple Topics in One Sequence
In real robotics deployments, a robot rarely records just one sensor. A field mission might simultaneously produce IMU readings at 200 Hz, GPS fixes at 10 Hz, and barometric pressure samples at 50 Hz. All of these streams are causally related; they describe the same vehicle, in the same environment, during the same run. Storing them in separate, unrelated records would destroy that relationship and make cross-sensor queries needlessly complex.
This is exactly what the Sequence model is designed for. A Sequence is not just a container for one sensor stream; it is a session-level grouping that can hold any number of Topics, each carrying a different sensor type. When you create a single Sequence named multi_sensor_ingestion and register IMU, GPS, and Pressure topics inside it, you are telling the daemon that all three streams belong to the same operational context. Later, when you query the platform, you can retrieve all topics from that sequence together, knowing their timestamps align to the same mission clock.
In this guide, we ingest the three sensors serially: we exhaust the IMU file completely, then the GPS file, then the Pressure file. This is the natural approach when your data arrives pre-split into separate files. The daemon does not require messages to arrive in global timestamp order across topics; each topic maintains its own ordered stream independently. If your source data is already interleaved in a single file, see the Writing Interleaved Topics guide instead.
Streaming Helpers
The guide uses three sample files: imu.csv, gps.csv, and pressure.csv. Download them to follow along locally.
- Python
- C++
- Rust
For each sensor, we define a dedicated generator function that reads its CSV in chunks and yields typed Message objects. This mirrors the single-topic pattern from the previous guide, extended to three sensor types. Each generator is independent; it knows nothing about the other sensors and can be tested in isolation. The GPS type uses a Point3d to hold latitude, longitude, and altitude, and a GPSStatus struct to carry fix quality metadata. The Pressure type is simpler, wrapping a single scalar value.
As before, each generator yields None on a parse error rather than raising an exception. This keeps the generator implementations clean and makes error handling a concern of the calling code, not the data loading code.
import pandas as pd
from mosaicolabs import (
MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy,
Message, IMU, Vector3d, GPS, GPSStatus, Pressure, Point3d
)
def stream_imu_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=IMU(
acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)),
angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z))
)
)
except Exception:
yield None
def stream_gps_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=GPS(
position=Point3d(x=float(row.latitude), y=float(row.longitude), z=float(row.altitude)),
status=GPSStatus(status=int(row.status), service=int(row.service))
)
)
except Exception:
yield None
def stream_pressure_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure))
except Exception:
yield None
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Connect and Create Sequence Writer
- Python
- C++
- Rust
Connecting to the daemon and creating a Sequence follows the same pattern as the single-topic guide. The key difference here is the metadata: we tag this sequence with mission-level context ("mission": "alpha_test", "environment": "laboratory") rather than format-level context. Because all three sensors will live inside this one sequence, the metadata should describe the session as a whole, not any individual stream.
The SequenceWriter returned by sequence_create() is the object that governs the entire session. Every topic you create, every message you push, is ultimately coordinated through this writer. When its with block exits successfully, the daemon finalizes all topics, seals the sequence's time bounds, and marks the entire session as complete and queryable.
setup_sdk_logging(level="INFO", pretty=True)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="multi_sensor_ingestion",
metadata={"mission": "alpha_test", "environment": "laboratory"},
on_error=SessionLevelErrorPolicy.Delete
) as swriter:
pass # Steps 3 and 4 here
It is mandatory to use SequenceWriter inside its own with context. Using it outside will raise an exception.
Sequence-Level Error Handling: SessionLevelErrorPolicy.Delete removes the incomplete sequence entirely on an unhandled error; use this when you want an all-or-nothing guarantee. SessionLevelErrorPolicy.Report flags the sequence as failed while retaining whatever records were already transmitted, which can be useful when partial data has diagnostic value. In multi-topic ingestion, the stakes of a session-level abort are higher: a crash late in the GPS ingestion phase could discard IMU data that was uploaded cleanly. This makes per-topic error containment (covered in the next section) especially important.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Topic Writers
All three TopicWriter objects should be created upfront, before any data is pushed. This is best practice for a straightforward reason: calling topic_create() registers the topic with the daemon immediately, causing it to allocate the catalog entry, prepare the typed write path, and associate the topic with the current sequence. By creating all writers at the start, you ensure the daemon has a complete picture of the session's structure before data starts flowing. It also makes the code easier to reason about: the setup phase is clearly separated from the data-movement phase, and you can verify that all three writers exist before committing to any I/O.
Each TopicWriter is bound to exactly one Ontology type. Attempting to push a GPS message through the imu_twriter would fail at the SDK level because the types are incompatible. This strictness is what allows the daemon to store each topic as a homogeneous, columnar stream rather than a mixed bag of records.
- Python
- C++
- Rust
imu_twriter = swriter.topic_create(
topic_name="sensors/imu",
metadata={"sensor_id": "accel_01"},
ontology_type=IMU,
on_error=TopicLevelErrorPolicy.Ignore,
)
gps_twriter = swriter.topic_create(
topic_name="sensors/gps",
metadata={"sensor_id": "gps_01"},
ontology_type=GPS,
on_error=TopicLevelErrorPolicy.Ignore,
)
pressure_twriter = swriter.topic_create(
topic_name="sensors/pressure",
metadata={"sensor_id": "pressure_01"},
ontology_type=Pressure,
on_error=TopicLevelErrorPolicy.Ignore,
)
The TopicLevelErrorPolicy.Ignore error policy will log on the server any error happening within the TopicWriter context
(e.g. while pushing a message), and then will ignore it, keeping the injestion stream up and active. For a more detailed explanation,
refer to the Topic-Level Error Handling Section.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Pushing Data for Each Sensor
- Python
- C++
- Rust
With all three writers registered, we iterate over each generator and push its messages. The three for loops run one after the other: first all IMU messages, then all GPS messages, then all Pressure messages.
Each loop is independent and isolated from the others by its own with context around each push call.
This per-topic error isolation is the critical design choice in multi-topic ingestion: it allows the SDK to correctly handle errors inside the context,
by means of the selected TopicLevelErrorPolicy policy.
# Push IMU
for msg in stream_imu_from_csv("imu.csv"):
with imu_twriter:
imu_twriter.push(message=msg)
if imu_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"IMU message skipped with error: {imu_twriter.last_error}")
# Push GPS
for msg in stream_gps_from_csv("gps.csv"):
with gps_twriter:
gps_twriter.push(message=msg)
if gps_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"GPS message skipped with error: {gps_twriter.last_error}")
# Push Pressure
for msg in stream_pressure_from_csv("pressure.csv"):
with pressure_twriter:
pressure_twriter.push(message=msg)
if pressure_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"Pressure message skipped with error: {pressure_twriter.last_error}")
Topic-Level Error Management: The with context around each push() call is important.
If an error occurs at the push level, for example a network hiccup or a type validation failure for a particular message, it will propagate up through the generator loop,
exit the SequenceWriter context block abnormally, and trigger the sequence-level on_error policy, potentially deleting everything.
By wrapping each operation related to a topic in the related TopicWriter context, you contain errors to the individual message that caused them.
In this case, since the TopicLevelErrorPolicy.Ignore policy has been selected, The affected timestamp is logged remotely, and ingestion continues with the next message.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Full Example
- Python
- C++
- Rust
import pandas as pd
from mosaicolabs import (
MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy,
Message, IMU, Vector3d, GPS, GPSStatus, Pressure, Point3d
)
def stream_imu_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=IMU(
acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)),
angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z))
)
)
except Exception:
yield None
def stream_gps_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(
timestamp_ns=int(row.timestamp),
data=GPS(
position=Point3d(x=float(row.latitude), y=float(row.longitude), z=float(row.altitude)),
status=GPSStatus(status=int(row.status), service=int(row.service))
)
)
except Exception:
yield None
def stream_pressure_from_csv(file_path, chunk_size=1000, skipinitialspace=True):
for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace):
for row in chunk.itertuples(index=False):
try:
yield Message(timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure))
except Exception:
yield None
def main():
setup_sdk_logging(level="INFO", pretty=True)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="multi_sensor_ingestion",
metadata={"mission": "alpha_test", "environment": "laboratory"},
on_error=SessionLevelErrorPolicy.Delete
) as swriter:
imu_twriter = swriter.topic_create(topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU)
gps_twriter = swriter.topic_create(topic_name="sensors/gps", metadata={"sensor_id": "gps_01"}, ontology_type=GPS)
pressure_twriter = swriter.topic_create(topic_name="sensors/pressure", metadata={"sensor_id": "pressure_01"}, ontology_type=Pressure)
# Push IMU
for msg in stream_imu_from_csv("imu.csv"):
with imu_twriter:
imu_twriter.push(message=msg)
if imu_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"IMU message skipped with error: {imu_twriter.last_error}")
# Push GPS
for msg in stream_gps_from_csv("gps.csv"):
with gps_twriter:
gps_twriter.push(message=msg)
if gps_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"GPS message skipped with error: {gps_twriter.last_error}")
# Push Pressure
for msg in stream_pressure_from_csv("pressure.csv"):
with pressure_twriter:
pressure_twriter.push(message=msg)
if pressure_twriter.status == TopicWriterStatus.IgnoredLastError:
# Last cycle produced an error
print(f"Pressure message skipped with error: {pressure_twriter.last_error}")
print("Successfully injected multi-sensor data from CSV into Mosaico!")
if __name__ == "__main__":
main()
The C++ SDK is currently in development.
The Rust SDK is currently in development.