Writing Interleaved Topics
This guide demonstrates how to ingest data from a single MCAP container file that holds interleaved messages from multiple sensors. Because messages from different topics are interleaved in the file, topic writers are created lazily on first encounter rather than up front.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
What MCAP Is and Why Interleaved Ingestion Matters
MCAP is a high-performance, self-describing container format designed for robotics and autonomous systems data, widely used in ROS 2 and beyond. Unlike a simple CSV where each file contains one type of data, an MCAP file encodes all sensor streams together in a single binary file, with messages ordered chronologically by timestamp. At 10:00:00.000 you might find an IMU record, immediately followed by a GPS fix at 10:00:00.001, then a pressure reading at 10:00:00.002, then another IMU record, all interleaved in the order they were originally recorded. This layout is efficient for playback and cross-sensor analysis, but it poses a challenge for ingestion: you cannot know in advance which sensor topics appear in the file, or in what order they first appear.
The serial approach from the previous guide (iterate IMU completely, then GPS, then Pressure) does not apply here. When you read an MCAP file front-to-back, you encounter messages from all topics simultaneously. You need a strategy that can register a new TopicWriter the first time a topic appears and then reuse that writer for every subsequent message on the same topic. This is the lazy registration pattern, and it is the central technique in this guide.
Architecture of Interleaved Ingestion
The ingestion pipeline for an MCAP file has four layers that are worth understanding before diving into the code.
First, the source file stream: the MCAP reader delivers a flat sequence of (schema, channel, message) tuples in chronological order. Each tuple carries the schema name (e.g. "sensor_msgs/msg/Imu"), the channel topic string (e.g. "/sensors/imu"), and the raw binary payload.
Second, schema detection and translation: MCAP schemas are external definitions, typically ROS 2 message types, that do not map directly to Mosaico's Ontology. You need a translation layer that knows how to convert a sensor_msgs/msg/Imu payload into a Mosaico IMU object, extract the embedded timestamp, and return a Message ready for the SDK. This translator is also responsible for declaring which Mosaico Ontology type corresponds to each schema.
Third, dynamic TopicWriter registration: the first time a given channel topic appears in the stream, a new TopicWriter is created and cached inside the SequenceWriter. Every subsequent message on that channel reuses the cached writer. The swriter.get_topic_writer() method is the cache lookup; swriter.topic_create() is the cache population.
Fourth, the push path: once a writer is obtained, the translated message is pushed through it to the daemon, exactly as in the serial guides. The only difference is that this happens inline within the single chronological scan of the file, interleaving pushes across multiple writers as messages are encountered.
Custom Translator and Type Mapping
The translator is the bridge between the external world and the Mosaico type system. External robotics schemas like sensor_msgs/msg/Imu have their own field naming conventions (linear_acceleration, angular_velocity) and timestamp representation (a header.stamp struct with separate seconds and nanoseconds fields). Mosaico's Ontology types have their own conventions (acceleration, angular_velocity as Vector3d; timestamps as a single nanosecond integer on the Message wrapper). The translator is where you perform this mapping explicitly and consciously.
This explicit translation step is not boilerplate to be minimized; it is an important design boundary. It separates the "understanding of external formats" from the "push to Mosaico" logic. If the source schema ever changes (a field rename, a new firmware version), you update the translator in one place. If Mosaico adds a new Ontology type, you add a branch to the translator. The rest of the ingestion pipeline stays unchanged.
The determine_mosaico_type() helper serves a complementary purpose: given a schema name, it returns the Mosaico Ontology class that should be used for the TopicWriter. This is needed at writer creation time, before any payload has been translated, so it is kept separate from the translation logic.
- Python
- C++
- Rust
from mosaicolabs.models import IMU, GPS, Pressure, Vector3d, GPSStatus, Time, Point3d
from mosaicolabs import Message
def custom_translator(schema_name: str, payload: dict):
if schema_name == "sensor_msgs/msg/Imu":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=IMU(
acceleration=Vector3d(**payload['linear_acceleration']),
angular_velocity=Vector3d(**payload['angular_velocity'])
))
if schema_name == "sensor_msgs/msg/NavSatFix":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=GPS(
position=Point3d(x=payload['latitude'], y=payload['longitude'], z=payload['altitude']),
status=GPSStatus(status=payload['status']['status'], service=payload['status']['service'])
))
if schema_name == "sensor_msgs/msg/FluidPressure":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=Pressure(value=payload['fluid_pressure']))
return None
def determine_mosaico_type(schema_name):
return {"sensor_msgs/msg/Imu": IMU, "sensor_msgs/msg/NavSatFix": GPS, "sensor_msgs/msg/FluidPressure": Pressure}.get(schema_name)
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Main Ingestion
- Python
- C++
- Rust
The ingestion loop is where the lazy registration pattern comes to life. For each message from the MCAP reader, the first question is whether a TopicWriter already exists for this channel. swriter.get_topic_writer(channel.topic) checks the SequenceWriter's internal cache and returns the writer if it has been registered, or None if this is the first time this topic has been encountered. On first encounter, we call determine_mosaico_type() to learn which Ontology type this channel carries, and then create the writer with swriter.topic_create(). On all subsequent encounters, we skip straight to the push.
The on_error=TopicLevelErrorPolicy.Finalize parameter is worth paying close attention to. In the serial guides, we caught push exceptions manually with try-except to prevent them from propagating to the sequence level. Here, TopicLevelErrorPolicy.Finalize provides an equivalent guarantee at the framework level: if a topic writer encounters an unrecoverable error, it closes itself gracefully (finalizes its stream) rather than raising an exception that could abort the entire sequence. Other topics continue ingesting normally. This is especially important for interleaved ingestion, where a bad message on the /sensors/imu topic appears in the same chronological stream as valid GPS and Pressure messages; you want those other messages to still reach the daemon even if the IMU stream has a problem.
The twriter.is_active check before each push guards against the case where a topic has already been finalized due to an earlier error. Attempting to push to a finalized writer would raise an error, so the check lets us skip those messages cleanly.
import json
from mcap.reader import make_reader
from mosaicolabs import MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, TopicLevelErrorPolicy
def deserialize_payload(data: bytes, schema_name: str) -> dict:
try:
return json.loads(data.decode("utf-8"))
except Exception:
return {}
def main():
setup_sdk_logging(level="INFO", pretty=True)
with open("mission_data.mcap", "rb") as f:
reader = make_reader(f)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="mcap_ingestion",
metadata={"mission": "alpha_test"},
on_error=SessionLevelErrorPolicy.Delete
) as swriter:
for schema, channel, message in reader.iter_messages():
twriter = swriter.get_topic_writer(channel.topic)
if twriter is None:
ontology_type = determine_mosaico_type(schema.name)
if ontology_type is None:
continue
twriter = swriter.topic_create(
topic_name=channel.topic,
metadata={},
ontology_type=ontology_type,
on_error=TopicLevelErrorPolicy.Finalize
)
if twriter.is_active:
with twriter:
raw = deserialize_payload(message.data, schema.name)
msg = custom_translator(schema.name, raw)
if msg is None:
continue
twriter.push(message=msg)
print("MCAP ingestion complete!")
if __name__ == "__main__":
main()
swriter.get_topic_writer(topic) returns an existing writer or None if the topic has not been registered yet. This pattern avoids pre-declaring all topics when the set of channels is only known at read time. The SequenceWriter maintains the writer cache internally; you do not need to manage a dictionary yourself.
TopicLevelErrorPolicy.Finalize closes the affected topic writer on error, allowing the remaining topics in the sequence to continue ingesting.
Without this policy, a single bad message could trigger the sequence-level error policy and abort the entire upload.
Note that before proceeding with topic-related operations (deserializing, pushing, ...) the if twriter.is_active guard is used: this is necessary when using TopicLevelErrorPolicy.Finalize,
to avoid keeping writing on finalized topics, which would produce runtime errors.
The C++ SDK is currently in development.
The Rust SDK is currently in development.
Generate a Sample MCAP File
Use the script below to generate a synthetic mission_data.mcap file with interleaved IMU, GPS, and barometric pressure messages for testing. The script writes ten timestamped samples for each sensor, with all three sensors emitting a message at each sample time, mimicking a real multi-sensor recording where sensors are synchronized to a common clock.
import time, json
from mcap.writer import Writer
def generate_mission_mcap(output_path: str):
with open(output_path, "wb") as f:
writer = Writer(f)
writer.start()
imu_schema = writer.register_schema(name="sensor_msgs/msg/Imu", encoding="jsonschema", data=b"{}")
gps_schema = writer.register_schema(name="sensor_msgs/msg/NavSatFix", encoding="jsonschema", data=b"{}")
press_schema = writer.register_schema(name="sensor_msgs/msg/FluidPressure", encoding="jsonschema", data=b"{}")
imu_chan = writer.register_channel(topic="/sensors/imu", message_encoding="json", schema_id=imu_schema)
gps_chan = writer.register_channel(topic="/sensors/gps", message_encoding="json", schema_id=gps_schema)
press_chan = writer.register_channel(topic="/sensors/baro", message_encoding="json", schema_id=press_schema)
start_time_ns = time.time_ns()
for i in range(10):
t = start_time_ns + (i * 100_000_000)
sec, nanosec = t // 1_000_000_000, t % 1_000_000_000
writer.add_message(imu_chan, log_time=t, publish_time=t,
data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}},
"linear_acceleration": {"x": 0.01*i, "y": 0.02, "z": 9.81},
"angular_velocity": {"x": 0.0, "y": 0.0, "z": 0.01}}).encode())
writer.add_message(gps_chan, log_time=t, publish_time=t,
data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}},
"latitude": 25.04, "longitude": 121.53, "altitude": 10.5,
"status": {"status": 1, "service": 1}}).encode())
writer.add_message(press_chan, log_time=t, publish_time=t,
data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}},
"fluid_pressure": 101325.0 - (i * 10)}).encode())
writer.finish()
if __name__ == "__main__":
generate_mission_mcap("mission_data.mcap")
Full Example
- Python
- C++
- Rust
import json
from mcap.reader import make_reader
from mosaicolabs import MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, TopicLevelErrorPolicy
from mosaicolabs.models import IMU, GPS, Pressure, Vector3d, GPSStatus, Time, Point3d
from mosaicolabs import Message
def custom_translator(schema_name: str, payload: dict):
if schema_name == "sensor_msgs/msg/Imu":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=IMU(
acceleration=Vector3d(**payload['linear_acceleration']),
angular_velocity=Vector3d(**payload['angular_velocity'])
))
if schema_name == "sensor_msgs/msg/NavSatFix":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=GPS(
position=Point3d(x=payload['latitude'], y=payload['longitude'], z=payload['altitude']),
status=GPSStatus(status=payload['status']['status'], service=payload['status']['service'])
))
if schema_name == "sensor_msgs/msg/FluidPressure":
ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds()
return Message(timestamp_ns=ts, data=Pressure(value=payload['fluid_pressure']))
return None
def determine_mosaico_type(schema_name):
mapping = {
"sensor_msgs/msg/Imu": IMU,
"sensor_msgs/msg/NavSatFix": GPS,
"sensor_msgs/msg/FluidPressure": Pressure,
}
return mapping.get(schema_name)
def deserialize_payload(data: bytes, schema_name: str) -> dict:
try:
return json.loads(data.decode("utf-8"))
except Exception:
return {}
def main():
setup_sdk_logging(level="INFO", pretty=True)
with open("mission_data.mcap", "rb") as f:
reader = make_reader(f)
with MosaicoClient.connect("localhost", 6726) as client:
with client.sequence_create(
sequence_name="mcap_ingestion",
metadata={"mission": "alpha_test"},
on_error=SessionLevelErrorPolicy.Delete
) as swriter:
for schema, channel, message in reader.iter_messages():
twriter = swriter.get_topic_writer(channel.topic)
if twriter is None:
ontology_type = determine_mosaico_type(schema.name)
if ontology_type is None:
continue
twriter = swriter.topic_create(
topic_name=channel.topic,
metadata={},
ontology_type=ontology_type,
on_error=TopicLevelErrorPolicy.Finalize
)
if twriter.is_active:
with twriter:
raw = deserialize_payload(message.data, schema.name)
msg = custom_translator(schema.name, raw)
if msg is None:
continue
twriter.push(message=msg)
print("MCAP ingestion complete!")
if __name__ == "__main__":
main()
The C++ SDK is currently in development.
The Rust SDK is currently in development.