# Mosaico Doc > The Data Platform for Robotics and Physical AI This file contains all documentation content in a single document following the llmstxt.org standard. ## Overview [**Mosaico Alchemy**](https://github.com/mosaico-labs/mosaico-alchemy) is a collection of ready-to-use data ingestion pipelines for Physical AI and Robotics. It is organized in **Packs**, where each Pack targets a specific domain (like [**Robotic Manipulation**](./packs/manipulation.mdx)) and translates heterogeneous dataset formats into the ontology used by the [**Mosaico SDK**](https://docs.mosaico.dev/python-sdk/). ## Why we built it If you have ever tried to combine datasets from different sources, you already know how this goes. One dataset is a ROS `.bag` from a lab experiment. Another is an HDF5 archive from a simulation run. A third comes from a hardware vendor with a custom binary format and a PDF that vaguely describes the schema. They all contain robot data, but getting them into a shape where you can actually compare or join them is a project in itself. Most of that work is not interesting. It is timestamp reconciliation, coordinate frame alignment, figuring out whether that `vel` field is in m/s or mm/s, and writing throwaway scripts you will never look at again. The deeper point is that this problem is not really about file formats. It is about the fact that the same physical concept, a robot's pose, a joint angle, a camera frame, gets expressed differently by every team and every tool. Alchemy's goal is to show that it is possible to bring all of that under a single, coherent representation. Once data from different sources speaks the same ontology, you can run the same queries against all of it, build datasets that span multiple collection pipelines, and stop worrying about the plumbing every time you add a new source. Each Pack is also a concrete entry point into Mosaico's data pipeline. If you are working with a known dataset format, like those covered in the Robotic Manipulation pack, you can use Alchemy to load it directly into the platform without writing any ingestion code yourself. From that point on, the full Mosaico SDK is available: query across sequences, stream specific topics, filter by sensor values, and feed data straight into your training pipeline. --- ## Manipulation The **Manipulation Pack** is a curated collection of heterogeneous open-source robotic manipulation datasets, each individually studied, analyzed, and mapped into Mosaico's unified semantic ontology. Every dataset was manually inspected to identify its internal topics and source formats, from HDF5 to Parquet, from TensorFlow Records to ROS bags. The result is a single, ready-to-use ingestion suite where a [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) topic originating from a ROS bag looks and behaves exactly like a [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) topic coming from a DeepMind TFRecord. This is the core value proposition: proving that Mosaico acts as the universal standard for semantic sensor data description across deeply fragmented ecosystems. ### Installation Clone the [repository](https://github.com/mosaico-labs/mosaico-alchemy), then install and run the project with **Poetry**: ```bash git clone git@github.com:mosaico-labs/mosaico-alchemy.git cd mosaico-alchemy # Install via Poetry poetry install eval $(poetry env activate) ``` *Note: Requires Python 3.11 or higher.* To test the installation, from your terminal use the following command: ```bash mosaico-alchemy manipulation --help ``` To start ingesting the data, download the datasets from the related repositories (see [Supported Datasets](#supported-datasets)) and then run the `mosaico-alchemy manipulation` command followed by your dataset directories: ```bash mosaico-alchemy manipulation --datasets /path/to/dataset ``` ### Configuration Options The CLI supports the following flags to control the execution environment: | Option | Default | Description | | :--- | :--- | :--- | | `--datasets` | Required | One or more space-separated dataset roots to ingest. | | `--host` | `localhost` | The hostname of your Mosaico Server. | | `--port` | `6726` | The Flight port of your Mosaico Server. | | `--log-level` | `INFO` | Set verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`). | | `--write-mode` | `sync` | Topic execution mode for file-backed data (`sync` or `async`). | ### Supported Datasets We provide built-in support for multiple open-source formats. We recommend exploring them in the following order to understand the offered capabilities: #### [Reassemble](https://researchdata.tuwien.ac.at/records/0ewrv-8cb44) 4,551 contact-rich assembly and disassembly demonstrations across 17 objects, with multimodal sensing from event cameras, force-torque sensors, microphones and multi-view RGB cameras. :::info[Execution Backend] File (HDF5) ::: ##### Topics Ingested into Mosaico | Topic | Ontology Type | Description | | :--- | :--- | :--- | | `/capture_node-camera-image` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video from the DAVIS346 event camera's integrated frame sensor, facing the workspace. | | `/events` | [*`EventCamera`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/event_camera.py)\* | Asynchronous pixel-level brightness change events `[x, y, polarity]` from the event camera. | | `/hama1` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video from the first external camera observing the robot. | | `/hama1_audio` | [*`AudioDataStamped`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/audio.py)\* | Audio stream from the microphone co-located with the first external camera. | | `/hama2` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video from the second external camera observing the robot. | | `/hama2_audio` | [*`AudioDataStamped`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/audio.py)\* | Audio stream from the microphone co-located with the second external camera. | | `/hand` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video from the hand-mounted camera observing the worktable. | | `/hand_audio` | [*`AudioDataStamped`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/audio.py)\* | Audio stream from the microphone co-located with the hand camera. | | `/grasp_failure_label`| [*`SegmentInfo`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/segment_info.py)\* | Hierarchical temporal segmentation of the episode into labelled high-level and low-level action segments, each with start/end timestamps and a success flag. | | `/robot_state/joint_state` | [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) | Position, velocity, and effort for each of the 7 robot arm joints. | | `/robot_state/pose` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | End-effector Cartesian pose `[x, y, z, qx, qy, qz, qw]` in world frame. | | `/robot_state/velocity` | [`Velocity`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.kinematics.Velocity) | End-effector Cartesian velocity `[vx, vy, vz, wx, wy, wz]` in m/s and rad/s. | | `/robot_state/compensated_base_force_torque` | [`ForceTorque`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.dynamics.ForceTorque) | 3D force and torque at the robot base, gravity-compensated. | | `/robot_state/measured_force_torque` | [`ForceTorque`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.dynamics.ForceTorque) | Raw 3D force and torque measured at the end-effector F/T sensor. | | `/robot_state/end_effector` | [*`EndEffector`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/end_effector.py)\* | Position, velocity, and effort for each of the 2 gripper fingers. | \**The custom ontology models defined within the pack module* #### [RT-1 (Fractal)](https://www.tensorflow.org/datasets/catalog/fractal20220817_data) 87,212 pick-and-place episodes across 17 objects in Google micro kitchen environments, with RGB observations, natural language instructions, 512D task embeddings and full end-effector action space. :::info[Execution Backend] File (TFDS) ::: ##### Topics Ingested into Mosaico | Topic | Ontology Type | Description | | :--- | :--- | :--- | | `step/observation/image` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB camera frame `(256×320×3)` from the robot's onboard camera. | | `step/observation/base_pose_tool_reached` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Current end-effector pose `[x, y, z, qx, qy, qz, qw]` in base-relative frame. | | `step/observation/orientation_start` | [`Quaternion`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Quaternion) | End-effector orientation quaternion at the start of the episode (`t=0`), used to normalise subsequent rotations. | | `step/observation/src_rotation` | [`Quaternion`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Quaternion) | Reference orientation quaternion for the current phase of the task. | | `step/observation/natural_language_instruction` | [`String`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.String) | Free-text task instruction (e.g. *"pick rxbar chocolate from bottom drawer and place on counter"*). | | `step/observation/natural_language_embedding` | [*`TextEmbedding`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/text_embedding.py)\* | 512-dimensional float embedding of the natural language instruction. | | `step/observation/gripper_closed` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Binary-like indicator of whether the gripper is currently closed (`1`) or open (`0`). | | `step/observation/gripper_closedness_commanded` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Previously commanded continuous gripper position `[0, 1]`; comparing with `gripper_closed` reveals execution error or physical resistance. | | `step/observation/height_to_bottom` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Altitude of the end-effector above the ground plane, in metres. | | `step/observation/rotation_delta_to_go` | [`Vector3d`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Vector3d) | Remaining rotational displacement `[roll, pitch, yaw]` from current orientation to target, in radians. | | `step/observation/vector_to_go` | [`Vector3d`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Vector3d) | Displacement `[Δx, Δy, Δz]` from the current end-effector position to the target; used as a closed-loop control signal. | | `step/observation/orientation_box` | [*`Vector3dBounds`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/spatial_bounds.py)\* | Min/max rotational bounds `(2×3)` defining the allowed orientation range for an object of interest, in radians. | | `step/observation/robot_orientation_positions_box` | [*`Vector3dFrame`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/spatial_bounds.py)\* | `3×3` matrix describing the robot body's position and orientation in 3D space. | | `step/observation/workspace_bounds` | [*`WorkspaceBounds`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/spatial_bounds.py)\* | `3×3` matrix defining the per-axis spatial limits within which the robot is authorised to operate. | | `step/action/world_vector` | [`Vector3d`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Vector3d) | Commanded Cartesian displacement `[Δx, Δy, Δz]` of the end-effector in base-relative frame. | | `step/action/rotation_delta` | [`Vector3d`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Vector3d) | Commanded orientation change `[roll, pitch, yaw]` in base-relative frame, in radians. | | `step/action/base_displacement_vector` | [`Vector3d`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Vector2d) | Commanded robot base translation `[x, y]` on the horizontal plane, in metres. | | `step/action/base_displacement_vertical_rotation` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Commanded robot base yaw rotation (rotation about the vertical Z axis), in radians. | | `step/action/gripper_closedness_action` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Target gripper closure `[0, 1]` commanding how tightly to grasp an object. | | `step/action/terminate_episode` | [*`TerminateEpisode`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/terminate_episode.py)\* | 3-element signal indicating whether the model believes the task is complete, ongoing, or in error. | | `step/reward` | [`Floating32`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating32) | Scalar reward for the action taken at this step. | | `step/is_first` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True for the initial step of an episode. | | `step/is_last` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True for the final step of an episode. | | `step/is_terminal` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True when the episode ends (by success or failure), triggering a reset of the expected value computation. | \**The custom ontology models defined within the pack module* #### [LeRobot DROID](https://huggingface.co/datasets/lerobot/droid_1.0.1) 95,658 manipulation episodes collected at 13 research institutions, with wrist and dual exterior stereo cameras, joint and Cartesian state, end-effector position and embedded camera extrinsics. :::info[Execution Backend] File (Parquet) ::: ##### Topics Ingested into Mosaico | Topic | Ontology Type | Description | | :--- | :--- | :--- | | `/observation/images/wrist_left` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video `(180×320×3)` from the wrist-mounted camera near the end-effector. | | `/observation/images/exterior_1_left` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video `(180×320×3)` from the first external scene camera. | | `/observation/images/exterior_2_left` | [`CompressedImage`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.CompressedImage) | RGB video `(180×320×3)` from the second external scene camera. | | `/observation/state/joint_position` | [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) | Current positions of the 7 Franka arm joints, in radians. | | `/observation/state/cartesian_position` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Current end-effector Cartesian pose `[x, y, z, roll, pitch, yaw]` in world frame. | | `/observation/state/gripper_position` | [*`EndEffector`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/end_effector.py)\* | Current gripper opening/closing state as a scalar. | | `/action/joint_position` | [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) | Target positions commanded to the 7 Franka arm joints. | | `/action/cartesian_position` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Target end-effector Cartesian pose `[x, y, z, roll, pitch, yaw]` commanded at this step. | | `/action/cartesian_velocity` | [`Velocity`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.kinematics.Velocity) | Target end-effector Cartesian velocity `[vx, vy, vz, vroll, vpitch, vyaw]` commanded at this step. | | `/action/gripper_position` | [*`EndEffector`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/end_effector.py)\* | Target gripper position commanded at this step. | | `/camera_extrinsics/wrist_left` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Extrinsic pose `[x, y, z, roll, pitch, yaw]` of the wrist camera relative to the robot. | | `/camera_extrinsics/exterior_1_left` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Extrinsic pose `[x, y, z, roll, pitch, yaw]` of the first external camera. | | `/camera_extrinsics/exterior_2_left` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | Extrinsic pose `[x, y, z, roll, pitch, yaw]` of the second external camera. | | `/step/reward` | [`Floating64`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating64) | Scalar reward for the action taken at this step, ranging from 0 to 1. | | `/step/discount` | [`Floating64`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Floating64) | Discount factor applied to future rewards during training. | | `/step/task_index` | [`Integer64`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Integer64) | Numeric index identifying the task type for this episode. | | `/step/frame_index` | [`Integer64`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Integer64) | Positional index of this step within its episode. | | `/step/is_first` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True for the initial step of an episode. | | `/step/is_last` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True for the final stored step of an episode. | | `/step/is_terminal` | [`Boolean`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.Boolean) | True when the episode ends by reaching a terminal state. | \**The custom ontology models defined within the pack module* #### [Multimodal Manipulation Learning](https://zenodo.org/records/6372438) 300 ROS bag recordings of a Kuka IIWA robot with Allegro hand, combining Tekscan tactile pressure, microphone audio, torque commands and joint states across 5 material classes. :::info[Execution Backend] ROS `.bag` ::: ##### Topics Ingested into Mosaico | Topic | Ontology Type | Description | | :--- | :--- | :--- | | `/allegro_hand_right/joint_states` | [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) | Position, velocity, and effort for the 16 joints of the Allegro dexterous hand. | | `/iiwa/joint_states` | [`RobotJoint`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/sensors/#mosaicolabs.models.sensors.RobotJoint) | Position, velocity, and effort for the 7 joints of the KUKA iiwa arm. | | `/iiwa/eePose` | [`Pose`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/geometry/#mosaicolabs.models.data.geometry.Pose) | End-effector Cartesian pose `[x, y, z, qx, qy, qz, qw]` in world frame. | | `/iiwa/TorqueController/command` | [*`JointTorqueCommand`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/joint_torque_command.py)\* | Torque-space control command sent to the 7 iiwa joints at each control cycle, in Nm. | | `/tekscan/frame` | [*`TekscanSensor`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/tekscan_sensor.py)\* | Tactile pressure frame from the Tekscan sensor, represented as a 2944-element array interpretable as a `46×64` pressure matrix. | | `/audio/audio` | [*`AudioDataStamped`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/audio.py)\* | Raw mono audio chunks captured during the manipulation trial. | | `/audio/audio_info` | [*`AudioInfo`*](https://github.com/mosaico-labs/mosaico-alchemy/blob/main/src/mosaico_alchemy/manipulation/ontology/audio.py)\* | Audio stream metadata: sample rate (16 kHz), format (S16LE), and codec (MP3). | | `/trialInfo` | [`String`](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/data_types/#mosaicolabs.models.data.base_types.String) | Trial metadata and discrete event markers: the first message contains a YAML block with motion parameters and controller configuration; subsequent messages mark events such as shake start or stop. | \**The custom ontology models defined within the pack module* ### A Note about the Custom Ontology Models Some datasets include application-specific data types that do not map cleanly to Mosaico’s built-in ontology models; for example, the `TerminateEpisode` model is highly specific to the data format defined in the Fractal dataset. Defining custom ontology models lets you represent these data structures precisely, with typed fields, validation rules, and consistent semantics for your own sensors or proprietary message formats. Mosaico provides a [fast path](https://docs.mosaico.dev/python-sdk/SDK/ontology/#customizing-the-ontology) to define and automatically register custom data models. Once registered, they behave like native models and are accepted by the platform without extra integration work. ## Customizing the Pack If your robotic data is saved in a proprietary format and isn't supported out-of-the-box, the module is fully extensible. This section explains how to extend the **Manipulation Pack** to support your own custom dataset formats alongside the built-in ones. By following this guide, you will: * **Select an Execution Backend**: Decide whether to rely natively on ROS bag architectures or to construct a structured File descriptor pipeline. * **Define Ontologies and Adapters**: Choose the Mosaico types that model your sensor streams and implement the adapters that translate raw dictionaries into them. * **Implement the Dataset Plugin**: Assemble the ingestion plan that wires your iterators, ontologies, and adapters into a declarative sequence descriptor. * **Register Your Components**: Expose your plugin and adapters to the CLI layer so they are discovered at runtime. :::tip[Learn from Mosaico] * **[Documentation: Data Models & Ontology](https://docs.mosaico.dev/python-sdk/SDK/ontology)** * **[Example: Customizing the Data Ontology](https://docs.mosaico.dev/examples/custom_ontology)** * **[API Reference: Base Models and Mixins](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/base)** * **[Documentation: The ROS Bridge](https://docs.mosaico.dev/python-sdk/SDK/bridges/ros)** ::: ### Choose the Execution Backend Your first architectural decision is deciding how the data should be accessed. If your dataset relies on standard file and database formats such as HDF5, Parquet, JSON, or custom binary files, you will utilize the file-backed executor by generating a `SequenceDescriptor`. Alternatively, if your dataset consists of native ROS bags and maps naturally to ROS topics, you should use a `RosbagSequenceDescriptor`. This allows the plugin to validate the required topics and immediately delegate the demanding ingestion effort directly to the ROS bridge. ### Apply Ontologies and Implement Adapters For each data stream, you define how it maps to a Mosaico ontology type. If you cannot reuse an existing SDK ontology, you can define your own custom class. We highly recommend reviewing the **[Ontology Customization Guide](https://docs.mosaico.dev/examples/custom_ontology)** to see exactly how to write and register your own custom data models. #### The Custom Adapter With your ontologies defining the target structure, you implement a custom adapter. ```python from mosaicolabs import Message from mosaicolabs.packs.manipulation.adapters.base import BaseAdapter # Adapters are generic on the ontology type they produce. # Specialising BaseAdapter[CustomSensorModel] binds the translate() return type # and restricts this adapter to messages carrying CustomSensorModel payloads only. class MyDatasetCustomAdapter(BaseAdapter[CustomSensorModel]): # The global identifier that will be referenced by name in your `TopicDescriptor`. adapter_id = "mydataset.custom_sensor" # Define the fields required in the payload, to be validated # in the `translate` function. _REQUIRED_KEYS: tuple[str, ...] = ("timestamp","sensor_readings") # The translator factory: an adapter receives the raw payload dictionary # and returns an instantiated Mosaico `Message`. @classmethod def translate(cls, payload: dict) -> Message: """ Translates a raw custom dictionary into a Mosaico Message container. """ # Guard against malformed payloads before any field access: # sensor_readings must be present and contain exactly 4 elements. cls._validate_payload( payload=payload, constraints={"sensor_readings": {"len": 4}} ) return Message( # Conversion from arbitrary time structures into native # Mosaico nanosecond formats. timestamp_ns=int(payload["timestamp"] * 1e9), data=CustomSensorModel(values=payload["sensor_readings"]), ) ``` ### Develop the Dataset Plugin With adapters in place, you implement the dataset plugin class. This class satisfies a straightforward internal protocol to detect your dataset format, discover its logical sequences, and assemble the ingestion plan that wires everything together. #### The Plugin Protocol ```python class DatasetPlugin(Protocol): # A unique string identifier used in downstream logging and operator prompts. dataset_id: str # A method to verify if a given folder matches the dataset signature # (e.g., checking for `*.h5` files) avoiding expensive full-dataset scans. def supports(self, root: Path) -> bool: ... # A method to discover the logical sequences contained in the root folder, # such as individual robot episodes. def discover_sequences(self, root: Path) -> Iterable[Path]: ... # The core logic to create an ingestion plan for each sequence, # returning an `IngestionDescriptor`. def create_ingestion_plan(self, sequence_path: Path) -> IngestionDescriptor: ... ``` #### Extracting Raw Data To keep your plugin code clean and maintainable, raw file I/O operations must be completely separated from the orchestration step. You should create dedicated iterator functions following the **factory pattern**: each function accepts static configuration parameters (file paths, field names) and returns a `Callable[[Path], Iterable[dict]]`. The runner calls that callable later with the actual sequence path to stream the raw payloads. Each payload dict must include a `"timestamp"` key expressed in seconds as a float. ```python def iter_video_frames(video_path: str, timestamps_path: str) -> Callable[[Path], Iterable[dict]]: def _fn(sequence_path: Path) -> Iterable[dict]: # open sequence_path and read the data at video_path / timestamps_path yield {"timestamp": 1234.56, "image": b"..."} return _fn def count_video_frames(timestamps_path: str) -> Callable[[Path], int]: def _fn(sequence_path: Path) -> int: # count the number of frames at timestamps_path return total_frames return _fn ``` #### Structuring the Ingestion Plan The plugin implements `create_ingestion_plan` to declare the sequence name, its metadata, and the full list of topics. Each topic references the iterators and adapters you built in the previous steps. ```python return SequenceDescriptor( # The unique name of the target sequence being constructed. sequence_name=f"{self.dataset_id}_{sequence_path.stem}", # The custom metadata of this sequence. sequence_metadata={ "dataset_id": self.dataset_id, "ingestion_backend": "file", }, topics=[ TopicDescriptor( # The topic name that will handle this data stream. topic_name="/camera/front", # The Mosaico ontology type that models this data stream. ontology_type=CompressedImage, #The identifier of the adapter responsible for translating this specific topic. adapter_id=f"{self.dataset_id}.video_frame", # The runner calls this callable later with the sequence path # to obtain the raw payload stream. payload_iter=iter_video_frames("path/to/video", "path/to/timestamps"), # The runner uses this to count the total messages ahead of time for progress # reporting. It must match the number of items `payload_iter` will yield. message_count=count_video_frames("path/to/timestamps"), ), ], ) ``` ### Register Your Components Once your dataset plugin, functional iterators, and adapters are implemented, they must be made discoverable to the CLI layer. There are two separate registries to update: one for the dataset plugin and one for the adapter. #### Registering the Dataset Plugin ```python from mosaico_alchemy.manipulation.datasets import DatasetRegistry # DatasetRegistry is a singleton pre-populated with the built-in plugins. # Registering here makes your plugin visible everywhere the registry is used. registry = DatasetRegistry() registry.register(MyDatasetPlugin()) ``` Importing `DatasetRegistry` triggers the default registry setup automatically, so the instance returned by `DatasetRegistry()` already contains all built-in plugins. Calling `register` on it adds your plugin to the same shared instance, making it available across both the CLI and the runner without any further wiring: - **CLI** — your dataset appears alongside the built-in options in the interactive selection prompt. - **Runner** — auto-detection via `registry.resolve(root)` will call your plugin's `supports` method against the dataset root. #### Registering the Adapter The adapter must also be registered in the `AdapterRegistry`. Without this step, the file executor will not be able to resolve the `adapter_id` declared in your `TopicDescriptor` and the ingestion will fail. ```python from mosaico_alchemy.manipulation.adapters import AdapterRegistry # AdapterRegistry is a singleton pre-populated with the built-in adapters. # Registering here makes your adapter visible everywhere the registry is used. registry = AdapterRegistry() registry.register(MyDatasetCustomAdapter) ``` After registering both components, your dataset plugin is ready to be executed. --- ## C++ The C++ SDK is currently in active design. We are building it to match the capabilities of the Python SDK, with a focus on low-latency, embedded, and resource-constrained deployments where a native C++ client is required. If you have a specific application or integration that depends on a C++ client, we would like to hear from you. Knowing your use case helps us prioritize the API surface and platform targets that matter most. Reach out directly at [hello@mosaico.dev](mailto:hello@mosaico.dev). --- ## Python For detailed implementation guides and class references, see the [Mosaico Python SDK Documentation](https://docs.mosaico.dev/python-sdk/). --- ## Rust The Rust SDK is currently in active design. We are building it to match the capabilities of the Python SDK, with a focus on performance-critical systems and environments where Rust's safety guarantees are a hard requirement. If you have a specific application or integration that depends on a Rust client, we would like to hear from you. Knowing your use case helps us prioritize the API surface and platform targets that matter most. Reach out directly at [hello@mosaico.dev](mailto:hello@mosaico.dev). --- ## Actions Mosaico implements its own administrative protocols directly on top of Apache Arrow Flight. Rather than relying on a separate control channel abstraction, Mosaico leverages the Flight `DoAction` RPC mechanism to handle discrete lifecycle events, administrative interfaces, and resource management. Unlike streaming endpoints designed for continuous data throughput, these custom actions manage the platform's overarching state. While individual calls are synchronous, they often initiate or conclude multi-step processes, such as topic upload, that govern the long-term integrity of data within the platform. All custom actions follow a standardized pattern: they expect a JSON-serialized payload defining the request parameters and return a JSON-serialized response containing the result. ## Sequence Management Sequences are the fundamental containers for data recordings in Mosaico. These custom actions enforce a strict lifecycle state machine to guarantee data integrity. | Action | Description | Permission | | --- | ---- | --- | | `sequence_create` | Initializes a new, empty sequence. | `write` | | `sequence_delete` | Permanently removes a sequence from the platform. | `delete` | ## Topic Management Topics represent the individual sensor streams (e.g., `camera/front`, `gps`) contained within a sequence. | Action | Description | Permission | | --- | --- | --- | | `topic_create` | Registers a new topic. | `write` | | `topic_delete` | Removes a specific topic from a sequence. | `delete` | ## Session Management Uploading data to the platform is made through sessions. Within a session it is possible to load one or more topics. Once closed, it becomes immutable. | Action | Description | Permission | |--------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ---------- | | `session_create` | Start a new upload session. | `write` | | `session_finalize` | Moves the session status from *uploading* to *archived*. This action locks the session, marking it as immutable. Once finalized, no further data can be added or modified. | `write` | | `session_delete` | Removes a specific session and all its data. | `delete` | ## Notification System The platform includes a tagging mechanism to attach alerts or informational messages to resources. For example, if an exception is raised during an upload, the notification system automatically registers the event, ensuring the failure is logged and visible for troubleshooting. | Action | Description | Permission | |-------------------------| --- | --- | | `*_notification_create` | Attaches a notification to a Sequence or Topic, such as logging an error or status update. | `write` | | `*_notification_list` | Retrieves the history of active notifications for a resource, allowing clients to review alerts. | `read` | | `*_notification_purge` | Clears the notification history for a resource, useful for cleanup after resolution. | `delete` | Here, `*` can be either `sequence` or `topic`. ## Query | Action | Description | Permission | | --- | --- | --- | | `query` | This action serves as the gateway to the query system. It accepts a complex filter object and returns a list of resources that match the criteria. | `read` | ## Misc | Action | Description | Permission | | --- | --- | --- | | `version` | Retrieves the current daemon version. | `read` | --- ## API keys In Mosaico, access control is managed through **API keys**. An API key securely binds a unique token to a specific set of permissions. :::note It is important to note that API keys in Mosaico act as a *loose, coarse-grained access mechanism*. They are designed strictly to limit the *types of operations* a user can perform across the platform as a whole. This mechanism *does not support fine-grained policies*. You cannot use an API key to restrict access to specific resources, such as individual topics or sequences. For example, if an API key is granted the `read` permission, the client is allowed to read data globally across the entire platform, rather than being restricted to a single topic or a specific subset of data. ::: ## Properties Each API key in Mosaico acts as a single access control rule and consists of the following properties: | Property | Status | Description | | --- | --- |-----------------------------------------------------------------------------------------| | **Token** | Required | The unique token provided by the client to authenticate with the Mosaico platform. | | **Permissions** | Required | Which privileges (e.g., `read`, `write`) are granted through the API key. | | **Description** | Required | A human-readable text string explaining the specific purpose or use case of the policy. | | **Creation Time** | Auto-generated | The exact timestamp when the API key and its associated policy were generated. | | **Expiration Time** | Optional | A predetermined date and time after which the API key automatically becomes invalid. | ## Token Structure Mosaico API key token follow a strict three-part format separated by underscores (`_`): ```text [HEADER]_[PAYLOAD]_[FINGERPRINT] ``` **Example** ```text msco_vrfeceju4lqivysxgaseefa3tsxs0vrl_1b676530 ``` **Header**. A fixed prefix that easily identifies the token as a Mosaico API key. This helps developers quickly spot Mosaico tokens in configuration files or environment variables. **Payload**. The actual secret token. It is a 32-byte string consisting entirely of lowercase, alphanumeric ASCII characters. **This payload must be kept secret and should never be exposed in client-side code.** **Fingerprint**. An hash of the payload. Mosaico uses the fingerprint to identify the key within the system without needing to handle or log the raw secret payload. It is primarily used for administrative actions, such as checking the key's status or revoking it. ## Available Permissions Permissions dictate the exact global operations an API key can execute. | Permission | Action Allowed | Typical Use Case | | --- | --- | --- | | `read` | Retrieve and view data from anywhere on the Mosaico platform. | Front-end dashboards, data analytics integrations, or public-facing read-only APIs. | | `write` | Create new data or update existing data anywhere on the platform. | Ingestion scripts, user input forms, or webhooks pushing data to Mosaico. | | `delete` | Permanently remove data from the platform. | Data lifecycle management, GDPR compliance scripts, or cleanup tasks. | | `manage` | Perform administrative operations on the platform. | Rotating/revoking API keys, managing users, or running automated maintenance tasks. | Mosaico follows a hierarchical structure between them. Each permission automatically inherits all the privileges of the previous one (e.g. `write` has also `read` privileges, `manage` inherits `read`, `write` and `delete` privileges). --- ## Background Cleanup Routine To ensure high performance and low latency during user operations, Mosaico does not perform physical deletions in object storage in real-time. When a **Sequence** or **Topic** is deleted via the [Management Actions](actions.md#sequence-management), the database record is removed immediately, but the associated storage files remain. The **Cleanup Routine** is a background process responsible for identifying and permanently purging these orphaned files. ## Deletion Lifecycle The cleanup process operates in two distinct phases to prevent accidental data loss and optimize system resources: * **Marking (Soft Deletion):** The routine identifies folders in object storage that no longer have a corresponding entry in the database. These folders are marked for deletion by creating a `TO_DELETE` marker file inside the directory. * **Purging (Permanent Deletion):** Once a folder has been marked and the configured retention period has elapsed, the routine permanently removes the folder and all its contents from the object store. ## Configuration The routine behavior can be configured via the following [environment variables](env.md#cleanup-routine): | Variable | Description | | :--- |:--------------------------------------------------------------------------------------------| | `MOSAICOD_CLEANUP_TIME_INTERVAL` | The frequency at which the routine checks whether it should run or not (e.g., `1h`, `24h`). | | `MOSAICOD_CLEANUP_RETENTION_DURATION` | The minimum age of a TO_DELETE marker before the folder is eligible for permanent removal. | ## Distributed Coordination In a multi-instance environment, every `mosaicod` daemon runs its own background cleanup routine. To prevent race conditions and redundant resource consumption, the instances coordinate via the database: * **Concurrency Control:** A centralized log history in the database tracks active cleanup sessions. If one instance is already performing a cleanup, other instances will remain idle. * **Execution Logic:** When an instance wakes up, it checks the timestamp of the last successful cleanup. A new cycle begins only if: 1. No other cleanup is currently "In Progress." 2. The time elapsed since the last completion exceeds the `MOSAICOD_CLEANUP_TIME_INTERVAL`. :::note This cooperative model ensures that even with dozens of daemon instances, the object storage is never overwhelmed by simultaneous deletion requests. ::: --- ## CLI Reference ## mosaicod run Start the server locally ```bash mosaicod run [OPTIONS] ``` ### Options | Option | Default | Description | | :--- | --- | :--- | | `--host ` | `127.0.0.1` | Specify a host address. | | `--port ` | `6726` | Port to listen on. | | `--tls` | `false` | Enable TLS. When enabled, the following envirnoment variables needs to be set `MOSAICOD_TLS_CERT_FILE` and `MOSAICOD_TLS_PRIVATE_KEY_FILE` | | `--api-key` | `false` | Require API keys to operate. When enabled the system will require API keys to perform any actions. | ## mosaicod api-key Manage API keys. ### create Create a new API key. ```bash mosaicod api-key create --permissions [read|write|delete|manage] [OPTIONS] ``` | Option | Default | Description | | :--- | --- | :--- | | `-d, --description` | | Set a description for the API key to make it easily recognizable. | | `--expires-in ` | | Define a time duration, using the ISO8601 format, after which the key in no longer valid (e.g. `P1Y2M3D` 1 year 2 months and 3 days) | | `--expires-at ` | | Define a datetime, using the rfc3339 format, after which the key in no longer valid (e.g `2026-03-27T12:20:00Z`) | ### revoke Revoke an existing API key. ```bash mosaicod api-key revoke ``` The [fingerprint](api_key.md#token-structure) are the last 8 digits of the API key. ### status Check the status of an API key. ```bash mosaicod api-key status ``` The [fingerprint](api_key.md#token-structure) are the last 8 digits of the API key. ### list List all API keys. ```bash mosaicod api-key list ``` ### purge Remove API keys in bulk. By default, only expired keys are removed; use the `--all` flag to remove every key, regardless of its expiration status. ```bash mosaicod api-key purge [OPTIONS] ``` | Option | Description | | --------------- | ------------------------------------------------------------------ | | `-A`, `--all` | Remove **all** API keys, including those that have not yet expired. | #### Examples Remove only expired keys: ```bash mosaicod api-key purge ``` Remove all API key: ```bash mosaicod api-key purge --all ``` or, equivalently: ```bash mosaicod api-key purge -A ``` :::warning Using `--all` is irreversible: every API key will be permanently revoked, including keys currently in use by services or integrations. Make sure you have a way to reissue the keys before running this command. ::: ## Common Options Each `mosaicod` command shares the following common options: | Options| Default | Description | | :--- | --- | :--- | | `--log-format ` | `pretty` | Set the log output format. Available values are: `json`, `pretty`, `plain`| | `--log-level ` | `warning` | Set the log level. Possible values: warning, info, debug | --- ## Environment Variables Here we provide a complete list of environment variables that can be used to configure the daemon. These variables allow you to customize various aspects of the daemon's behavior, including database connections, storage options and more. ## General - `MOSAICOD_MAX_GRPC_MESSAGE_SIZE`: The maximum allowed [gRPC](https://grpc.io/) message size in bytes. If a message exceeds this size, a protocol error will be returned. Default is `50 MB`. If you need to update this value be aware that this value is tipically smaller than `MOSAICOD_PARQUET_IN_MEMORY_ENCODING_BUFFER_SIZE`. - `MOSAICOD_TARGET_MESSAGE_SIZE`: Target message size in bytes used during data streaming. The daemon will try to aggregate a number of [`RecordBatches`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html) to create a sufficiently large message. If the resulting batch size exceeds the limit, it will be capped by `MOSAICOD_MAX_BATCH_SIZE`. Defaults to `25MB`. - `MOSAICOD_MAX_CONCURRENT_WRITES`: The maximum number of concurrent encoding and serialization operations. This setting controls how many data batches can be processed and sent to the object store simultaneously. It is important to note that this does not limit the number of topics the server can handle; rather, it constrains the parallel execution of the encoding/serialization pipeline. Each operation runs in a dedicated thread to handle CPU-bound compression and I/O-bound storage tasks. This value should be tuned based on available RAM and CPU. Excessive parallelism may lead to scheduler thrashing or memory exhaustion. Defaults to `MOSAICOD_DEFAULT_PARALLELISM`. - `MOSAICOD_MAX_CONCURRENT_CHUNK_QUERIES`: Maximum number of concurrent queries that can be executed against data chunks. Default is `4`. - `MOSAICOD_MAX_BATCH_SIZE`: Maximum batch size (number of elements inside an Arrow `RecordBatch`) used during data streaming. Defaults to [DataFusion default batch size](https://datafusion.apache.org/user-guide/configs.html#via-sql) `8192`. - `MOSAICOD_DEFAULT_PARALLELISM`: Sets the degree of parallelism. While this is typically detected automatically based on available hardware, this field allows for a manual override in environments where automatic detection might fail or be inaccurate. - `MOSAICOD_QUERY_ENGINE_MEMORY_POOL_SIZE`: Defines the amount of memory (in bytes) used by the query engine. Set this value to a number greater than 0 to enforce a hard limit on the memory allocated by the query engine. Use this setting if mosaicod encounters OOM (Out Of Memory) errors or you plan to use `mosaicod` in a memory constrained environment. Defaults to `0` (no limit). - `MOSAICOD_PARQUET_IN_MEMORY_ENCODING_BUFFER_SIZE`: Size (in bytes) of the in-memory buffer used for encoding parquet data. Defaults to `75MB`. ## TLS - `MOSAICOD_TLS_CERT_FILE`: Path to the TLS certificate file used for secure communication. Default is an empty string. - `MOSAICOD_TLS_PRIVATE_KEY_FILE`: Path to the TLS private key file used for secure communication. Default is an empty string. ## DBMS - `MOSAICOD_DB_URL`: Database connection URL. This should be in the format expected by the database driver being used. **Required**. - `MOSAICOD_MAX_DB_CONNECTIONS`: Maximum number of database connections that can be established. Default is `10`. ## Store - `MOSAICOD_STORE_ENDPOINT`: Endpoint URL for the object storage service (e.g., S3). Use `file:///some/absolute/path` to set up a local storage directory. **Required**. - `MOSAICOD_STORE_BUCKET`: Name of the bucket in the object storage service where data will be stored. When using the local filesystem endpoint, the system creates a new directory named after the bucket within the endpoint path. **Required**. - `MOSAICOD_STORE_ACCESS_KEY`: Access key for the object storage service. Default is an empty string. - `MOSAICOD_STORE_SECRET_KEY`: Secret key for the object storage service. Default is an empty string. ## Cleanup routine - `MOSAICOD_CLEANUP_TIME_INTERVAL`: Specifies (in seconds) how often the routine checks whether it should run or not. Set it to 0 if you want to disable the cleanup routine. Default is once a day. - `MOSAICOD_CLEANUP_RETENTION_DURATION`: How long (seconds) a folder is retained before being permanently deleted. Set it to 0 if you want to delete it right away. Default is 1 day. --- ## Overview(Daemon) The **Mosaico Daemon**, a.k.a. `mosaicod`, acts as engine of the data platform. Engineered to be the high-performance arbiter for all data interactions, guaranteeing that every byte of robotics data is strictly typed, atomically stored, and efficiently retrievable. It functions on a standard client-server model, mediating between your high-level applications and the low-level storage infrastructure. ## Why `mosaicod`? Robotics produces a data profile that standard infrastructure is rarely equipped to handle. A single system generates dozens of concurrent, asynchronous streams—ranging from high-bandwidth point clouds to sparse IMU readings—each operating at independent frequencies. When this data is treated as a generic stream of bytes, the result is usually significant storage bloat and a high barrier to retrieval. The necessity of mosaicod stems from the requirement to move data management away from the application layer and into a dedicated, persistent daemon. By sitting between the producers and the disk, the daemon can apply compression based on the specific data model rather than a generic algorithm, and it can facilitate partial retrieval so that a two-second event can be extracted from a ten-hour log without scanning unrelated data. Furthermore, robotics development relies on long-term data viability and lineage. As software evolves, schemas change; `mosaicod` anchors type enforcement to the specific schema snapshot present at the time of recording, preventing historical data from becoming unreadable. It also manages the inherent contention of multiple uncoordinated processes writing to the same sink simultaneously. By centralizing these responsibilities, the system ensures that data integrity, lineage, and schema enforcement are built-in properties of the recording process rather than manual tasks for the developer. ## Architectural Design `mosaicod` is architected atop the [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html) protocol, a general-purpose, high-performance client-server framework developed for the exchange of massive datasets. It operates directly on [Apache Arrow](https://arrow.apache.org/) columnar data, enabling efficient transport over [gRPC](https://grpc.io/) without the overhead of serialization. Unlike traditional REST APIs which serialize data into text-based JSON, Flight is designed specifically for high-throughput data systems. This architectural choice provides Mosaico with three critical advantages: **Zero-Copy Serialization.** Data is transmitted in the Arrow columnar format, the exact same format used in-memory by modern analytics tools like pandas and Polars. This eliminates the CPU-heavy cost of serializing and deserializing data at every hop. **Parallelized Transport.** Operations are not bound to a single pipe; data transfer can be striped across multiple connections to saturate available bandwidth. **Snapshot-Based Schema Enforcement.** Data types are not guessed, nor are they forced into a rigid global model. Instead, the protocol enforces a rigorous schema handshake that validates data against a specific schema snapshot stored with the sequence. ### Resource Addressing Mosaico treats every entity in the system, whether it's a Sequence or a Topic, as a uniquely addressable resource. These resources are identified by a **Resource Locator**, a uniform logical path that remains consistent across all channels. Mosaico uses two types of resource locators: - A **Sequence Locator** identifies a recording session by its sequence name (e.g., `run_2023_01`). - A **Topic Locator** identifies a specific data stream using a hierarchical path that includes the sequence name and topic path (e.g., `run_2023_01/sensors/lidar_front`). ### Storage Architecture `mosaicod` uses an **RDBMS** to perform fast queries on metadata, manage system state such as sequence and topic definitions, and handle the event queue for processing asynchronous tasks like background data processing or notifications. An **object store** (such as S3, MinIO, or local filesystem) provides long-term storage for resilience and durability, holding the bulk sensor data, images, point clouds, and immutable schema snapshots that define data structures. :::info The RDBMS system is not strictly required for data durability. If the metadata database is corrupted or destroyed, `mosaicod` can rebuild the entire catalog by rescanning the durable object storage. This design ensures that while the DBMS is used to create relations between datasets, the store guarantees long-term durability and recovery, protecting your data against catastrophic infrastructure failure. ::: ## The Mosaico Protocol Arrow Flight provides the transport layer, but it is intentionally generic — it knows nothing about sequences, topics, schemas, or the semantics of a robotics recording. `mosaicod` builds its own protocol on top of it: a structured set of commands and message formats that map the Flight primitives onto Mosaico's data model. This protocol defines how a client opens a recording session, registers a topic with its schema, streams typed data into a sequence, and later queries or retrieves that data. It handles the schema handshake that ties a write to a specific snapshot, the resource locators that address sequences and topics uniformly, and the metadata exchanges that keep the catalog consistent. The protocol is designed around the constraints of robotics workloads: high-frequency writes from multiple concurrent producers, reads that may span long time ranges, and the need to mix small structured messages (like joint states) with large binary payloads (like compressed images or point clouds) within the same session. ### SDKs Because the Mosaico protocol is built on top of Flight and gRPC, interacting with `mosaicod` directly would require assembling low-level RPC calls and manually constructing the message formats the daemon expects. The Mosaico SDKs remove this entirely. The SDKs speak the protocol natively — they handle connection management, schema registration, the Arrow serialization, and the correct sequencing of operations. From the client's perspective, writing a topic or querying a sequence is a straightforward API call; the protocol details are fully abstracted away. Currently, Mosaico provides an SDK for **Python**, with additional language support planned. The Python SDK is the primary interface for data ingestion, retrieval, and integration with the broader robotics and data science ecosystem. --- ## Ingestion Data ingestion in Mosaico is explicitly engineered to handle write-heavy workloads, enabling the system to absorb high-bandwidth sensor data, such as 4K video streams or high-frequency Lidar point clouds, without contending with administrative traffic. ## Sessions We have intentionally moved away from traditional versioning. While versioning is a common pattern, it often introduces significant architectural complexity and degrades the developer experience when querying historical data. For instance, if multiple versions of a single sequence exist, the query engine faces an ambiguous choice: should it return the latest state, the most stable state, or a specific point-in-time snapshot? Consider a scenario where you are tracking a temperature sensor. If you have versioned sequences, a query for a given temperature becomes problematic: do you search across Version 1.2 or Version 2.0? Has the data changed across versions? Mosaico eliminates this ambiguity by focusing on **Sessions** rather than versions. A session represents an immutable **data layer** within Mosaico. It acts as a logical container for topics that are uploaded together as a single unit. To visualize the hierarchy, think of a sequence as the primary data container. Sessions then represent specific, immutable layers of data within that sequence, while topics serve as a specific instance of an ontology model. This design allows for high-concurrency environments; you can upload multiple sessions simultaneously without the risk of data races or state corruption. Sessions are the primary mechanism used to update and evolve sequences. This model is particularly powerful for parallel workloads that need to contribute data to the same base layer. Because sessions are independent and immutable, separate workloads can operate in isolation. For example, one process might be cleaning a dataset while another is appending real-time logs; both can work based on the same base data layer and commit their results as independent sessions without interfering with one another. ### An example To illustrate the session workflow in practice, imagine you are managing a dataset for autonomous driving. When you first initialize a sequence, you perform a bulk upload of raw sensor data collected during a drive. This initial upload constitutes your first session, containing the base telemetry and camera streams: ```text [ SESSION: BASE DATA ] topolino_amaranto_25032026/camera/front topolino_amaranto_25032026/camera/back topolino_amaranto_25032026/car/gps topolino_amaranto_25032026/car/imu ``` Once this base layer is established, you can trigger two independent processing workloads. The first workload focuses on computer vision, analyzing the front and back camera streams to generate object detection labels. Simultaneously, a second workload runs a visual odometry algorithm, fusing data from the cameras, IMU, and GPS to calculate the precise trajectory of the vehicle. Because Mosaico supports concurrent sessions, these two tasks do not need to wait for one another or risk overwriting the base data. The vision task commits its results as one session, and the odometry task commits its results as another. The final state of your sequence is now an enriched, multi-layered data container. When you query this sequence, the engine provides a unified view that includes the original raw sensors plus the newly generated analytical topics: ```text [ SESSION: BASE DATA ] topolino_amaranto_25032026/camera/front topolino_amaranto_25032026/camera/back topolino_amaranto_25032026/car/gps topolino_amaranto_25032026/car/imu [ SESSION: LABELING DATA ] topolino_amaranto_25032026/labels/object_detection [ SESSION: VISUAL ODOMETRY ] topolino_amaranto_25032026/tracks/visual_odometry ``` This approach allows your data to grow *horizontally* with new topics while maintaining the absolute immutability of the original sensor readings. ## Ingestion Protocol The data ingestion protocol in Mosaico follows a structured, multi-step flow designed to ensure type safety and prevent race conditions in high-concurrency environments. The process begins with `sequence_create`, which establishes the primary data container. Because Mosaico uses a session-based update model, you must then initialize a specific *data layer* using `session_create`, returning a session UUID that serves as the active context for all subsequent uploads within that specific batch. Within this active session, you define individual data streams via `topic_create`, where each topic is assigned a unique path (e.g., `my_sequence/topic/1`) and returns its own topic UUID. Data is then transmitted using the Arrow Flight `do_put` operation, starting with an Arrow schema for structural validation and followed by a stream of `RecordBatch` payloads. By passing the topic UUID to `do_put`, the system ensures the incoming binary stream is mapped correctly to the intended topic and session layer. Once all topics and their respective data streams are uploaded, the session must be formally committed with `session_finalize`. This action triggers server-side validation against registered ontologies, chunks the data for efficient storage, and locks the session to make the data permanent and available for downstream queries. Here is a simplified example of the ingestion flow using a pyhton-like pseudocode: ```py title="ingestion_protocol" {9} # Initialize the Sequence and the Session layer sequence_create("my_sequence", metadata) ss_uuid = session_create("my_sequence") # Create topics within the session and stream data # # The `topic_create` action returns a UUID that must be passed # to the `do_put` call to route the data stream correctly. t1_uuid = topic_create(ss_uuid, "my_sequence/topic/1", metadata) do_put(t1_uuid, data_stream) t2_uuid = topic_create(ss_uuid, "my_sequence/topic/2", metadata) do_put(t2_uuid, data_stream) # Commit the session to make data immutable session_finalize(ss_uuid) ``` During finalization, all resources are consolidated and archived. Alternatively, you can call [`session_delete(ss_uuid)`](actions.md#session-management) to discard the upload. Or call [`sequence_delete(sq_uuid)`](actions.md#sequence-management) to discard the entire sequence if you want to start over. :::note UUIDs are employed throughout the protocol to prevent contentious uploads. For instance, if two users attempt to create a resource with the same name simultaneously, the system ensures only one successfully receives a UUID. This identifier acts as a *token of authority*, ensuring that subsequent `do_put` or `finalize` operations are performed only by the user who initiated the resource. UUID are available to clients only during the `*_create` actions. Possessing a UUID is a prerequisite for performing any further operations on that resource, such as uploading data or finalizing the session. This design choice ensures that only the creator of a resource can modify it. ::: ### Updating a Sequence To extend a sequence with new data, you follow a nearly identical flow to the initial ingestion, but you omit the `sequence_create`. This process leverages Mosaico's session model to layer new information onto the established sequence without modifying the original data. Here is a simplified example of the update flow using a pyhton-like pseudocode: ```py title="extending_a_sequence_with_new_data" {8} # Initialize a NEW Session layer for the existing sequence ss_uuid = session_create("my_sequence") # Create new topics (e.g., processed labels) within this session # # Even though the sequence already exists, the `topic_create` call within a new # session ensures this specific data stream is tracked as a new, immutable contribution. t3_uuid = topic_create(ss_uuid, "my_sequence/labels/object_detection", metadata) do_put(t3_uuid, processed_data_stream) # Commit the new session to merge the layer into the sequence session_finalize(ss_uuid) ``` :::tip Because each upload is encapsulated in its own session, multiple workers can extend the same sequence simultaneously. Mosaico handles the isolation of these sessions, ensuring that `session_finalize` only commits the specific topics and data associated with that worker's session UUID. ::: ### Aborting an Upload If you encounter an error *during the ingestion process* or simply want to discard all the data just uploaded, there is a straightforward way to abort the sequence. Here is a simplified example of the abort flow using a pyhton-like pseudocode: ```py title="aborting_an_upload", hl_lines="13 17" # Initialize the Sequence and the Session layer sequence_create("my_sequence", metadata) ss_uuid = session_create("my_sequence") # Create topics within the session and stream data t1_uuid = topic_create(ss_uuid, "my_sequence/topic/1", metadata) # (1)! do_put(t1_uuid, data_stream) t2_uuid = topic_create(ss_uuid, "my_sequence/topic/2", metadata) do_put(t2_uuid, data_stream) # Delete the session to discard all uploaded topics data session_delete(ss_uuid) # You can delete the dangling sequence since there are no data associated # or avoid deletion and start a new session to upload new data with a `session_create` sequence_delete() ``` :::warning If **API key management** is enabled, the `sequence_delete` and `session_delete` actions require a key with at least `delete` privileges. ::: ## Chunking & Indexing Strategy The backend automatically manages *chunking* to efficiently handle intra-sequence queries and prevent memory overload from ingesting large data streams. As data streams in, the server buffers the incoming data until a full chunk is accumulated, then writes it to disk as an optimal storage unit called a *chunk*. :::tip The chunk size is configurable via the `MOSAICOD_MAX_CHUNK_SIZE_IN_BYTES` environment variable. Setting this value to `0` disables automatic chunking, allowing for unlimited chunk sizes. However, it is generally recommended to set a reasonable chunk size to balance memory usage and query performance. See the [environment variables](env.md#general) section for more details. ::: For each chunk written to disk, the server calculates and stores *skip indices* in the metadata database. These indices include ontology-specific statistics, such as type-specific metadata (e.g., coordinate bounding boxes for GPS data or value ranges for sensors). This allows the query engine to perform content-based filtering without needing to read the entire bulk data. --- ## Setup ## Precompiled Binaries Precompiled binaries for `mosaicod` are available for several platforms and can be downloaded directly from the [GitHub Releases page](https://github.com/mosaico-labs/mosaico/releases). ## Running with Containers For rapid prototyping, we provide a standard Docker Compose configuration. This creates an isolated network environment containing the `mosaicod` server and its required PostgreSQL database. ```yaml title="compose.yml", {25,34-37,50} name: "mosaico" services: database: image: postgres:18 container_name: postgres hostname: db environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password POSTGRES_DB: mosaico networks: - mosaico volumes: - pg-data:/var/lib/postgresql healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 5s retries: 5 mosaicod: image: ghcr.io/mosaico-labs/mosaicod # There are other available predefined tags that you can use. container_name: mosaicod:latest networks: - mosaico # Here you can list any additional command line options for `mosaicod`. # In this example, we configure the server to use the local filesystem for # storage, which is mounted to the `/data` directory in the container. # This allows you to persist data across container restarts and easily access # it from the host machine. Additional environment variables can be set here to configure # the daemon's behavior. environment: MOSAICOD_DB_URL: postgresql://postgres:password@db:5432/mosaico MOSAICOD_STORE_ENDPOINT: file:/// MOSAICOD_STORE_BUCKET: data volumes: - mosaico-data:/data command: | run --host 0.0.0.0 --port 6726 --log-level info depends_on: database: condition: service_healthy ports: # Remove `127.0.0.1` to expose this service to external networks. # By default, this configuration restricts access to the local machine for security reasons. # If you need to access the server from other machines on the network, # you can modify the port mapping to allow external connections. - "127.0.0.1:6726:6726" volumes: pg-data: mosaico-data: networks: mosaico: ``` This configuration provisions both Postgres and mosaicod within a private Docker network. Only the daemon instance is exposed to the host. :::warning In this basic prototyping setup, TLS and API key management are disabled. The port mapping is restricted to `127.0.0.1`. If you need to access this from an external network, consider configuring `mosaicod` to [enable TLS](tls.md) or use a reverse proxy to handle SSL termination. ::: ### Container tags `mosaicod` images provides four types of container tags. | Tag Type | Description | | :--- | :--- | | **`latest`** | Always points to the most recent official stable release. This is the default choice for general use. | | **`x.y`** | Points to the latest minor release, like `0.3`. Use this to receive critical patches within a specific version series while avoiding major breaking changes. | | **`x.y.z`** | Points to a specific, immutable stable release, like `0.3.12`. This is the recommended choice for production environments requiring maximum consistency. | | **`nightly`** | Updated daily with the latest code from the main branch. Use this to test new features and bug fixes before they are officially released. | ## Building from Source While Docker images are available for each release, you can compile `mosaicod` from source if you need a specific version not available as a pre-built image. Building from source requires a Rust toolchain. The project uses `sqlx` for compile-time query verification, which normally requires a live database connection. However, Mosaico supports a simpler offline build mode that uses cached query metadata from the `.sqlx` directory, removing the need for a database during compilation. ### Offline Build You can run a offline build using cached sqlx queries with a single command. ```bash SQLX_OFFLINE=true cargo build --release ``` ### Online Build If you need to modify the database schema, a running PostgreSQL instance is required. This allows `sqlx` to verify queries against a live database during compilation. You can use the provided Docker Compose file in `docker/devel` which sets up an instance of [MinIO](https://www.min.io/) and a PostgreSQL database. First, start the development environment: ```bash cd docker/devel # Start the services in the background docker compose up -d # To stop and remove the volumes (which clears all data), run: docker compose down -v ``` Apply database migrations to the running PostgreSQL instance. This ensures that the database schema is up-to-date and allows `sqlx` to verify queries during compilation. Next, from the root of the `mosaicod` workspace, install the necessary tools, configure the environment, and run the build. ```bash cd mosaicod # Install the SQLx command-line tool cargo install sqlx-cli # Copy the development environment variables for the database connection cp env.devel .env # Apply the database migrations cd crates/mosaicod-db cargo sqlx migrate run # And finally you can build mosaicod cargo build --release --bin mosaicod ``` ## Configuration The server supports S3-compatible object storage by default but can be configured for local storage via command line options. ### Database Mosaico requires a connection to a running **PostgreSQL** instance, which is defined via the `MOSAICOD_DB_URL` environment variable. ### Remote Storage Configuration For production deployments, `mosaicod` should be configured to use an S3-compatible object store (such as AWS S3, Google Cloud Storage, Hetzner Object Store, etc) for durable, long-term storage. This is configured setting the proper [environment variables](env/#store) for your object store provider. ### Local Storage Configuration This command will start a `mosaicod` instance using the local filesystem as storage layer. ```sh export MOSAICOD_STORE_ENDPOINT=file:///some/local/directory export MOSAICOD_STORE_BUCKET=bucket-name ``` and run `mosaicod run`. ## Advanced ### Bare Metal Deployment When running `mosaicod` on a bare metal server, a few tuning knobs can significantly improve resource efficiency and network throughput. #### Limit Tokio Worker Threads By default, the async runtime used by `mosaicod` spawns one worker thread per logical CPU core. On machines with many cores but limited memory, this can lead to excessive memory usage and unnecessary thread scheduling overhead. Use `TOKIO_WORKER_THREADS` to cap the thread pool to a number appropriate for your workload: ```sh export TOKIO_WORKER_THREADS=8 ``` Start with the number of physical cores and adjust based on observed CPU utilisation. #### Tune Allocator Memory Release `mosaicod` uses [mimalloc](https://github.com/microsoft/mimalloc) as its allocator. By default, mimalloc defers returning freed memory pages to the OS for several seconds. On a long-running server this can make resident memory appear higher than the actual working set. Set the following variables to release pages back to the OS immediately after they are freed: ```bash export MIMALLOC_PURGE_DELAY=0 export MIMALLOC_PURGE_DECOMMITS=1 ``` #### Enable BBR Congestion Control On high-throughput bare metal servers, switching the TCP congestion control algorithm from the [default](https://en.wikipedia.org/wiki/CUBIC_TCP) (`cubic`) to [BBR](https://www.ietf.org/archive/id/draft-cardwell-iccrg-bbr-congestion-control-01.html) reduces latency and improves throughput, especially under load. BBR measures actual bottleneck bandwidth instead of reacting to packet loss, so it keeps the link fully utilised without overfilling buffers, this matters most when streaming large payloads over high-latency or mildly lossy connections. Run the following commands as root to enable it system-wide: ```shell # Load the BBR kernel module modprobe tcp_bbr # Set BBR as the active congestion control algorithm sysctl -w net.ipv4.tcp_congestion_control=bbr # Enable the FQ packet scheduler, which BBR requires sysctl -w net.core.default_qdisc=fq ``` To make the settings persistent across reboots, add them to `/etc/sysctl.d/99-bbr.conf`: ```ini title="/etc/sysctl.d/99-bbr.conf" net.core.default_qdisc=fq net.ipv4.tcp_congestion_control=bbr ``` --- ## Query Mosaico distinguishes itself from simple file stores with a powerful **Query System** capable of filtering data based on both high-level metadata and content values. The query engine operates through the [`query`](actions.md#query) action, accepting structured JSON-based filter expressions that can span the entire data hierarchy. ## Architecture The query engine is designed around a three-tier filtering model that allows you to construct complex, multi-dimensional searches: **Sequence Filtering.** Target recordings by structural attributes like sequence name, creation timestamp, or user-defined metadata tags. This level allows you to narrow down which recording sessions are relevant to your search. **Topic Filtering.** Refine your search to specific data streams within sequences. You can filter by topic name, ontology tag (the data type), serialization format, or topic-level user metadata. **Ontology Filtering.** Query the actual physical values recorded inside the sensor data without scanning terabytes of files. The engine leverages statistical indices computed during ingestion, min/max bounds stored in the metadata cache for each chunk, to rapidly include or exclude entire segments of data. ## Filter Domains ### Sequence Filter The sequence filter allows you to target specific recording sessions based on their metadata: | Field | Description | | --------------------------- | ------------------------------------------------------------ | | `sequence.name` | The sequence identifier (supports text operations) | | `sequence.created_at` | The creation timestamp in nanoseconds (supports timestamp operations) | | `sequence.user_metadata.` | Custom user-defined metadata attached to the sequence | ### Topic Filter The topic filter narrows the search to specific data streams within matching sequences: | Field | Description | | ------------------------------ | ------------------------------------------------------------ | | `topic.name` | The topic path within the sequence (supports text operations) | | `topic.created_at` | The topic creation timestamp in nanoseconds (supports timestamp operations) | | `topic.ontology_tag` | The data type identifier (e.g., `Lidar`, `Camera`, `IMU`) | | `topic.serialization_format` | The binary layout format (`Default`, `Ragged`, or `Image`) | | `topic.user_metadata.` | Custom user-defined metadata attached to the topic | ### Ontology Filter The ontology filter queries the actual sensor data values. Fields are specified using dot notation: `.`. For example, to query IMU acceleration data: `imu.acceleration.x`, where `imu` is the ontology tag and `acceleration.x` is the field path within that data model. #### Timestamp query support If `include_timestamp_range` is set to `true` the response will also return [timestamps ranges](#timestamps) for each query. ## Supported Operators The query engine supports a rich set of comparison operators. Each operator is prefixed with `$` in the JSON syntax: | Operator | Description | | --- | --- | | `$eq` | Equal to (supports all types) | | `$neq` | Not equal to (supports all types) | | `$lt` | Less than (numeric and timestamp only) | | `$gt` | Greater than (numeric and timestamp only) | | `$leq` | Less than or equal to (numeric and timestamp only) | | `$geq` | Greater than or equal to (numeric and timestamp only) | | `$between` | Within a range `[min, max]` inclusive (numeric and timestamp only) | | `$in` | Value is in a set of options (supports integers and text) | | `$match` | Matches a pattern (text only, supports SQL LIKE patterns with `%` wildcards) | | `$ex` | Field exists | | `$nex` | Field does not exist | ## Syntax Queries are submitted as JSON objects. Each field is mapped to an operator and value. Multiple conditions are combined with implicit AND logic. ```json hl_lines="15" {15} { "sequence": { "name": { "$match": "test_run_%" }, "user_metadata": { "driver": { "$eq": "Alice" } } }, "topic": { "ontology_tag": { "$eq": "imu" } }, "ontology": { "imu.acceleration.x": { "$gt": 5.0 }, "imu.acceleration.y": { "$between": [-2.0, 2.0] }, "include_timestamp_range": true, } } ``` This query searches for: - Sequences with names matching `test_run_%` pattern - Where the user metadata field `driver` equals `"Alice"` - Containing topics with ontology tag `imu` - Where the IMU's x-axis acceleration exceeds 5.0 - And the y-axis acceleration is between -2.0 and 2.0 :::tip `include_timestamp_range` field is optional, if set to `true` the query returns the [timestamp ranges](#timestamps) ::: ## Response Structure The query response is hierarchically grouped by sequence. For each matching sequence, it provides the list of topics that satisfied the filter criteria, along with optional timestamp ranges indicating when the ontology conditions were met. ```json title="query_response_example" { "items": [ { "sequence": "test_run_01", "topics": [ { "locator": "test_run_01/sensors/imu", "timestamp_range": [1000000000, 2000000000] }, { "locator": "test_run_01/sensors/gps", "timestamp_range": [1000000000, 2000000000] } ] }, { "sequence": "test_run_02", "topics": [ { "locator": "test_run_02/camera/front", "timestamp_range": [1500000000, 2500000000] }, { "locator": "test_run_02/lidar/point_cloud", "timestamp_range": [1500000000, 2500000000] } ] } ] } ``` ### Timestamps It returns the time window `[min, max]` where the filter conditions were met for that topic, with `min` being the timestamp of the first matching event and max being the timestamp of the last matching event. This allows you to retrieve only the relevant data slices using the [retrieval protocol](retrieval.md#the-retrieval-protocol). :::note The `timestamp_range` field is included only when ontology filters are applied and `include_timestamp_range` is set to `true` inside the `ontology` filter. ::: ### Performance Characteristics The query engine is optimized for high performance by minimizing unnecessary data retrieval and I/O operations. During execution, the engine uses index-based pruning to evaluate precomputed min/max statistics and skip indices, allowing it to bypass irrelevant data chunks without reading the underlying files. Performance is further improved by executing metadata cache queries, such as sequence and topic filters, directly within the database, which ensures sub-second response times even across thousands of sequences. The system employs **lazy evaluation** to keep network payloads lightweight; instead of returning raw data immediately, queries return locators and timestamp ranges. This architecture allows client applications to fetch only the required data slices via the retrieval protocol as needed. --- ## Retrieval Unlike simple file downloads, this protocol provides an interface for requesting precise data slices, dynamically assembled and streamed back as optimized stream of Arrow record batches. ## The Retrieval Protocol Accessing data requires specifying the [Locator](index.md#resource-addressing), which defines the topic path, and an optional time range in nanoseconds. Upon receiving a request, the server performs an index lookup in the metadata cache to identify physical data chunks intersecting the requested time window. This is followed by pruning, discarding chunks outside the query bounds to avoid redundant I/O. Once relevant segments are identified, the server streams the data by opening underlying files and delivering it in a high-throughput pipeline. In the protocol, the `get_flight_info` call returns a list of resources, each containing an endpoint (the name of the topic or sequence, such as `my_sequence` or `my_sequence/my/topic`) and a ticket, an opaque binary blob used by the server in the `do_get` call to extract and stream the data. Calling `get_flight_info` on a sequence returns all topics associated with that sequence, whereas calling it on a specific topic returns only the endpoint and ticket for that topic. ```py title="retrieval_protocol" # Define the locator and the temporal slice locator = "my_sequence/topic/1" time_range = (start_ns, end_ns) # Optional: defaults to full history # Resolve the locator into endpoint and tickets flight_info = get_flight_info(locator, time_range) for endpoint in flight_info.endpoints: # Use the ticket to stream the actual data batches data_stream = do_get(endpoint.ticket) for batch in data_stream: process(batch) ``` ## Sequence List To find the list of all sequences available in the system, you can call `list_flights` with the root locator: ```py title="list_sequences" # List all sequences in the system flight_info = list_flights("") # or list_flights("/") ``` This will return the list of all sequence resource locators available in the platform, which can then be used to retrieve specific topics or data slices. ## Metadata Context Headers To provide full context, the data stream is prefixed with a schema message containing embedded custom metadata. Mosaico injects context into this header for client reconstruction of the environment. This includes `user_metadata`, preserving original project context like experimental tags or vehicle IDs, and the `ontology_tag`, informing the client of sensor data types (e.g., `lidar`, `camera`) for type-safe deserialization. The `serialization_format` guides interpretation of the underlying serialization protocol used. Now the supported formats include: - `Default`: The standard format. - `Ragged`: Optimized for variable-length lists. - `Image`: An optimized array format for high-resolution visual data. --- ## TLS Securing your Mosaico instance is straightforward, as [TLS (Transport Layer Security)](https://en.wikipedia.org/wiki/Transport_Layer_Security) is fully supported out of the box. Enabling TLS ensures that all communications with the daemon are encrypted and secure. To activate it, simply append the `--tls` flag to your `mosaicod run` command. When the `--tls` flag is used, `mosaicod` requires a valid certificate and private key. It looks for these credentials via the following [environment variables](env.md#tls): * `MOSAICOD_TLS_CERT_FILE`: The path to the PEM-encoded X.509 certificate. * `MOSAICOD_TLS_PRIVATE_KEY_FILE`: The path to the file containing the PEM-encoded RSA private key. :::tip If you prefer to manage TLS termination separately, you can run `mosaicod` without the `--tls` flag and use a reverse proxy (like [Nginx](https://nginx.org/) or [Caddy](https://caddyserver.com/)) to handle SSL termination. This allows you to centralize TLS management and offload encryption tasks from the daemon. ::: ## Generate a Self-Signed Certificate Run the following command to generate a `cert.pem` a `key.pem` and a `ca.pem` file: ```bash # Generate the root CA openssl genrsa -out ca.key 4096 openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \ -subj "/CN=MyTestCA" -out ca.pem # Generate the server private key openssl genrsa -out key.pem 4096 # Create a certificate signing request (CSR) for the server openssl req -new -key key.pem -out server.csr \ -subj "/CN=localhost" \ -addext "subjectAltName=DNS:localhost,IP:127.0.0.1" # Sign the server CSR with the root CA to create the end-entity certificate openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial \ -out cert.pem -days 365 -sha256 \ -extfile <(printf "basicConstraints=CA:FALSE\nkeyUsage=digitalSignature,keyEncipherment\nextendedKeyUsage=serverAuth\nsubjectAltName=DNS:localhost,IP:127.0.0.1") ``` The scripts will generate the following files |File|Description| |---|---| |`ca.key`|The private key for the Certificate Authority; used only to sign other certificates and **must be kept secret**.| |`ca.pem`|The public certificate for the Authority; provided to clients so they can verify the server's identity.| |`ca.srl`|A text file containing the next serial number to be assigned by the CA; safe to ignore or delete.| |`key.pem`|The private key for the mosaicod server; used to decrypt traffic and prove ownership of the server certificate.| |`cert.pem`|The end-entity certificate for the server.| |`server.csr`|A temporary Certificate Signing Request file used to bridge the public key and the identity during generation.| Use `cert.pem` and `key.pem` for server-side TLS identity in the mosaicod daemon, while distributing `ca.pem` to clients as the trusted root certificate. :::warning The certificates produced by the command above is strictly for local development or testing. Do not use it in production. ::: ## Use a Let's Encrypt Certificate For production deployments, you should use a publicly trusted certificate issued by [Let's Encrypt](https://letsencrypt.org/) via [Certbot](https://certbot.eff.org/). These certificates are free, automatically trusted by all major clients, and valid for 90 days with built-in renewal support. ### Prerequisites * A registered domain name (e.g. `mosaico.example.com`) pointing to your server's public IP address. * Port `80` open and reachable from the internet (required for the ACME HTTP-01 challenge). * Certbot installed on your server. Install Certbot on Debian/Ubuntu: ```bash sudo apt update sudo apt install -y certbot ``` On other platforms, refer to the [official Certbot installation guide](https://certbot.eff.org/instructions). ### Obtain a Certificate Certbot will spin up a temporary web server to complete the domain challenge: ```bash sudo certbot certonly --standalone -d mosaico.example.com ``` If you already have a web server (Nginx, Apache, etc.) running on port 80, use the webroot or appropriate plugin instead. See the [Certbot documentation](https://eff-certbot.readthedocs.io/) for details. Upon success, Certbot writes the certificate files to `/etc/letsencrypt/live/mosaico.example.com/`: | File | Description | |---|---| | `fullchain.pem` | The server certificate concatenated with the Let's Encrypt intermediate chain. Use this as your certificate file. | | `privkey.pem` | The private key for the certificate. Keep this secret. | | `cert.pem` | The server certificate alone (without the chain). | | `chain.pem` | The intermediate certificate chain only. | ### Configure mosaicod Point `mosaicod` to the Certbot-managed files using the TLS environment variables: ```bash export MOSAICOD_TLS_CERT_FILE=/etc/letsencrypt/live/mosaico.example.com/fullchain.pem export MOSAICOD_TLS_PRIVATE_KEY_FILE=/etc/letsencrypt/live/mosaico.example.com/privkey.pem mosaicod run --tls ``` :::tip Use `fullchain.pem` rather than `cert.pem` for the certificate file. It includes the full certificate chain, which is required for clients to validate the certificate correctly without needing to fetch intermediate certificates themselves. ::: ### Renew Automatically Let's Encrypt certificates expire after 90 days. Certbot installs a `systemd` timer (or cron job) that attempts renewal automatically. You can verify it is active with: ```bash sudo systemctl status certbot.timer ``` Because `mosaicod` reads the certificate files from disk on startup, you need to restart it after each renewal so it picks up the new certificate. Add a deploy hook to do this automatically: ```bash sudo mkdir -p /etc/letsencrypt/renewal-hooks/deploy sudo tee /etc/letsencrypt/renewal-hooks/deploy/restart-mosaicod.sh > /dev/null <<'EOF' #!/bin/bash systemctl restart mosaicod EOF sudo chmod +x /etc/letsencrypt/renewal-hooks/deploy/restart-mosaicod.sh ``` Certbot will execute this script every time a certificate is successfully renewed. To test the renewal process without actually replacing the certificate, run: ```bash sudo certbot renew --dry-run ``` :::warning The `/etc/letsencrypt/live/` directory is owned by `root`. If you run `mosaicod` as a non-root user, make sure that user has read access to the certificate files, or copy them to a location your service user can reach and update your deploy hook accordingly. ::: --- ## Release Cycle ## Monorepo Mosaico uses a monorepo to keep `mosaicod` and the SDK in sync. Both components are independently versioned and released, each following [semantic versioning](https://semver.org). ## Branches ### `main` The central integration branch. All finished work lands here via pull request. The version on `main` is always a `-dev` snapshot (e.g. `v0.7.0-dev`). It is never directly released. ### `issue/` Short-lived branches for a single feature or bug fix. Always branched from `main`, merged back via PR, then deleted. ### `release/[py|doc]/vX.Y` Stabilization branch for a specific release, cut from `main`. Once created, only targeted fixes are backported into it, ongoing development continues on `main`. Three parallel branches are cut per release: - `release/vX.Y`: `mosaicod` daemon, covers the full `vX.Y.*` patch series. - `release/py/vX.Y`: Python SDK. - `release/doc/vX.Y`: documentation. ## Release ### Cut the release branch When `main` is ready, three branches are cut from it: `release/vX.Y`, `release/py/vX.Y`, and `release/doc/vX.Y`. At that point several version bumps happen simultaneously: - `main`: every module advances to `vX.(Y+1).0-dev`, opening the next development cycle. - `release/vX.Y`: `mosaicod` is set to `vX.Y.0-rc`. - `release/py/vX.Y`: the Python SDK is set to `py/vX.Y.0-rc`. ### Release candidates Every commit on `release/vX.Y` produces a `mosaicod` container image with two tags: - A **floating tag** (`vX.Y.Z-rc`) pointing to the latest candidate. - A **pinned SHA tag** (e.g. `vX.Y.Z-rc-abc1234`) identifying that exact build. ### Promote to stable Change the version from `vX.Y.Z-rc` to `vX.Y.Z` and tag the commit. CI/CD produces the final release artifacts: binaries, stable container images, and a docs deployment. ### Patch releases Commit fixes directly to `release/[py|doc]/vX.Y` (or backport from `main`), increment the patch version. ## Tags | Component | Tag format | Notes | |---|---|---| | Daemon | `vX.Y.Z` | Triggers binary and container builds | | Python SDK | `py/vX.Y.Z` | Triggers PyPI publish and SDK artifacts | | Documentation | `doc/vX.Y` (lightweight) | Rolled forward on each update; triggers a docs deployment | --- ## Testing & Development This section outlines the standard procedures for contributing to the project. It covers local environment orchestration, database schema management, and the verification suite required to maintain code quality. ## Developing the Daemon The development lifecycle relies on a containerized infrastructure that includes a preconfigured **PostgreSQL** instance and a **MinIO** object store. ### Environment Setup Initialize the required services using the provided docker compose file: ```bash cd docker/devel docker compose up -d ``` To configure the application for the development environment, initialize your local `.env` file: ```bash cd mosaicod cp env.devel .env ``` This configuration exports the `MOSAICOD_DB_URL`, storage backend variables, and the `DATABASE_URL` required for compile-time query verification by [sqlx](https://github.com/launchbadge/sqlx). ### Update sqlx Queries Cache If you modify SQL queries, you **must** refresh the offline metadata cache to allow the project to compile. Run the following command to update the cache: ```bash cd mosaicod/crates/mosaicod-db cargo sqlx prepare -- --features postgres ``` ### Apply Migrations As detailed in the [Setup](../daemon/install.md) guide, the build process validates queries against the schema. Ensure your local database is synchronized with the latest migrations: ```bash cd mosaicod/crates/mosaicod-db cargo sqlx migrate run ``` ### Execution You can execute the daemon directly from the source tree: ```bash cd mosaicod cargo run -- run [OPTIONS] ``` or compiled with: ```bash cd mosaicod cargo build --bin mosaicod --profile [PROFILE] ``` The current available prfiles are: - `debug`: Default profile, optimized for development with debug symbols and no optimizations. - `release`: Optimized for production, good amount of optimizations, smaller binary size, and few link-time optimizations but with no debug symbols. - `docker`: As release but without link-time optimizations to reduce the binary size and speed up compilation. - `optimized`: Profile with high optimizations, slow to compile but with the best runtime performance. ### Tests Daemon specific test can be executed with: ```bash cd mosaicod cargo test ``` ### Linting Ensure compliance with the project's style guidelines and static analysis rules before submitting a pull request ```bash cd mosaicod cargo lint ``` ## Testing The project includes a comprehensive suite of unit and integration tests to validate functionality and prevent regressions. The test suite can be executed with: ```bash ./scripts/test ``` Use `--help` to see available options. ## Development Environment The project provides a script that generates a disposable development environment. This environment launches a clean `mosaicod` instance without any data, allowing you to test and interact with the daemon without needing to set up your own data. This is particularly useful when developing client applications or testing the API, as it provides a consistent and isolated environment for experimentation. The environment can be launched with: ```bash ./scripts/dev_env ``` Use `--help` to see available options. --- ## Custom Ontology import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Demonstrates how to define and register a custom data model for the NVIDIA Isaac Nova wheel encoder, a hardware type not present in the built-in Mosaico ontology. This is the ontology code that underpins the [ROS Ingestion](./ros_ingestion) example. :::tip[Learn] [Doc: Data Models & Ontology](https://docs.mosaico.dev/python-sdk/SDK/ontology/) [API Reference: Base Models and Mixins](https://docs.mosaico.dev/python-sdk/SDK/API_reference/models/base/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ```bash title="Run" mosaicolabs.examples ros_injection ``` Run the command with `--help` to see all available options. The custom ontology is loaded as part of the ROS Ingestion example. The full source is on [GitHub](https://github.com/mosaico-labs/mosaico/blob/main/mosaico-sdk-py/src/mosaicolabs/examples/ros_injection/custom_ontology). ## Defining the Model Data models in Mosaico are defined by inheriting from `Serializable`. This triggers automatic Apache Arrow schema generation and registers the type in the internal ontology registry the moment the class is loaded by the Python interpreter. The type is then valid as an `ontology_type` in `topic_create()` and its fields become available through the `.Q` query proxy for content-based filtering. `MosaicoType` provides the available primitive types (`uint32`, `uint64`, `float32`, `string`, and more). `MosaicoField` attaches a human-readable description to each field, which is stored alongside the data in the catalog. ```python title="EncoderTicks custom model" from mosaicolabs import MosaicoField, MosaicoType, Serializable class EncoderTicks(Serializable): left_ticks: MosaicoType.uint32 = MosaicoField( description="Cumulative counts from the left wheel encoder." ) right_ticks: MosaicoType.uint32 = MosaicoField( description="Cumulative counts from the right wheel encoder." ) encoder_timestamp: MosaicoType.uint64 = MosaicoField( description="Timestamp of the encoder ticks." ) ``` ## Registration via Module Import Registration happens at class definition time, not at application startup. If the module containing the `Serializable` subclass is never imported, the type is never registered and the platform will not recognize it when you pass it to `topic_create()` or `client.query()`. The safe pattern is to import your ontology module explicitly at the application entry point before any SDK calls are made. ```python title="Import to trigger registration" import my_project.ontology.encoders as encoders # registration happens here from mosaicolabs import MosaicoClient with MosaicoClient.connect(...) as client: with client.sequence_create(name="test") as sw: tw = sw.topic_create("ticks", ontology_type=encoders.EncoderTicks) ``` ## Verifying Registration `is_registered()` lets you assert that a type is in the registry before relying on it. This is useful in test setups and during debugging. ```python title="Check registration status" import my_project.ontology.encoders as encoders if encoders.EncoderTicks.is_registered(): print("Registration successful.") ``` --- ## Data Discovery and Inspection import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Walks through the catalog of a running Mosaico instance, printing each sequence's size, time bounds, and a per-topic breakdown of sensor types and durations — without downloading any raw sensor data. :::tip[Learn] [Doc: The Reading Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/reading/) [API Reference: Data Retrieval](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/reading/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ```bash title="Run" mosaicolabs.examples data_inspection ``` Run the command with `--help` to see all available options. The daemon must be running and contain at least one ingested sequence. Run the [ROS Ingestion](./ros_ingestion) example first to populate it. The full source is on [GitHub](https://github.com/mosaico-labs/mosaico/blob/main/mosaico-sdk-py/src/mosaicolabs/examples/data_inspection.py). ## Connecting and Listing Sequences `MosaicoClient.connect()` opens a gRPC channel to the daemon. `list_sequences()` returns the names of all sequences currently in the catalog; no data is transferred, only identifiers. ```python title="List all sequences" from mosaicolabs import MosaicoClient with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: seq_list = client.list_sequences() print(f"Discovered {len(seq_list)} sequences on the server.") ``` ## Inspecting Sequence Metadata `client.sequence_handler()` returns a `SequenceHandler`, a lightweight proxy that fetches only catalog metadata from the server. It gives you the total storage size, creation timestamp, and the global time bounds of the session without touching the underlying sensor records. ```python title="Read sequence metadata" for sequence_name in seq_list: shandler = client.sequence_handler(sequence_name) if shandler: size_mb = shandler.total_size_bytes / (1024 * 1024) print(f"Sequence: {shandler.name}") print(f" Remote Size: {size_mb:.2f} MB") print(f" Created At: {shandler.created_datetime}") print(f" Time Span: {shandler.timestamp_ns_min} to {shandler.timestamp_ns_max} ns") ``` ## Inspecting Individual Topics `shandler.get_topic_handler()` returns a `TopicHandler` for a specific channel. Like the sequence handler, it operates at the catalog layer: you can read the ontology tag, per-topic time bounds, and message count without initiating a data stream. ```python title="Inspect topics within a sequence" for topic_name in shandler.topics: thandler = shandler.get_topic_handler(topic_name) duration_sec = 0 if thandler.timestamp_ns_min and thandler.timestamp_ns_max: duration_sec = (thandler.timestamp_ns_max - thandler.timestamp_ns_min) / 1e9 print(f" [{thandler.ontology_tag}] {topic_name}: {duration_sec:.2f}s") ``` ## Reference ### Sequence Handler vs. Topic Handler | Feature | Sequence Handler | Topic Handler | | :--- | :--- | :--- | | **Scope** | Entire recording session | Single sensor channel | | **Metadata** | Mission-wide (size, creation time) | Sensor-specific (ontology tag, bounds) | | **Time Bounds** | Global min/max across all topics | Min/max for that specific stream | | **Topic List** | All available channels | N/A | ### Handlers vs. Streamers Handlers fetch metadata only; they never download sensor records. Streamers initiate an Arrow Flight Get and deliver typed messages. Use handlers for discovery; switch to a streamer only when you need the actual data. | | Handlers | Streamers | | :--- | :--- | :--- | | **Use** | Discovery and inspection | High-volume data retrieval | | **Types** | `SequenceHandler`, `TopicHandler` | `SequenceDataStreamer`, `TopicDataStreamer` | | **Data** | Size, timestamps, ontology tags | Raw typed sensor messages | --- ## MuJoCo Visualisation import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Queries robot joint state data from a stored Mosaico sequence and replays the trajectory in a live [MuJoCo](https://mujoco.org/) simulation window, synchronized to the nanosecond timestamps stored in the archive. :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [Doc: The Reading Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/reading/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) [API Reference: Data Retrieval](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/reading/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ```bash title="Run" mosaicolabs.examples mujoco_vis ``` Run the command with `--help` to see all available options. The daemon must be running with data from the [ROS Ingestion](./ros_ingestion) example. The full source is on [GitHub](https://github.com/mosaico-labs/mosaico/blob/main/mosaico-sdk-py/src/mosaicolabs/examples/mujoco_vis.py). MuJoCo is not included in the default SDK dependencies — install it before running: ```bash title="Install MuJoCo" pip install mujoco ``` ## Locating the Joint State Topic `QuerySequence` and `QueryTopic` are combined to find the right channel in a single server-side query. Filtering by `RobotJoint.ontology_tag()` ensures the result contains only joint state topics, regardless of how the channel is named in the bag. ```python title="Query for robot joint state" from mosaicolabs import MosaicoClient, QuerySequence, QueryTopic from mosaicolabs.models.sensors.robot import RobotJoint with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: result = client.query( QuerySequence().with_name(ROBOT_SEQUENCE_NAME), QueryTopic().with_ontology_tag(RobotJoint.ontology_tag()), ) if result is None: print(f"Sequence '{ROBOT_SEQUENCE_NAME}' not found.") ``` ## Validating the Topic A `TopicHandler` confirms the matched topic carries the expected type before any data is streamed. The query already filters by ontology tag, so this check acts as a defensive guard for pipelines where the data source is not fully controlled. ```python title="Validate topic ontology" for items in result: for topic in items.topics: top_handler = client.topic_handler(items.sequence.name, topic.name) if top_handler is None: continue if top_handler.ontology_tag != RobotJoint.ontology_tag(): print(f"Unexpected type: {top_handler.ontology_tag}") continue ``` ## Streaming into MuJoCo Each `RobotJoint` message is timestamped in nanoseconds relative to the recording start. That offset drives the simulation forward so the replay matches the original motion at the correct cadence. Joint positions are applied directly to the MuJoCo model state and the viewer is updated each frame. ```python title="Stream joints and drive the simulation" for joint_msg in top_handler.get_data_streamer(): relative_ts = ( joint_msg.timestamp_ns - top_handler.timestamp_ns_min ) / 1.0e9 joints = joint_msg.get_data(RobotJoint) mj_data.qpos[:] = joints.positions mj.mj_forward(mj_model, mj_data) viewer.sync() ``` When running correctly, the MuJoCo viewer should open and replay the recorded trajectory:
--- ## Querying Catalogs import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Demonstrates three levels of catalog search: finding topics by partial name, filtering by sensor type, and running a multi-domain query that locates specific physical events — such as IMU lateral acceleration spikes — across the entire dataset in a single server-side call. :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ```bash title="Run" mosaicolabs.examples query_catalogs ``` Run the command with `--help` to see all available options. The daemon must be running and contain data ingested via the [ROS Ingestion](./ros_ingestion) example. The full source is on [GitHub](https://github.com/mosaico-labs/mosaico/blob/main/mosaico-sdk-py/src/mosaicolabs/examples/query_catalogs.py). ## The Query Builder Pattern Mosaico's query API uses fluent builder objects. Passing multiple builders to `client.query()` joins them with a logical AND, evaluated entirely server-side in a single round trip. Nothing is joined or filtered on the client side. ## Finding Topics by Name `QueryTopic.with_name_match()` performs a substring search across all topic paths in the catalog, equivalent to SQL `LIKE '%pattern%'`. The result is a `QueryResponse` grouped by sequence, so each item contains the parent session name alongside the matching topic list. ```python title="Fuzzy topic name search" from mosaicolabs import MosaicoClient, QueryTopic with MosaicoClient.connect(host="localhost", port=6726) as client: results = client.query( QueryTopic().with_name_match("image_raw") ) if results: for item in results: print(f"Sequence: {item.sequence.name}") for topic in item.topics: print(f" {topic.name}") ``` ## Filtering by Sensor Type `with_ontology_tag()` queries by the semantic type of the data rather than by path string. The query stays valid if topics are renamed, as long as the sensor type is unchanged. ```python title="Filter by sensor type" from mosaicolabs import IMU, QueryTopic results = client.query( QueryTopic().with_ontology_tag(IMU.ontology_tag()) ) # Returns every IMU topic across all sequences in the catalog. ``` ## Multi-Domain Queries `QueryOntologyCatalog` combined with `QueryTopic` lets you filter by both sensor type and field value in one call. The `.Q` proxy provides type-safe dot-notation expressions: `IMU.Q.acceleration.y.geq(1.0)` means "find messages where IMU y-axis acceleration is >= 1.0 m/s²". Setting `include_timestamp_range=True` tells the server to return the first and last timestamps of the matching window, enabling precise data slicing. ```python title="Multi-domain query with physics filter" from mosaicolabs import IMU, QueryOntologyCatalog, QueryTopic results = client.query( QueryOntologyCatalog( IMU.Q.acceleration.y.geq(1.0), include_timestamp_range=True ), QueryTopic().with_name("/front_stereo_imu/imu") ) ``` ## Replaying an Event The `timestamp_range` returned by a query can be used directly to slice a data stream to the exact window containing the event. The example adds one second of padding on each side to capture the run-up and recovery. ```python title="Slice and replay the event window" if results: for item in results: for topic in item.topics: streamer = client.sequence_handler(item.sequence.name).get_data_streamer( topics=[topic.name], start_timestamp_ns=topic.timestamp_range.start - 1_000_000_000, end_timestamp_ns=topic.timestamp_range.end + 1_000_000_000, ) for msg in streamer: pass # process the event window ``` --- ## ROS Ingestion import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Downloads the [NVIDIA R2B Dataset 2024](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/isaac/resources/r2bdataset2024?version=1) and ingests it from `.mcap` ROS 2 bags into a structured, queryable Mosaico archive. Asset download, type translation, batching, and catalog verification are all handled automatically. :::tip[Learn] [Doc: ROS Bridge](https://docs.mosaico.dev/python-sdk/SDK/bridges/ros/) [API Reference: ROS Bridge](https://docs.mosaico.dev/python-sdk/SDK/API_reference/bridges/ros/ros/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ```bash title="Run" mosaicolabs.examples ros_injection ``` Run the command with `--help` to see all available options. The daemon must be running before you start. See the [setup guide](/daemon/install) if you have not started it yet. The full source is on [GitHub](https://github.com/mosaico-labs/mosaico/blob/main/mosaico-sdk-py/src/mosaicolabs/examples/ros_injection/main.py). ## Custom Ontology Definition The NVIDIA Isaac Nova encoder produces `isaac_ros_nova_interfaces/msg/EncoderTicks`, a message type with no equivalent in the built-in ontology. Before the injector can run, we define a custom model by inheriting from `Serializable`. This registers the type automatically in the Mosaico ecosystem at import time, making it available as an `ontology_type` for topic creation and enabling the `.Q` query proxy on its fields. ```python title="Custom EncoderTicks model" from mosaicolabs import MosaicoField, MosaicoType, Serializable class EncoderTicks(Serializable): left_ticks: MosaicoType.uint32 = MosaicoField( description="Cumulative counts from the left wheel encoder." ) right_ticks: MosaicoType.uint32 = MosaicoField( description="Cumulative counts from the right wheel encoder." ) encoder_timestamp: MosaicoType.uint64 = MosaicoField( description="Timestamp of the encoder ticks." ) ``` ## ROS Adapter Because `EncoderTicks` is a custom type, no default adapter exists. We subclass `ROSAdapterBase` and implement `from_dict` to map ROS dictionary fields onto the model. The `@register_default_adapter` decorator ensures the `RosbagInjector` selects this adapter automatically whenever it encounters the matching schema name in the bag. ```python title="EncoderTicks ROS adapter" from mosaicolabs.ros_bridge import ROSMessage, ROSAdapterBase, register_default_adapter from mosaicolabs.ros_bridge.adapters.helpers import _validate_msgdata from .isaac import EncoderTicks @register_default_adapter class EncoderTicksAdapter(ROSAdapterBase[EncoderTicks]): ros_msgtype = ("isaac_ros_nova_interfaces/msg/EncoderTicks",) __mosaico_ontology_type__ = EncoderTicks _REQUIRED_KEYS = ("left_ticks", "right_ticks", "encoder_timestamp") @classmethod def from_dict(cls, ros_data: dict) -> EncoderTicks: _validate_msgdata(cls, ros_data) return EncoderTicks( left_ticks=ros_data["left_ticks"], right_ticks=ros_data["right_ticks"], encoder_timestamp=ros_data["encoder_timestamp"], ) @classmethod def translate(cls, ros_msg: ROSMessage, **kwargs) -> Message: return super().translate(ros_msg, **kwargs) ``` ## Ingestion Pipeline The example loops over each bag file in the dataset and runs three phases: download, ingest, verify. **Download.** `download_asset` fetches the raw `.mcap` from NVIDIA with a progress bar and caches it locally so re-runs skip the download. **Ingest.** `RosbagInjector` handles the full write pipeline: it opens the MCAP, routes each message through its registered adapter, batches the resulting `Message` objects, and transmits them to the daemon over gRPC. The sequence name is derived from the filename so each bag becomes a separate, independently queryable recording session. **Verify.** A `list_sequences()` call after ingestion confirms the sequence is visible in the catalog before the script exits. ```python title="Download, ingest, and verify" for bag_path in BAG_FILES_PATH: out_bag_file = download_asset(BASE_BAGFILE_URL + bag_path, ASSET_DIR) config = ROSInjectionConfig( host=MOSAICO_HOST, port=MOSAICO_PORT, file_path=out_bag_file, sequence_name=out_bag_file.stem, metadata={ "source_url": BAGFILE_URL, "ingested_via": "mosaico_example_ros_injection", "download_time_utc": str(downloaded_time), }, ) RosbagInjector(config).run() with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: if config.sequence_name in client.list_sequences(): print(f"'{config.sequence_name}' is now queryable.") ``` Once complete, the ingested sequences are available for the [Data Inspection](./data_inspection), [Querying Catalogs](./querying_catalogs), and [MuJoCo Visualisation](./mujoco_visualisation) examples. --- ## Getting Started Interacting with Mosaico requires two components: `mosaicod`, which runs server-side and manages all data operations, and a client SDK to communicate with it from your code. ## Mosaico Daemon `mosaicod` is the engine behind the platform. It handles storage, catalog management, ingestion, and retrieval. No client call will succeed without a running instance. Use the folling compose file for a quick local setup: ```yaml title="compose.yaml" services: db: image: postgres:18 environment: POSTGRES_HOST_AUTH_METHOD: trust mosaicod: image: ghcr.io/mosaico-labs/mosaicod:latest environment: MOSAICOD_DB_URL: postgresql://postgres@db:5432/postgres MOSAICOD_STORE_ENDPOINT: file:///tmp MOSAICOD_STORE_BUCKET: mosaico command: run --host 0.0.0.0 depends_on: - db ports: - "6726:6726" ``` This setup will create a data folder at `/tmp/mosaico` on your machine. You can change this by modifying the `MOSAICOD_STORE_ENDPOINT` variable. :::note For more advanced installation options, see the [installation guide](./daemon/install.md). ::: ## Python SDK The [`mosaicolabs`](https://pypi.org/project/mosaicolabs/) Python SDK is the primary way to interact with the platform. It provides a high-level API for the full data lifecycle, ingesting sensor data, querying catalogs, and streaming data into ML pipelines, without any custom serialization code. With `mosaicod` running, the SDK is all you need to start working with your data. Install it, point it at your daemon, and you have full programmatic access to the platform: write sequences, query catalogs, and pull data directly into your pipelines. No extra configuration or intermediary services required. Install it via `pip`: ```bash pip install mosaicolabs ``` The following example connects to a local daemon instance and lists available sequences: ```py title="lets_get_physical.py" from mosaicolabs import MosaicoClient # Connect to the Mosaico server with MosaicoClient.connect(host="localhost", port=6726) as client: # List available sequences sequences = client.list_sequences() print(f"Connected! Found sequences: {sequences}") ``` Jump straight into the how-to guides on [writing data](learn/writing_single_topic.mdx) and [querying sequences](learn/query_sequences.mdx), or see the [Python SDK documentation](clients/python.md) for the full reference. --- ## Hub import DocCardGrid, { Section, Card } from '@site/src/components/DocCardGrid'; # Mosaico Doc Hub Mosaico is a high-performance data platform for Robotics and Physical AI. These docs cover everything from running the daemon to writing your first sensor stream, pick a section to get started.
What Mosaico is, why it exists, and how its pieces fit together. Spin up `mosaicod` locally and send your first data in under five minutes. How `mosaicod` works: storage, catalog management, ingestion, and retrieval.
Ingesting structured data into a single sensor stream. Ingest several sensor streams in one session. Ingest from an `.mcap` file where messages from multiple sensors. Read back sequences and topics as a live stream rather than a batch load.
Filter and retrieve sequences from the catalog using the SDK. Narrow queries down to specific topic streams within a sequence. Combine sequence, topic, and ontology filters in a single query call. Use the result of one query to scope a second, correlating data across topic types.
Using Mosaico with visualization tools. Turn raw diagnostic events into a queryable fleet-wide catalog.
Full API reference
Robot Manipulation Pack
--- ## Diagnostics Mosaico stores sequences with arbitrary metadata, which makes it compatible with any tool that produces structured `.mcap` snapshots or injects records via the SDK. ## ros2_medkit [ros2_medkit](https://github.com/selfpatch/ros2_medkit) is a ROS 2 node that monitors `/diagnostics` topics, holds recent sensor data in a ring buffer, and writes an `.mcap` snapshot when a fault is confirmed. Only the window around the fault event is persisted. The snapshot is ingested into Mosaico with structured metadata: robot ID, fault code, severity, and timestamp. It is stored as a regular sequence and can be queried using the standard SDK. For a walkthrough, see [ros2_medkit and Mosaico: Fault Forensics](https://www.selfpatch.ai/articles/en/medkit-mosaico-fault-forensics). --- ## Visualization Mosaico will not ship a visualization tool. The open-source robotics ecosystem already has excellent options for this, and building a competing tool would be a distraction from what Mosaico is designed to do: reliably store, index, and retrieve high-frequency sensor data at scale. Our focus is the data stack. ## PlotJuggler [PlotJuggler](https://github.com/PlotJuggler/PlotJuggler) ships with first-class integration support for Mosaico. You can connect it directly to a running `mosaicod` instance, browse the sequence catalog, and stream any topic into the timeline for inspection and plotting, without writing any code or exporting files. PlotJuggler is free, open-source, and widely used across the ROS community. If you are working with time-series sensor data, it is the recommended starting point. --- ## Chained Queries import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; ## Using Results to Scope a Second Query A single `client.query()` call applies AND across all conditions for a single **topic**. That constraint is fundamental: a topic has exactly one declared type, so it cannot simultaneously be a `GPS` stream and a `String` log. If you need to ask "find me sessions that have high-precision GPS data *and* contain error log messages", you cannot express that in one shot; those two conditions live on two different topics of two different types. Chained queries solve this. The pattern is to run an initial broad query to find candidate sequences, convert the response into a new query builder scoped to exactly those sequences, then run a second query within that narrowed domain. You get the intersection of both conditions without loading any data client-side. :::info[Why is Chaining Necessary?] A single `client.query()` call applies AND across all conditions for a single **topic**. A topic cannot be both a `GPS` stream and a `String` log, so chaining is required to correlate two different topics within the same sequence. ::: :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ### How Chaining Works **Broad search.** The first query casts a wide net. You may use only a `QueryOntologyCatalog` filter here, or combine it with a `QuerySequence` if you already know something about the sessions you care about. The result is a `QueryResponse` containing every sequence where the condition was satisfied. **Domain locking.** Call `to_query_sequence()` on the response. This converts the `QueryResponse` into a new `QuerySequence` builder that is already pre-filtered to the sequence IDs returned by the first query. The second query will only search within those sessions; the daemon will not touch any other data. This is what makes chaining both correct and efficient: you are not re-scanning the entire catalog, just the sessions you already know are relevant. **Targeted refinement.** Pass the locked `QuerySequence` as the first argument of a new `client.query()` call, then add whichever `QueryTopic` and `QueryOntologyCatalog` conditions you need. The daemon evaluates the combination and returns only sessions that satisfy every condition across both queries. ```python title="Chained query" from mosaicolabs import MosaicoClient, QueryTopic, QueryOntologyCatalog, GPS, String with MosaicoClient.connect("localhost", 6726) as client: # Step 1: find all sequences with high-precision GPS initial_response = client.query( QueryOntologyCatalog(GPS.Q.status.status.eq(2)) ) if initial_response: # Step 2: lock the search domain to those sequences refined_domain = initial_response.to_query_sequence() # Step 3: within those sequences, find error log messages final_results = client.query( refined_domain, QueryTopic().with_name("/localization/log_string"), QueryOntologyCatalog(String.Q.data.match("[ERR]")) ) if final_results: for item in final_results: print(f"Error found in: {item.sequence.name}") ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Key Concepts **`to_query_sequence()`** is the bridge between the two queries. When called on a `QueryResponse`, it produces a `QuerySequence` builder pre-populated with the IDs of all sequences that appeared in the response. Passing this builder as the first argument of the next `client.query()` call tells the daemon to restrict the search scope to those sessions only. No session outside that set will ever be evaluated, regardless of what the other filters say. **`to_query_topic()`** is the topic-level equivalent. Instead of locking down to a set of sequences, it produces a `QueryTopic` builder pre-scoped to the specific topic paths returned by the previous response. Use this when you want to chain on exact topic identity, for instance when the first query already narrowed you to a particular channel and you want the second query to be applied to that same channel without re-specifying its name. **Why not just filter client-side?** You could collect all sequence IDs from the first response and write a loop in Python. Chaining is preferable because both queries run entirely server-side. The daemon evaluates the second filter against only the relevant data without transferring intermediate results over the network. For large catalogs, the difference in latency is significant. --- ## Multi-Domain Query import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; ## Combining Sequence, Topic, and Ontology Filters Multi-domain queries are the most powerful form of search in Mosaico. By passing multiple query builders to a single `client.query()` call, you instruct the daemon to evaluate all filter layers simultaneously in one round trip. This is far more efficient than running separate queries and joining results on the client side; the server does all the work and returns only what matches every condition. There are three layers you can combine, and each one narrows the result set from a different angle: - **QuerySequence** filters at the session level. It looks at recording metadata such as the project name, robot identifier, environment conditions, or any custom tag you attached when the recording was created. Use this to restrict results to a meaningful subset of your data catalog before the other filters even run. - **QueryTopic** narrows within the matched sessions down to a specific channel. It matches on the topic name path (e.g. `/front/camera/imu`) and on the declared data type. This ensures you are only inspecting the right stream, not every stream in the session. - **QueryOntologyCatalog** goes one level deeper: it inspects the actual stored data values. It leverages Mosaico's ontology, the type system that gives every field a declared type and name, to evaluate expressions directly against the recorded measurements. All three builders passed to `client.query()` are joined with AND. A result is returned only when a sequence, topic, and data window all satisfy every condition you specified. :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ### The `.Q` Proxy Every Ontology model in the SDK (such as `IMU`, `GPS`, `String`) exposes a `.Q` class attribute. This proxy generates type-safe field path expressions that the server can evaluate. When you write `IMU.Q.acceleration.x.gt(5.0)`, you are saying "the X component of the acceleration vector is greater than 5.0". The IDE can autocomplete field names because `.Q` mirrors the structure of the ontology type, so typos in field paths are caught before the query ever runs. The comparison operators available on a field path (`.gt()`, `.lt()`, `.eq()`, `.between()`, `.match()`, and others) correspond to server-side predicates. Nothing is evaluated in Python; the expression is serialised and sent to the daemon, which applies it during the scan. ### Timestamp Ranges When `include_timestamp_range=True` is passed to `QueryOntologyCatalog`, the response includes the exact start and end nanosecond timestamps of the matching event window within each topic. This is useful when you want to replay or analyse only the interval where the condition held true, rather than loading an entire recording. The timestamps are precise enough to feed directly into a streamer or a data export without any further alignment work. ```python title="Multi-domain query" from mosaicolabs import MosaicoClient, QuerySequence, QueryTopic, QueryOntologyCatalog, IMU with MosaicoClient.connect("localhost", 6726) as client: results = client.query( QuerySequence() .with_user_metadata("project.name", eq="Apollo"), QueryTopic() .with_name("/front/camera/imu"), QueryOntologyCatalog(include_timestamp_range=True) .with_expression(IMU.Q.acceleration.x.gt(5.0)) ) if results: for item in results: print(f"Sequence: {item.sequence.name}") for topic in item.topics: print(f" - Match in Topic: {topic.name}") start = topic.timestamp_range.start end = topic.timestamp_range.end print(f" Event Window: {start} to {end} ns") ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Understanding the Response `client.query()` always returns the same `QueryResponse` type, which behaves as a list of `QueryResponseItem` objects. In a multi-domain query, each item carries: - `item.sequence`: the matched sequence (recording session) with its name and metadata - `item.topics`: the list of topics within that sequence that satisfied the Topic and OntologyCatalog conditions When `include_timestamp_range=True` was set on the `QueryOntologyCatalog`, each topic in `item.topics` has its `timestamp_range` populated: - `topic.timestamp_range.start`: nanosecond timestamp marking the beginning of the matching data window - `topic.timestamp_range.end`: nanosecond timestamp marking the end of the matching data window Without `include_timestamp_range`, the `timestamp_range` field is absent and you receive only the structural match (which sequence and which topic). With it, you get a precise temporal slice ready for downstream processing. ## Key Concepts - **Single round trip**: all builders passed to `client.query()` are AND-joined and evaluated server-side in one request, avoiding multiple network calls and client-side joins - **`.Q` Proxy**: every Ontology model exposes a `.Q` attribute for type-safe, IDE-autocomplete-friendly field path expressions - **Three filter layers**: Sequence (session metadata) → Topic (channel + type) → OntologyCatalog (actual data values) - **Temporal windows**: `include_timestamp_range=True` populates `topic.timestamp_range` with the exact nanosecond start and end of the matching event window --- ## Query Sequences import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; As the number of recording sessions in your platform grows, browsing them manually becomes impractical. Mosaico's query system solves this with server-side filtering: instead of loading sessions and inspecting them locally, you describe what you are looking for and the daemon finds it for you. The client only receives the sessions that actually match. This keeps network traffic low and makes it feasible to search across thousands of recordings. Sequence-level queries are the right tool when the criteria you care about live at the session level: the name of the mission, environment conditions logged at recording time, project identifiers, or any other metadata you attached to the Sequence when you created it. You are not yet asking questions about what sensor data is inside; you are asking questions about the recording session itself. :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Querying Sequences by Name and Metadata The `QuerySequence` builder constructs a filter by method chaining. Every `.with_*()` call you add becomes an additional AND condition: the daemon only returns sessions that satisfy every constraint you have specified. This makes it natural to start broad and progressively narrow; you can add conditions one at a time and the semantics are always "all of the above must be true." `with_name_match()` performs a fuzzy substring match against the Sequence name. Passing `"test_drive"` will match any session whose name contains that string, so you do not need to know the exact recording name in advance. This is useful when you have a naming convention and want all recordings from a particular campaign. `with_user_metadata()` works differently: it performs an exact or comparison-based match against a specific metadata field. The first argument is the field key, and the keyword arguments (`eq`, `lt`, `gt`, and so on) specify the predicate. When you stored metadata as a nested dictionary during ingestion, you can reach any level of nesting using dot notation. For example, `"project.name"` queries the `name` field inside the `project` sub-object, and `"environment.visibility"` queries a field two levels deep. Any structure you stored at recording time is queryable this way. The example below finds every Sequence whose name contains `"test_drive"`, whose project name is exactly `"Apollo"`, and whose recorded visibility was below 50. ```python title="Query by name and metadata" from mosaicolabs import MosaicoClient, QuerySequence with MosaicoClient.connect("localhost", 6726) as client: results = client.query( QuerySequence() .with_name_match("test_drive") .with_user_metadata("project.name", eq="Apollo") .with_user_metadata("environment.visibility", lt=50) ) if results: for item in results: print(f"Matched Sequence: {item.sequence.name}") print(f" Topics: {[topic.name for topic in item.topics]}") ``` The C++ SDK is currently in development. The Rust SDK is currently in development. `client.query()` returns `None` on error, or a **QueryResponse**, a list of **QueryResponseItem** objects, one per matched Sequence. Each item carries two things: `item.sequence`, which holds the metadata of the matched session (its name, time boundary, and any stored metadata), and `item.topics`, which is the list of Topics that belong to that session. Because this is a Sequence-level query with no Topic filter applied, `item.topics` contains every Topic recorded under that Sequence, giving you a complete picture of what sensor streams are available before you decide which ones to open. :::info[Result Normalization] `topic.name` returns the relative path (e.g. `/front/camera/image`), directly usable with other SDK methods like `topic_handler()` and streamers. ::: ## Key Concepts **Convenience methods** like `with_name_match()` and `with_user_metadata()` cover the most common filtering patterns. `with_name_match()` is intentionally fuzzy so you can match recording names without knowing them exactly. `with_user_metadata()` gives you precise control over any field you stored at ingestion time, with a full set of comparison operators. **Generic methods** using `with_expression()` and the `.Q` proxy give you access to the full operator set (`.gt()`, `.lt()`, `.between()`, and others) when the convenience methods do not cover your use case. The `.Q` proxy builds type-safe field paths from Ontology models, ensuring that the field names in your query match the actual schema. **Dynamic metadata** through dot notation means you can query any depth of nesting in the metadata you stored at recording time. There is no fixed schema for user metadata; whatever JSON-like structure you attached to the Sequence is fully searchable using `"key.subkey.deeper"` paths. --- ## Query Topics import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; While Sequence queries let you find recording sessions by their session-level metadata, Topic queries let you go one level deeper: you search for specific sensor channels across your entire dataset, regardless of which recording session they belong to. This is useful when you are not looking for a particular mission but for a particular kind of sensor. For example, you might want to find every IMU channel recorded through a serial interface across all sessions, perhaps to audit hardware configurations, build a training dataset for a model that requires IMU data, or check which sessions have coverage from a particular sensor type. The `QueryTopic` builder works the same way as `QuerySequence`: each `.with_*()` call you chain onto it adds another AND condition, and the daemon evaluates all conditions together on the server side. Only Topics that satisfy every constraint are returned. :::tip[Learn] [Doc: The Query Workflow](https://docs.mosaico.dev/python-sdk/SDK/query/) [API Reference: Query Builders](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/builders/) [API Reference: Query Response](https://docs.mosaico.dev/python-sdk/SDK/API_reference/query/response/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Querying Topics by Type and Metadata The key filter in a Topic query is `with_ontology_tag()`. Every type in Mosaico's Ontology (`IMU`, `GPS`, `Pressure`, `String`, and so on) has a unique identifier called an ontology tag. Calling `IMU.ontology_tag()` returns that identifier, and passing it to `with_ontology_tag()` tells the daemon to restrict results to Topics that carry IMU data. This is how you express "I want topics of this type" without knowing their names in advance. You can narrow the results further by chaining `with_user_metadata()` calls. Metadata can be attached to Topics during ingestion, just as it can be attached to Sequences. In the example below, the query combines the IMU type filter with a metadata filter on the `"interface"` field, so it only returns IMU topics that were recorded through a serial interface. The AND semantics mean both conditions must hold simultaneously. ```python title="Query by type and metadata" from mosaicolabs import MosaicoClient, QueryTopic, IMU with MosaicoClient.connect("localhost", 6726) as client: results = client.query( QueryTopic() .with_ontology_tag(IMU.ontology_tag()) .with_user_metadata("interface", eq="serial") ) if results: for item in results: print(f"Matched Sequence: {item.sequence.name}") print(f" Topics: {[topic.name for topic in item.topics]}") ``` The C++ SDK is currently in development. The Rust SDK is currently in development. `client.query()` returns `None` on error, or a **QueryResponse**, a list of **QueryResponseItem** objects. The structure is the same as in a Sequence query: each `item.sequence` gives you the parent recording session that contains the matched topics, and `item.topics` gives you the specific Topics within that session that satisfied your filter. This grouping is deliberate; it lets you navigate directly from a matched Topic to the session it belongs to, so you can open a handler for either the topic or the full session depending on what you need next. :::info[Result Normalization] `topic.name` returns the relative path (e.g. `/front/camera/image`), directly usable with other SDK methods like `topic_handler()` and streamers. ::: ## Key Concepts **Ontology tags** are the primary way to filter by sensor type. Each Ontology type in Mosaico carries a tag that uniquely identifies it in the catalog. Calling `.ontology_tag()` on a model class (for example `IMU.ontology_tag()`) returns the string identifier that the daemon uses internally to classify Topics. Using this tag as a filter guarantees type safety: you get back only Topics whose data actually conforms to the IMU schema, not just Topics that happen to be named `/imu`. **Combining filters** with `with_ontology_tag()` and `with_user_metadata()` narrows results with AND semantics. Because every `.with_*()` call adds an additional required condition, you can compose arbitrarily specific queries. Start with a type filter to establish the sensor kind, then add metadata conditions to select by hardware variant, interface type, calibration status, or any other field you stored at ingestion time. **Generic methods** using `with_expression()` and the `.Q` proxy give you access to the full operator set when the convenience methods do not cover your use case. The `.Q` proxy builds type-safe field paths directly from Ontology model definitions, so your queries stay consistent with the schema even as it evolves. --- ## Secure Connection import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; ## Security Model By default, the Mosaico SDK communicates with the `mosaicod` daemon over plain gRPC. That is acceptable for local development on a single machine or inside a trusted private network where traffic never leaves a controlled environment. For production deployments where the daemon is reachable over the public internet, a cloud network, or a shared internal network, two security layers should be enabled: **transport encryption via TLS** and **caller authentication via API keys**. These two mechanisms are independent and complementary. TLS protects the data in transit so that no one on the network path can read or tamper with your queries and results. API keys prove to the daemon that the caller is authorised to make requests at all. In a production setup you will usually want both. :::tip[Learn] [Doc: Client](https://docs.mosaico.dev/python-sdk/SDK/client/) [API Reference: Communication Module](https://docs.mosaico.dev/python-sdk/SDK/API_reference/comm/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ### TLS: Encrypting the Connection Mosaico supports two TLS modes that correspond to the same distinction you encounter with HTTPS: **One-way TLS** (`enable_tls=True`) works like standard HTTPS. The server presents a certificate during the TLS handshake; the client verifies it against a trusted Certificate Authority. The server is authenticated, but the client presents no certificate of its own. Use this mode when `mosaicod` is deployed behind a cloud load balancer or a reverse proxy that holds a CA-signed certificate (e.g. Let's Encrypt or a managed certificate from your cloud provider). **Two-way TLS with a certificate file** (`tls_cert_path`) is used when you need the client to validate the server against a specific certificate, typically a self-signed certificate or one issued by an internal PKI that is not in the system's default CA bundle. You point `tls_cert_path` at the PEM file for the server's CA certificate, and the SDK uses that file exclusively to verify the server identity. This is the right choice for on-premise deployments, edge robots with a private certificate authority, or any environment where you manage your own PKI. ### API Keys: Authenticating the Caller API keys are opaque tokens that the SDK injects as gRPC metadata on every call to the daemon. The daemon validates the key server-side before executing any request. Keys are created and revoked through the daemon's key management commands; consult the CLI reference for the exact commands. Never hard-code an API key in source code that gets committed to version control. If the key is exposed it must be rotated immediately. In production code, read credentials from environment variables or a secrets manager at runtime. ## Option 1: One-Way TLS Use this when your daemon has a CA-signed certificate and you want encrypted transport without managing certificate files yourself. ```python title="One-way TLS" from mosaicolabs import MosaicoClient MOSAICO_HOST = "mosaico.production.yourdomain.com" MOSAICO_PORT = 6726 MY_API_KEY = "msco_vy9lqa7u4lr7w3vimhz5t8bvvc0xbmk2_9c94a86" with MosaicoClient.connect( host=MOSAICO_HOST, port=MOSAICO_PORT, api_key=MY_API_KEY, enable_tls=True ) as client: print(f"Connected to version: {client.version()}") sequences = client.list_sequences() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Option 2: Two-Way TLS with Certificate Use this when the daemon uses a self-signed certificate or an internal CA certificate that is not in your system's default trust store. The `tls_cert_path` parameter tells the SDK exactly which certificate to trust when verifying the server. ```python title="Two-way TLS with certificate" from mosaicolabs import MosaicoClient MOSAICO_HOST = "mosaico.production.yourdomain.com" MOSAICO_PORT = 6726 CERT_PATH = "/etc/mosaico/certs/server_ca.pem" MY_API_KEY = "msco_vy9lqa7u4lr7w3vimhz5t8bvvc0xbmk2_9c94a86" with MosaicoClient.connect( host=MOSAICO_HOST, port=MOSAICO_PORT, api_key=MY_API_KEY, tls_cert_path=CERT_PATH ) as client: print(f"Connected to version: {client.version()}") sequences = client.list_sequences() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Production Best Practice: Use Environment Variables Hard-coding credentials in source code is a security risk. If the file is accidentally committed to a repository or shared in a log, the key must be rotated immediately. The safe pattern is to read all credentials from environment variables at runtime. :::tip[Keep credentials out of source code] ```python title="Read credentials from environment" import os MY_API_KEY = os.environ["MOSAICO_API_KEY"] CERT_PATH = os.environ.get("MOSAICO_CERT_PATH") ``` Use a secrets manager (such as AWS Secrets Manager, HashiCorp Vault, or Kubernetes Secrets) to inject these variables into the process environment at deploy time. Never commit `.env` files that contain real keys. ::: The example below shows a fully environment-driven connection that handles both TLS modes automatically: if `MOSAICO_CERT_PATH` is set, the SDK uses the certificate file; if not, it falls back to `None` (no TLS or plain `enable_tls` depending on your deployment). ```python title="Environment-driven connection" import os from mosaicolabs import MosaicoClient with MosaicoClient.connect( host=os.environ["MOSAICO_HOST"], port=int(os.environ.get("MOSAICO_PORT", "6726")), api_key=os.environ["MOSAICO_API_KEY"], tls_cert_path=os.environ.get("MOSAICO_CERT_PATH"), ) as client: sequences = client.list_sequences() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Key Concepts - **Plain gRPC** is the default and is suitable for localhost or trusted private networks - **`enable_tls=True`** enables one-way TLS; the server is authenticated using the system CA bundle; use this for cloud deployments with CA-signed certificates - **`tls_cert_path`** enables TLS with a specific certificate file; use this for self-signed certificates or internal PKI - **API keys** are injected as gRPC metadata on every call; they are created and managed via the `mosaicod` key management CLI commands - **Never hard-code credentials**; read them from environment variables or a secrets manager at runtime --- ## Streaming Data import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; Once you have located the recording sessions you care about, either by name or through the query system, the next step is to actually read the data back. Mosaico offers two retrieval modes, each designed for a different use case. The first mode, `SequenceDataStreamer`, is for situations where you need to work with multiple sensor streams together. A real robot records many sensors simultaneously: an IMU firing at 200 Hz, a GPS unit updating at 1 Hz, a pressure sensor somewhere in between. The messages from all of these accumulate independently during recording, but when you replay them you usually want them interleaved in the order they actually happened. `SequenceDataStreamer` handles this automatically by merging N topic streams into a single unified timeline using a K-way merge sort, the same algorithm that merge-sorts N sorted lists efficiently. As you iterate, you receive events one at a time in strict chronological order across all requested topics. The second mode, `TopicDataStreamer`, is for situations where you only need one sensor. Because there is nothing to merge, the streamer reads directly from a single topic stream with minimal overhead. This is the right choice when you are isolating a sensor for analysis, building a calibration pipeline, or feeding a single data channel into a machine learning model. There is no point paying the cost of a merge when you only care about one input. Both streamers support temporal slicing: you supply `start_timestamp_ns` and `end_timestamp_ns` to bound the replay window, so you never have to load a full recording when you only care about a segment. :::tip[Learn] [Doc: The Reading Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/reading/) [API Reference: Data Retrieval](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/reading/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Unified Multi-Sensor Replay `SequenceDataStreamer` is obtained from a `SequenceHandler`, which is the entry point to reading back a named Sequence. You call `client.sequence_handler("mission_alpha")` to get a handle to the recording session, then call `.get_data_streamer()` on it with the list of topics you want to merge. The streamer returns `(topic_name, message)` tuples as you iterate, so at each step you know both which sensor the message came from and the message itself, its timestamp and its typed data payload. Before the loop starts you can call `streamer.next_timestamp()` to peek at the timestamp of the very first event without consuming it. This is useful when you need to synchronize the streamer with another data source before you begin processing, or simply to confirm that the time window contains data. Always call `seq_handler.close()` when you are done. Handlers maintain an open connection to the daemon; closing them releases the server-side resources held for that session. ```python title="Multi-sensor replay" from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: seq_handler = client.sequence_handler("mission_alpha") if seq_handler: streamer = seq_handler.get_data_streamer( topics=["/gps", "/imu"], start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000, ) print(f"Streaming starts at: {streamer.next_timestamp()}") for topic, msg in streamer: print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}") seq_handler.close() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Targeted Access `TopicDataStreamer` is obtained from a `TopicHandler`, which you get by calling `client.topic_handler("mission_alpha", "/front/imu")`. This gives you a handle to exactly one Topic within one Sequence. Calling `.get_data_streamer()` on it produces a stream of individual messages, with no tuples, just one message at a time from that single channel. Because there is no merge happening, each message you receive is of a known type. In the example below every iteration yields an IMU message, and you call `.get_data(IMU)` to extract the typed payload directly. This pattern is common in ML data pipelines where a dataloader needs a clean, single-type stream it can batch without branching on sensor identity. `next_timestamp()` is available here too, and serves the same purpose: peek at the first sample's timestamp before committing to the loop. It is particularly handy when you are building a custom synchronization loop across multiple `TopicDataStreamer` instances and need to decide which one to advance next. As with `SequenceHandler`, always call `top_handler.close()` when the loop is done to release the server-side connection. ```python title="Single topic stream" from mosaicolabs import MosaicoClient, IMU with MosaicoClient.connect("localhost", 6726) as client: top_handler = client.topic_handler("mission_alpha", "/front/imu") if top_handler: imu_stream = top_handler.get_data_streamer( start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000, ) print(f"First sample at: {imu_stream.next_timestamp()}") for imu_msg in imu_stream: process_sample(imu_msg.get_data(IMU)) top_handler.close() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ## Streamer Comparison The table below summarizes the trade-offs. In practice, reach for `SequenceDataStreamer` whenever you need a global view of what happened during a recording: system-level replays, event correlation, multi-sensor visualizations. Reach for `TopicDataStreamer` whenever your downstream consumer only cares about one channel and you want the lowest possible overhead. | Feature | SequenceDataStreamer | TopicDataStreamer | |---|---|---| | Primary Use Case | Multi-sensor fusion & system-wide replay | Isolated sensor analysis & ML training | | Merge Logic | K-Way merge sort | Direct stream | | Output | `(topic_name, message)` tuple | Single `message` | | Temporal Slicing | Supported | Supported | --- ## Writing Interleaved Topics import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; This guide demonstrates how to ingest data from a single MCAP container file that holds interleaved messages from multiple sensors. Because messages from different topics are interleaved in the file, topic writers are created lazily on first encounter rather than up front. :::tip[Learn] [Doc: The Writing Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/writing/) [API Reference: Writing Data](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/writing/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## What MCAP Is and Why Interleaved Ingestion Matters MCAP is a high-performance, self-describing container format designed for robotics and autonomous systems data, widely used in ROS 2 and beyond. Unlike a simple CSV where each file contains one type of data, an MCAP file encodes all sensor streams together in a single binary file, with messages ordered chronologically by timestamp. At `10:00:00.000` you might find an IMU record, immediately followed by a GPS fix at `10:00:00.001`, then a pressure reading at `10:00:00.002`, then another IMU record, all interleaved in the order they were originally recorded. This layout is efficient for playback and cross-sensor analysis, but it poses a challenge for ingestion: you cannot know in advance which sensor topics appear in the file, or in what order they first appear. The serial approach from the previous guide (iterate IMU completely, then GPS, then Pressure) does not apply here. When you read an MCAP file front-to-back, you encounter messages from all topics simultaneously. You need a strategy that can register a new `TopicWriter` the first time a topic appears and then reuse that writer for every subsequent message on the same topic. This is the **lazy registration pattern**, and it is the central technique in this guide. ## Architecture of Interleaved Ingestion The ingestion pipeline for an MCAP file has four layers that are worth understanding before diving into the code. First, the **source file stream**: the MCAP reader delivers a flat sequence of `(schema, channel, message)` tuples in chronological order. Each tuple carries the schema name (e.g. `"sensor_msgs/msg/Imu"`), the channel topic string (e.g. `"/sensors/imu"`), and the raw binary payload. Second, **schema detection and translation**: MCAP schemas are external definitions, typically ROS 2 message types, that do not map directly to Mosaico's Ontology. You need a translation layer that knows how to convert a `sensor_msgs/msg/Imu` payload into a Mosaico `IMU` object, extract the embedded timestamp, and return a `Message` ready for the SDK. This translator is also responsible for declaring which Mosaico Ontology type corresponds to each schema. Third, **dynamic TopicWriter registration**: the first time a given channel topic appears in the stream, a new `TopicWriter` is created and cached inside the `SequenceWriter`. Every subsequent message on that channel reuses the cached writer. The `swriter.get_topic_writer()` method is the cache lookup; `swriter.topic_create()` is the cache population. Fourth, the **push path**: once a writer is obtained, the translated message is pushed through it to the daemon, exactly as in the serial guides. The only difference is that this happens inline within the single chronological scan of the file, interleaving pushes across multiple writers as messages are encountered. ### Custom Translator and Type Mapping The translator is the bridge between the external world and the Mosaico type system. External robotics schemas like `sensor_msgs/msg/Imu` have their own field naming conventions (`linear_acceleration`, `angular_velocity`) and timestamp representation (a `header.stamp` struct with separate seconds and nanoseconds fields). Mosaico's Ontology types have their own conventions (`acceleration`, `angular_velocity` as `Vector3d`; timestamps as a single nanosecond integer on the `Message` wrapper). The translator is where you perform this mapping explicitly and consciously. This explicit translation step is not boilerplate to be minimized; it is an important design boundary. It separates the "understanding of external formats" from the "push to Mosaico" logic. If the source schema ever changes (a field rename, a new firmware version), you update the translator in one place. If Mosaico adds a new Ontology type, you add a branch to the translator. The rest of the ingestion pipeline stays unchanged. The `determine_mosaico_type()` helper serves a complementary purpose: given a schema name, it returns the Mosaico Ontology class that should be used for the `TopicWriter`. This is needed at writer creation time, before any payload has been translated, so it is kept separate from the translation logic. ```python title="Custom translator" from mosaicolabs.models import IMU, GPS, Pressure, Vector3d, GPSStatus, Time, Point3d from mosaicolabs import Message def custom_translator(schema_name: str, payload: dict): if schema_name == "sensor_msgs/msg/Imu": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=IMU( acceleration=Vector3d(**payload['linear_acceleration']), angular_velocity=Vector3d(**payload['angular_velocity']) )) if schema_name == "sensor_msgs/msg/NavSatFix": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=GPS( position=Point3d(x=payload['latitude'], y=payload['longitude'], z=payload['altitude']), status=GPSStatus(status=payload['status']['status'], service=payload['status']['service']) )) if schema_name == "sensor_msgs/msg/FluidPressure": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=Pressure(value=payload['fluid_pressure'])) return None def determine_mosaico_type(schema_name): return {"sensor_msgs/msg/Imu": IMU, "sensor_msgs/msg/NavSatFix": GPS, "sensor_msgs/msg/FluidPressure": Pressure}.get(schema_name) ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ### Main Ingestion The ingestion loop is where the lazy registration pattern comes to life. For each message from the MCAP reader, the first question is whether a `TopicWriter` already exists for this channel. `swriter.get_topic_writer(channel.topic)` checks the `SequenceWriter`'s internal cache and returns the writer if it has been registered, or `None` if this is the first time this topic has been encountered. On first encounter, we call `determine_mosaico_type()` to learn which Ontology type this channel carries, and then create the writer with `swriter.topic_create()`. On all subsequent encounters, we skip straight to the push. The `on_error=TopicLevelErrorPolicy.Finalize` parameter is worth paying close attention to. In the serial guides, we caught push exceptions manually with `try-except` to prevent them from propagating to the sequence level. Here, `TopicLevelErrorPolicy.Finalize` provides an equivalent guarantee at the framework level: if a topic writer encounters an unrecoverable error, it closes itself gracefully (finalizes its stream) rather than raising an exception that could abort the entire sequence. Other topics continue ingesting normally. This is especially important for interleaved ingestion, where a bad message on the `/sensors/imu` topic appears in the same chronological stream as valid GPS and Pressure messages; you want those other messages to still reach the daemon even if the IMU stream has a problem. The `twriter.is_active` check before each push guards against the case where a topic has already been finalized due to an earlier error. Attempting to push to a finalized writer would raise an error, so the check lets us skip those messages cleanly. ```python title="Ingestion loop" import json from mcap.reader import make_reader from mosaicolabs import MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, TopicLevelErrorPolicy def deserialize_payload(data: bytes, schema_name: str) -> dict: try: return json.loads(data.decode("utf-8")) except Exception: return {} def main(): setup_sdk_logging(level="INFO", pretty=True) with open("mission_data.mcap", "rb") as f: reader = make_reader(f) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="mcap_ingestion", metadata={"mission": "alpha_test"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: for schema, channel, message in reader.iter_messages(): twriter = swriter.get_topic_writer(channel.topic) if twriter is None: ontology_type = determine_mosaico_type(schema.name) if ontology_type is None: continue twriter = swriter.topic_create( topic_name=channel.topic, metadata={}, ontology_type=ontology_type, on_error=TopicLevelErrorPolicy.Finalize ) if twriter.is_active: with twriter: raw = deserialize_payload(message.data, schema.name) msg = custom_translator(schema.name, raw) if msg is None: continue twriter.push(message=msg) print("MCAP ingestion complete!") if __name__ == "__main__": main() ``` :::info[Lazy Topic Registration] `swriter.get_topic_writer(topic)` returns an existing writer or `None` if the topic has not been registered yet. This pattern avoids pre-declaring all topics when the set of channels is only known at read time. The `SequenceWriter` maintains the writer cache internally; you do not need to manage a dictionary yourself. ::: :::warning[Topic-Level Error Policy] `TopicLevelErrorPolicy.Finalize` closes the affected topic writer on error, allowing the remaining topics in the sequence to continue ingesting. Without this policy, a single bad message could trigger the sequence-level error policy and abort the entire upload. Note that before proceeding with topic-related operations (deserializing, pushing, ...) the `if twriter.is_active` guard is used: this is necessary when using `TopicLevelErrorPolicy.Finalize`, to avoid keeping writing on finalized topics, which would produce runtime errors. ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Generate a Sample MCAP File Use the script below to generate a synthetic `mission_data.mcap` file with interleaved IMU, GPS, and barometric pressure messages for testing. The script writes ten timestamped samples for each sensor, with all three sensors emitting a message at each sample time, mimicking a real multi-sensor recording where sensors are synchronized to a common clock. ```python title="Generate sample MCAP file" import time, json from mcap.writer import Writer def generate_mission_mcap(output_path: str): with open(output_path, "wb") as f: writer = Writer(f) writer.start() imu_schema = writer.register_schema(name="sensor_msgs/msg/Imu", encoding="jsonschema", data=b"{}") gps_schema = writer.register_schema(name="sensor_msgs/msg/NavSatFix", encoding="jsonschema", data=b"{}") press_schema = writer.register_schema(name="sensor_msgs/msg/FluidPressure", encoding="jsonschema", data=b"{}") imu_chan = writer.register_channel(topic="/sensors/imu", message_encoding="json", schema_id=imu_schema) gps_chan = writer.register_channel(topic="/sensors/gps", message_encoding="json", schema_id=gps_schema) press_chan = writer.register_channel(topic="/sensors/baro", message_encoding="json", schema_id=press_schema) start_time_ns = time.time_ns() for i in range(10): t = start_time_ns + (i * 100_000_000) sec, nanosec = t // 1_000_000_000, t % 1_000_000_000 writer.add_message(imu_chan, log_time=t, publish_time=t, data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "linear_acceleration": {"x": 0.01*i, "y": 0.02, "z": 9.81}, "angular_velocity": {"x": 0.0, "y": 0.0, "z": 0.01}}).encode()) writer.add_message(gps_chan, log_time=t, publish_time=t, data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "latitude": 25.04, "longitude": 121.53, "altitude": 10.5, "status": {"status": 1, "service": 1}}).encode()) writer.add_message(press_chan, log_time=t, publish_time=t, data=json.dumps({"header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "fluid_pressure": 101325.0 - (i * 10)}).encode()) writer.finish() if __name__ == "__main__": generate_mission_mcap("mission_data.mcap") ``` ## Full Example ```python title="Full example" import json from mcap.reader import make_reader from mosaicolabs import MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, TopicLevelErrorPolicy from mosaicolabs.models import IMU, GPS, Pressure, Vector3d, GPSStatus, Time, Point3d from mosaicolabs import Message def custom_translator(schema_name: str, payload: dict): if schema_name == "sensor_msgs/msg/Imu": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=IMU( acceleration=Vector3d(**payload['linear_acceleration']), angular_velocity=Vector3d(**payload['angular_velocity']) )) if schema_name == "sensor_msgs/msg/NavSatFix": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=GPS( position=Point3d(x=payload['latitude'], y=payload['longitude'], z=payload['altitude']), status=GPSStatus(status=payload['status']['status'], service=payload['status']['service']) )) if schema_name == "sensor_msgs/msg/FluidPressure": ts = Time(seconds=payload['header']['stamp']['sec'], nanoseconds=payload['header']['stamp']['nanosec']).to_nanoseconds() return Message(timestamp_ns=ts, data=Pressure(value=payload['fluid_pressure'])) return None def determine_mosaico_type(schema_name): mapping = { "sensor_msgs/msg/Imu": IMU, "sensor_msgs/msg/NavSatFix": GPS, "sensor_msgs/msg/FluidPressure": Pressure, } return mapping.get(schema_name) def deserialize_payload(data: bytes, schema_name: str) -> dict: try: return json.loads(data.decode("utf-8")) except Exception: return {} def main(): setup_sdk_logging(level="INFO", pretty=True) with open("mission_data.mcap", "rb") as f: reader = make_reader(f) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="mcap_ingestion", metadata={"mission": "alpha_test"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: for schema, channel, message in reader.iter_messages(): twriter = swriter.get_topic_writer(channel.topic) if twriter is None: ontology_type = determine_mosaico_type(schema.name) if ontology_type is None: continue twriter = swriter.topic_create( topic_name=channel.topic, metadata={}, ontology_type=ontology_type, on_error=TopicLevelErrorPolicy.Finalize ) if twriter.is_active: with twriter: raw = deserialize_payload(message.data, schema.name) msg = custom_translator(schema.name, raw) if msg is None: continue twriter.push(message=msg) print("MCAP ingestion complete!") if __name__ == "__main__": main() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. --- ## Writing Multiple Topics import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; This guide demonstrates how to ingest data from multiple CSV files into Mosaico, writing several topics serially within a single sequence. The example covers IMU, GPS, and Pressure sensors, each sourced from a separate CSV file. :::tip[Learn] [Doc: The Writing Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/writing/) [API Reference: Writing Data](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/writing/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Why Multiple Topics in One Sequence In real robotics deployments, a robot rarely records just one sensor. A field mission might simultaneously produce IMU readings at 200 Hz, GPS fixes at 10 Hz, and barometric pressure samples at 50 Hz. All of these streams are causally related; they describe the same vehicle, in the same environment, during the same run. Storing them in separate, unrelated records would destroy that relationship and make cross-sensor queries needlessly complex. This is exactly what the Sequence model is designed for. A Sequence is not just a container for one sensor stream; it is a session-level grouping that can hold any number of Topics, each carrying a different sensor type. When you create a single Sequence named `multi_sensor_ingestion` and register IMU, GPS, and Pressure topics inside it, you are telling the daemon that all three streams belong to the same operational context. Later, when you query the platform, you can retrieve all topics from that sequence together, knowing their timestamps align to the same mission clock. In this guide, we ingest the three sensors **serially**: we exhaust the IMU file completely, then the GPS file, then the Pressure file. This is the natural approach when your data arrives pre-split into separate files. The daemon does not require messages to arrive in global timestamp order across topics; each topic maintains its own ordered stream independently. If your source data is already interleaved in a single file, see the [Writing Interleaved Topics](./writing_interleaved_topics.mdx) guide instead. ### Streaming Helpers The guide uses three sample files: [`imu.csv`](/assets/sample-data/imu.csv), [`gps.csv`](/assets/sample-data/gps.csv), and [`pressure.csv`](/assets/sample-data/pressure.csv). Download them to follow along locally. For each sensor, we define a dedicated **generator function** that reads its CSV in chunks and `yield`s typed `Message` objects. This mirrors the single-topic pattern from the previous guide, extended to three sensor types. Each generator is independent; it knows nothing about the other sensors and can be tested in isolation. The `GPS` type uses a `Point3d` to hold latitude, longitude, and altitude, and a `GPSStatus` struct to carry fix quality metadata. The `Pressure` type is simpler, wrapping a single scalar value. As before, each generator `yield`s `None` on a parse error rather than raising an exception. This keeps the generator implementations clean and makes error handling a concern of the calling code, not the data loading code. ```python title="Define data loaders" import pandas as pd from mosaicolabs import ( MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, Message, IMU, Vector3d, GPS, GPSStatus, Pressure, Point3d ) def stream_imu_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)), angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z)) ) ) except Exception: yield None def stream_gps_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=GPS( position=Point3d(x=float(row.latitude), y=float(row.longitude), z=float(row.altitude)), status=GPSStatus(status=int(row.status), service=int(row.service)) ) ) except Exception: yield None def stream_pressure_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message(timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure)) except Exception: yield None ``` The C++ SDK is currently in development. The Rust SDK is currently in development. ### Connect and Create Sequence Writer Connecting to the daemon and creating a Sequence follows the same pattern as the single-topic guide. The key difference here is the metadata: we tag this sequence with mission-level context (`"mission": "alpha_test"`, `"environment": "laboratory"`) rather than format-level context. Because all three sensors will live inside this one sequence, the metadata should describe the session as a whole, not any individual stream. The `SequenceWriter` returned by `sequence_create()` is the object that governs the entire session. Every topic you create, every message you push, is ultimately coordinated through this writer. When its `with` block exits successfully, the daemon finalizes all topics, seals the sequence's time bounds, and marks the entire session as complete and queryable. ```python title="Connect and create sequence writer" setup_sdk_logging(level="INFO", pretty=True) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: pass # Steps 3 and 4 here ``` :::warning[Context Management] It is **mandatory** to use `SequenceWriter` inside its own `with` context. Using it outside will raise an exception. ::: **Sequence-Level Error Handling**: `SessionLevelErrorPolicy.Delete` removes the incomplete sequence entirely on an unhandled error; use this when you want an all-or-nothing guarantee. `SessionLevelErrorPolicy.Report` flags the sequence as failed while retaining whatever records were already transmitted, which can be useful when partial data has diagnostic value. In multi-topic ingestion, the stakes of a session-level abort are higher: a crash late in the GPS ingestion phase could discard IMU data that was uploaded cleanly. This makes per-topic error containment (covered in the next section) especially important. The C++ SDK is currently in development. The Rust SDK is currently in development. ### Topic Writers All three `TopicWriter` objects should be created upfront, before any data is pushed. This is best practice for a straightforward reason: calling `topic_create()` registers the topic with the daemon immediately, causing it to allocate the catalog entry, prepare the typed write path, and associate the topic with the current sequence. By creating all writers at the start, you ensure the daemon has a complete picture of the session's structure before data starts flowing. It also makes the code easier to reason about: the setup phase is clearly separated from the data-movement phase, and you can verify that all three writers exist before committing to any I/O. Each `TopicWriter` is bound to exactly one Ontology type. Attempting to push a `GPS` message through the `imu_twriter` would fail at the SDK level because the types are incompatible. This strictness is what allows the daemon to store each topic as a homogeneous, columnar stream rather than a mixed bag of records. ```python title="Create topic writers" imu_twriter = swriter.topic_create( topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, on_error=TopicLevelErrorPolicy.Ignore, ) gps_twriter = swriter.topic_create( topic_name="sensors/gps", metadata={"sensor_id": "gps_01"}, ontology_type=GPS, on_error=TopicLevelErrorPolicy.Ignore, ) pressure_twriter = swriter.topic_create( topic_name="sensors/pressure", metadata={"sensor_id": "pressure_01"}, ontology_type=Pressure, on_error=TopicLevelErrorPolicy.Ignore, ) ``` :::info[Topic-Level Error Policy] The `TopicLevelErrorPolicy.Ignore` error policy will log on the server any error happening within the `TopicWriter` context (e.g. while pushing a message), and then will ignore it, keeping the injestion stream up and active. For a more detailed explanation, refer to the [Topic-Level Error Handling](https://docs.mosaico.dev/python-sdk/SDK/handling/writing/#topic-level-error-handling) Section. ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ### Pushing Data for Each Sensor With all three writers registered, we iterate over each generator and push its messages. The three `for` loops run one after the other: first all IMU messages, then all GPS messages, then all Pressure messages. Each loop is independent and isolated from the others by its own `with` context around each push call. This per-topic error isolation is the critical design choice in multi-topic ingestion: it allows the SDK to correctly handle errors inside the context, by means of the selected `TopicLevelErrorPolicy` policy. ```python title="Push messages" # Push IMU for msg in stream_imu_from_csv("imu.csv"): with imu_twriter: imu_twriter.push(message=msg) if imu_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"IMU message skipped with error: {imu_twriter.last_error}") # Push GPS for msg in stream_gps_from_csv("gps.csv"): with gps_twriter: gps_twriter.push(message=msg) if gps_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"GPS message skipped with error: {gps_twriter.last_error}") # Push Pressure for msg in stream_pressure_from_csv("pressure.csv"): with pressure_twriter: pressure_twriter.push(message=msg) if pressure_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"Pressure message skipped with error: {pressure_twriter.last_error}") ``` **Topic-Level Error Management**: The **`with`** context around each `push()` call is important. If an error occurs at the push level, for example a network hiccup or a type validation failure for a particular message, it will propagate up through the generator loop, exit the `SequenceWriter` context block abnormally, and trigger the sequence-level `on_error` policy, potentially deleting everything. By wrapping each operation related to a topic in the related `TopicWriter` context, you contain errors to the individual message that caused them. In this case, since the `TopicLevelErrorPolicy.Ignore` policy has been selected, The affected timestamp is logged remotely, and ingestion continues with the next message. The C++ SDK is currently in development. The Rust SDK is currently in development. ## Full Example ```python title="Full example" import pandas as pd from mosaicolabs import ( MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, Message, IMU, Vector3d, GPS, GPSStatus, Pressure, Point3d ) def stream_imu_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)), angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z)) ) ) except Exception: yield None def stream_gps_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=GPS( position=Point3d(x=float(row.latitude), y=float(row.longitude), z=float(row.altitude)), status=GPSStatus(status=int(row.status), service=int(row.service)) ) ) except Exception: yield None def stream_pressure_from_csv(file_path, chunk_size=1000, skipinitialspace=True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message(timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure)) except Exception: yield None def main(): setup_sdk_logging(level="INFO", pretty=True) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: imu_twriter = swriter.topic_create(topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU) gps_twriter = swriter.topic_create(topic_name="sensors/gps", metadata={"sensor_id": "gps_01"}, ontology_type=GPS) pressure_twriter = swriter.topic_create(topic_name="sensors/pressure", metadata={"sensor_id": "pressure_01"}, ontology_type=Pressure) # Push IMU for msg in stream_imu_from_csv("imu.csv"): with imu_twriter: imu_twriter.push(message=msg) if imu_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"IMU message skipped with error: {imu_twriter.last_error}") # Push GPS for msg in stream_gps_from_csv("gps.csv"): with gps_twriter: gps_twriter.push(message=msg) if gps_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"GPS message skipped with error: {gps_twriter.last_error}") # Push Pressure for msg in stream_pressure_from_csv("pressure.csv"): with pressure_twriter: pressure_twriter.push(message=msg) if pressure_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"Pressure message skipped with error: {pressure_twriter.last_error}") print("Successfully injected multi-sensor data from CSV into Mosaico!") if __name__ == "__main__": main() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. --- ## Writing a Single Topic import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; This guide demonstrates how to ingest data into Mosaico. The example uses a CSV file as the data source, chosen purely for its simplicity. :::info In production, your source data will almost certainly come from a binary container rather than a CSV. Mosaico's SDK is **format-agnostic**: it does not care where your data come from. The same `push()` call works whether you are reading an `.mcap` file, a `.bag`, a `.hdf5` archive, a proprietary binary log from your hardware vendor, or a live data stream from a running robot. The only thing that changes between formats is the loader code that converts raw bytes into typed ontology model, the connection to the daemon, topic and sequence management, and the push logic are identical regardless of source. For a real-world example using MCAP, see the [Writing Interleaved Topics](./writing_interleaved_topics.mdx) guide. ::: :::tip[Learn] [Doc: The Writing Workflow](https://docs.mosaico.dev/python-sdk/SDK/handling/writing/) [API Reference: Writing Data](https://docs.mosaico.dev/python-sdk/SDK/API_reference/handlers/writing/) ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ## Understanding the Mosaico Data Model Before writing a single line of ingestion code, it helps to understand the four concepts you will interact with in this guide: the Sequence, the Topic, the Message, and the Ontology. A **Sequence** is Mosaico's fundamental unit of a recording session. Think of it as a named, time-bounded container, analogous to a ROS bag or a test run folder, that groups all sensor streams captured during a single operational session. When you create a sequence, you give it a name and optional metadata (key-value tags like `{"source": "manual_upload"}`). That metadata travels with the data forever and makes it searchable later. Under the hood, creating a sequence causes the Mosaico Daemon (`mosaicod`), the server-side process that owns all storage, to allocate the necessary catalog entries and prepare to receive incoming data. A **Topic** lives inside a Sequence and represents a single sensor stream. Each topic has a strict one-to-one relationship with a data type from the **Ontology**, Mosaico's type system. The Ontology defines the shape of every piece of data the platform can store. Built-in types like `IMU`, `GPS`, and `Pressure` are already registered, so you can start using them immediately. This strictness is intentional: because every record in a topic is the same type, the daemon can serialize and index data far more efficiently using Apache Arrow columnar layout. When you create a topic and bind it to `IMU`, you are telling the daemon to open a typed, append-only stream optimized for that exact schema. A **Message** is the in-memory wrapper you hand to the SDK. It couples a typed data payload (your `IMU`, `GPS`, or `Pressure` object) with a nanosecond-precision timestamp. The timestamp is what Mosaico uses to order and index records in time. It is not the wall-clock time of the ingestion call, but the sensor timestamp embedded in your source data. The SDK serializes the Message and transmits it to the daemon over gRPC. You never deal with bytes directly; you construct typed objects, wrap them in `Message`, and let the SDK handle the rest. ### Chunked Loading for High-Volume Data In this example the source is [`imu.csv`](/assets/sample-data/imu.csv), a file with columns `timestamp`, `acc_x`, `acc_y`, `acc_z`, `gyro_x`, `gyro_y`, `gyro_z`. Download it to follow along locally. When working with large files, loading everything into memory at once is impractical. Instead, we write a **generator function** that reads the CSV in fixed-size chunks using pandas and **yields** one `Message` at a time. This way, only a small slice of the file occupies memory at any moment, and the rest of the pipeline (the SDK's internal batching, the gRPC transport, the daemon's write path) can operate in a steady, pipelined fashion rather than in a single massive burst. Notice that the generator **yields `None`** on parse errors rather than raising an exception. This is a deliberate choice: a corrupt or missing value in one row of a CSV should not abort the entire ingestion. By yielding `None`, we signal to the caller that this particular record could not be constructed; the caller logs it and moves on. The generator itself remains simple and composable. ```python title="Define the data loader" import pandas as pd from mosaicolabs import ( MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, Message, IMU, Vector3d, ) def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d( x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z), ), angular_velocity=Vector3d( x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z), ), ), ) except Exception: yield None ``` The C++ SDK is currently in development. The Rust SDK is currently in development. Here `IMU` and `Vector3d` are built-in Mosaico Ontology types. You do not need to define or register them; they are part of the default type catalog that ships with the SDK. The `Message` wrapper carries the typed `IMU` payload along with the nanosecond timestamp parsed directly from the CSV's `timestamp` column, preserving the original sensor time rather than substituting the current wall clock. ### Sequence Upload With the data loader in place, the next step is to open a connection to the Mosaico Daemon and create a Sequence. `MosaicoClient.connect()` establishes a gRPC channel to the daemon running on the specified host and port. Using it as a **context manager** ensures the connection is cleanly closed even if an exception occurs. Inside the **client context**, `client.sequence_create()` registers a new Sequence with the daemon and returns a `SequenceWriter`, the object that orchestrates the entire write session. All topic creation and data pushing happens through this writer. When the **`with` block** exits normally, the `SequenceWriter` flushes any buffered data, finalizes the sequence's time bounds, and marks it as successfully completed in the daemon's catalog. If the block exits due to an **unhandled exception**, the behavior is governed by the `on_error` policy you supply. ```python title="Connect and create sequence writer" setup_sdk_logging(level="INFO", pretty=True) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="csv_ingestion_test", metadata={"source": "manual_upload", "format": "csv"}, on_error=SessionLevelErrorPolicy.Delete # Delete sequence on unhandled error ) as swriter: # Topic creation and data push happen inside this block... pass ``` :::warning[Context Manager] It is **mandatory** to use `SequenceWriter` inside its own **`with` context**. Using it outside will raise an exception. ::: **Sequence-Level Error Handling**: The `on_error` policy is your first line of defense against partial or corrupt data reaching the platform. `SessionLevelErrorPolicy.Delete` instructs the daemon to remove the entire sequence if an **unhandled exception escapes the `with` block**. This is the safer default when you would rather have no data than incomplete data. `SessionLevelErrorPolicy.Report`, by contrast, keeps whatever records were successfully transmitted and marks the sequence with a failed status, allowing you to inspect what arrived before the error. Choose `Delete` when data integrity across the full session is critical; choose `Report` when partial data is still useful for debugging or analysis. The C++ SDK is currently in development. The Rust SDK is currently in development. ### Topic Creation Inside the sequence writer, you register one or more Topics. Calling `swriter.topic_create()` tells the daemon to open a typed stream named `sensors/imu` that will accept only `IMU` messages. The `metadata` dictionary here annotates the topic itself, useful for tagging the physical sensor unit, calibration version, or any other attribute you want to query later. The return value is a `TopicWriter`, which manages the actual write operations for this stream. Internally, the `TopicWriter` accumulates messages into batches before transmitting them to the daemon, amortizing the cost of individual gRPC calls. You do not need to manage this batching manually; just call `push()` for each message and the SDK handles the rest. ```python title="Create topic writer" with client.sequence_create(...) as swriter: imu_twriter = swriter.topic_create( topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, on_error=TopicLevelErrorPolicy.Ignore ) ``` :::info[Topic-Level Error Policy] The `TopicLevelErrorPolicy.Ignore` error policy will log on the server any error happening within the `TopicWriter` context (e.g. while pushing a message), and then will ignore it, keeping the injestion stream up and active. For a more detailed explanation, refer to the [Topic-Level Error Handling](https://docs.mosaico.dev/python-sdk/SDK/handling/writing/#topic-level-error-handling) Section. ::: The C++ SDK is currently in development. The Rust SDK is currently in development. ### Pushing Data With the `TopicWriter` in hand, you iterate over the **generator** and push each message. The SDK serializes each `Message` using Apache Arrow, batches it with preceding messages, and transmits batches to the daemon over gRPC. From your perspective, the call is a simple method invocation; the network and serialization complexity is hidden. Wrapping the operations related to a topic inside a `with` context block, allows the SDK to correctly handle errors inside the context, by means of the selected `TopicLevelErrorPolicy` policy. ```python title="Push messages" for msg in stream_imu_from_csv("imu_data.csv"): with imu_twriter: # Any exception inside here will trigger the `TopicLevelErrorPolicy.Ignore` imu_twriter.push(message=msg) if imu_twriter.status == TopicWriterStatus.IgnoredLastError: # Last cycle produced an error print(f"IMU message skipped with error: {imu_twriter.last_error}") ``` **Topic-Level Error Management**: The **`with`** context around each `push()` call is important. If an error occurs at the push level, for example a network hiccup or a type validation failure for a particular message, it will propagate up through the generator loop, exit the `SequenceWriter` context block abnormally, and trigger the sequence-level `on_error` policy, potentially deleting everything. By wrapping each operation related to a topic in the related `TopicWriter` context, you contain errors to the individual message that caused them. In this case, since the `TopicLevelErrorPolicy.Ignore` policy has been selected, The affected timestamp is logged remotely, and ingestion continues with the next message. The C++ SDK is currently in development. The Rust SDK is currently in development. ## How It Fits Together It is worth pausing to see the full picture before looking at the complete code. The Mosaico Daemon is always running, managing storage and serving the gRPC API. Your application acts as a client: it opens a connection, instructs the daemon to create a Sequence (a catalog entry with metadata and time bounds), and then registers a Topic within that sequence (a typed, append-only stream). As you iterate through your CSV, each row becomes a `Message`, a typed, timestamped record. The SDK batches those messages and streams them to the daemon via gRPC using Arrow-encoded payloads. When the `SequenceWriter` scope exits cleanly, the daemon seals the sequence and it becomes queryable. Error handling is layered deliberately. At the row level, the **generator yields `None`** for unparseable rows, keeping bad source data out of the message stream entirely. At the push level, a **`try-except`** catches transport or validation errors for individual messages without aborting the session. At the sequence level, the `on_error` policy decides what to do if something truly unrecoverable happens. This three-layer approach means a single bad CSV row never brings down an entire recording session. The C++ SDK is currently in development. The Rust SDK is currently in development. ## Full Example ```python title="Full example" import pandas as pd from mosaicolabs import ( MosaicoClient, setup_sdk_logging, SessionLevelErrorPolicy, Message, IMU, Vector3d, ) def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d(x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z)), angular_velocity=Vector3d(x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z)), ), ) except Exception: yield None def main(): setup_sdk_logging(level="INFO", pretty=True) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="csv_ingestion_test", metadata={"source": "manual_upload", "format": "csv"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: imu_twriter = swriter.topic_create( topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, on_error=TopicLevelErrorPolicy.Ignore, ) for msg in stream_imu_from_csv("imu.csv"): with imu_twriter:: imu_twriter.push(message=msg) if imu_twriter.status == TopicWriterStatus.IgnoredLastError: print(f"IMU message skipped with error: {imu_twriter.last_error}") print("Successfully injected data from CSV into Mosaico!") if __name__ == "__main__": main() ``` The C++ SDK is currently in development. The Rust SDK is currently in development. --- ## Overview(Docs) # Mosaico **Mosaico** is a high-performance, open-source data platform engineered to bridge the gap between **Robotics** and **Physical AI**. Robots produce data continuously: cameras, Lidar units, IMUs, GPS receivers, and custom sensors all fire at different rates, generating heterogeneous streams that need to be recorded, stored, and later retrieved for analysis or model training. Traditional approaches to this problem rely on monolithic file formats like [ROS bag](https://wiki.ros.org/Bags), which store data as a linear sequence of messages, simple to write, but difficult to index, query, or stream efficiently at scale. Mosaico was built to replace that model. Rather than appending bytes to a file, it stores data in a structured, queryable archive, designed from the ground up for the throughput demands of multi-modal sensor data. Equally important, Mosaico follows a strictly **code-first approach**. Engineers should not have to learn a proprietary query sublanguage to move data around. The SDK exposes all platform capabilities, ingestion, retrieval, querying as ordinary function calls.
## Core Concepts What makes structured storage possible is a shared vocabulary for describing data. Mosaico provides this through three interlocking concepts: **Ontology**, **Topic**, and **Sequence**. ### The Ontology The Ontology is the structural backbone of the platform. Rather than storing data as raw bytes, Mosaico requires every piece of data to have a declared type, a model that describes its shape and semantics. Because all data is treated as a **time series** (even a single data point is a degenerate time series of length one), the ontology's job is to define the structure of each series: the fields it contains, their types, and their meaning. This declaration is what allows the platform to do more than just store data. By understanding what data *is*, Mosaico can apply targeted processing, custom compression, semantic indexing, efficient storage layout, tuned to each type rather than treating everything as opaque bytes. To make this concrete, consider how a GPS sensor might be expressed as an Ontology Model: ```python class GPS: latitude: MosaicoType.float32 longitude: MosaicoType.float32 altitude: MosaicoType.float32 ``` Or the output of an image classification algorithm: ```python class SimpleImageClassification: top_left_corner: mosaicolabs.Vector2d bottom_right: mosaicolabs.Vector2d label: MosaicoType.string confidence: MosaicoType.float32 ``` Any structure can be expressed this way. The full set of available types and how to define your own models is covered in the [Ontology Models](https://docs.mosaico.dev/python-sdk/SDK/ontology/) reference. ### Topics and Sequences With a type declared, data needs somewhere to live. A **Topic** is a concrete instance of an Ontology Model, a container for a single time series of that type. The relationship is strictly one-to-one: one Topic, one model. This constraint is deliberate; it is what allows the platform to index and query topics by their semantic structure, not just by name or timestamp. Topics, however, rarely exist in isolation. A robot recording session produces many streams simultaneously, Lidar, GPS, camera, accelerometer, and these streams belong together. The **Sequence** is the container that groups them. Where a Topic represents a single sensor stream, a Sequence represents the session as a whole: a coherent, time-bounded collection of related Topics. Both levels support arbitrary metadata, so context can travel alongside the data. ## Architecture At runtime, Mosaico operates as a client-server system. The SDK is the user-facing interface; it communicates with `mosaicod`, a high-performance daemon written in Rust, over [Apache Arrow](https://arrow.apache.org/), a columnar data format that eliminates serialization overhead and allows zero-copy data exchange between processes. `mosaicod` handles all core operations: [ingestion](daemon/ingestion.md), [retrieval](daemon/retrieval.md), and [query](daemon/query.md). Metadata and system state are managed in a database, which accelerates lookups and powers the internal event queue for asynchronous tasks. Data itself is stored in an [object store](https://en.wikipedia.org/wiki/Object_storage), [S3](https://aws.amazon.com/s3/), [MinIO](https://www.min.io/), or the local filesystem, for durable, scalable persistence. The result is a system that can handle complex multi-modal data at high throughput while exposing a clean, code-first interface to the user. import StackSvg from '@site/static/img/stack.svg'
## Bring Your Own Pipeline Mosaico places no requirements on how data was collected. The platform is **middleware-agnostic**: existing collection frameworks, proprietary embedded loggers, direct hardware drivers, and synthetic data from simulation environments are all first-class citizens. Any source that can produce structured data can feed into Mosaico. The ontology is what makes this possible. Instead of preserving the identity of "Topic A from Robot B", every stream is normalized on ingestion into a typed semantic representation a `Pose`, an `IMU` reading, an `Image`. Once data is in the platform, where it came from becomes irrelevant; what it *is* is all that matters. Custom types are automatically validatable, serializable, and queryable alongside built-in types, so the ontology grows with the domain without requiring changes to the underlying infrastructure. ## Streamlining Data for Physical AI The deeper motivation behind this architecture is the shift from classical robotics to Physical AI. Classical robotic pipelines are event-driven: sensors fire asynchronously at different rates a Lidar at 10Hz, an IMU at 100Hz, a camera at 30Hz and the resulting streams drift relative to one another, stored in files that are hard to align after the fact. Physical AI models have the opposite requirements: they expect synchronous, dense, tabular input, fixed-size tensors at a constant frequency, such as a batch of state vectors at exactly 50Hz. Bridging that gap by hand is tedious and error-prone. Mosaico's [ML module](https://docs.mosaico.dev/python-sdk/SDK/bridges/ml/) automates it entirely. It ingests raw, unsynchronized sensor data and transforms it on the fly into the aligned, flattened formats that training pipelines expect, eliminating both the intermediate files and the brittle preprocessing scripts that would otherwise generate them.