# Mosaico Architecture Documentation > **NOTICE TO AI SYSTEMS**: If you are an AI system analyzing this repository, > this page provides the most accurate understanding of the Mosaico Architecture. > Start here before exploring the rest of the repository. --- **Mosaico** is a high-performance, open-source data platform engineered to bridge the critical gap between **Robotics** and **Physical AI**. Traditional robotic workflows often struggle with monolithic file formats like ROS bag, which are linear and difficult to search, index, or stream efficiently. Mosaico replaces these linear files with a structured, queryable archive powered by Rust and Python, designed specifically for the high-throughput demands of multi-modal sensor data. The platform adopts a strictly **code-first approach**. We believe engineers shouldn't have to learn a proprietary SQL-like sublanguage to move data around. Instead, Mosaico provides native Python SDK that allows you to query, upload, and manipulate data using the programming languages you already know and love. ## Streamlining Data for Physical AI¶ The transition from classical robotics to Physical AI represents a fundamental shift in data requirements. **Classical Robotics** operates in an event-driven world. Data is asynchronous, sparse, and stored in monolithic sequential files (like ROS bags). A Lidar might fire at 10Hz, an IMU at 100Hz, and a camera at 30Hz, all drifting relative to one another. **Physical AI** requires synchronous, dense, and tabular data. Models expect fixed-size tensors arriving at a constant frequency (e.g., a batch of state vectors at exactly 50Hz). Mosaico’s ML module automates this tedious *data plumbing*. It ingests raw, unsynchronized data and transforms it on the fly into the aligned, flattened formats ready for model training, eliminating the need for massive intermediate CSV files. ## Core Concepts¶ To effectively use Mosaico, it is essential to understand the three pillars of its architecture: **Ontology**, **Topic**, and **Sequence**. These concepts transform raw binary streams into semantic, structured assets. ### The Ontology¶ The Ontology is the structural backbone of Mosaico. It serves as a semantic representation of all data used within your application, whether that consists of simple sensor readings or the complex results of an algorithmic process. In Mosaico, all data is viewed through the lens of **time series**. Even a single data point is treated as a singular case of a time series. The ontology defines the *shape* of this data. It can represent base types (such as integers, floats, or strings) as well as complex structures (such as specific sensor arrays or processing results). This abstraction allows Mosaico to understand what your data *is*, rather than just storing it as raw bytes. By using an ontology to inject and index data, you enable the platform to perform ad-hoc processing, such as custom compression or semantic indexing, tailored specifically to the type of data you have ingested. Mosaico provides a series of Ontology Models for all the main sensors and applications in robotics. These are specific data structures representing a single data type. For example, a GPS sensor might be modeled as follows: ``` class GPS: latitude: float longitude: float altitude: float ``` An image classification algorithm can be represented with an ontology model like: ``` class SimpleImageClassification: top_left_corner: mosaicolabs.Vector2d bottom_right: mosaicolabs.Vector2d label: str confidence: float ``` Users can easily extend the platform by defining their own Ontology Models. ### Topics and Sequences¶ Once you have an Ontology Model, you need a way to instantiate it and store actual data. This is where the **Topic** comes in. *A Topic is a concrete instance of a specific ontology model.* It functions as a container for a particular time series holding that specific data model. There is a strict one-to-one relationship here: one Topic corresponds to exactly one Ontology Model. This relationship allows you to query specific topics within the platform based on their semantic structure. However, data rarely exists in isolation. Topics are usually part of a larger context. In Mosaico, this context is provided by the **Sequence**. A Sequence is a collection of logically related Topics. To visualize this, think of a *ROS bag* or a recording of a robot's run. The recording session itself is the Sequence. Inside that Sequence, you have readings from a Lidar sensor, a GPS unit, and an accelerometer. Each of those individual sensor streams is a Topic, and each Topic follows the structure defined by its Ontology Model. Both Topics and Sequences can hold metadata to further describe their contents. ## Architecture¶ Mosaico follows a client-server architecture where users interact with the platform through the Python SDK to query, read, and write data. The SDK communicates with the Mosaico daemon a.k.a. `mosaicod`, a high-performance server written in Rust, using Apache Arrow for efficient columnar data exchange without serialization overhead. `mosaicod` daemon handles all core data operations including ingestion, retrieval, and query. It uses a database instance to accelerate metadata queries, manage system state, and implement an event queue for processing asynchronous tasks. Data files themselves are stored in an object store (such as S3, MinIO, or local filesystem) for durable, long-term persistence and scalability. This design enables Mosaico to efficiently manage complex multi-modal sensor data while providing a simple, code-first interface for developers. --- The Mosaico SDK is a Python interface designed specifically for managing **Physical AI and Robotics data**. Its purpose is to handle the complete lifecycle of information, from the moment it is captured by a sensor to the moment it is used to train a neural network or analyze a robot's behavior. The SDK is built on the philosophy that robotics data is **unique**. Whether it comes from a autonomous car, a drone, or a factory arm, this data is multi-modal, highly frequent, and deeply interconnected in space and time. The Mosaico SDK provides the infrastructure to treat this data as a *first-class citizen* rather than just a collection of generic numbers. It understands the geometric and physical semantics of complex data types such as LIDAR point clouds, IMU readings, high-resolution camera feeds, and rigid-body transformations. ## Installation¶ Install the SDK via `pip`: ``` pip install mosaicolabs ``` *Note: Requires Python 3.10 or higher.* ## Overview¶ The SDK is built on the following core principles: ### Middleware Independence¶ Mosaico is middleware-agnostic. While the SDK provides robust tools for ROS, it exists because robotics data itself is complex, regardless of the collection method. The platform serves as a standardized hub that can ingest data from: * **Existing Frameworks**: Such as ROS 1, ROS 2, `.mcap` and `.db3` files. * **Custom Collectors**: Proprietary data loggers or direct hardware drivers. * **Simulators**: Synthetic data generated in virtual environments. ### Ontology¶ The Mosaico Data Ontology acts as the abstraction layer between your specific data collection system and your storage. Instead of saving "Topic A from Robot B" you save a `Pose`, an `IMU` reading, or an `Image`. Once data is in the platform, its origin becomes secondary to its universal, semantic format. Moreover, the ontology is designed to be extensible with no effort, to meet the needs of any domain; the custom types are automatically validatable, serializable, and queryable alongside standard types. ### High-Performance¶ Leveraging Apache Arrow for zero-copy performance, the SDK moves massive data volumes from the network to analysis tools without the CPU overhead of traditional data conversion. Every piece of data is time-synchronized, allowing the SDK to *replay* a session from dozens of sensors in the exact chronological order they occurred. ## Key Operations¶ ### Data Ingestion¶ You can push data into Mosaico through two primary pathways, both designed to ensure your data is validated and standardized before storage: **Native Ontology Ingestion**. This approach allows you to stream data directly from your application, providing the highest level of control over serialization and real-time performance. **Ecosystem Adapters & Bridges**. Use specialized adapters to translate data from existing middleware and log formats into Mosaico sequences. Mosaico currently supports ROS 1 bags (`.bag`) and more recent formats like `.mcap` and `.db3`. ### Data Retrieval¶ Retrieving data goes beyond simple downloading. It is possible to stream and merge multiple topics into a single, time-ordered timeline, which is essential for sensor fusion. Connect directly to a specific sensor, such as just the front-facing camera, to save bandwidth and memory. The SDK fetches data in batches, allowing you to process datasets that are much larger than your computer's RAM. ### Querying & Discovery¶ Mosaico allows you to find data based on *what* happened, not just *when* it happened. You can search for specific sequences by metadata tags (like `robot_id` or `location`) or query the actual contents of the sensor data (e.g., *"Find all sequences where the vehicle acceleration exceeded 4 m/s^2"*). ### Machine Learning & Analytics¶ The ML Module transforms raw, sparse sensor streams into the tabular formats required by modern AI: * **Flattening**: Converts nested sensor data into organized tables (e.g. `pandas.DataFrames`). * **Temporal Resampling**: Aligns sensors running at different speeds (e.g., a 100Hz IMU and a 5Hz GPS) onto a uniform time grid with custom frame-rate for model training. --- This guide demonstrates how to ingest data from multiple topics stored within a **single file container** (such as an MCAP or a specialized binary log) into the Mosaico Data Platform. Unlike serial ingestion where files are processed one by one, interleaved ingestion handles a stream of messages from different sensors—such as IMU, GPS, and Pressure—as they appear in the source file. For this guide, we use the **MCAP library** as an example to briefly show how to parse a high-performance robotics container and stream its contents into Mosaico. You will learn how to: * **Orchestrate a single sequence** for a multi-sensor stream. * **Dynamically resolve Topic Writers** using the local SDK cache. * **Implement a Custom Translator** to map external schemas to the Mosaico Ontology. * **Isolate failures** to a single sensor stream using Defensive Ingestion patterns. ### The Multi-Topic Streaming Architecture¶ In a mixed ingestion scenario, the source file provides a serialized stream of records. Each record contains a **topic name**, a **timestamp**, and a **data payload** associated with a specific **schema**. | Topic | Schema Example (MCAP) | Mosaico Target Model | | --- | --- | --- | | `/robot/imu` | `sensor_msgs/msg/Imu` | `IMU` | | `/robot/gps` | `sensor_msgs/msg/NavSatFix` | `GPS` | | `/env/pressure` | `sensor_msgs/msg/FluidPressure` | `Pressure` | As the reader iterates through the file, Mosaico dynamically assigns each record to its corresponding "lane" (Topic Writer). Generating Sample Data (Optional) If you want to follow along and run the code in this guide, you can generate the `mission_data.mcap` sample file using the following Python script. Make sure you have the `mcap` Python package installed (`pip install mcap`). mcap\_gen.py ``` import time import json from mcap.writer import Writer def generate_mission_mcap(output_path: str): with open(output_path, "wb") as f: writer = Writer(f) writer.start() # 1. Register Schema (define data type name, must correspond to Mosaico script) imu_schema = writer.register_schema(name="sensor_msgs/msg/Imu", encoding="jsonschema", data=b"{}") gps_schema = writer.register_schema(name="sensor_msgs/msg/NavSatFix", encoding="jsonschema", data=b"{}") press_schema = writer.register_schema(name="sensor_msgs/msg/FluidPressure", encoding="jsonschema", data=b"{}") # 2. Register Channel (define Topic path) imu_chan = writer.register_channel(topic="/sensors/imu", message_encoding="json", schema_id=imu_schema) gps_chan = writer.register_channel(topic="/sensors/gps", message_encoding="json", schema_id=gps_schema) press_chan = writer.register_channel(topic="/sensors/baro", message_encoding="json", schema_id=press_schema) # 3. Simulate data generation loop (generate 10 data) start_time_ns = time.time_ns() for i in range(10): current_time_ns = start_time_ns + (i * 100_000_000) # Every 0.1 seconds sec = current_time_ns // 1_000_000_000 nanosec = current_time_ns % 1_000_000_000 # --- Simulate IMU data --- imu_payload = { "header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "linear_acceleration": {"x": 0.01 * i, "y": 0.02, "z": 9.81}, "angular_velocity": {"x": 0.0, "y": 0.0, "z": 0.01} } writer.add_message(imu_chan, log_time=current_time_ns, data=json.dumps(imu_payload).encode(), publish_time=current_time_ns) # --- Simulate GPS data --- gps_payload = { "header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "latitude": 25.04, "longitude": 121.53, "altitude": 10.5, "status": {"status": 1, "service": 1} } writer.add_message(gps_chan, log_time=current_time_ns, data=json.dumps(gps_payload).encode(), publish_time=current_time_ns) # --- Simulate Pressure data --- press_payload = { "header": {"stamp": {"sec": sec, "nanosec": nanosec}}, "fluid_pressure": 101325.0 - (i * 10) } writer.add_message(press_chan, log_time=current_time_ns, data=json.dumps(press_payload).encode(), publish_time=current_time_ns) writer.finish() print(f"Successfully generated MCAP file: {output_path}") if __name__ == "__main__": generate_mission_mcap("mission_data.mcap") ``` ### Step 1: Implementing the Custom Translator and Adapters¶ Because source files often use external data formats (like ROS, Protobuf, or JSON), you need a translation layer to map these raw payloads into strongly-typed Mosaico objects. Map incoming data schemas to Mosaico Ontology models. ``` from mosaicolabs.models import (IMU, GPS, Pressure, Vector3d, GPSStatus, Time, Serializable, Point3d) def custom_translator(schema_name: str, payload: dict): if schema_name == "sensor_msgs/msg/Imu": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=IMU( acceleration=Vector3d(**payload['linear_acceleration']), angular_velocity=Vector3d(**payload['angular_velocity']) ) ) if schema_name == "sensor_msgs/msg/NavSatFix": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=GPS( position=Point3d( x=payload['latitude'], y=payload['longitude'], z=payload['altitude'] ), status=GPSStatus( status=payload['status']['status'], service=payload['status']['service'] ) ) ) if schema_name == "sensor_msgs/msg/FluidPressure": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=Pressure(value=payload['fluid_pressure']) ) return None def determine_mosaico_type(schema_name: str) -> Optional[Type["Serializable"]]: """Determine the Mosaico type of the topic based on the schema name.""" if schema_name == "sensor_msgs/msg/Imu": return IMU elif schema_name == "sensor_msgs/msg/NavSatFix": return GPS elif schema_name == "sensor_msgs/msg/FluidPressure": return Pressure return None ``` #### Understanding the Output¶ The Mosaico `Message` object is an in-memory object wrapping the sensor data with necessary metadata (e.g. timestamp), and ensuring it is ready for serialization and network transmission. In this specific case, the data are instances of the `IMU`, `GPS` and `Pressure` models. These are built-in parts of the Mosaico default ontology, meaning the platform already understands their schema and how to optimize their storage. In Depth Explanation * **Documentation: Data Models & Ontology** * **API Reference: Sensor Models** ### Step 2: Orchestrating the Multi-Topic Interleaved Ingestion¶ To write data, we first establish a connection to the Mosaico server via the `MosaicoClient.connect()` method and create a `SequenceWriter`. A sequence writer acts as a logical container for related sensor data streams (topics). When initializing your data handling pipeline, it is highly recommended to wrap the **Mosaico Client** within a `with` statement. This context manager pattern ensures that underlying network connections and shared resource pools are correctly shut down and released when your operations conclude. Connect to the Mosaico server and create a sequence writer ``` from mcap.reader import make_reader from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy, Message with open("mission_data.mcap", "rb") as f: reader = make_reader(f) setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Report # (1)! ) as swriter: # Steps 3 and 4 (Topic Creation & Pushing) happen here... ``` 1. Mosaico supports two distinct error policies for sequences: `SessionLevelErrorPolicy.Delete` and `SessionLevelErrorPolicy.Report`. See The Writing Workflow. Context Management It is **mandatory** to use the `SequenceWriter` instance returned by `client.sequence_create()` inside its own `with` context. The following code will raise an exception: ``` swriter = client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={...}, ) # Performing operations using `swriter` will raise an exception swriter.topic_create(...) # Raises here ``` This choice ensures that the sequence writing orchestrator is closed and cataloged when the block is exited, even if your application encounters a crash or is manually interrupted. #### Sequence-Level Error Handling¶ The behavior of the orchestrator during a failure is governed by the `on_error` policy. This is a *Last-Resort* automated error policy, which dictates how the server manages a sequence if an unhandled exception bubbles up to the `SequenceWriter` context manager. By default, this is set to `SessionLevelErrorPolicy.Report`, send an error notification to the server, allowing the platform to flag the sequence as failed while retaining whatever records were successfully transmitted before the error occurred. Alternatively, you can specify `SessionLevelErrorPolicy.Delete`: in this case, the SDK will signal the server to physically remove the incomplete sequence and its associated topic directories, if any errors occurred. In Depth Explanation * **Documentation: The Writing Workflow** * **API Reference: Writing Data** ### Step 3: Topic Creation and Resource Allocation¶ Inside the sequence, we can stream interleaved data without loading the entire file into memory. We automatically create individual **Topic Writers** per each channel in the MCAP file to manage data streams. Each writer is an independent "lane" assigned its own internal buffer and background thread for serialization. The `swriter.get_topic_writer` pattern removes the need to pre-scan the file. **Topics are created only when they are first encountered**. ``` with client.sequence_create(...) as swriter: # Iterate through all interleaved messages for schema, channel, message in reader.iter_messages(): # 1. Resolve Topic Writer using the SDK cache twriter = swriter.get_topic_writer(channel.topic) # (1)! if twriter is None: ontology_type = determine_mosaico_type(schema.name) if ontology_type is None: print(f"Skipping message on {channel.topic} due to unknown ontology type") # Skip the topic if no ontology type is found continue # Dynamically register the topic writer upon discovery twriter = swriter.topic_create( # (2)! topic_name=channel.topic, metadata={}, ontology_type=ontology_type, on_error=TopicLevelErrorPolicy.Finalize # (3)! ) ``` 1. Here we are checking if the a `TopicWriter` for the current topic already exists. 2. Here we are creating the topic writer for the current topic, if it doesn't exist yet. 3. Here we are setting the error policy for the current topic. In this case, if an error occurs, the topic writer will **signal the error to the server and finalize the topic. Further writes to this topic will raise an error**. #### Topic-Level Error Management¶ In the code snippet above, we implemented a **Controlled Ingestion** by wrapping the topic-specific processing and pushing logic within a local `with twriter:` block. Because the `SequenceWriter` cannot natively distinguish which specific topic failed within your custom processing code (such as a coordinate transformation), an unhandled exception will bubble up and trigger the global sequence-level error policy. In this way, we can keep ingesting data from the other topics even if one single topic fails. ### Step 4: Pushing Data into the Pipeline¶ The final stage of the ingestion process involves iterating through your data generators and transmitting records to the Mosaico platform by calling the `TopicWriter.push()` method for each record. The `push()` method optimizes the throughput by accumulating messages into internal batches. ``` if twriter.is_active: # (1)! with twriter: # (2)! # In a real scenario, use a deserializer like mcap_ros2.decoder raw_data = deserialize_payload(message.data, schema.name) # (3)! mosaico_msg = custom_translator(schema.name, raw_data) if mosaico_msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records twriter.push(message=mosaico_msg) # (4)! if twriter.status == TopicWriterStatus.FinalizedWithError print(f"Writer for topic {twriter.name} prematurely finalized due to error: '{twriter.last_error}'") ``` 1. We check this because `on_error=TopicLevelErrorPolicy.Finalize`: the topic writer could have been closed, if an error occurred in a previous iteration. By doing this, we avoid wasting resources by processing and pushing data into a closed topic writer. 2. Protect the topic-related executions: in this way the `TopicWriter` can correctly handle the errors in this block, by implementing the topic-level error policy. 3. This is an example of a custom function that deserializes the payload of the current message. 4. This function raises if the `TopicWriter` is not active. See `TopicWriter.push`. ## The full example code¶ ``` """ Import the necessary classes from the Mosaico SDK. """ """ Import the necessary classes from the Mosaico SDK. """ from mcap.reader import make_reader from mosaicolabs import ( MosaicoClient, # The gateway to the Mosaico Platform setup_sdk_logging, # The mosaico logging config SessionLevelErrorPolicy, # The error policy for the SequenceWriter TopicLevelErrorPolicy, # The error policy for the TopicWriter Message, # The base class for all data messages IMU, # The IMU sensor data class Vector3d, # The 3D vector class, needed to populate the IMU and GPS data GPS, # The GPS sensor data class GPSStatus, # The GPS status enum, needed to populate the GPS data Pressure, # The Pressure sensor data class Time, # The Time class, needed to populate the IMU and GPS data Point3d, # The 3D point class, needed to populate the GPS data Serializable # The Serializable class ) from typing import Optional, Type """ Define the generator functions that yield `Message` objects. For each schema, we define a function that translates the payload of the current message into a `Message` object. """ def custom_translator(schema_name: str, payload: dict): if schema_name == "sensor_msgs/msg/Imu": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=IMU( acceleration=Vector3d(**payload['linear_acceleration']), angular_velocity=Vector3d(**payload['angular_velocity']) ) ) if schema_name == "sensor_msgs/msg/NavSatFix": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=GPS( position=Point3d( x=payload['latitude'], y=payload['longitude'], z=payload['altitude'] ), status=GPSStatus( status=payload['status']['status'], service=payload['status']['service'] ) ) ) if schema_name == "sensor_msgs/msg/FluidPressure": header = payload['header'] timestamp_ns = Time( seconds=header['stamp']['sec'], nanoseconds=header['stamp']['nanosec'] ).to_nanoseconds() return Message( timestamp_ns=timestamp_ns, data=Pressure(value=payload['fluid_pressure']) ) return None def determine_mosaico_type(schema_name: str) -> Optional[Type["Serializable"]]: """Determine the Mosaico type of the topic based on the schema name.""" if schema_name == "sensor_msgs/msg/Imu": return IMU elif schema_name == "sensor_msgs/msg/NavSatFix": return GPS elif schema_name == "sensor_msgs/msg/FluidPressure": return Pressure return None # Example helper function def deserialize_payload(data: bytes, schema_name: str) -> dict: """ Decode the binary data in MCAP into a Python dictionary. """ try: import json return json.loads(data.decode("utf-8")) except Exception as e: print(f"decode error: {e}") return {} """ Main ingestion orchestration """ def main(): setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with open("mission_data.mcap", "rb") as f: reader = make_reader(f) with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Delete ) as swriter: # Iterate through all interleaved messages for schema, channel, message in reader.iter_messages(): # 1. Resolve Topic Writer using the SDK cache twriter = swriter.get_topic_writer(channel.topic) if twriter is None: ontology_type = determine_mosaico_type(schema.name) if ontology_type is None: print(f"Skipping message on {channel.topic} due to unknown ontology type") # Skip the topic if no ontology type is found continue # Dynamically register the topic writer upon discovery twriter = swriter.topic_create( topic_name=channel.topic, metadata={}, ontology_type=ontology_type, on_error=TopicLevelErrorPolicy.Ignore, ) # 2. Defensive Ingestion: Isolate errors to this specific record if twriter.is_active: # (1)! with twriter: # In a real scenario, use a deserializer like mcap_ros2.decoder raw_data = deserialize_payload(message.data, schema.name) # Example helper function mosaico_msg = custom_translator(schema.name, raw_data) if mosaico_msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records twriter.push(message=mosaico_msg) # Inspect premature finalization if twriter.status == TopicWriterStatus.FinalizedWithError print(f"Writer for topic {twriter.name} prematurely finalized due to error: '{twriter.last_error}'") ``` ``` # All buffers are flushed and the sequence is committed when exiting the SequenceWriter 'with' block print("Multi-topic ingestion completed!") ``` if **name** == "**main**": main() ``` --- Sometimes a single query is insufficient because you need to correlate data across different topics. This guide demonstrates **Query Chaining**, a technique where the results of one search are used to "lock" the domain for a second, more specific search. Related experiment To fully grasp the following How-To, we recommend you to read (and reproduce) the **Querying Catalogs Example**. In Depth Explanation * **Documentation: Querying Catalogs** * **API Reference: Query Builders** * **API Reference: Query Response** ### The Objective¶ Find sequences where a high-precision GPS state was achieved, and **within those same sequences**, locate any log messages containing the string `"[ERR]"`. ### Implementation¶ ``` from mosaicolabs import MosaicoClient, QueryTopic, QueryOntologyCatalog, GPS, String with MosaicoClient.connect("localhost", 6726) as client: # Step 1: Initial Broad Search # Find all sequences with high-precision GPS (e.g. status code 2) initial_response = client.query( QueryOntologyCatalog(GPS.Q.status.status.eq(2)) ) if initial_response: # Step 2: Domain Locking # .to_query_sequence() creates a new builder pre-filtered to ONLY these sequences refined_domain = initial_response.to_query_sequence() # (1)! # Step 3: Targeted Refinement # Search for error strings only within the validated sequences final_results = client.query( refined_domain, # Restrict to this search domain QueryTopic().with_name("/localization/log_string"), QueryOntologyCatalog(String.Q.data.match("[ERR]")) ) for item in final_results: print(f"Error found in precise sequence: {item.sequence.name}") ``` 1. `to_query_sequence()` returns a `QuerySequence` builder pre-filtered to include only the **sequences** present in the response. See also `to_query_topic()` The `query` method returns `None` if an error occurs, or a `QueryResponse` object. This response acts as a list of `QueryResponseItem` objects, each providing: * **`item.sequence`**: A `QueryResponseItemSequence` containing the sequence name matching the query. * **`item.topics`**: A list of `QueryResponseItemTopic` objects that matched the query. Result Normalization The `topic.name` returns the relative topic path (e.g., `/front/camera/image`), which is immediately compatible with other SDK methods like `MosaicoClient.topic_handler()`, `SequenceHandler.get_topic_handler()` or streamers. ### Key Concepts¶ * **Convenience Methods**: High-level helpers like `QueryTopic().with_name()` provide a quick way to filter by ontology tags. * **Generic Methods**: The `with_expression()` method accepts raw **Query Expressions** generated through the `.Q` proxy. This provides full access to every supported operator (`.gt()`, `.lt()`, `.between()`, etc.) for specific fields. * **The `.Q` Proxy**: Every ontology model features a static `.Q` attribute that dynamically builds type-safe field paths for your expressions. * **Why is Chaining Necessary?** A single `client.query()` call applies a logical **AND** to all conditions to find a single **topic** that satisfies everything. Since a topic cannot be both a `GPS` stream and a `String` log simultaneously, you must use chaining to link two different topics within the same **Sequence** context. --- This guide demonstrates how to orchestrate a **Unified Query** across three distinct layers of the Mosaico Data Platform: the **Sequence** (session metadata), the **Topic** (channel configuration), and the **Ontology Catalog** (actual sensor data). By combining these builders in a single request, you can perform highly targeted searches that correlate mission-level context with specific sensor events. Related experiment To fully grasp the following How-To, we recommend you to read (and reproduce) the **Querying Catalogs Example**. In Depth Explanation * **Documentation: Querying Catalogs** * **API Reference: Query Builders** * **API Reference: Query Response** ### The Objective¶ We want to isolate data segments from a large fleet recording by searching for: 1. **Sequence**: Sessions belonging to the `"Apollo"` project. 2. **Topic**: Specifically the front-facing camera imu topic named `"/front/camera/imu"`. 3. **Ontology**: Time segments where such an IMU recorded a longitudinal acceleration (x-axis) exceeding 5.0 m/s². ### Implementation¶ When you pass multiple builders to the `MosaicoClient.query()` method, the platform joins them with a logical **AND** condition. The server will return only the sequences that match the `QuerySequence` criteria, and within those sequences, only the topics that match both the `QueryTopic` and `QueryOntologyCatalog` criteria. The multi-domain query allows you to execute a search across metadata and raw sensor data in a single, atomic request. ``` from mosaicolabs import MosaicoClient, QuerySequence, QueryTopic, QueryOntologyCatalog, IMU, Sequence # 1. Establish a connection with MosaicoClient.connect("localhost", 6726) as client: # 2. Execute a unified multi-domain query results = client.query( # Filter 1: Sequence Layer (Project Metadata) QuerySequence() .with_user_metadata("project.name", eq="Apollo"), # (1)! # Filter 2: Topic Layer (Specific Channel Name) QueryTopic() .with_name("/front/camera/imu"), # Precise name match # Filter 3: Ontology Layer (Deep Data Event Detection) QueryOntologyCatalog(include_timestamp_range=True) .with_expression(IMU.Q.acceleration.x.gt(5.0)) ) # 3. Process the Structured Response if results: for item in results: # item.sequence contains the matched Sequence metadata print(f"Sequence: {item.sequence.name}") # item.topics contains only the topics and time-segments # that satisfied ALL criteria simultaneously for topic in item.topics: # Access the high-precision timestamp for the detected event print(f" - Match in Topic: {topic.name}") # (2)! start, end = topic.timestamp_range.start, topic.timestamp_range.end # (3)! print(f" Event Window: {start} to {end} ns") ``` 1. Use dot notation to access nested fields in the `user_metadata` dictionary. 2. The `item.topics` list contains all the topics that matched the query. In this case, it will contain all the topics that are of type IMU, with a name matching that specific topic name and for which the data-related filter is met. 3. The `topic.timestamp_range` provides the first and last occurrence of the queried condition within a topic, allowing you to slice data accurately for further analysis. The `query` method returns `None` if an error occurs, or a `QueryResponse` object. This response acts as a list of `QueryResponseItem` objects, each providing: * **`item.sequence`**: A `QueryResponseItemSequence` containing the sequence name matching the query. * **`item.topics`**: A list of `QueryResponseItemTopic` objects that matched the query. Result Normalization The `topic.name` returns the relative topic path (e.g., `/front/camera/image`), which is immediately compatible with other SDK methods like `MosaicoClient.topic_handler()`, `SequenceHandler.get_topic_handler()` or streamers. ### Key Concepts¶ * **Convenience Methods**: High-level helpers like `with_name_match()` or `with_user_metadata()` provide a quick way to filter common fields. * **Generic Methods**: The `with_expression()` method accepts raw **Query Expressions** generated through the `.Q` proxy. This provides full access to every supported operator (`.gt()`, `.lt()`, `.between()`, etc.) for specific fields. * **The `.Q` Proxy**: Every ontology model features a static `.Q` attribute that dynamically builds type-safe field paths for your expressions. * **Temporal Windows**: By setting `include_timestamp_range=True` in the `QueryOntologyCatalog`, the platform identifies the exact start and end of the matching event within the stream. --- This guide demonstrates how to locate specific recording sessions based on their naming conventions and custom user metadata tags. This is the most common entry point for data discovery, allowing you to isolate sessions that match specific environmental or project conditions. Related experiment To fully grasp the following How-To, we recommend you to read (and reproduce) the **Querying Catalogs Example**. In Depth Explanation * **Documentation: Querying Catalogs** * **API Reference: Query Builders** * **API Reference: Query Response** ### The Objective¶ We want to find all sequences where: 1. The sequence name contains the string `"test_drive"`. 2. The user metadata indicates a specific project name (e.g., `"Apollo"`). 3. The environmental visibility was recorded as less than 50m. ### Implementation¶ When you call multiple `with_*` methods of the `QuerySequence` builder, the platform joins them with a logical **AND** condition. The server will return only the sequences that match the criteria alltogether. ``` from mosaicolabs import MosaicoClient, QuerySequence, Sequence # 1. Establish a connection with MosaicoClient.connect("localhost", 6726) as client: # 2. Execute the query results = client.query( QuerySequence() # Use a convenience method for fuzzy name matching .with_name_match("test_drive") # Use convenience method for filtering fixed and dynamic metadata fields .with_user_metadata("project.name", eq="Apollo") .with_user_metadata("environment.visibility", lt=50) # (1)! ) # 3. Process the Response if results: for item in results: # item.sequence contains the information for the matched sequence print(f"Matched Sequence: {item.sequence.name}") print(f" Topics: {[topic.name for topic in item.topics]}") # (2)! ``` 1. Use dot notation to access nested fields stored in the user metadata. 2. The `item.topics` list contains all the topics that matched the query. In this case, all the available topics are returned because no topic-specific filters were applied. The `query` method returns `None` if an error occurs, or a `QueryResponse` object. This response acts as a list of `QueryResponseItem` objects, each providing: * **`item.sequence`**: A `QueryResponseItemSequence` containing the sequence name matching the query. * **`item.topics`**: A list of `QueryResponseItemTopic` objects that matched the query. Result Normalization The `topic.name` returns the relative topic path (e.g., `/front/camera/image`), which is immediately compatible with other SDK methods like `MosaicoClient.topic_handler()`, `SequenceHandler.get_topic_handler()` or streamers. ### Key Concepts¶ * **Convenience Methods**: High-level helpers like `with_name_match()` provide a quick way to filter common fields. * **Generic Methods**: The `with_expression()` method accepts raw **Query Expressions** generated through the `.Q` proxy. This provides full access to every supported operator (`.gt()`, `.lt()`, `.between()`, etc.) for specific fields. * **Dynamic Metadata Access**: Using the nested notation in `with_user_metadata("key.subkey", ...)` allows you to query any custom tag you attached during the ingestion phase. --- This guide demonstrates how to locate specific recording sessions based on their naming conventions and custom user metadata tags. This is the most common entry point for data discovery, allowing you to isolate sessions that match specific environmental or project conditions. Related experiment To fully grasp the following How-To, we recommend you to read (and reproduce) the **Querying Catalogs Example**. In Depth Explanation * **Documentation: Querying Catalogs** * **API Reference: Query Builders** * **API Reference: Query Response** ### The Objective¶ We want to find all topics where: 1. The topic refers to an IMU sensor. 2. The user metadata indicates a specific sensor interface (e.g., `"serial"`). ### Implementation¶ When you call multiple `with_*` methods of the `QueryTopic` builder, the platform joins them with a logical **AND** condition. The server will return only the sequences that match the criteria alltogether. ``` from mosaicolabs import MosaicoClient, QueryTopic, Topic, IMU # 1. Establish a connection with MosaicoClient.connect("localhost", 6726) as client: # 2. Execute the query results = client.query( QueryTopic() # Use a convenience method for fuzzy name matching .with_ontology_tag(IMU.ontology_tag()) # Use a convenience method for filtering fixed and dynamic metadata fields .with_user_metadata("interface", eq="serial"))) # 3. Process the Response if results: for item in results: # item.sequence contains the metadata for the matched sequence print(f"Matched Sequence: {item.sequence.name}") print(f" Topics: {[topic.name for topic in item.topics]}") # (1)! ``` 1. The `item.topics` list contains all the topics that matched the query. In this case, it will contain all the topics that are of type IMU and have the user metadata field `interface` set to `"serial"`. The `query` method returns `None` if an error occurs, or a `QueryResponse` object. This response acts as a list of `QueryResponseItem` objects, each providing: * **`item.sequence`**: A `QueryResponseItemSequence` containing the sequence metadata. * **`item.topics`**: A list of `QueryResponseItemTopic` objects that matched the query. Result Normalization The `topic.name` returns the relative topic path (e.g., `/front/camera/image`), which is immediately compatible with other SDK methods like `MosaicoClient.topic_handler()`, `SequenceHandler.get_topic_handler()` or streamers. ### Key Concepts¶ * **Convenience Methods**: High-level helpers like `with_ontology_tag()` provide a quick way to filter by ontology tags. * **Generic Methods**: The `with_expression()` method accepts raw **Query Expressions** generated through the `.Q` proxy. This provides full access to every supported operator (`.gt()`, `.lt()`, `.between()`, etc.) for specific fields. * **Dynamic Metadata Access**: Using the nested notation in `with_user_metadata("key.subkey", ...)` allows you to query any custom tag you attached during the ingestion phase. --- By following this guide, you will learn how to: 1. **Initialize an Authenticated Session** using the `connect` method. 2. **Handle TLS Certificates** for encrypted communication. 3. **Use Environment Discovery** for production-grade credential management. ## Option 1: Connecting via the `connect()` Factory¶ The `MosaicoClient.connect()` method is the standard way to provide security credentials. Note that when `tls_cert_path` is provided, the SDK automatically switches to an encrypted gRPC channel. ``` from mosaicolabs import MosaicoClient # 1. Configuration constants MOSAICO_HOST = "mosaico.production.yourdomain.com" MOSAICO_PORT = 6726 # The certificate used by the server (CA or self-signed) CERT_PATH = "/etc/mosaico/certs/server_ca.pem" # Your secret API key MY_API_KEY = "msco_vy9lqa7u4lr7w3vimhz5t8bvvc0xbmk2_9c94a86" # 2. Establish the secure connection with MosaicoClient.connect( host=MOSAICO_HOST, port=MOSAICO_PORT, api_key=MY_API_KEY, # Injects Auth Middleware tls_cert_path=CERT_PATH # Enables TLS encryption ) as client: # All operations inside this block are now encrypted and authenticated print(f"Connected to version: {client.version()}") sequences = client.list_sequences() ``` ## Option 2: Environment Discovery¶ In actual robotics fleets or CI/CD pipelines, hardcoding keys is a major security risk. The `MosaicoClient` includes a `from_env()` method that automatically looks for specific environment variables to configure the client. ### 1. Set your environment variables¶ In your terminal or Docker Compose file, set the following: ``` export MOSAICO_API_KEY="msco_your_secret_key_here" export MOSAICO_TLS_CERT_FILE="/path/to/ca.pem" ``` ### 2. Implement the "Zero-Config" Connection¶ The SDK will now automatically pick up these values: ``` import os from mosaicolabs import MosaicoClient # The host and port are still required, but credentials are discovered with MosaicoClient.from_env( host="mosaico.internal.cluster", port=6726 ) as client: # The client is already authenticated via MOSAICO_API_KEY # and encrypted via MOSAICO_TLS_CERT_FILE pass ``` --- This guide demonstrates how to ingest data into the Mosaico Data Platform from custom files. Here the example of a CSV file is provided, but the logic is compatible with any file format and I/O library. You will learn how to use the Mosaico SDK for: * **Opening a connection** to the Mosaico server. * **Creating a sequence**. * **Creating a topic**. * **Pushing data into a topic**. ### Step 1: Chunked Loading for High-Volume Data¶ In this example, we assume our CSV file contains the following columns: imu.csv ``` timestamp, acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z 1110022, 0.0032, 0.001, -0.002, 0.01, 0.005, -0.003 1111022, 0.0041, 0.002, -0.001, 0.012, 0.006, -0.004 1112022, 0.0028, 0.0005, -0.003, 0.009, 0.004, -0.002 ``` The implementation below uses `pandas` to stream the data, but the logic is compatible with any streaming I/O library. When dealing with massive datasets, we adopt a **chunked loading approach** for each sensor type. Define the generator functions that yield Message objects. ``` import pandas as pd from mosaicolabs import ( MosaicoClient, # The gateway to the Mosaico Platform setup_sdk_logging, # The mosaico logging config SessionLevelErrorPolicy, # The error policy for the SequenceWriter Message, # The base class for all data messages IMU, # The IMU sensor data class Vector3d, # The 3D vector class, needed to populate the IMU data ) def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): # (1)! for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d( x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z), ), angular_velocity=Vector3d( x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z), ), ), ) except Exception: # Yield None only for parsing/type-related errors yield None ``` 1. Use pandas TextFileReader to stream the file in chunks The Mosaico `Message` object is an in-memory object wrapping the sensor data with necessary metadata (e.g. timestamp), and ensuring it is ready for serialization and network transmission. In this specific case, the data is an instance of the `IMU` model. This is a built-in part of the Mosaico default ontology, meaning the platform already understands its schema and how to optimize its storage. In Depth Explanation * **Documentation: Data Models & Ontology** * **API Reference: Sensor Models** ### Step 2: Orchestrating the Sequence Upload¶ To write data, we first establish a connection to the Mosaico server via the `MosaicoClient.connect()` method and create a `SequenceWriter`. A sequence writer acts as a logical container for related data streams (topics). When initializing your data handling pipeline, it is highly recommended to wrap the `MosaicoClient` within a `with` statement. This context manager pattern ensures that underlying network connections and shared resource pools are correctly shut down and released when your operations conclude. Connect to the Mosaico server and create a sequence writer ``` setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with MosaicoClient.connect("localhost", 6726) as client: # Initialize the Sequence Orchestrator with client.sequence_create( sequence_name="csv_ingestion_test", metadata={"source": "manual_upload", "format": "csv"} on_error = SessionLevelErrorPolicy.Delete # (1)! ) as swriter: # Step 3 and 4 happen inside this block... ``` 1. Mosaico supports two distinct error policies for sequences: `SessionLevelErrorPolicy.Delete` and `SessionLevelErrorPolicy.Report`. Context Management It is **mandatory** to use the `SequenceWriter` instance returned by `client.sequence_create()` inside its own `with` context. The following code will raise an exception: ``` swriter = client.sequence_create( sequence_name="csv_ingestion_test", metadata={...}, ) # Performing operations using `swriter` will raise an exception swriter.topic_create(...) # Raises here ``` This choice ensures that the sequence writing orchestrator is closed and cataloged when the block is exited, even if your application encounters a crash or is manually interrupted. #### Sequence-Level Error Handling¶ The behavior of the orchestrator during a failure is governed by the `on_error` policy. This is a *Last-Resort* automated error policy, which dictates how the server manages a sequence if an unhandled exception bubbles up to the `SequenceWriter` context manager. By default, this is set to `SessionLevelErrorPolicy.Report`, send an error notification to the server, allowing the platform to flag the sequence as failed while retaining whatever records were successfully transmitted before the error occurred. Alternatively, you can specify `SessionLevelErrorPolicy.Delete`: in this case, the SDK will signal the server to physically remove the incomplete sequence and its associated topic directories, if any errors occurred. In Depth Explanation * **Documentation: The Writing Workflow** * **API Reference: Writing Data** ### Step 3: Topic Creation¶ Inside the sequence, we create a Topic Writer, which is assigned to the IMU topic. ``` with client.sequence_create(...) imu_twriter = swriter.topic_create( # (1)! topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, ) ``` 1. Here we are creating a dedicated writer for the IMU topic ### Step 4: Pushing Data into the Pipeline¶ The final stage of the ingestion process involves iterating through your data generators and transmitting records to the Mosaico platform by calling the `TopicWriter.push()` method for each record. The `push()` method optimizes the throughput by accumulating messages into internal batches. ``` with client.sequence_create(...) imu_twriter = swriter.topic_create(...) for msg in stream_imu_from_csv("imu_data.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: imu_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error at time: {msg.timestamp_ns}. Inner err: {e}") ``` #### Topic-Level Error Management¶ In the code snippet above, we implemented a **Controlled Ingestion** by wrapping the topic-specific processing and pushing logic within a local `try-except` block. Because the `SequenceWriter` cannot natively distinguish which specific topic failed within your custom processing code (such as a coordinate transformation or a malformed CSV row), an unhandled exception will bubble up and trigger the global sequence-level error policy. To avoid this, you should catch errors locally for each topic. Upcoming versions of the SDK will introduce native **Topic-Level Error Policies**. This feature will allow you to define the error behavior directly when creating the topic, removing the need for boilerplate `try-except` blocks around every sensor stream. ## The full example code¶ ``` """ Import the necessary classes from the Mosaico SDK. """ import pandas as pd from mosaicolabs import ( MosaicoClient, # The gateway to the Mosaico Platform setup_sdk_logging, # The mosaico logging config SessionLevelErrorPolicy, # The error policy for the SequenceWriter Message, # The base class for all data messages IMU, # The IMU sensor data class Vector3d, # The 3D vector class, needed to populate the IMU data ) """ Define the generator functions that yield `Message` objects. """ def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): """ Efficiently reads a large CSV in chunks to prevent memory exhaustion. """ # Use pandas TextFileReader to stream the file in chunks for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d( x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z), ), angular_velocity=Vector3d( x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z), ), ), ) except Exception: # Yield None only for parsing/type-related errors yield None """ Main ingestion orchestration """ def main(): setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with MosaicoClient.connect("localhost", 6726) as client: # Initialize the Sequence Orchestrator with client.sequence_create( sequence_name="csv_ingestion_test", metadata={"source": "manual_upload", "format": "csv"}, on_error = SessionLevelErrorPolicy.Delete # Default ) as swriter: # Create a dedicated writer for the IMU topic imu_twriter = swriter.topic_create( topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, ) # --- Push IMU Data --- for msg in stream_imu_from_csv("imu.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: imu_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing IMU at time: {msg.timestamp_ns}. Inner err: {e}") # All buffers are flushed and the sequence is committed when exiting the SequenceWriter 'with' block print("Successfully injected data from CSV into Mosaico!") # Here the `MosaicoClient` context and all connections are closed if __name__ == "__main__": main() ``` --- This guide demonstrates how to ingest data from multiple custom files into the Mosaico Data Platform. While the logic below uses CSV files as the primary example, the SDK's modular design is compatible with any file format (JSON, Parquet, binary) and any I/O library. You will learn how to use the Mosaico SDK to: * **Open a connection** to the Mosaico server. * **Creating a sequence**. * **Creating topics**. * **Pushing data into topics**, via **Controlled Ingestion Patterns** to prevent a single file failure from aborting the entire upload. ### Step 1: Chunked Loading for Heterogeneous Data¶ The following implementation defines three distinct generators to stream IMU, GPS, and Pressure data. In this example, we assume our CSV files contain the following columns: imu.csv ``` timestamp, acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z 1110022, 0.0032, 0.001, -0.002, 0.01, 0.005, -0.003 ``` gps.csv ``` timestamp, latitude, longitude, altitude, status, service 1110022, 45.123456, -93.123456, 250.0, 1, 1 ``` pressure.csv ``` timestamp, pressure 1110022, 101325.0 ``` When dealing with massive datasets spread across multiple files, we adopt a **chunked loading approach** for each sensor type. Define the generator functions that yield Message objects ``` import pandas as pd from mosaicolabs import ( MosaicoClient, # The gateway to the Mosaico Platform setup_sdk_logging, # The mosaico logging config SessionLevelErrorPolicy, # The error policy for the SequenceWriter Message, # The base class for all data messages IMU, # The IMU sensor data class Vector3d, # The 3D vector class, needed to populate the IMU and GPS data GPS, # The GPS sensor data class GPSStatus, # The GPS status enum, needed to populate the GPS data Pressure, # The Pressure sensor data class Point3d # The 3D point class, needed to populate the GPS data ) # Define the generator functions that yield `Message` objects. # For each file, open the reading process and yield the messages one by one. def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d( x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z), ), angular_velocity=Vector3d( x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z), ) ) ) def stream_gps_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): yield Message( timestamp_ns=int(row.timestamp), data=GPS( position=Point3d( x=float(row.latitude), y=float(row.longitude), z=float(row.altitude), ), status=GPSStatus( status=int(row.status), service=int(row.service), ) ) ) def stream_pressure_from_csv(file_path: str, chunk_size: int = 1000): for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=True): for row in chunk.itertuples(index=False): yield Message( timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure) ) ``` #### Understanding the Output¶ The Mosaico `Message` object is an in-memory object wrapping the sensor data with necessary metadata (e.g. timestamp), and ensuring it is ready for serialization and network transmission. In this specific case, the data are instances of the `IMU`, `GPS` and `Pressure` models. These are built-in parts of the Mosaico default ontology, meaning the platform already understands their schema and how to optimize their storage. In Depth Explanation * **Documentation: Data Models & Ontology** * **API Reference: Sensor Models** ### Step 2: Orchestrating the Multi-Topic Sequence¶ To write data, we first establish a connection to the Mosaico server via the `MosaicoClient.connect()` method and create a `SequenceWriter`. A sequence writer acts as a logical container for related sensor data streams (topics). When initializing your data handling pipeline, it is highly recommended to wrap the **Mosaico Client** within a `with` statement. This context manager pattern ensures that underlying network connections and shared resource pools are correctly shut down and released when your operations conclude. Connect to the Mosaico server and create a sequence writer ``` setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with MosaicoClient.connect("localhost", 6726) as client: with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Delete # (1)! ) as swriter: # Steps 3 and 4 (Topic Creation & serial Pushing) happen here... ``` 1. Mosaico supports two distinct error policies for sequences: `SessionLevelErrorPolicy.Delete` and `SessionLevelErrorPolicy.Report`. Context Management It is **mandatory** to use the `SequenceWriter` instance returned by `client.sequence_create()` inside its own `with` context. The following code will raise an exception: ``` swriter = client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={...}, ) # Performing operations using `swriter` will raise an exception swriter.topic_create(...) # Raises here ``` This choice ensures that the sequence writing orchestrator is closed and cataloged when the block is exited, even if your application encounters a crash or is manually interrupted. #### Sequence-Level Error Handling¶ The behavior of the orchestrator during a failure is governed by the `on_error` policy. This is a *Last-Resort* automated error policy, which dictates how the server manages a sequence if an unhandled exception bubbles up to the `SequenceWriter` context manager. By default, this is set to `SessionLevelErrorPolicy.Report`, send an error notification to the server, allowing the platform to flag the sequence as failed while retaining whatever records were successfully transmitted before the error occurred. Alternatively, you can specify `SessionLevelErrorPolicy.Delete`: in this case, the SDK will signal the server to physically remove the incomplete sequence and its associated topic directories, if any errors occurred. In Depth Explanation * **Documentation: The Writing Workflow** * **API Reference: Writing Data** ### Step 3: Topic Creation and Resource Allocation¶ Inside the sequence, we create individual **Topic Writers** to manage data streams. Each writer is an independent "lane" assigned its own internal buffer and background thread for serialization. ``` with client.sequence_create(...) as swriter: # Create dedicated Topic Writers for each sensor stream imu_twriter = swriter.topic_create( # (1)! topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, ) gps_twriter = swriter.topic_create( # (2)! topic_name="sensors/gps", metadata={"sensor_id": "gps_01"}, ontology_type=GPS, ) pressure_twriter = swriter.topic_create( # (3)! topic_name="sensors/pressure", metadata={"sensor_id": "pressure_01"}, ontology_type=Pressure, ) ``` 1. Here we are creating a dedicated writer for the IMU topic 2. Here we are creating a dedicated writer for the GPS topic 3. Here we are creating a dedicated writer for the Pressure topic ### Step 4: Pushing Data into the Pipeline¶ The final stage of the ingestion process involves iterating through your data generators and transmitting records to the Mosaico platform by calling the `TopicWriter.push()` method for each record. The `push()` method optimizes the throughput by accumulating messages into internal batches. ``` # 1. Push IMU Data for msg in stream_imu_from_csv("imu.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: imu_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing IMU at time: {msg.timestamp_ns}. Inner err: {e}") # 2. Push GPS Data with Custom Processing for msg in stream_gps_from_csv("gps.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: # This custom processing might fail process_gps_message(msg) gps_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing GPS at time: {msg.timestamp_ns}. Inner err: {e}") # 3. Push Pressure Data for msg in stream_pressure_from_csv("pressure.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: pressure_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing pressure at time: {msg.timestamp_ns}. Inner err: {e}") ``` #### Topic-Level Error Management¶ In the code snippet above, we implemented a **Controlled Ingestion** by wrapping the topic-specific processing and pushing logic within a local `try-except` block. Because the `SequenceWriter` cannot natively distinguish which specific topic failed within your custom processing code (such as a coordinate transformation or a malformed CSV row), an unhandled exception will bubble up and trigger the global sequence-level error policy. To avoid this, you should catch errors locally for each topic. Upcoming versions of the SDK will introduce native **Topic-Level Error Policies**. This feature will allow you to define the error behavior directly when creating the topic, removing the need for boilerplate `try-except` blocks around every sensor stream. ## The full example code¶ ``` """ Import the necessary classes from the Mosaico SDK. """ import pandas as pd from mosaicolabs import ( MosaicoClient, # The gateway to the Mosaico Platform setup_sdk_logging, # The mosaico logging config SessionLevelErrorPolicy, # The error policy for the SequenceWriter Message, # The base class for all data messages IMU, # The IMU sensor data class Vector3d, # The 3D vector class, needed to populate the IMU data GPS, # The GPS sensor data class GPSStatus, # The GPS status enum, needed to populate the GPS data Pressure, # The Pressure sensor data class Point3d # The 3D point class, needed to populate the GPS data ) """ Define the generator functions that yield `Message` objects. For each file, open the reading process and yield the messages one by one. """ def stream_imu_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): """Efficiently streams IMU data.""" for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=IMU( acceleration=Vector3d( x=float(row.acc_x), y=float(row.acc_y), z=float(row.acc_z), ), angular_velocity=Vector3d( x=float(row.gyro_x), y=float(row.gyro_y), z=float(row.gyro_z), ) ) ) except Exception: # Yield None only for parsing/type-related errors yield None def stream_gps_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): """Efficiently streams GPS data.""" for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=GPS( position=Point3d( x=float(row.latitude), y=float(row.longitude), z=float(row.altitude), ), status=GPSStatus( status=int(row.status), service=int(row.service), ) ) ) except Exception: # Yield None only for parsing/type-related errors yield None def stream_pressure_from_csv(file_path: str, chunk_size: int = 1000, skipinitialspace: bool = True): """Efficiently streams Barometric Pressure data.""" for chunk in pd.read_csv(file_path, chunksize=chunk_size, skipinitialspace=skipinitialspace): for row in chunk.itertuples(index=False): try: yield Message( timestamp_ns=int(row.timestamp), data=Pressure(value=row.pressure) ) except Exception: # Yield None only for parsing/type-related errors yield None """ Main ingestion orchestration """ def main(): setup_sdk_logging(level="INFO", pretty=True) # Configure the mosaico logging with MosaicoClient.connect("localhost", 6726) as client: # Initialize the Orchestrator for the entire mission with client.sequence_create( sequence_name="multi_sensor_ingestion", metadata={"mission": "alpha_test", "environment": "laboratory"}, on_error=SessionLevelErrorPolicy.Delete # Deletes the whole sequence if a fatal crash occurs ) as swriter: # Create dedicated Topic Writers for each sensor stream imu_twriter = swriter.topic_create( topic_name="sensors/imu", metadata={"sensor_id": "accel_01"}, ontology_type=IMU, ) gps_twriter = swriter.topic_create( topic_name="sensors/gps", metadata={"sensor_id": "gps_01"}, ontology_type=GPS, ) pressure_twriter = swriter.topic_create( topic_name="sensors/pressure", metadata={"sensor_id": "pressure_01"}, ontology_type=Pressure, ) # --- 1. Push IMU Data --- for msg in stream_imu_from_csv("imu.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: imu_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing IMU at time: {msg.timestamp_ns}. Inner err: {e}") # --- 2. Push GPS Data with Custom Processing --- for msg in stream_gps_from_csv("gps.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: # This custom processing might fail process_gps_message(msg) gps_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing GPS at time: {msg.timestamp_ns}. Inner err: {e}") # --- 3. Push Pressure Data --- for msg in stream_pressure_from_csv("pressure.csv"): if msg is None: # Log and skip, or raise if incomplete data is disallowed print("Skipping row due to parsing error") continue # Ignore malformed records try: pressure_twriter.push(message=msg) except Exception as e: # Log and skip, or raise if incomplete data is disallowed print(f"Error processing pressure at time: {msg.timestamp_ns}. Inner err: {e}") # All buffers are flushed and the sequence is committed when exiting the SequenceWriter 'with' block print("Multi-topic ingestion completed!") if __name__ == "__main__": main() ``` --- This guide demonstrates how to interact with the Mosaico Data Platform to retrieve the data stream that has been previously ingested. You will learn how to use the Mosaico SDK to: * **Obtain a `SequenceDataStreamer`** to consume recordings from a sequence. * **Obtain a `TopicDataStreamer`** to consume recordings from a topic. Prerequisites To fully grasp the following How-To, we recommend you to read (and reproduce) the **Reading a Sequence and its Topics Example**. In Depth Explanation * **Documentation: The Reading Workflow** * **API Reference: Data Retrieval** ### Unified Multi-Sensor Replay¶ A `SequenceDataStreamer` is designed for sensor fusion and full-system replay. It allows you to consume synchronized multiple data streams—such as high-rate IMU data and low-rate GPS fixes—as if they were a single, coherent timeline. ``` from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: seq_handler = client.sequence_handler("mission_alpha") if seq_handler: # Initialize a Unified Stream for synchronized multi-sensor analysis streamer = seq_handler.get_data_streamer( # Filter specific topics topics=["/gps", "/imu"], # Define the optional temporal window: Only data in this range will be streamed start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000, ) print(f"Streaming starts at: {streamer.next_timestamp()}") # Consume the stream. The loop yields messages from both topics in perfect chronological order for topic, msg in streamer: print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}") # Finalize the reading channel to release server resources seq_handler.close() ``` ### Targeted Access¶ A `TopicDataStreamer` provides a dedicated channel for interacting with a single data resource. ``` from mosaicolabs import MosaicoClient, IMU with MosaicoClient.connect("localhost", 6726) as client: # Access a specific topic handler directly via the client top_handler = client.topic_handler("mission_alpha", "/front/imu") if top_handler: # Start a Targeted Stream for isolated, low-overhead replay imu_stream = top_handler.get_data_streamer( # Define the optional temporal window: Only data in this range will be streamed start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000, ) # Query the next timestamp, without consuming the message print(f"First sample at: {imu_stream.next_timestamp()}") # Direct loop for maximum efficiency for imu_msg in imu_stream: # Access the strongly-typed IMU data directly process_sample(imu_msg.get_data(IMU)) # Finalize the topic channel top_handler.close() ``` ### Streamer Comparison¶ | Feature | `SequenceDataStreamer` | `TopicDataStreamer` | | --- | --- | --- | | **Primary Use Case** | Multi-sensor fusion & system-wide replay | Isolated sensor analysis & ML training | | **Logic Overhead** | K-Way Merge Sorting | Direct Stream | | **Output Type** | Tuple of `(topic_name, message)` | Single `message` object | | **Temporal Slicing** | Supported | Supported | --- This guide demonstrates how to programmatically explore the Mosaico Data Platform to discover ingested sequences and inspect their internal structures. By following this guide, you will learn how to: 1. **List all sequences** available on a remote server. 2. **Access high-level metadata** (size, creation time, duration) for a specific recording session. 3. **Drill down into individual topics** to identify sensor types and sampling spans. Prerequisites This tutorial assumes you have already ingested data into your Mosaico instance, using the example described in the ROS Injection guide. Experiment Yourself This guide is **fully executable**. 1. **Start the Mosaico Infrastructure** 2. **Run the example** ``` mosaicolabs.examples data_inspection ``` Full Code The full code of the example is available **here**. In Depth Explanation * **Documentation: The Reading Workflow** * **API Reference: Data Retrieval** ## Step 1: Connecting to the Catalog¶ The first step is establishing a control connection. We use the `MosaicoClient` within a context manager to ensure that network resources and internal connection pools are gracefully released when the script finishes. ``` from mosaicolabs import MosaicoClient # Connect to the gateway of the Mosaico Data Platform with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: # Retrieve a simple list of all unique sequence identifiers seq_list = client.list_sequences() print(f"Discovered {len(seq_list)} sequences on the server.") ``` ## Step 2: Inspecting Sequence Metadata¶ A **Sequence** represents a holistic recording session. To inspect its metadata without downloading bulk data, we request a `SequenceHandler`. This object acts as a "lazy" proxy to the server-side resource. ``` # Assuming we are iterating through the seq_list from Step 1 for sequence_name in seq_list: shandler = client.sequence_handler(sequence_name) if shandler: # Access physical and logical diagnostics size_mb = shandler.total_size_bytes / (1024 * 1024) print(f"Sequence: {shandler.name}") print(f"• Remote Size: {size_mb:.2f} MB") print(f"• Created At: {shandler.created_datetime}") # Determine the global temporal bounds of the entire session start, end = shandler.timestamp_ns_min, shandler.timestamp_ns_max print(f"• Time Span: {start} to {end} ns") ``` ## Step 3: Inspecting Individual Topics¶ Inside a sequence, data is partitioned into **Topics**, each corresponding to a specific sensor stream or data channel. We can use the `SequenceHandler` to spawn a `TopicHandler` for granular inspection. ``` # Iterate through all data channels in this sequence for topic_name in shandler.topics: # Obtain a handler for the specific sensor stream thandler = shandler.get_topic_handler(topic_name) # Identify the semantic type of the data (e.g., 'imu', 'image') ontology = thandler.ontology_tag # Calculate duration for this specific sensor duration_sec = 0 if thandler.timestamp_ns_min and thandler.timestamp_ns_max: duration_sec = (thandler.timestamp_ns_max - thandler.timestamp_ns_min) / 1e9 print(f" - [{ontology}] {topic_name}: {duration_sec:.2f}s of data") ``` ## Comparisons¶ ### Sequence vs. Topic Handlers¶ | Feature | Sequence Handler | Topic Handler | | --- | --- | --- | | **Scope** | Entire Recording Session | Single Sensor | | **Metadata** | Mission-wide (e.g., driver, weather) | Sensor-specific (e.g., model, serial) | | **Time Bounds** | Global min/max of all topics | Min/max for that specific stream | | **Topics** | List of all available streams | N/A | ### Catalog Layer vs. Data Layer (Handlers vs Streamers)¶ | Feature | Handlers (Catalog Layer) | Streamers (Data Layer) | | --- | --- | --- | | **Primary Use** | Metadata inspection & discovery | High-volume data retrieval | | **Object Type** | `SequenceHandler` / `TopicHandler` | `SequenceDataStreamer` / `TopicDataStreamer` | | **Data Scope** | Size, Timestamps, Ontology Tags | Raw sensor messages | --- The Mosaico SDK is designed with a **code-first philosophy**. To help you move from architectural concepts to production implementation, we provide a suite of pedagogical examples. These examples aren't just snippets; they are **fully executable blueprints** that demonstrate high-performance data handling, zero-copy serialization with Apache Arrow, and deep discovery via our type-safe query engine. We provide a specialized CLI utility to run these examples without manual configuration. This runner automatically injects connection parameters into the example logic, allowing you to point the samples at your specific Mosaico instance. ### Basic Usage¶ From your terminal, use the `mosaicolabs.examples` command followed by the name of the example: ``` # Run the ROS Ingestion example mosaicolabs.examples ros_injection ``` ### Configuration Options¶ The CLI supports several global flags to control the execution environment: | Option | Default | Description | | --- | --- | --- | | `--host` | `localhost` | The hostname of your Mosaico Server. | | `--port` | `6726` | The Flight port of your Mosaico Server. | | `--log-level` | `INFO` | Set verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`). | **Example with custom server:** ``` mosaicolabs.examples data_inspection --host 192.168.1.50 --port 6276 --log-level DEBUG ``` ### Available Blueprints¶ We recommend exploring the examples in the following order to understand the platform's flow: #### ROS Ingestion¶ * **CLI Name**: `ros_injection` * **Purpose**: The "Hello World" of Mosaico. This example automates the download of the NVIDIA R2B Dataset 2024 and performs a high-performance injection from `.mcap` files. * **Key Concept**: Demonstrates **Adaptation Logic**—translating raw ROS dictionaries into strongly-typed Mosaico Ontology models. #### Data Discovery and Inspection¶ * **CLI Name**: `data_inspection` * **Purpose**: Learn how to browse the server-side catalog. * **Key Concept**: Explains the **Lazy Handler** pattern. You will see how to retrieve sequence metadata (size, timestamps) and topic details without downloading bulk sensor data. #### Deep Querying¶ * **CLI Name**: `query_catalogs` * **Purpose**: Move beyond simple time-based retrieval. * **Key Concept**: Showcases the **`.Q` Query Proxy**. You will learn to search for specific physical events (e.g., "Find IMU lateral acceleration >= 1.0 m/s^2") across your entire dataset. ### Infrastructure Prerequisite¶ Before running any example, ensure your Mosaico infrastructure is active. The easiest way to start is using the provided **Quick Start environment**. Please, refer to the **daemon Setup** for setting up the environment. ### Ready to start?¶ We recommend beginning with the **ROS Ingestion Guide** to populate your local server with high-fidelity robotics data. The other examples will run on the data ingested via the ROS Ingestion example. --- This guide demostrates how to extend the Mosaico Data Platform with custom data models. While Mosaico provides a rich default ontology for robotics (IMU, GPS, Images, etc.), specialized hardware often requires proprietary data structures. By following this guide, you will be able to: * **Define** strongly-typed data models using Python and Apache Arrow. * **Register** these models so they are recognized by the Mosaico Ecosystem. * **Integrate** them into the ingestion and retrieval pipelines. Full Code The full code of the example is available **here**. In Depth Explanation * **Documentation: Data Models & Ontology** * **API Reference: Base Models and Mixins** ### Step 1: Define the Custom Data Model¶ In Mosaico, data models are defined by inheriting from the **`Serializable`** base class. This ensures that your model can be automatically translated into the platform's high-performance storage format. For this example, we will create a model for **`EncoderTicks`**, found in the NVIDIA R2B Dataset 2024. ``` import pyarrow as pa from mosaicolabs import Serializable class EncoderTicks( Serializable, # Automatically registers the model via `Serializable.__init_subclass__` ): """ Custom model for hardware-level encoder tick readings. """ # --- Wire Schema Definition (Apache Arrow) --- # This defines the high-performance binary storage format on the server. __msco_pyarrow_struct__ = pa.struct([ pa.field("left_ticks", pa.uint32(), nullable=False), pa.field("right_ticks", pa.uint32(), nullable=False), pa.field("encoder_timestamp", pa.uint64(), nullable=False), ]) # --- Data Fields --- # Names and types must strictly match the Apache Arrow schema above. left_ticks: int right_ticks: int encoder_timestamp: int ``` ### Step 2: Ensure "Discovery" via Module Import¶ It is a common pitfall to define a class and expect the platform to "see" it immediately. Mosaico utilizes the `Serializable.__init_subclass__` hook to perform **automatic registration** the moment the class is loaded into memory by the Python interpreter. For your custom type to be available in your application (especially during ingestion or when using the `ROSBridge`), you **must** ensure the module containing the class is imported. #### Best Practice: The Registry Pattern¶ Create a dedicated `models.py` or `ontology/` package for your project and import it at your application's entry point. ``` # app/main.py import my_project.ontology.encoders as encoders # <-- This triggers the registration from mosaicolabs import MosaicoClient def run_ingestion(): with MosaicoClient.connect(...) as client: # Now 'EncoderTicks' is a valid ontology_type for topic creation with client.sequence_create(name="test") as sw: tw = sw.topic_create("ticks", ontology_type=encoders.EncoderTicks) # ... ``` ### Step 3: Verifying Registration¶ If you are unsure whether your model has been correctly "seen" by the ecosystem, you can check the internal registry of the `Serializable` class. ``` from mosaicolabs import Serializable import my_project.ontology.encoders as encoders # <-- This triggers the registration if encoders.EncoderTicks.is_registered(): print("Registration successful!") ``` --- By following this guide, you will learn how to: 1. **Perform Fuzzy Searches**: Find topics based on partial name matches. 2. **Filter by Sensor Type**: Isolate all topics belonging to a specific ontology (e.g., all IMUs). 3. **Execute Multi-Domain Queries**: Correlate sensor names with physical events (e.g., "Find the front camera IMU data where acceleration on the y-axis exceeded 1.0 m/s^2"). Prerequisites This tutorial assumes you have already ingested data into your Mosaico instance, using the example described in the ROS Injection guide. Experiment Yourself This guide is **fully executable**. 1. **Start the Mosaico Infrastructure** 2. **Run the example** ``` mosaicolabs.examples query_catalogs ``` Full Code The full code of the example is available **here**. In Depth Explanation * **Documentation: Querying Catalogs** * **API Reference: Query Builders** * **API Reference: Query Response** ## The Query Philosophy: Chaining & Builders¶ Mosaico uses specialized **Query Builders** that provide a "fluent" interface. When you pass multiple builders to the `client.query()` method, the platform joins them with a logical **AND** condition—returning only results that satisfy every criteria simultaneously. ## Step 1: Finding Topics by Name Match¶ The `QueryTopic` builder allows you to search for data channels without knowing their exact full path. ``` from mosaicolabs import MosaicoClient, QueryTopic with MosaicoClient.connect(host="localhost", port=6726) as client: # Use a convenience method for fuzzy name matching (equivalent to SQL %match%) results = client.query( QueryTopic().with_name_match("image_raw") # (1)! ) if results: for item in results: print(f"Sequence: {item.sequence.name}") for topic in item.topics: print(f" - Matched: {topic.name}") ``` 1. The `with_name_match()` method allows you to filter for topics that match a pattern. In this case, we are filtering for topics that contain *"image\_raw"* in their name. This is equivalent to using the SQL `LIKE` operator with a wildcard. ### Understanding the Response Structure¶ The `query()` method returns a `QueryResponse` object, which is hierarchically grouped to make data management easy: | Object | Purpose | | --- | --- | | **`item.sequence`** | Contains the name of the sequence where the match was found. | | **`item.topics`** | A list of only the topics within that sequence that satisfied the criteria. | | **`topic.timestamp_range`** | Provides the `start` and `end` nanoseconds for the matching event. | Result Normalization The `topic.name` returns the relative topic path (e.g., `/front/camera/image`), which is immediately compatible with other SDK methods like `MosaicoClient.topic_handler()`. ## Step 2: Filtering by Ontology (Sensor Type)¶ Instead of hardcoding strings, Mosaico allows you to query by the **semantic type** of the data. This ensures that if you rename a topic, your analysis scripts won't break as long as the sensor type remains the same. ``` from mosaicolabs import IMU, QueryTopic # Retrieve the unique ontology tag dynamically from the class results = client.query( QueryTopic().with_ontology_tag(IMU.ontology_tag()) # (1)! ) # This returns every IMU topic across all sequences in the database. ``` 1. The `ontology_tag()` method returns the unique identifier for the ontology class. ## Step 3: Multi-Domain Queries (Physics + Metadata)¶ This is the most powerful feature of the platform. We use the **`.Q` Query Proxy**, a type-safe bridge that allows you to construct filters using standard Python dot notation. In this example, we identify specific time segments where a specific IMU sensor recorded a lateral acceleration (y-axis) greater than or equal to 1.0 m/s^2. ``` from mosaicolabs import IMU, QueryOntologyCatalog, QueryTopic results = client.query( # Layer 1: Physics (Ontology Catalog) QueryOntologyCatalog( IMU.Q.acceleration.y.geq(1.0), include_timestamp_range=True # (1)! ), # Layer 2: Configuration (Topic Name) QueryTopic().with_name("/front_stereo_imu/imu") # (2)! ) ``` 1. The `include_timestamp_range=True` flag tells the server to return the first and last timestamps of the queried condition within a topic, allowing you to slice data accurately for further analysis. 2. The `with_name()` method allows you to filter by topic name. In this case, we are (exactly) filtering for the `/front_stereo_imu/imu` topic. You can also use the `with_name_match()` method to filter for topics that match a pattern. **Architect's Note:** Notice how we pass two different builders. The server will only return the `/front_stereo_imu/imu` topic, and only if it contains values >= 1.0. ## Practical Application: Replaying the "Impact"¶ Once you have the results, you can immediately slice the data for playback: ``` if results: for item in results: for topic in item.topics: # Slice the sequence to only the relevant 2 seconds of the event streamer = client.sequence_handler(item.sequence.name).get_data_streamer( topics=[topic.name], start_timestamp_ns=topic.timestamp_range.start - 1_000_000_000, end_timestamp_ns=topic.timestamp_range.end + 1_000_000_000 ) # Replay the high-acceleration event... ``` --- This tutorial demonstrates a complete Mosaico data ingestion using the NVIDIA R2B Dataset 2024. You will learn how to automate the transition from monolithic ROS bags (.mcap) to a structured, queryable archive. By following this guide, you will: * **Automate Asset Preparation**: Programmatically download and manage remote datasets. * **Implement Adaptation Logic**: Translate raw ROS types into the strongly-typed Mosaico Ontology. * **Execute Ingestion**: Use the `RosbagInjector` to handle batching and network transmission. * **Verify Integrity**: Programmatically inspect the server to ensure the data is cataloged. Experiment Yourself This guide is **fully executable**. 1. **Start the Mosaico Infrastructure** 2. **Run the example** ``` mosaicolabs.examples ros_injection ``` Full Code The full code of the example is available **here**. In Depth Explanation * **How-To: Customizing the Data Ontology** * **Documentation: The ROS Bridge** * **API Reference: The ROS Bridge** ## Step 1: Custom Ontology Definition (`isaac.py`)¶ In Mosaico, data is strongly typed. When dealing with specialized hardware like the NVIDIA Isaac Nova encoders, with custom data models, not available in the SDK, we must define a model that the platform understands. ### The Data Model¶ The `EncoderTicks` class defines the physical storage format. ``` import pyarrow as pa from mosaicolabs import Serializable class EncoderTicks(Serializable): # (1)! # --- Wire Schema Definition --- __msco_pyarrow_struct__ = pa.struct([ pa.field("left_ticks", pa.uint32(), nullable=False), pa.field("right_ticks", pa.uint32(), nullable=False), pa.field("encoder_timestamp", pa.uint64(), nullable=False), ]) # --- Pydantic Fields --- left_ticks: int # (2)! right_ticks: int encoder_timestamp: int ``` 1. Inheriting from `Serializable` automatically registers your model in the Mosaico ecosystem, making it dispatchable to the data platform, and enables the `.Q` query proxy. 2. The field names in the `pa.struct` **must match exactly** the names of the Python attributes. In Depth Explanation * **How-To: Customizing the Data Ontology** * **Documentation: Data Models & Ontology** ## Step 2: Implementing the ROS Adapter (`isaac_adapters.py`)¶ The translation between the raw ROS dictionary and the Mosaico ontology data model is made via *adapters*. Being a custom message type, `"isaac_ros_nova_interfaces/msg/EncoderTicks"` does not have a default adapter. Therefore we must define one: the `ROSAdapterBase` class provides the necessary infrastructure for this. We just need to implement the `from_dict` method, which is responsible for converting the raw ROS message dictionary into our custom ontology model. ### The Adapter Implementation¶ ``` from mosaicolabs.ros_bridge import ROSMessage, ROSAdapterBase, register_default_adapter from mosaicolabs.ros_bridge.adapters.helpers import _validate_msgdata from .isaac import EncoderTicks @register_default_adapter class EncoderTicksAdapter(ROSAdapterBase[EncoderTicks]): ros_msgtype = ("isaac_ros_nova_interfaces/msg/EncoderTicks",) # (1)! __mosaico_ontology_type__ = EncoderTicks # (2)! _REQUIRED_KEYS = ("left_ticks", "right_ticks", "encoder_timestamp") # (3)! @classmethod def from_dict(cls, ros_data: dict) -> EncoderTicks: # (4)! """ Convert a ROS message dictionary to an EncoderTicks object. """ _validate_msgdata(cls, ros_data) return EncoderTicks( left_ticks=ros_data["left_ticks"], right_ticks=ros_data["right_ticks"], encoder_timestamp=ros_data["encoder_timestamp"], ) @classmethod def translate(cls, ros_msg: ROSMessage, **kwargs: Any) -> Message: # (5)! """ Translates a ROS EncoderTicks message into a Mosaico Message container. """ return super().translate(ros_msg, **kwargs) ``` 1. A tuple of strings representing the ROS message types that this adapter can handle. 2. The Mosaico ontology type that this adapter can handle. 3. A tuple of strings representing the required keys in the ROS message. This is used by the `_validate_msgdata` method to check that the ROS message does contains the required fields. 4. This is the heart of the translator. It takes a Python dictionary and maps the keys to our `EncoderTicks` ontology model. 5. This method is called by the `RosbagInjector` class for each message in the bag. It is responsible for converting the raw ROS message dictionary into the Mosaico message, wrapping the custom ontology model. **Key Operation**: The `@register_default_adapter` decorator ensures the `RosbagInjector` automatically knows how to handle this message type during the ingestion loop. ## Step 3: Orchestrating the Ingestion Loop¶ In a real-world scenario, you often need to ingest a batch of files. The `main.py` implementation uses a 3-phase loop to manage this workflow. ### Phase 1: Asset Preparation¶ Before we can ingest data, we need the raw file. This phase downloads a verified dataset from NVIDIA. ``` for bag_path in BAG_FILES_PATH: bag_file_url = BASE_BAGFILE_URL + bag_path # This utility handles downloads with visual progress bars out_bag_file = download_asset(bag_file_url, ASSET_DIR) ``` ### Phase 2: High-Performance Ingestion¶ Configure and run the injector. This layer handles connection pooling, asynchronous serialization, and batching. ``` config = ROSInjectionConfig( host=MOSAICO_HOST, port=MOSAICO_PORT, file_path=out_bag_file, sequence_name=out_bag_file.stem, # Sequence name derived from filename # Some example metadata for the sequence metadata={ "source_url": BAGFILE_URL, "ingested_via": "mosaico_example_ros_injection", "download_time_utc": str(downloaded_time), }, log_level="INFO", ) # Handles connection, loading, adaptation, and batching injector = RosbagInjector(config) injector.run() # Starts the ingestion process ``` ### Phase 3: Verification & Retrieval¶ Programmatically confirm the sequence is available on the server using a context manager. ``` with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: # (1)! seq_list = client.list_sequences() # (2)! if config.sequence_name in seq_list: print(f"Success: '{config.sequence_name}' is now queryable.") ``` 1. Establishes a secure connection to the platform. 2. Asks for the lists of sequences on the server In Depth Explanation * **How-To: Data Discovery and Inspection** * **Documentation: The Reading Workflow** * **API Reference: Data Retrieval** --- API Reference: `mosaicolabs.comm.MosaicoClient`. The `MosaicoClient` is a resource manager designed to orchestrate three distinct **Layers** of communication and processing. This layered architecture ensures that high-throughput sensor data does not block critical control operations or application logic. Creating a new client is done via the `MosaicoClient.connect()` factory method. It is recomended to always use the client inside a `with` context to ensure resources in all layers are cleanly released. ``` from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: # Logic goes here pass # Pools and connections are closed automatically ``` ## Control Layer¶ A single, dedicated connection is maintained for metadata operations. This layer handles lightweight tasks such as creating sequences, querying the catalog, and managing schema definitions. By isolating control traffic, the client ensures that critical commands (like `sequence_finalize`) are never queued behind heavy data transfers. ## Data Layer¶ For high-bandwidth data ingestion (e.g., uploading 4x 1080p cameras simultaneously), the client maintains a **Connection Pool** of multiple Flight clients. The SDK automatically stripes writes across these connections in a round-robin fashion, allowing the application to saturate the available network bandwidth. ## Processing Layer¶ Serialization of complex sensor data (like compressing images or encoding LIDAR point clouds) is CPU-intensive. The SDK uses an **Executor Pool** of background threads to offload these tasks. This ensures that while one thread is serializing the *next* batch of data, another thread is already transmitting the *previous* batch over the network. As a senior architect, it is vital to emphasize that the **Security Layer** is not an "add-on" but a foundational component of the `MosaicoClient` lifecycle. In robotics, where data often moves from edge devices to centralized clusters, this layer ensures that your Physical AI assets remain protected against unauthorized access and intercept. ## Security Layer¶ The Security Layer manages the confidentiality and integrity of the communication channel. It is composed of two primary mechanisms that work in tandem to harden the connection. ### 1. Encryption (TLS)¶ For deployments over public or shared networks, the client supports **Transport Layer Security (TLS)**. By providing a `tls_cert_path`, the client automatically switches from an insecure channel to an encrypted one. The SDK handles the heavy lifting of reading the certificate bytes and configuring the underlying Flight/gRPC drivers to verify the server's identity and encrypt the data stream. ### 2. Authentication (API Key)¶ Mosaico uses an **API Key** system to authorize every operation. When a key is provided, the client automatically attaches your unique credentials to the metadata of every gRPC and Flight call. This ensures that even if your endpoint is public, only requests with a valid, non-revoked key are processed by the server. The client supports 4 permission levels, each with increasing privileges: | Permission | Description | | --- | --- | | `APIKeyPermissionEnum.Read` | Read-Only access to resources | | `APIKeyPermissionEnum.Write` | Write access to resources (Create and update sequences) | | `APIKeyPermissionEnum.Delete` | Delete access to resources (Delete sequences, sessions and topics) | | `APIKeyPermissionEnum.Manage` | Full access to resources + Manage API keys (create, retrieve the status, revoke) | ``` from mosaicolabs import MosaicoClient # The client handles the handshake and credential injection automatically with MosaicoClient.connect( host="mosaico.production.cluster", port=6726, api_key="", ) as client: print(client.version()) ``` API-Key and TLS TLS is not mandatory for connecting via API-keys. It is recommended to enable the support for TLS in the server, to avoid sensitive credential to be sent on unencrypted channels. ### Recommended Patterns¶ #### Explicit Connection¶ Ideal for local development or scripts where credentials are managed by a secrets manager. ``` from mosaicolabs import MosaicoClient # The client handles the handshake and credential injection automatically with MosaicoClient.connect( host="mosaico.production.cluster", port=6726, api_key="", tls_cert_path="/etc/mosaico/certs/ca.pem" ) as client: print(client.version()) ``` #### Environment-Based Configuration (`from_env`)¶ As a best practice for production and containerized environments (Docker/Kubernetes), the `MosaicoClient` supports **Zero-Config Discovery** via `MosaicoClient.from_env`. This prevents sensitive keys from ever being hardcoded in your source files. The client automatically searches for: \* `MOSAICO_API_KEY`: Your authentication token. \* `MOSAICO_TLS_CERT_FILE`: The path to your CA certificate. ``` import os from mosaicolabs import MosaicoClient # Standardize your deployment by using environment variables # export MOSAICO_API_KEY=... # export MOSAICO_TLS_CERT_FILE=... with MosaicoClient.from_env(host="mosaico.internal", port=6726) as client: # Security is initialized automatically from the environment print(client.version()) ``` --- The **Mosaico Data Ontology** is the semantic backbone of the SDK. It defines the structural "rules" that transform raw binary streams into meaningful physical data, such as GPS coordinates, inertial measurements, or camera frames. By using a strongly-typed ontology, Mosaico ensures that your data remains consistent, validatable, and highly optimized for both high-throughput transport and complex queries. ## Core Philosophy¶ The ontology is designed to solve the "generic data" problem in robotics by ensuring every data object is: 1. **Validatable**: Uses Pydantic for strict runtime type checking of sensor fields. 2. **Serializable**: Automatically maps Python objects to efficient **PyArrow** schemas for high-speed binary transport. 3. **Queryable**: Injects a fluent API (`.Q`) into every class, allowing you to filter databases based on physical values (e.g., `IMU.Q.acceleration.x > 6.0`). 4. **Middleware-Agnostic**: Acts as an abstraction layer so that your analysis code doesn't care if the data originally came from ROS, a simulator, or a custom logger. ## Available Ontology Classes¶ The Mosaico SDK provides a comprehensive library of models that transform raw binary streams into validated, queryable Python objects. These are grouped by their physical and logical application below. ### Base Data Models¶ API Reference: Base Data Types These models serve as timestamped, metadata-aware wrappers for standard primitives. They allow simple diagnostic or scalar values to be treated as first-class members of the platform. | Module | Classes | Purpose | | --- | --- | --- | | **Primitives** | `String`, `LargeString` | UTF-8 text data for logs or status messages. | | **Booleans** | `Boolean` | Logic flags (True/False). | | **Signed Integers** | `Integer8`, `Integer16`, `Integer32`, `Integer64` | Signed whole numbers of varying bit-depth. | | **Unsigned Integers** | `Unsigned8`, `Unsigned16`, `Unsigned32`, `Unsigned64` | Non-negative integers for counters or IDs. | | **Floating Point** | `Floating16`, `Floating32`, `Floating64` | Real numbers for high-precision physical values. | ### Geometry & Kinematics Models¶ API Reference: Geometry Models These structures define spatial relationships and the movement states of objects in 2D or 3D coordinate frames. | Module | Classes | Purpose | | --- | --- | --- | | **Points & Vectors** | `Vector2d/3d/4d`, `Point2d/3d` | Fundamental spatial directions and locations. | | **Rotations** | `Quaternion` | Compact, singularity-free 3D orientation (). | | **Spatial State** | `Pose`, `Transform` | Absolute positions or relative coordinate frame shifts. | | **Motion** | `Velocity`, `Acceleration` | Linear and angular movement rates (Twists and Accels). | | **Aggregated State** | `MotionState` | An atomic snapshot combining Pose, Velocity, and Acceleration. | ### Sensor Models¶ API Reference: Sensor Models High-level models representing physical hardware devices and their processed outputs. | Module | Classes | Purpose | | --- | --- | --- | | **Inertial** | `IMU` | 6-DOF inertial data: linear acceleration and angular velocity. | | **Navigation** | `GPS`, `GPSStatus`, `NMEASentence` | Geodetic fixes (WGS 84), signal quality, and raw NMEA strings. | | **Vision** | `Image`, `CompressedImage`, `CameraInfo`, `ROI` | Raw pixels, encoded streams (JPEG/H264), calibration, and regions of interest. | | **Environment** | `Temperature`, `Pressure`, `Range` | Thermal readings (K), pressure (Pa), and distance intervals (m). | | **Dynamics** | `ForceTorque` | 3D force and torque vectors for load sensing. | | **Magnetic** | `Magnetometer` | Magnetic field vectors measured in microTesla (). | | **Robotics** | `RobotJoint` | States (position, velocity, effort) for index-aligned actuator arrays. | ## Architecture¶ The ontology architecture relies on three primary abstractions: the **Factory** (`Serializable`), the **Envelope** (`Message`) and the **Mixins** ### `Serializable` (The Factory)¶ API Reference: `mosaicolabs.models.Serializable` Every data payload in Mosaico inherits from the `Serializable` class. It manages the global registry of data types and ensures that the system knows exactly how to convert a string tag like `"imu"` back into a Python class with a specific binary schema. `Serializable` uses the `__init_subclass__` hook, which is automatically called whenever a developer defines a new subclass. ``` class MyCustomSensor(Serializable): # <--- __init_subclass__ triggers here ... ``` When this happens, `Serializable` performs the following steps automatically: 1. **Validates Schema:** Checks if the subclass defined the PyArrow struct schema (`__msco_pyarrow_struct__`). If missing, it raises an error at definition time (import time), preventing runtime failures later. 2. **Generates Tag:** If the class doesn't define `__ontology_tag__`, it auto-generates one from the class name (e.g., `MyCustomSensor` -> `"my_custom_sensor"`). 3. **Registers Class:** It adds the new class to the global types registry. 4. **Injects Query Proxy:** It dynamically adds a `.Q` attribute to the class, enabling the fluent query syntax (e.g., `MyCustomSensor.Q.voltage > 12.0`). ### `Message` (The Envelope)¶ API Reference: `mosaicolabs.models.Message` The **`Message`** class is the universal transport envelope for all data within the Mosaico platform. It acts as the "Source of Truth" for synchronization and spatial context, combining specific sensor data (the payload) with critical middleware-level metadata. By centralizing metadata at the envelope level, Mosaico ensures that every data point—regardless of its complexity—carries a consistent temporal and spatial identity. ``` from mosaicolabs import Message, Time, Temperature # Create a Temperature message with unified envelope metadata meas_time = Time.now() temp_msg = Message( timestamp_ns=meas_time.to_nanoseconds(), # Primary synchronization clock frame_id="comp_case", # Spatial reference frame seq_id=101, # Optional sequence ID for ordering data=Temperature.from_celsius( value=57, variance=0.03 ) ) ``` While logically a `Message` contains a `data` object, the physical representation on the wire (PyArrow/Parquet) is **flattened**, ensuring zero-overhead access to nested data during queries while maintaining a clean, object-oriented API in Python. * **Logical:** `Message(timestamp_ns=123, frame_id="map", data=IMU(acceleration=Vector3d(x=1.0,...)))` * **Physical:** `Struct(timestamp_ns=123, frame_id="map", seq_id=null, acceleration, ...)` The `Message` mechanism enables a flexible dual-usage pattern for every Mosaico ontology type, supporting both **Standalone Messages** and **Embedded Fields**. #### Standalone Messages¶ Any `Serializable` type (from elementary types like `String` and `Float32` to complex sensors like `IMU`) can be used as a standalone message. When assigned to the `data` field of a `Message` envelope, the type represents an independent data stream with its own global timestamp and metadata, that can be pushed via a dedicated `TopicWriter`. This is ideal for pushing processed signals, debug values, or simple sensor readings. ``` # Sending a raw Vector3d as a timestamped standalone message with its own uncertainty accel_msg = Message( timestamp_ns=ts, frame_id="base_link", data=Vector3d( x=0.0, y=0.0, z=9.81, covariance=[0.01, 0, 0, 0, 0.01, 0, 0, 0, 0.01] # 3x3 Diagonal matrix ) ) accel_writer.push(message=accel_msg) # Sending a raw String as a timestamped standalone message log_msg = Message( timestamp_ns=ts, frame_id="base_link", data=String(data="Waypoint-miss in navigation detected!") ) log_writer.push(message=log_msg) ``` #### Embedded Fields¶ `Serializable` types can also be embedded as internal fields within a larger structure. In this context, they behave as standard data types. While the parent `Message` provides the global temporal context, the embedded fields can carry their own granular attributes, such as unique uncertainty matrices. ``` # Embedding Vector3d inside a complex IMU model imu_payload = IMU( # Embedded Field 1: Acceleration with its own specific uncertainty # Here the Vector3d instance inherits the timestamp and frame_id # from the parent IMU Message. acceleration=Vector3d( x=0.5, y=-0.2, z=9.8, covariance=[0.1, 0, 0, 0, 0.1, 0, 0, 0, 0.1] ), # Embedded Field 2: Angular Velocity angular_velocity=Vector3d(x=0.0, y=0.0, z=0.0) ) # Wrap the complex payload in the Message envelope imu_writer.push(Message(timestamp_ns=ts, frame_id="imu_link", data=imu_payload)) ``` ### Mixins: Uncertainty & Robustness¶ Mosaico uses **Mixins** to inject standard uncertainty fields across different data types, ensuring a consistent interface for sensor fusion and error analysis. These fields are typically used to represent the precision of the sensor data. #### `CovarianceMixin`¶ API Reference: `mosaicolabs.models.mixins.CovarianceMixin` Injects multidimensional uncertainty fields, typically used for flattened covariance matrices (e.g., 3x3 or 6x6) in sensor fusion applications. ``` class MySensor(Serializable, CovarianceMixin): # Automatically receives covariance and covariance_type fields ... ``` #### `VarianceMixin`¶ API Reference: `mosaicolabs.models.mixins.VarianceMixin` Injects monodimensional uncertainty fields, useful for sensors with 1-dimensional uncertain data like `Temperature` or `Pressure`. ``` class MySensor(Serializable, VarianceMixin): # Automatically receives variance and variance_type fields ... ``` By leveraging these mixins, the platform can perform deep analysis on data quality—such as filtering for only "high-confidence" segments—without requiring unique logic for every sensor type. ## Querying Data Ontology with the Query (`.Q`) Proxy¶ The Mosaico SDK allows you to perform deep discovery directly on the physical content of your sensor streams. Every class inheriting from `Serializable`, including standard sensors, geometric primitives, and custom user models, is automatically injected with a static **`.Q` proxy** attribute. This proxy acts as a type-safe bridge between your Python data models and the platform's search engine, enabling you to construct complex filters using standard Python dot notation. ### How the Proxy Works¶ The `.Q` proxy recursively inspects the model’s schema to expose every queryable field path. It identifies the data type of each field and provides only the operators valid for that type (e.g., numeric comparisons for acceleration, substring matches for frame IDs). * **Direct Field Access**: Filter based on primary values, such as `Temperature.Q.value.gt(25.0)`. * **Nested Navigation**: Traverse complex, embedded structures. For example, in the `GPS` model, you can drill down into the status sub-field: `GPS.Q.status.satellites.geq(8)`. * **Mixin Integration**: Fields inherited from mixins are automatically included in the proxy. This allows you to query uncertainty metrics (from `VarianceMixin` or `CovarianceMixin`) across any model. ### Queryability Examples¶ The following table illustrates how the proxy flattens complex hierarchies into queryable paths: | Type Field Path | Proxy Field Path | Source Type | Queryable Type | Supported Operators | | --- | --- | --- | --- | --- | | `IMU.acceleration.x` | `IMU.Q.acceleration.x` | `float` | **Numeric** | `.eq()`, `.neq()`, `.lt()`, `.gt()`, `.leq()`, `.geq()`, `.in_()`, `.between()` | | `GPS.status.hdop` | `GPS.Q.status.hdop` | `float` | **Numeric** | `.eq()`, `.neq()`, `.lt()`, `.gt()`, `.leq()`, `.geq()`, `.in_()`, `.between()` | | `IMU.frame_id` | `IMU.Q.frame_id` | `str` | **String** | `.eq()`, `.neq()`, `.match()`, `.in_()` | | `GPS.covariance_type` | `GPS.Q.covariance_type` | `int` | **Numeric** | `.eq()`, `.neq()`, `.lt()`, `.gt()`, `.leq()`, `.geq()`, `.in_()`, `.between()` | ### Practical Usage¶ To execute these filters, pass the expressions generated by the proxy to the `QueryOntologyCatalog` builder. ``` from mosaicolabs import MosaicoClient, IMU, GPS, QueryOntologyCatalog with MosaicoClient.connect("localhost", 6726) as client: # orchestrate a query filtering by physical thresholds AND metadata qresponse = client.query( QueryOntologyCatalog(include_timestamp_range=True) # Ask for the start/end timestamps of occurrences .with_expression(IMU.Q.acceleration.z.gt(15.0)) .with_expression(GPS.Q.status.service.eq(2)) ) # The server returns a QueryResponse grouped by Sequence for structured data management if qresponse is not None: for item in qresponse: # 'item.sequence' contains the name for the matched sequence print(f"Sequence: {item.sequence.name}") # 'item.topics' contains only the topics and time-segments # that satisfied the QueryOntologyCatalog criteria for topic in item.topics: # Access high-precision timestamps for the data segments found start, end = topic.timestamp_range.start, topic.timestamp_range.end print(f" Topic: {topic.name} | Match Window: {start} to {end}") ``` For a comprehensive list of all supported operators and advanced filtering strategies (such as query chaining), see the **Full Query Documentation** and the Ontology types SDK Reference in the **API Reference**: * Base Data Models * Sensors Models * Geometry Models * Platform Models ## Customizing the Ontology¶ The Mosaico SDK is built for extensibility, allowing you to define domain-specific data structures that can be registered to the platform and live alongside standard types. Custom types are automatically validatable, serializable, and queryable once registered in the platform. Follow these three steps to implement a compatible custom data type: ### 1. Inheritance and Mixins¶ Your custom class **must** inherit from `Serializable` to enable auto-registration, factory creation, and the queryability of the model. To align with the Mosaico ecosystem, use the following mixins: * **`CovarianceMixin`**: Used for data including measurement uncertainty, standardizing the storage of covariance matrices. ### 2. Define the Wire Schema (`__msco_pyarrow_struct__`)¶ You must define a class-level `__msco_pyarrow_struct__` using `pyarrow.struct`. This explicitly dictates how your Python object is serialized into high-performance Apache Arrow/Parquet buffers for network transmission and storage. #### 2.1 Serialization Format Optimization¶ API Reference: `mosaicolabs.enum.SerializationFormat` You can optimize remote server performance by overriding the `__serialization_format__` attribute. This controls how the server compresses and organizes your data. | Format | Identifier | Use Case Recommendation | | --- | --- | --- | | **Default** | `"default"` | **Standard Table**: Fixed-width data with a constant number of fields. | | **Ragged** | `"ragged"` | **Variable Length**: Best for lists, sequences, or point clouds. | | **Image** | `"image"` | **Blobs**: Raw or compressed images requiring specialized codec handling. | If not explicitly set, the system defaults to `Default` format. ### 3. Define Class Fields¶ Define the Python attributes for your class using standard type hints. Note that the names of your Python class fields **must match exactly** the field names defined in your `__msco_pyarrow_struct__` schema. ### Customization Example: `EnvironmentSensor`¶ This example demonstrates a custom sensor for environmental monitoring that tracks temperature, humidity, and pressure. ``` # file: custom_ontology.py from typing import Optional import pyarrow as pa from mosaicolabs.models import Serializable class EnvironmentSensor(Serializable): """ Custom sensor reading for Temperature, Humidity, and Pressure. """ # --- 1. Define the Wire Schema (PyArrow Layout) --- __msco_pyarrow_struct__ = pa.struct( [ pa.field("temperature", pa.float32(), nullable=False), pa.field("humidity", pa.float32(), nullable=True), pa.field("pressure", pa.float32(), nullable=True), ] ) # --- 2. Define Python Fields (Must match schema exactly) --- temperature: float humidity: Optional[float] = None pressure: Optional[float] = None # --- Usage Example --- from mosaicolabs.models import Message, Header, Time # Initialize with standard metadata meas = EnvironmentSensor( header=Header(stamp=Time.now(), frame_id="lab_sensor_1"), temperature=23.5, humidity=0.45 ) # Ready for streaming or querying # writer.push(Message(timestamp_ns=ts, data=meas)) ``` Schema for defining a custom ontology model. --- The **Data Handling** module serves as the high-performance operational core of the Mosaico SDK, providing a unified interface for moving multi-modal sensor data between local applications and the Mosaico Data Platform. Engineered to solve the "Big Data" challenges of robotics and autonomous systems, this module abstracts the complexities of network I/O, asynchronous buffering, and high-precision temporal alignment. ### Asymmetric Architecture¶ The SDK employs a specialized architecture that separates concerns into **Writers/Updaters** and **Handlers**, ensuring each layer is optimized for its unique traffic pattern: * **Ingestion (Writing/Updating)**: Designed for low-latency, high-throughput ingestion of 4K video, high-frequency IMU telemetry, and dense point clouds. It utilizes a "Multi-Lane" approach where each sensor stream operates in isolation with dedicated system resources. * **Discovery & Retrieval (Reading)**: Architected to separate metadata-based resource discovery from high-volume data transmission. This separation allows developers to inspect sequence and topic catalogs—querying metadata and temporal bounds—before committing to a high-bandwidth data stream. ### Memory-Efficient Data Flow¶ The Mosaico SDK is engineered to handle massive data volumes without exhausting local system resources, enabling the processing of datasets that span terabytes while maintaining a minimal and predictable memory footprint. * **Smart Batching & Buffering**: Both reading and writing operations are executed in memory-limited batches rather than loading or sending entire sequences at once. * **Asynchronous Processing**: The SDK offloads CPU-intensive tasks, such as image serialization and network I/O, to background threads within the `MosaicoClient`. * **Automated Lifecycle**: In reading workflows, processed batches are automatically discarded and replaced with new data from the server. In writing workflows, buffers are automatically flushed based on configurable size or record limits. * **Stream Persistence**: Integrated **Error Policies** allow developers to prioritize either a "clean slate" data state or "recovery" of partial data in the event of an application crash. --- The **Reading Workflow** in Mosaico is architected to separate resource discovery from high-volume data transmission. This is achieved through two distinct layers: **Handlers**, which serve as metadata proxies, and **Streamers**, which act as the high-performance data engines. API-Keys When the connection is established via the authorization middleware (i.e. using an API-Key), the reading workflow requires the minimum `APIKeyPermissionEnum.Read` permission. Try-It Out You can experiment yourself the Handlers module via the **Data Discovery and Inspection Example**. ### Handlers: The Catalog Layer¶ Handlers are lightweight objects that represent a server-side resource. Their primary role is to provide immediate access to system information and user-defined metadata **without downloading the actual sensor data**. They act as the "Catalog" layer of the SDK, allowing you to inspect the contents of the platform before committing to a high-bandwidth data stream. Mosaico provides two specialized handler types: `SequenceHandler` and `TopicHandler`. #### `SequenceHandler`¶ API Reference: `mosaicolabs.handlers.SequenceHandler`. Represents a complete recording session. It provides a holistic view, allowing you to inspect all available topic names, global sequence metadata, and the overall temporal bounds (earliest and latest timestamps) of the session. Spawning a new sequence handler is done via the `MosaicoClient.sequence_handler()` factory method. This example demonstrates how to use a Sequence handler to inspect metadata. ``` import sys from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: # Use a Handler to inspect the catalog seq_handler = client.sequence_handler("mission_alpha") if seq_handler: print(f"Sequence: {seq_handler.name}") print(f"\t| Topics: {seq_handler.topics}") print(f"\t| User metadata: {seq_handler.user_metadata}") print(f"\t| Timestamp span: {seq_handler.timestamp_ns_min} - {seq_handler.timestamp_ns_max}") print(f"\t| Created {seq_handler.sequence_info.created_datetime}") print(f"\t| Size (MB) {seq_handler.sequence_info.total_size_bytes/(1024*1024)}") # Once done, close the reading channel (recommended) seq_handler.close() ``` #### `TopicHandler`¶ API Reference: `mosaicolabs.handlers.TopicHandler`. Represents a specific data channel within a sequence (e.g., a single IMU or Camera). It provides granular system info, such as the specific ontology model used and the data volume of that individual stream. Spawning a new topic handler is done via the `MosaicoClient.topic_handler()` factory method, or via `SequenceHandler.get_topic_handler()` factory method. This example demonstrates how to use a Topic handler to inspect metadata. ``` import sys from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: # Use a Handler to inspect the catalog top_handler = client.topic_handler("mission_alpha", "/front/imu") # Note that the same handler can be retrieve via the SequenceHandler of the parent sequence: # seq_handler = client.sequence_handler("mission_alpha") # top_handler = seq_handler.get_topic_handler("/front/imu") if top_handler: print(f"Sequence:Topic: {top_handler.sequence_name}:{top_handler.name}") print(f"\t| User metadata: {top_handler.user_metadata}") print(f"\t| Timestamp span: {top_handler.timestamp_ns_min} - {top_handler.timestamp_ns_max}") print(f"\t| Created {top_handler.topic_info.created_datetime}") print(f"\t| Size (MB) {top_handler.topic_info.total_size_bytes/(1024*1024)}") # Once done, close the reading channel (recommended) top_handler.close() ``` ### Streamers: The Data Engines¶ Both handlers serve as **factories**; once you have identified the resource you need, the handler is used to spawn the appropriate Streamer to begin data consumption. Streamers are the active components that manage the physical data exchange between the server and your application. They handle the complexities of network buffering, batch management, and the de-serialization of raw bytes into Mosaico `Message` objects. #### `SequenceDataStreamer` (Unified Replay)¶ API Reference: `mosaicolabs.handlers.SequenceDataStreamer`. The **`SequenceDataStreamer`** is a unified engine designed specifically for sensor fusion and full-system replay. It allows you to consume multiple data streams as if they were a single, coherent timeline. Spawning a new sequence data streamer is done via the `SequenceHandler.get_data_streamer()` factory method. When streaming data, the streamer employs the following technical mechanisms: * **K-Way Merge Sorting**: The streamer monitors the timestamps across all requested topics simultaneously. On every iteration, it "peeks" at the next available message from each topic and yields the one with the lowest timestamp. * **Strict Chronological Order**: This sorting ensures that messages are delivered in exact acquisition order, effectively normalizing topics that may operate at vastly different frequencies (e.g., high-rate IMU vs. low-rate GPS). * **Temporal Slicing**: You can request a "windowed" extraction by specifying `start_timestamp_ns` and `end_timestamp_ns`. This is highly efficient as it avoids downloading the entire sequence, focusing only on the specific event or time range of interest. * **Smart Buffering**: To maintain memory efficiency, the streamer retrieves data in memory-limited batches. As you iterate, processed batches are discarded and replaced with new data from the server, allowing you to stream sequences that exceed your available RAM. This example demonstrates how to initiate and use the Sequence data stream. ``` import sys from mosaicolabs import MosaicoClient with MosaicoClient.connect("localhost", 6726) as client: # Use a Handler to inspect the catalog seq_handler = client.sequence_handler("mission_alpha") if seq_handler: # Start a Unified Stream (K-Way Merge) for multi-sensor replay # We only want GPS and IMU data for this synchronized analysis streamer = seq_handler.get_data_streamer( topics=["/gps", "/imu"], # Optionally filter topics # Optionally set the time window to extract start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000 ) # Check the start message timestamp print(f"Recording starts at: {streamer.next_timestamp()}") for topic, msg in streamer: # Processes GPS and IMU in perfect chronological order print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}") # Once done, close the reading channel (recommended) seq_handler.close() ``` #### `TopicDataStreamer` (Targeted Access)¶ API Reference: `mosaicolabs.handlers.TopicDataStreamer`. The **`TopicDataStreamer`** provides a dedicated, high-throughput channel for interacting with a single data resource. By bypassing the complex synchronization logic required for merging multiple topics, it offers the lowest possible overhead for tasks requiring isolated data streams, such as training models on specific camera frames or IMU logs. Spawning a new topic data streamer is done via the `TopicHandler.get_data_streamer()` factory method. To ensure efficiency, the streamer supports the following features: * **Temporal Slicing**: Much like the `SequenceDataStreamer`, you can extract data in a time-windowed fashion by specifying `start_timestamp_ns` and `end_timestamp_ns`. This ensures that only the relevant portion of the stream is retrieved rather than the entire dataset. * **Smart Buffering**: Data is not downloaded all at once; instead, the SDK retrieves information in memory-limited batches, substituting old data with new batches as you iterate to maintain a constant, minimal memory footprint. This example demonstrates how to initiate and use the Topic data stream. ``` import sys from mosaicolabs import MosaicoClient, IMU with MosaicoClient.connect("localhost", 6726) as client: # Retrieve the topic handler using (e.g.) MosaicoClient top_handler = client.topic_handler("mission_alpha", "/front/imu") if top_handler: # Start a Targeted Stream for single-sensor replay imu_stream = top_handler.get_data_streamer( # Optionally set the time window to extract start_timestamp_ns=1738508778000000000, end_timestamp_ns=1738509618000000000 ) # Peek at the start time print(f"Recording starts at: {imu_stream.next_timestamp()}") # Direct, low-overhead loop for imu_msg in imu_stream: process_sample(imu_msg.get_data(IMU)) # Some custom process function # Once done, close the reading channel (recommended) top_handler.close() ``` --- The **Writing Workflow** in Mosaico is designed for high-throughput data ingestion, ensuring that your application remains responsive even when streaming high-bandwidth sensor data like 4K video or high-frequency IMU telemetry. The architecture is built around a **"Multi-Lane"** approach, where each sensor stream operates in its own isolated lane with dedicated system resources. API-Keys When the connection is established via the authorization middleware (i.e. using an API-Key), the writing workflow is allowed only if the key has at least `APIKeyPermissionEnum.Write` permission. ## `SequenceWriter`¶ API Reference: `mosaicolabs.handlers.SequenceWriter`. The `SequenceWriter` acts as the central controller for a recording session. It manages the high-level lifecycle of the data on the server and serves as the factory for individual sensor streams. Spawning a new sequence writer is done via the `MosaicoClient.connect()` factory method. **Key Roles:** * **Lifecycle Management**: It handles the lifecycle of a new sequence resource and related writing Session, ensuring that it is either successfully committed as immutable data. In the event of a failure, the sequence and the written data are handled according to the configured `SessionLevelErrorPolicy`. * **Resource Distribution**: The writer pulls network connections from the **Connection Pool** and background threads from the **Executor Pool**, assigning them to individual topics. This isolation prevents a slow network connection on one topic from bottlenecking others. * **Context Safety**: To ensure data integrity, the `SequenceWriter` must be used within a Python `with` block. This guarantees that all buffers are flushed and the sequence is closed properly, even if your application crashes. ``` from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy, TopicLevelErrorPolicy # Open the connection with the Mosaico Client with MosaicoClient.connect("localhost", 6726) as client: # Start the Sequence Orchestrator with client.sequence_create( sequence_name="mission_log_042", # Custom metadata for this data sequence. metadata={ # (1)! "vehicle": { "vehicle_id": "veh_sim_042", "powertrain": "EV", "sensor_rig_version": "v3.2.1", "software_stack": { "perception": "perception-5.14.0", "localization": "loc-2.9.3", "planning": "plan-4.1.7", }, }, "driver": { "driver_id": "drv_sim_017", "role": "validation", "experience_level": "senior", }, } on_error = SessionLevelErrorPolicy.Delete ) as seq_writer: # `seq_writer` is the writing handler of the new 'mission_log_042' sequence # Data will be uploaded by spawning topic writers that will manage # the actual data stream push... See below. ``` 1. The metadata fields will be queryable via the `Query` mechanism. The mechanism allows creating queries like: `QuerySequence().with_user_metadata("vehicle.software_stack.planning", eq="plan-4.1.7")` ### Sequence-Level Error Handling¶ API Reference: `mosaicolabs.enum.SessionLevelErrorPolicy`. Deprecated OnErrorPolicy In release 0.3.0, the `OnErrorPolicy` is declared **`deprecated`** in favor of the `SessionLevelErrorPolicy`. The support for the class will be removed in the release 0.4.0. No changes are made to the enum values of the new class `SessionLevelErrorPolicy`, which are identical to the ones of the deprecated class. Configured when instantiating a new `SequenceWriter` via the `on_error` parameter, these policies dictate how the server handles a sequence if an unhandled exception bubbles up to the `SequenceWriter` context manager. By default, this policy is set to `SessionLevelErrorPolicy.Report`, which means an error notification is sent to the server, allowing the platform to flag the sequence as failed while retaining whatever records were successfully transmitted before the error occurred. Alternatively, the `SessionLevelErrorPolicy.Delete` policy will signal the server to physically remove the incomplete sequence and its associated topic directories, if any errors occurred. Error Handling and API-Key When the connection is established via the authorization middleware (i.e. using an API-Key), the `SessionLevelErrorPolicy.Delete` policy is successfully executed by the server only if the API-Key has `APIKeyPermissionEnum.Delete` permission. If this is not the case, the server will raise an error and the current writing Session will remain in an unlocked state. An example schematic rationale for deciding between the two policies can be: | Scenario | Recommended Policy | Rationale | | --- | --- | --- | | **Edge/Field Tests** | `SessionLevelErrorPolicy.Report` | Forensic value: "Partial data is better than no data" for crash analysis. | | **Automated CI/CD** | `SessionLevelErrorPolicy.Delete` | Platform hygiene: Prevents cluttering the catalog with junk data from failed runs. | | **Ground Truth Generation** | `SessionLevelErrorPolicy.Delete` | Integrity: Ensures only 100% verified, complete sequences enter the database. | ## `TopicWriter`¶ API Reference: `mosaicolabs.handlers.TopicWriter`. Once a topic is created via `SequenceWriter.topic_create`, a `TopicWriter` is spawned to handle the actual transmission of data for that specific stream. It abstracts the underlying networking protocols, allowing you to simply "push" Python objects while it handles the heavy lifting. **Key Roles:** * **Smart Buffering**: Instead of sending every single message over the network—which would be highly inefficient—the `TopicWriter` accumulates records in a memory buffer. * **Automated Flushing**: The writer automatically triggers a "flush" to the server whenever the internal buffer exceeds your configured limits, such as a maximum byte size or a specific number of records. * **Asynchronous Serialization**: For CPU-intensive data (like encoding images), the writer can offload the serialization process to background threads, ensuring your main application loop stays fast. ``` # Continues from the code above... # with client.sequence_create(...) as seq_writer: # Create individual Topic Writers # Each writer gets its own assigned resources from the pools imu_writer = seq_writer.topic_create( topic_name="sensors/imu", # The univocal topic name metadata={ # The topic/sensor custom metadata "vendor": "inertix-dynamics", "model": "ixd-f100", "firmware_version": "1.2.0", "serial_number": "IMUF-9A31D72X", "calibrated":"false", }, error_policy=TopicLevelErrorPolicy.Raise, # Raises an exception if an error occurs ontology_type=IMU, # The ontology type stored in this topic ) # Another individual topic writer for the GPS device gps_writer = seq_writer.topic_create( topic_name="sensors/gps", # The univocal topic name metadata={ # The topic/sensor custom metadata "role": "primary_gps", "vendor": "satnavics", "model": "snx-g500", "firmware_version": "3.2.0", "serial_number": "GPS-7C1F4A9B", "interface": { # (1)! "type": "UART", "baudrate": 115200, "protocol": "NMEA", }, }, # The topic/sensor custom metadata error_policy=TopicLevelErrorPolicy.Ignore, # Ignore errors in this topic ontology_type=GPS, # The ontology type stored in this topic ) # Suppose we have a stream of IMU data, e.g. from a file # Push data in a controlled context - The SDK handles batching and background I/O for imu_data in imu_data_stream: with imu_writer: # Protect the execution imu_data = process_imu(imu_data) # This code may raise an exception: will re-raise imu_writer.push( message=Message( timestamp_ns=imu_data.timestamp_ns, data=imu_data, ) ) for gps_data in gps_data_stream: with gps_writer: # Protect the execution gps_data = process_gps(gps_data) # This code may raise an exception: will be ignored gps_writer.push( message=Message( timestamp_ns=gps_data.timestamp_ns, data=gps_data, ) ) # Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server # and closes all connections and pools ``` 1. The metadata fields will be queryable via the `Query` mechanism. The mechanism allows creating query expressions like: `QueryTopic().with_user_metadata("interface.type", eq="UART")`. API Reference: * `mosaicolabs.models.platform.Topic` * `mosaicolabs.models.query.builders.QueryTopic`. ### Topic-Level Error Handling¶ By default, the `SequenceWriter` context manager cannot natively distinguish which specific topic failed during custom processing or data pushing. An unhandled exception in one stream will bubble up and trigger the global **Sequence-Level Error Policy**, potentially aborting the entire upload. To prevent this, the SDK introduces native **Topic-Level Error Policies**, which automate the "Defensive Ingestion" pattern directly within the `TopicWriter`. This pattern is highly recommended, in paerticular for complex ingestion pipelines (see for example the interleaved ingestion how-to). !!! note: The error handling is only possible inside the `TopicWriter` context manager, i.e. by wrapping the processing and pushing code inside a `with topic_writer:` block. When creating a topic via the `SequenceWriter.topic_create` function, users can specify a `TopicLevelErrorPolicy` that isolates failures to that specific data "lane". This ensures that a single malformed message or transformation error does not compromise the high-level sequence. By defining these behaviors at the configuration level, the user can eliminate the need for boilerplate error-handling code around every topic writer context. The `TopicLevelErrorPolicy` can be set to: * `TopicLevelErrorPolicy.Raise`: (Default) Raises an exception if an error occurs; this returns the error handling to the `SequenceWriter.on_error` policy. * `TopicLevelErrorPolicy.Ignore`: Reports the error to the server and continues the ingestion process. * `TopicLevelErrorPolicy.Finalize`: Reports the error to the server and finalizes the topic, but does not interrupt the ingestion process. ## `SequenceUpdater`¶ API Reference: `mosaicolabs.handlers.SequenceUpdater`. The `SequenceUpdater` is used to update an existing sequence on the server. Updating a sequence means adding new topics only, by opening a new writing Session. The `SequenceUpdater` cannot be used to update the metadata of a sequence or its existing topics. Spawning a new sequence updater is done via the `SequenceHandler.update()` factory method. **Key Roles:** * **Lifecycle Management**: It handles the lifecycle of a new writing Session on an existing sequence and ensures that it is either successfully committed as immutable data or, in the event of a failure, cleaned up according to the configured `SessionLevelErrorPolicy`. * **Resource Distribution**: The writer pulls network connections from the **Connection Pool** and background threads from the **Executor Pool**, assigning them to individual topics. This isolation prevents a slow network connection on one topic from bottlenecking others. * **Context Safety**: To ensure data integrity, the `SequenceUpdater` must be used within a Python `with` block. This guarantees that all buffers are flushed and the writing Session is closed properly, even if your application crashes. ``` from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy # Open the connection with the Mosaico Client with MosaicoClient.connect("localhost", 6726) as client: # Get the handler for the sequence seq_handler = client.sequence_handler("mission_log_042") # Update the sequence with seq_handler.update( on_error = SessionLevelErrorPolicy.Delete # Relative to this session only ) as seq_updater: # Start creating topics and pushing data ``` Session-level Error Handling Configured when instantiating a new `SequenceUpdater` via the `on_error` parameter, the `SessionLevelErrorPolicy` policy dictates how the server handles the new writing Session if an unhandled exception bubbles up to the `SequenceUpdater` context manager. The very same semantics as the `SequenceWriter` apply. These policies are relative to the **current writing Session only**: the data already stored in the sequence with previous sessions is not affected and are kept as immutable data. Error Handling and API-Key When the connection is established via the authorization middleware (i.e. using an API-Key), the `SessionLevelErrorPolicy.Delete` policy is successfully executed by the server only if the API-Key has `APIKeyPermissionEnum.Delete` permission. If this is not the case, the server will raise an error and the current writing Session will remain in an unlocked state. Once obtained, the `SequenceUpdater` can be used to create new topics and push data to them, in the very same way as a explained in the `TopicWriter` section. ``` # Continues from the code above... # seq_handler.update(...) as seq_updater: # Create individual Topic Writers # Each writer gets its own assigned resources from the pools imu_writer = seq_updater.topic_create(...) # Push data - The SDK handles batching and background I/O imu_writer.push(...) # Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server # and closes all connections and pools ``` --- The **Query Workflow** in Mosaico provides a high-performance, **fluent** interface for discovering and filtering data within the Mosaico Data Platform. It is designed to move beyond simple keyword searches, allowing you to perform deep, semantic queries across metadata, system catalogs, and the physical content of sensor streams. API-Keys When the connection is established via the authorization middleware (i.e. using an API-Key), the query workflow requires the minimum `APIKeyPermissionEnum.Read` permission. Try-It Out You can experiment yourself the Query module via the **Querying Catalogs Example**. A typical query workflow involves chaining methods within specialized builders to create a unified request that the server executes atomically. In the example below, the code orchestrates a multi-domain search to isolate high-interest data segments. Specifically, it queries for: * **Sequence Discovery**: Finds any recording session whose name contains the string `"test_drive"` **AND** where the custom user metadata indicates an `"environment.visibility"` value strictly less than 50. * **Topic Filtering**: Restricts the search specifically to the data channel named `"/front/camera/image"`. * **Ontology Analysis**: Performs a deep inspection of IMU sensor payloads to identify specific time segments where the **X-axis acceleration exceeds a certain threshold** while simultaneously the **Y-axis acceleration exceeds a certain threshold**. ``` from mosaicolabs import QueryOntologyCatalog, QuerySequence, QueryTopic, IMU, MosaicoClient # Establish a connection to the Mosaico Data Platform with MosaicoClient.connect("localhost", 6726) as client: # Perform a unified server-side query across multiple domains: qresponse = client.query( # Filter Sequence-level metadata QuerySequence() .with_name_match("test_drive") # Use convenience method for fuzzy name matching .with_user_metadata("environment.visibility", lt=50), # Use convenience method for filtering user metadata # Search on topics with specific names QueryTopic() .with_name("/front/camera/image"), # Perform deep time-series discovery within sensor payloads QueryOntologyCatalog(include_timestamp_range=True) # Request temporal bounds for matches .with_expression(IMU.Q.acceleration.x.gt(5.0)) # Use the .Q proxy to filter the `acceleration` field .with_expression(IMU.Q.acceleration.y.gt(4.0)), ) # The server returns a QueryResponse grouped by Sequence for structured data management if qresponse is not None: for item in qresponse: # 'item.sequence' contains the name for the matched sequence print(f"Sequence: {item.sequence.name}") # 'item.topics' contains only the topics and time-segments # that satisfied the QueryOntologyCatalog criteria for topic in item.topics: # Access high-precision timestamps for the data segments found start, end = topic.timestamp_range.start, topic.timestamp_range.end print(f" Topic: {topic.name} | Match Window: {start} to {end}") ``` The provided example illustrates the core architecture of the Mosaico Query DSL. To effectively use this module, it is important to understand the two primary mechanisms that drive data discovery: * **Query Builders (Fluent Logic Collectors)**: Specialized builders like `QuerySequence`, `QueryTopic`, and `QueryOntologyCatalog` serve as containers for your search criteria. They provide a **Fluent Interface** where you can chain two types of methods: + **Convenience Methods**: High-level helpers for common fields, such as `with_user_metadata()`, `with_name_match()`, or `with_created_timestamp()`. + **Generic `with_expression()`**: A versatile method that accepts any expression obtained via the **`.Q` proxy**, allowing you to define complex filters for deep sensor payloads. * **The `.Q` Proxy (Dynamic Model Inspection)**: Every `Serializable` model in the Mosaico ontology features a static `.Q` attribute. This proxy dynamically inspects the model's underlying schema to build dot-notated field paths and intercepts attribute access (e.g., `IMU.Q.acceleration.x`). When a terminal method is called—such as `.gt()`, `.lt()`, or `.between()`—it generates a type-safe **Atomic Expression** used by the platform to filter physical sensor data or metadata fields. By combining these mechanisms, the Query Module delivers a robust filtering experience: * **Multi-Domain Orchestration**: Execute searches across Sequence metadata, Topic configurations, and raw Ontology sensor data in a single, atomic request. * **Structured Response Management**: Results are returned in a `QueryResponse` that is automatically grouped by `Sequence`, making it easier to manage multi-sensor datasets. ## Query Execution & The Response Model¶ Queries are executed via the `query()` method exposed by the `MosaicoClient` class. When multiple builders are provided, they are combined with a logical **AND**. | Method | Return | Description | | --- | --- | --- | | `query(*queries, query)` | `Optional[QueryResponse]` | Executes one or more queries against the platform catalogs. The provided queries are joined in AND condition. The method accepts a variable arguments of query builder objects or a pre-constructed `Query` object. | The query execution returns a `QueryResponse` object, which behaves like a standard Python list containing `QueryResponseItem` objects. | Class | Description | | --- | --- | | `QueryResponseItem` | Groups all matches belonging to the same **Sequence**. Contains a `QueryResponseItemSequence` and a list of related `QueryResponseItemTopic`. | | `QueryResponseItemSequence` | Represents a specific **Sequence** where matches were found. It includes the sequence name. | | `QueryResponseItemTopic` | Represents a specific **Topic** where matches were found. It includes the normalized topic path and the optional `timestamp_range` (the first and last occurrence of the condition). | ``` import sys from mosaicolabs import MosaicoClient, QueryOntologyCatalog from mosaicolabs.models.sensors import IMU # Establish a connection to the Mosaico Data Platform with MosaicoClient.connect("localhost", 6726) as client: # Define a Deep Data Filter using the .Q Query Proxy # We are searching for vertical impact events where acceleration.z > 15.0 m/s^2 impact_qbuilder = QueryOntologyCatalog( IMU.Q.acceleration.z.gt(15.0), # include_timestamp_range returns the precise start/end of the matching event include_timestamp_range=True ) # Execute the query via the client results = client.query(impact_qbuilder) # The same can be obtained by using the Query object # results = client.query( # query = Query( # impact_qbuilder # ) # ) if results is not None: # Parse the structured QueryResponse object # Results are automatically grouped by Sequence for easier data management for item in results: print(f"Sequence: {item.sequence.name}") # Iterate through matching topics within the sequence for topic in item.topics: # Topic names are normalized (sequence prefix is stripped) for direct use print(f" - Match in: {topic.name}") # Extract the temporal bounds of the event if topic.timestamp_range: start = topic.timestamp_range.start end = topic.timestamp_range.end print(f" Occurrence: {start} ns to {end} ns") ``` * **Temporal Windows**: The `timestamp_range` provides the first and last occurrence of the queried condition within a topic, allowing you to slice data accurately for further analysis. * **Result Normalization**: `topic.name` returns the relative topic path (e.g., `/sensors/imu`), making it immediately compatible with other SDK methods like `topic_handler()`. ### Restricted Queries (Chaining)¶ The `QueryResponse` class enables a powerful mechanism for **iterative search refinement** by allowing you to convert your current results back into a new query builder. This approach is essential for resolving complex, multi-modal dependencies where a single monolithic query would be logically ambiguous, inefficient or technically impossible. | Method | Return Type | Description | | --- | --- | --- | | `to_query_sequence()` | `QuerySequence` | Returns a query builder pre-filtered to include only the **sequences** present in the response. | | `to_query_topic()` | `QueryTopic` | Returns a query builder pre-filtered to include only the specific **topics** identified in the response. | When you invoke these factory methods, the SDK generates a new query expression containing an explicit `$in` filter populated with the identifiers held in the current response. This effectively **"locks" the search domain**, allowing you to apply new criteria to a restricted subset of your data without re-scanning the entire platform catalog. ``` from mosaicolabs import MosaicoClient, QueryTopic, QueryOntologyCatalog, GPS, String with MosaicoClient.connect("localhost", 6726) as client: # Broad Search: Find all sequences where a GPS sensor reached a high-precision state (status=2) initial_response = client.query( QueryOntologyCatalog(GPS.Q.status.status.eq(2)) ) # 'initial_response' now acts as a filtered container of matching sequences. # Domain Locking: Restrict the search scope to the results of the initial query if not initial_response.is_empty(): # .to_query_sequence() generates a QuerySequence pre-filled with the matching sequence names. refined_query_builder = initial_response.to_query_sequence() # Targeted Refinement: Search for error patterns ONLY within the restricted domain # This ensures the platform only scans for '[ERR]' strings within sequences already validated for GPS precision. final_response = client.query( refined_query_builder, # The "locked" sequence domain QueryTopic().with_name("/localization/log_string"), # Target a specific log topic QueryOntologyCatalog(String.Q.data.match("[ERR]")) # Filter by exact data content pattern ) ``` When a specific set of topics has been identified through a data-driven query (e.g., finding every camera topic that recorded a specific event), you can use `to_query_topic()` to "lock" your next search to those specific data channels. This is particularly useful when you need to verify a condition on a very specific subset of sensors across many sequences, bypassing the need to re-identify those topics in the next step. In the next example, we first find all topics of a specific channel from a specific sequence name pattern, and then search specifically within *those* topics for any instances where the data content matches a specific pattern. ``` from mosaicolabs import MosaicoClient, QueryTopic with MosaicoClient.connect("localhost", 6726) as client: # Broad Search: Find sequences with high-precision GPS initial_response = client.query( QueryTopic().with_name("/localization/log_string"), # Target a specific log topic QuerySequence().with_name_match("test_winter_2025_") # Filter by sequence name pattern ) # Chaining: Use results to "lock" the domain and find specific log-patterns in those sequences if not initial_response.is_empty(): final_response = client.query( initial_response.to_query_topic(), # The "locked" topic domain QueryOntologyCatalog(String.Q.data.match("[ERR]")) # Filter by content ) ``` #### When Chaining is Necessary¶ The previous example of the `GPS.status` query and the subsequent `/localization/log_string` topic search highlight exactly when *query chaining* becomes a technical necessity rather than just a recommendation. In the Mosaico Data Platform, a single `client.query()` call applies a logical **AND** across all provided builders to locate individual **data streams (topics)** that satisfy every condition simultaneously. Because a single topic cannot physically represent two different sensor types at once, such as being both a `GPS` sensor and a `String` log, a monolithic query attempting to filter for both on the same stream will inherently return zero results. Chaining resolves this by allowing you to find the correct **Sequence** context in step one, then "locking" that domain to find a different **Topic** within that same context in step two. ``` # AMBIGUOUS: This looks for ONE topic that is BOTH GPS and String response = client.query( QueryOntologyCatalog(GPS.Q.status.status.eq(DGPS_FIX)), QueryOntologyCatalog(String.Q.data.match("[ERR]")), QueryTopic().with_name("/localization/log_string") ) ``` ## Architecture¶ ### Query Layers¶ Mosaico organizes data into three distinct architectural layers, each with its own specialized Query Builder: #### `QuerySequence` (Sequence Layer)¶ API Reference: `mosaicolabs.models.query.builders.QuerySequence`. Filters recordings based on high-level session metadata, such as the sequence name or the time it was created. **Example** Querying for sequences by name and creation date ``` from mosaicolabs import MosaicoClient, Topic, QuerySequence with MosaicoClient.connect("localhost", 6726) as client: # Search for sequences by project name and creation date qresponse = client.query( QuerySequence() .with_name_match("test_drive") .with_user_metadata("project", eq="Apollo") .with_created_timestamp(time_start=Time.from_float(1690000000.0)) ) # Inspect the response for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") ``` #### `QueryTopic` (Topic Layer)¶ API Reference: `mosaicolabs.models.query.builders.QueryTopic`. Targets specific data channels within a sequence. You can search for topics by name pattern or by their specific Ontology type (e.g., "Find all GPS topics"). **Example** Querying for image topics by ontology tag, metadata key and topic creation timestamp ``` from mosaicolabs import MosaicoClient, Image, Topic, QueryTopic with MosaicoClient.connect("localhost", 6726) as client: # Query for all 'image' topics created in a specific timeframe, matching some metadata (key, value) pair qresponse = client.query( QueryTopic() .with_ontology_tag(Image.ontology_tag()) .with_created_timestamp(time_start=Time.from_float(170000000)) .with_user_metadata("camera_id.serial_number", eq="ABC123_XYZ") ) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") ``` #### `QueryOntologyCatalog` (Ontology Catalog Layer)¶ API Reference: `mosaicolabs.models.query.builders.QueryOntologyCatalog`. Filters based on the **actual time-series content** of the sensors (e.g., "Find events where `acceleration.z` exceeded a specific value"). **Example** Querying for mixed sensor data ``` from mosaicolabs import MosaicoClient, QueryOntologyCatalog, GPS, IMU with MosaicoClient.connect("localhost", 6726) as client: # Chain multiple sensor filters together qresponse = client.query( QueryOntologyCatalog() .with_expression(GPS.Q.status.satellites.geq(8)) .with_expression(Temperature.Q.value.between([273.15, 373.15])) .with_expression(Pressure.Q.value.geq(100000)) ) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") # Filter for a specific component value and extract the first and last occurrence times qresponse = client.query( QueryOntologyCatalog(include_timestamp_range=True) .with_expression(IMU.Q.acceleration.x.lt(-4.0)) .with_expression(IMU.Q.acceleration.y.gt(5.0)) .with_expression(Pose.Q.rotation.z.geq(0.707)) ) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {{topic.name: [topic.timestamp_range.start, topic.timestamp_range.end] for topic in item.topics}}") ``` The Mosaico Query Module offers two distinct paths for defining filters, **Convenience Methods** and **Generic Expression Method**, both of which support **method chaining** to compose multiple criteria into a single query using a logical **AND**. #### Convenience Methods¶ The query layers provide high-level fluent helpers (`with_`), built directly into the query builder classes and designed for ease of use. They allow you to filter data without deep knowledge of the internal model schema. The builder automatically selects the appropriate field and operator (such as exact match vs. substring pattern) based on the method used. ``` from mosaicolabs import QuerySequence, QueryTopic, RobotJoint # Build a filter with name pattern qbuilder = QuerySequence() .with_name_match("test_drive") # Execute the query qresponse = client.query(qbuilder) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") # Build a filter with ontology tag AND a specific creation time window qbuilder = QueryTopic() .with_ontology_tag(RobotJoint.ontology_tag()) .with_created_timestamp(start=t1, end=t2) # Execute the query qresponse = client.query(qbuilder) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") ``` * **Best For**: Standard system-level fields like Names and Timestamps. #### Generic Expression Method¶ The `with_expression()` method accepts raw **Query Expressions** generated through the `.Q` proxy. This provides full access to every supported operator (`.gt()`, `.lt()`, `.between()`, etc.) for specific fields. ``` from mosaicolabs import QueryOntologyCatalog, IMU # Build a filter with deep time-series data discovery and measurement time windowing qresponse = client.query( QueryOntologyCatalog() .with_expression(IMU.Q.acceleration.x.gt(5.0)) .with_expression(IMU.Q.timestamp_ns.gt(1700134567)) ) # Inspect the response if qresponse is not None: # Results are automatically grouped by Sequence for easier data management for item in qresponse: print(f"Sequence: {item.sequence.name}") print(f"Topics: {[topic.name for topic in item.topics]}") ``` * **Used For**: Accessing specific Ontology data fields (e.g., acceleration, position, etc.) in stored time-series data. ### The `.Q` Proxy Mechanism¶ The Query Proxy is the cornerstone of Mosaico's type-safe data discovery. Every data model in the Mosaico Ontology (e.g., `IMU`, `GPS`, `Image`) is automatically injected with a static `.Q` attribute during class initialization. This mechanism transforms static data structures into dynamic, fluent interfaces for constructing complex filters. The proxy follows a three-step lifecycle to ensure that your queries are both semantically correct and high-performance: 1. **Intelligent Mapping**: During system initialization, the proxy inspects the sensor's schema recursively. It maps every nested field path (e.g., `"acceleration.x"`) to a dedicated *queryable* object, i.e. an object providing comparison operators and expression generation methods. 2. **Type-Aware Operators**: The proxy identifies the data type of each field (numeric, string, dictionary, or boolean) and exposes only the operators valid for that type. This prevents logical errors, such as attempting a substring `.match()` on a numeric acceleration value. 3. **Intent Generation**: When you invoke an operator (e.g., `.gt(15.0)`), the proxy generates a `QueryExpression`. This object encapsulates your search intent and is serialized into an optimized JSON format for the platform to execute. To understand how the proxy handles nested structures, inherited attributes, and data types, consider the `IMU` ontology class: ``` class IMU(Serializable): acceleration: Vector3d # Composed type: contains x, y, z angular_velocity: Vector3d # Composed type: contains x, y, z orientation: Optional[Quaternion] = None # Composed type: contains x, y, z, w ``` The `.Q` proxy enables you to navigate the data exactly as it is defined in the model. By following the `IMU.Q` instruction, you can drill down through nested fields and inherited mixins using standard dot notation until you reach a base queryable type. The proxy automatically flattens the hierarchy, assigning the correct queryable type and operators to each leaf node: (API Reference: `mosaicolabs.models.sensors.IMU`) | Proxy Field Path | Queryable Type | Supported Operators (Examples) | | --- | --- | --- | | **`IMU.Q.acceleration.x/y/z`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | | **`IMU.Q.angular_velocity.x/y/z`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | | **`IMU.Q.orientation.x/y/z/w`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | | **`IMU.Q.timestamp_ns`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | | **`IMU.Q.recording_timestamp_ns`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | | **`IMU.Q.frame_id`** | **String** | `.eq()`, `.neq()`, `.match()`, `in_()` | | **`IMU.Q.sequence_id`** | **Numeric** | `.gt()`, `.lt()`, `.geq()`, `.leq()`, `.eq()`, `.between()`, `.in_()` | The following table lists the supported operators for each data type: | Data Type | Operators | | --- | --- | | **Numeric** | `.eq()`, `.neq()`, `.lt()`, `.leq()`, `.gt()`, `.geq()`, `.between()`, `.in_()` | | **String** | `.eq()`, `.neq()`, `.match()` (i.e. substring), `.in_()` | | **Boolean** | `.eq(True/False)` | | **Dictionary** | `.eq()`, `.lt()`, `.leq()`, `.gt()`, `.geq()`, `.between()` | #### Supported vs. Unsupported Types¶ While the `.Q` proxy is highly versatile, it enforces specific rules on which data structures can be queried: * **Supported Types**: The proxy resolves all simple (int, float, str, bool) or composed types (like `Vector3d` or `Quaternion`). It will continue to expose nested fields as long as they lead to a primitive base type. * **Dictionaries**: Dynamic fields, i.e. derived from dictionaries in the ontology models, are fully queryable through the proxy using bracket notation (e.g., `.Q.dict_field["key"]` or `.Q.dict_field["key.subkey.subsubkey"]`). This approach provides the flexibility to search across custom tags and dynamic properties that aren't part of a fixed schema. This dictionary-based querying logic applies to any **custom ontology model** created by the user that contains a `dict` field. + **Syntax**: Instead of the standard dot notation used for fixed fields, you must use square brackets `["key"]` to target specific dictionary entries. + **Nested Access**: For dictionaries containing nested structures, you can use **dot notation within the key string** (e.g., `["environment.visibility"]`) to traverse sub-fields. + **Operator Support**: Because dictionary values are dynamic, these fields are "promiscuous," meaning they support mixed numeric, string, and boolean operators without strict SDK-level type checking. * **Unsupported Types (Lists and Tuples)**: Any field defined as a container, such as a **List** or **Tuple** (e.g., `covariance: List[float]`), is currently skipped by the proxy generator. These fields will not appear in autocomplete and cannot be used in a query expression. ## Constraints & Limitations¶ While fully functional, the current implementation (v0.x) has a **Single Occurrence Constraint**. * **Constraint**: A specific data field path may appear **only once** within a single query builder instance. You cannot chain two separate conditions on the same field (e.g., `.gt(0.5)` and `.lt(1.0)`). ``` # INVALID: The same field (acceleration.x) is used twice in the constructor QueryOntologyCatalog() \ .with_expression(IMU.Q.acceleration.x.gt(0.5)) .with_expression(IMU.Q.acceleration.x.lt(1.0)) # <- Error! Duplicate field path ``` * **Solution**: Use the built-in **`.between([min, max])`** operator to perform range filtering on a single field path. * **Note**: You can still query multiple *different* fields from the same sensor model (e.g., `acceleration.x` and `acceleration.y`) in one builder. ``` # VALID: Each expression targets a unique field path QueryOntologyCatalog( IMU.Q.acceleration.x.gt(0.5), # Unique field IMU.Q.acceleration.y.lt(1.0), # Unique field IMU.Q.angular_velocity.x.between([0, 1]), # Correct way to do ranges include_timestamp_range=True ) ``` --- The **Mosaico ML** module serves as the high-performance bridge between the Mosaico Data Platform and the modern Data Science ecosystem. While the platform is optimized for high-speed raw message streaming, this module provides the abstractions necessary to transform asynchronous sensor data into tabular formats compatible with **Physical AI**, **Deep Learning**, and **Predictive Analytics**. Working with robotics and multi-modal datasets presents three primary technical hurdles that the ML module is designed to solve: * **Heterogeneous Sampling**: Sensors like LIDAR (low frequency), IMU (high frequency), and GPS (intermittent) operate at different rates. * **High Volume**: Datasets often exceed the available system RAM. * **Nested Structures**: Robotics data is typically deeply nested with coordinate transformations and covariance matrices. ## From Sequences to DataFrames¶ API Reference: `mosaicolabs.ml.DataFrameExtractor` The `DataFrameExtractor` is a specialized utility designed to convert Mosaico sequences into tabular formats. Unlike standard streamers that instantiate individual Python objects, this extractor operates at the **Batch Level** by pulling raw `RecordBatch` objects directly from the underlying stream to maximize throughput. ### Key Technical Features¶ * **Recursive Flattening**: Automatically "unpacks" deeply nested Mosaico Ontology structures into primitive columns. * **Semantic Naming**: Columns use a `{topic_name}.{ontology_tag}.{field_path}` convention (e.g., `/front/camera/imu.imu.acceleration.x`) to remain self-describing. * **Namespace Isolation**: Topic names are included in column headers to prevent collisions when multiple sensors of the same type are present. * **Memory-Efficient Windowing**: Uses a generator-based approach to yield data in time-based "chunks" (e.g., 5-second windows) while handling straddling batches via a carry-over buffer. * **Sparse Merging**: Creates a "sparse" DataFrame containing the union of all timestamps, using `NaN` for missing sensor readings at specific intervals. This example demonstrates iterating through a sequence in 10-second tabular chunks. ``` from mosaicolabs import MosaicoClient from mosaicolabs.ml import DataFrameExtractor with MosaicoClient.connect("localhost", 6726): # Initialize from an existing SequenceHandler seq_handler = client.sequence_handler("drive_session_01") extractor = DataFrameExtractor(seq_handler) # Iterate through 10-second chunks for df in extractor.to_pandas_chunks(window_sec=10.0): # 'df' is a pandas DataFrame with semantic columns # Example: df["/front/camera/imu.imu.acceleration.x"] print(f"Processing chunk with {len(df)} rows") ``` For complex types like images that require specialized decoding, Mosaico allows you to "inflate" a flattened DataFrame row back into a strongly-typed `Message` object. ``` from mosaicolabs import MosaicoClient from mosaicolabs.ml import DataFrameExtractor from mosaicolabs.models import Message, Image with MosaicoClient.connect("localhost", 6726): # Initialize from an existing SequenceHandler seq_handler = client.sequence_handler("drive_session_01") extractor = DataFrameExtractor(seq_handler) # Get data chunks for df in extractor.to_pandas_chunks(topics=["/sensors/front/image_raw"]): for _, row in df.iterrows(): # Reconstruct the full Message (envelope + payload) from a row img_msg = Message.from_dataframe_row( row=row, topic_name="/sensors/front/image_raw", ) if img_msg: img = img_msg.get_data(Image).to_pillow() # Access typed fields with IDE autocompletion print(f"Time: {img_msg.timestamp_ns}") img.show() ``` ## Sparse to Dense Representation¶ API Reference: `mosaicolabs.ml.SyncTransformer` The `SyncTransformer` is a temporal resampler designed to solve the **Heterogeneous Sampling** problem inherent in robotics and Physical AI. It aligns multi-rate sensor streams (for example, an IMU at 100Hz and a GPS at 5Hz) onto a uniform, fixed-frequency grid to prepare them for machine learning models. The `SyncTransformer` operates as a processor that bridges the gaps between windowed chunks yielded by the `DataFrameExtractor`. Unlike standard resamplers that treat each data batch in isolation, this transformer maintains internal state to ensure signal continuity across batch boundaries. ### Key Design Principles¶ * **Stateful Continuity**: It maintains an internal cache of the last known sensor values and the next expected grid tick, allowing signals to bridge the gap between independent DataFrame chunks. * **Semantic Integrity**: It respects the physical reality of data acquisition by yielding `None` for grid ticks that occur before a sensor's first physical measurement, avoiding data "hallucination". * **Vectorized Performance**: Internal kernels leverage high-speed lookups for high-throughput processing. * **Protocol-Based Extensibility**: The mathematical logic for resampling is decoupled through a `SynchPolicy` protocol, allowing for custom kernel injection. ### Implemented Synchronization Policies¶ API Reference: `mosaicolabs.ml.SyncPolicy` Each policy defines a specific logic for how the transformer bridges temporal gaps between sparse data points. #### 1. **`SyncHold`** (Last-Value-Hold)¶ * **Behavior**: Finds the most recent valid measurement and "holds" it constant until a new one arrives. * **Best For**: Sensors where states remain valid until explicitly changed, such as robot joint positions or battery levels. #### 2. **`SyncAsOf`** (Staleness Guard)¶ * **Behavior**: Carries the last known value forward only if it has not exceeded a defined maximum "tolerance" (fresher than a specific age). * **Best For**: High-speed signals that become unreliable if not updated frequently, such as localization coordinates. #### 3. **`SyncDrop`** (Interval Filter)¶ * **Behavior**: Ensures a grid tick only receives a value if a new measurement actually occurred within that specific grid interval; otherwise, it returns `None`. * **Best For**: Downsampling high-frequency data where a strict 1-to-1 relationship between windows and unique hardware events is required. ### Scikit-Learn Compatibility¶ By implementing the standard `fit`/`transform` interface, the `SyncTransformer` makes robotics data a "first-class citizen" of the Scikit-learn ecosystem. This allows for the plug-and-play integration of multi-rate sensor data into standard pipelines. ``` from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from mosaicolabs import MosaicoClient from mosaicolabs.ml import DataFrameExtractor, SyncTransformer, SynchHold # Define a pipeline for physical AI preprocessing pipeline = Pipeline([ ('sync', SyncTransformer(target_fps=30.0, policy=SynchHold())), ('scaler', StandardScaler()) ]) with MosaicoClient.connect("localhost", 6726): # Initialize from an existing SequenceHandler seq_handler = client.sequence_handler("drive_session_01") extractor = DataFrameExtractor(seq_handler) # Process sequential chunks while maintaining signal continuity for sparse_chunk in extractor.to_pandas_chunks(window_sec=5.0): # The transformer automatically carries state across sequential calls normalized_dense_chunk = pipeline.transform(sparse_chunk) ``` --- The **ROS Bridge** module serves as the ingestion gateway for ROS (Robot Operating System) data into the Mosaico Data Platform. Its primary function is to solve the interoperability challenges associated with ROS bag files—specifically format fragmentation (ROS 1 `.bag` vs. ROS 2 `.mcap`/`.db3`) and the lack of strict schema enforcement in custom message definitions. API-Keys When the connection is established via the authorization middleware (i.e. using an API-Key), the ROS Ingestion employs the mosaico Writing Workflow, which is allowed only if the key has at least `APIKeyPermissionEnum.Write` permission. The core philosophy of the module is **"Adaptation, Not Just Parsing."** Rather than simply extracting raw dictionaries from ROS messages, the bridge actively translates them into the standardized **Mosaico Ontology**. For example, a `geometry_msgs/Pose` is validated, normalized, and instantiated as a strongly-typed `mosaicolabs.models.data.Pose` object before ingestion. Try-It Out You can experiment yourself the ROS Bridge ingestion via the **ROS Ingestion Example**. ## Architecture¶ The module is composed of four distinct layers that handle the pipeline from raw file access to server transmission. ### The Loader (`ROSLoader`)¶ The `ROSLoader` acts as the abstraction layer over the physical bag files. It utilizes the `rosbags` library to provide a unified interface for reading both ROS 1 and ROS 2 formats (`.bag`, `.db3`, `.mcap`). * **Responsibilities:** File I/O, raw deserialization, and topic filtering (supporting glob patterns like `/cam/*`). * **Error Handling:** It implements configurable policies (`IGNORE`, `LOG_WARN`, `RAISE`) to handle corrupted messages or deserialization failures without crashing the entire pipeline. ### The Orchestrator (`RosbagInjector`)¶ The **`RosbagInjector`** is the central command center of the ROS Bridge module. It is designed to be the primary entry point for developers who want to embed high-performance ROS ingestion directly into their Python applications or automation scripts. The ingestor orchestrates the interaction between the **`ROSLoader`** (file access), the **`ROSBridge`** (data adaptation), and the **`MosaicoClient`** (network transmission). It handles the complex lifecycle of a data upload—including connection management, batching, and transaction safety—while providing real-time feedback through a visual CLI interface. #### Core Workflow Execution: `run()`¶ The `run()` method is the heart of the ingestor. When called, it initiates a multi-phase pipeline: 1. **Handshake & Registry**: Establishes a connection to the Mosaico server and registers any provided custom `.msg` definitions into the global `ROSTypeRegistry`. 2. **Sequence Creation**: Requests the server to initialize a new data sequence based on the provided name and metadata. 3. **Adaptive Streaming**: Iterates through the ROS bag records. For each message, it identifies the correct adapter, translates the ROS dictionary into a Mosaico object, and pushes it into an optimized asynchronous write buffer. 4. **Transaction Finalization**: Once the bag is exhausted, it flushes all remaining buffers and signals the server to commit the sequence. #### Configuring the Ingestion¶ The behavior of the ingestor is entirely driven by the **`ROSInjectionConfig`**. This configuration object ensures that the ingestion logic is decoupled from the user interface, allowing for consistent behavior whether triggered via the CLI or a complex script. #### Practical Example: Programmatic Usage¶ ``` from pathlib import Path from mosaicolabs import SessionLevelErrorPolicy, TopicLevelErrorPolicy from mosaicolabs.ros_bridge import RosbagInjector, ROSInjectionConfig, Stores def run_injection(): # Define the Injection Configuration # This data class acts as the single source for the operation. config = ROSInjectionConfig( # Input Data file_path=Path("data/session_01.db3"), # Target Platform Metadata sequence_name="test_ros_sequence", metadata={ "driver_version": "v2.1", "weather": "sunny", "location": "test_track_A" }, # Topic Filtering (supports glob patterns) # This will only upload topics starting with '/cam' topics=["/cam*"], # ROS Configuration # Specifying the distro ensures correct parsing of standard messages # (.db3 sqlite3 rosbags need the specification of distro) ros_distro=Stores.ROS2_HUMBLE, # Custom Message Registration # Register proprietary messages before loading to prevent errors custom_msgs=[ ( "my_custom_pkg", # ROS Package Name Path("./definitions/my_pkg/"), # Path to directory containing .msg files Stores.ROS2_HUMBLE, # Scope (valid for this distro) ) # registry will automatically infer type names as `my_custom_pkg/msg/{filename}` ], # Adapter Overrides # Use specific adapters for designated topics instead of the default. # In this case, instead to use PointCloudAdapter for depth camera, # MyCustomRGBDAdapter will be used for the specified topic. adapter_override={ "/camera/depth/points": MyCustomRGBDAdapter, }, # Execution Settings log_level="WARNING", # Reduce verbosity for automated scripts # Session Level Error Handling on_error=SessionLevelErrorPolicy.Report # Report the error and terminate the session # Topic Level Error Handling topics_on_error=TopicLevelErrorPolicy.Raise # Re-raise any exception ) # Instantiate the Controller ingestor = RosbagInjector(config) # Execute # The run method handles connection, loading, and uploading automatically. # It raises exceptions for fatal errors, allowing you to wrap it in try/except blocks. try: ingestor.run() print("Injection job completed successfully.") except Exception as e: print(f"Injection job failed: {e}") # Use as script or call the injection function in your code if __name__ == "__main__": run_injection() ``` ### The Adaptation Layer (`ROSBridge` & Adapters)¶ This layer represents the default semantic core of the module, translating raw ROS data into the Mosaico Ontology. * **`ROSAdapterBase`:** An abstract base class that establishes the **default** contracts for converting specific ROS message types into their corresponding Mosaico Ontology types. * **Concrete Adapters:** The library provides built-in implementations for common standards, such as `IMUAdapter` (mapping `sensor_msgs/Imu` to `IMU`) and `ImageAdapter` (mapping `sensor_msgs/Image` to `Image`). These adapters include advanced logic for recursive unwrapping, automatically extracting data from complex nested wrappers like `PoseWithCovarianceStamped`. Developers can also implement custom adapters to handle non-standard or proprietary types. * **`ROSBridge`:** A central registry and dispatch mechanism that maps ROS message type strings (e.g., `sensor_msgs/msg/Imu`) to their corresponding default adapter classes, ensuring the correct translation logic is applied for each message. #### Extending the Bridge (Custom Adapters)¶ Users can extend the bridge to support new ROS message types by implementing a custom adapter and registering it. 1. **Inherit from `ROSAdapterBase`**: Define the input ROS type string and the target Mosaico Ontology type. 2. **Implement `from_dict`**: Define the logic to convert the `ROSMessage.data` dictionary into an intance of the target ontology object. 3. **Register**: Decorate the class with `@register_default_adapter`. ``` from mosaicolabs.ros_bridge import ROSAdapterBase, register_default_adapter, ROSMessage from mosaicolabs.models import Message from my_ontology import MyCustomData # Assuming this class exists @register_default_adapter class MyCustomAdapter(ROSAdapterBase[MyCustomData]): ros_msgtype = "my_pkg/msg/MyCustomType" __mosaico_ontology_type__ = MyCustomData @classmethod def from_dict(cls, ros_data: dict, **kwargs) -> MyCustomData: # Transformation logic here return MyCustomData(...) ``` #### Override Adapters¶ This section explains how to extend the bridge's capabilities by implementing and registering Override Adapters. ##### Overriding and Extending Adapters¶ While the ROS Bridge provides a robust set of default adapters for standard message types, real-world robotics often involve proprietary message definitions or non-standard uses of common types. Through the **`adapter_override`** parameter in the `ROSInjectionConfig`, you can explicitly map a specific topic to a chosen adapter. This is particularly useful for types like `sensor_msgs/msg/PointCloud2`, where, for example, different LiDAR vendors may encode data in unique ways that require specialized parsing logic. Override adapter usage Use adapter overrides for versatile message types like `sensor_msgs/msg/PointCloud2`, where different sensors (LiDAR, Radar, etc.) share the same ROS type but require unique parsing logic. Overrides should be defined and used when a given ROS message type has its own **default adapter** registered in the `ROSBridge` registry, but such an adapter cannot satisfy topic-specific requirements. If your message type is used consistently across all topics, simply use the `@register_default_adapter` decorator to establish a global fallback. ##### Implementing a Custom Adapter Override¶ To create a custom adapter, you must inherit from `ROSAdapterBase` and define the transformation logic. Here is a structural example of a custom LiDAR adapter; note that the adapter is not registered as **default**, since the message type `sensor_msgs/msg/PointCloud2` already has it: ``` from typing import Any, Optional, Type, Tuple from mosaicolabs.ros_bridge import ROSAdapterBase, ROSMessage from mosaicolabs.models import Message from my_ontology import MyLidar # Your target Ontology class class MyCustomLidarAdapter(ROSAdapterBase[MyLidar]): # Define which ROS type this adapter handles ros_msgtype: str | Tuple[str, ...] = "sensor_msgs/msg/PointCloud2" # Define the target Mosaico Ontology class __mosaico_ontology_type__: Type[MyLidar] = MyLidar @classmethod def translate( cls, ros_msg: ROSMessage, **kwargs: Any, ) -> Message: """ Optional: Override the high-level translation if you need to manipulate the ROSMessage envelope before processing. """ # Optionally add pre/post processing logic around the base translation. return super().translate(ros_msg, **kwargs) @classmethod def from_dict(cls, ros_data: dict) -> MyLidar: """ The primary transformation logic. Converts the deserialized ROS dictionary into a Mosaico object. """ # Core transformation logic: map raw ROS fields to your ontology type. return MyLidar( # ... map ros_data fields to MyLidar fields ) @classmethod def schema_metadata(cls, ros_data: dict, **kwargs: Any) -> Optional[dict]: """ Optional: Extract specific metadata from the ROS message to be stored in the Mosaico schema registry. """ return None ``` ##### Key Methods¶ **`from_dict(ros_data)`**: This is the most important method. It receives the ROS message as a Python dictionary and must return an instance of your target Mosaico Ontology class. **`translate(ros_msg)`**: This is the entry point called by the ROSBridge. It just calls `from_dict`, but you can override it if you need access to the message metadata during conversion. **`schema_metadata(ros_data)`**: Use this to extract metadata information about the sensor configuration that is useful for the platform to know. ##### Registering the Override¶ Once implemented, the adapter is registered against a specific topic via `adapter_override` in ROSInjectionConfig: ``` from .my_adapter import MyCustomLidarAdapter ... config = ROSInjectionConfig( file_path=Path("sensor_data.mcap"), sequence_name="custom_lidar_run", # Explicitly tell the bridge to use your custom adapter for this topic adapter_override={ "/lidar/front/pointcloud": MyCustomLidarAdapter, } ) ... ingestor = RosbagInjector(config) ingestor.run() ``` With this configuration, all the `sensor_msgs/msg/PointCloud2` message received on `/lidar/front/pointcloud`, will be processed exclusively by `MyCustomLidarAdapter`. All other topics continue to use the standard resolution logic. By using this pattern, you can maintain a clean separation between your raw ROS data and your high-level Mosaico data models, ensuring that even the most "exotic" sensor data is correctly ingested and indexed. #### CLI Usage¶ The module includes a command-line interface for quick ingestion tasks. The full list of options can be retrieved by running `mosaicolabs.ros_injector -h` ``` # Basic Usage mosaicolabs.ros_injector ./data.mcap --name "Test_Run_01" # Advanced Usage: Filtering topics and adding metadata mosaicolabs.ros_injector ./data.db3 \ --name "Test_Run_01" \ --topics /camera/front/* /gps/fix \ --metadata ./metadata.json \ --ros-distro ros2_humble ``` ### The Type Registry (`ROSTypeRegistry`)¶ The **`ROSTypeRegistry`** is a context-aware singleton designed to manage the schemas required to decode ROS data. ROS message definitions are frequently external to the data files themselves—this is especially true for ROS 2 `.db3` (SQLite) formats and proprietary datasets containing custom sensors. Without these definitions, the bridge cannot deserialize the raw binary "blobs" into readable dictionaries. * **Schema Resolution**: It allows the `ROSLoader` to resolve custom `.msg` definitions on-the-fly during bag playback. * **Version Isolation (Stores)**: ROS messages often vary across distributions (e.g., a "Header" in ROS 1 Noetic is structurally different from ROS 2 Humble). The registry uses a "Profile" system to store these version-specific definitions separately, preventing cross-distribution conflicts. * **Global vs. Scoped Definitions**: You can register definitions **Globally** (available to all loaders) or **Scoped** to a specific distribution. #### Pre-loading Definitions¶ While you can pass custom messages via `ROSInjectionConfig`, it can become cumbersome for large-scale projects with hundreds of proprietary types. The recommended approach is to pre-load the registry at the start of your application. This makes the definitions available to all subsequent loaders automatically. | Method | Scope | Description | | --- | --- | --- | | **`register(...)`** | Single Message | Registers a single custom type. The source can be a path to a `.msg` file or a raw string containing the definition. | | **`register_directory(...)`** | Batch Package | Scans a directory for all `.msg` files and registers them under a specific package name (e.g., `my_pkg/msg/Sensor`). | | **`get_types(...)`** | Internal | Implements a "Cascade" logic: merges Global definitions with distribution-specific overrides for a loader. | | **`reset()`** | Utility | Clears all stored definitions. Primarily used for unit testing to ensure process isolation. | #### Centralized Registration Example¶ A clean way to manage large projects is to centralize your message registration in a single setup function (e.g., `setup_registry.py`): ``` from pathlib import Path from mosaicolabs.ros_bridge import ROSTypeRegistry, Stores def initialize_project_schemas(): # 1. Register a proprietary message valid for all ROS versions ROSTypeRegistry.register( msg_type="common_msgs/msg/SystemHeartbeat", source=Path("./definitions/Heartbeat.msg") ) # 2. Batch register an entire package for ROS 2 Humble ROSTypeRegistry.register_directory( package_name="robot_v3_msgs", dir_path=Path("./definitions/robot_v3/msgs"), store=Stores.ROS2_HUMBLE ) ``` Once registered, the `RosbagInjector` (and the underlying `ROSLoader`) automatically detects and uses these definitions. There is no longer the need to pass the `custom_msgs` list in the `ROSInjectionConfig`. ``` # main_injection.py import setup_registry # Runs the registration logic above from mosaicolabs.ros_bridge import RosbagInjector, ROSInjectionConfig, Stores from pathlib import Path # Initialize registry setup_registry.initialize_project_schemas() # Configure injection WITHOUT listing custom messages again config = ROSInjectionConfig( file_path=Path("mission_data.mcap"), sequence_name="mission_01", metadata={"operator": "Alice"}, ros_distro=Stores.ROS2_HUMBLE, # Loader will pull the Humble-specific types we registered # custom_msgs=[] <-- No longer needed! ) ingestor = RosbagInjector(config) ingestor.run() ``` ### Testing & Validation¶ The ROS Bag Injection module has been validated against a variety of standard datasets to ensure compatibility with different ROS distributions, message serialization formats (CDR/ROS 1), and bag container formats (`.bag`, `.mcap`, `.db3`). #### Recommended Dataset for Verification¶ For evaluating Mosaico capabilities, we recommend the **NVIDIA NGC Catalog - R2B Dataset 2024**. This dataset has been verified to be fully compatible with the injection pipeline. The following table details the injection performance for the **NVIDIA R2B Dataset 2024**. These benchmarks were captured on a system running **macOS 26.2** with an **Apple M2 Pro (10 cores, 16GB RAM)**. #### NVIDIA R2B Dataset 2024 Injection Performance¶ | Sequence Name | Compression Factor | Injection Time | Hardware Architecture | Notes | | --- | --- | --- | --- | --- | | **`r2b_galileo2`** | ~70% | ~40 sec | Apple M2 Pro (16GB) | High compression achieved for telemetry data. | | **`r2b_galileo`** | ~1% | ~30 sec | Apple M2 Pro (16GB) | Low compression due to pre-compressed source images. | | **`r2b_robotarm`** | ~66% | ~50 sec | Apple M2 Pro (16GB) | High efficiency for high-frequency state updates. | | **`r2b_whitetunnel`** | ~1% | ~30 sec | Apple M2 Pro (16GB) | Low compression; contains topics with no available adapter. | #### Understanding Performance Factors¶ * **Compression Factors**: Sequences like `r2b_galileo2` achieve high ratios (~70%) because Mosaico optimizes the underlying columnar storage for scalar telemetry. Conversely, sequences with pre-compressed video feeds show minimal gains (~1%) because the data is already in a dense format. * **Injection Time**: This metric includes the overhead of local MCAP/DB3 deserialization via `ROSLoader`, semantic translation through the `ROSBridge`, and the asynchronous transmission to the Mosaico server. * **Hardware Impact**: On the **Apple M2 Pro**, the `RosbagInjector` utilizes multi-threading for the **Adaptation Layer**, allowing serialization tasks to run in parallel while the main thread manages the Flight stream. #### Known Issues & Limitations¶ While the underlying `rosbags` library supports the majority of standard ROS 2 bag files, specific datasets with non-standard serialization alignment or proprietary encodings may encounter compatibility issues. **NVIDIA Isaac ROS Benchmark Dataset (2023)** * **Source:** NVIDIA NGC Catalog - R2B Dataset 2023 * **Issue:** Deserialization failure during ingestion. * **Technical Details:** The ingestion process fails within the `AnyReader.deserialize` method of the `rosbags` library. The internal CDR deserializer triggers an assertion error indicating a mismatch in the expected data length vs. the raw payload size. * **Error Signature:** ``` # In rosbags.serde.cdr: assert pos + 4 + 3 >= len(rawdata) ``` * **Recommendation:** This issue originates in the upstream parser handling of this specific dataset's serialization alignment. It is currently recommended to exclude this dataset or transcode it using standard ROS 2 tools before ingestion. ## Supported Message Types ***ROS-Specific Data Models*** In addition to mapping standard ROS messages to the core Mosaico ontology, the `ros-bridge` module implements two specialized data models. These are defined specifically for this module to handle ROS-native concepts that are not yet part of the official Mosaico standard: * **`FrameTransform`**: Designed to handle coordinate frame transformations (modeled after `tf2_msgs/msg/TFMessage`). It encapsulates a list of `Transform` objects to manage spatial relationships. * **`BatteryState`**: Modeled after `sensor_msgs/msg/BatteryState`), this class captures comprehensive power supply metrics. It includes core data (voltage, current, capacity, percentage) and detailed metadata such as power supply health, technology status, and individual cell readings. * **`PointCloud2`**: Modeled after `sensor_msgs/msg/PointCloud2`, this class captures raw point cloud data including field layout, endianness, and binary payload. It includes the companion `PointField` model to describe each data channel (e.g., `x`, `y`, `z`, `intensity`). > **Note:** Although these are provisional additions, both `FrameTransform`, `BatteryState`, and `PointCloud2` inherit from `Serializable`. This ensures they remain fully compatible with Mosaico’s existing serialization infrastructure. ### Supported Message Types Table | ROS Message Type | Mosaico Ontology Type | Adapter | | --- | --- | --- | | `geometry_msgs/msg/Pose`, `PoseStamped`... | `Pose` | `PoseAdapter` | | `geometry_msgs/msg/Twist`, `TwistStamped`... | `Velocity` | `TwistAdapter` | | `geometry_msgs/msg/Accel`, `AccelStamped`... | `Acceleration` | `AccelAdapter` | | `geometry_msgs/msg/Vector3`, `Vector3Stamped` | `Vector3d` | `Vector3Adapter` | | `geometry_msgs/msg/Point`, `PointStamped` | `Point3d` | `PointAdapter` | | `geometry_msgs/msg/Quaternion`, `QuaternionStamped` | `Quaternion` | `QuaternionAdapter` | | `geometry_msgs/msg/Transform`, `TransformStamped` | `Transform` | `TransformAdapter` | | `geometry_msgs/msg/Wrench`, `WrenchStamped` | `ForceTorque` | `WrenchAdapter` | | `nav_msgs/msg/Odometry` | `MotionState` | `OdometryAdapter` | | `nmea_msgs/msg/Sentence` | `NMEASentence` | `NMEASentenceAdapter` | | `sensor_msgs/msg/Image`, `CompressedImage` | `Image`, `CompressedImage` | `ImageAdapter`, `CompressedImageAdapter` | | `sensor_msgs/msg/Imu` | `IMU` | `IMUAdapter` | | `sensor_msgs/msg/NavSatFix` | `GPS`, `GPSStatus` | `GPSAdapter`, `NavSatStatusAdapter` | | `sensor_msgs/msg/CameraInfo` | `CameraInfo` | `CameraInfoAdapter` | | `sensor_msgs/msg/RegionOfInterest` | `ROI` | `ROIAdapter` | | `sensor_msgs/msg/JointState` | `RobotJoint` | `RobotJointAdapter` | | `sensor_msgs/msg/BatteryState` | `BatteryState` (ROS-specific) | `BatteryStateAdapter` | | `std_msgs/msg/String` | `String` | `_GenericStdAdapter` | | `std_msgs/msg/Int8(16,32,64)` | `Integer8(16,32,64)` | `_GenericStdAdapter` | | `std_msgs/msg/UInt8(16,32,64)` | `Unsigned8(16,32,64)` | `_GenericStdAdapter` | | `std_msgs/msg/Float32(64)` | `Floating32(64)` | `_GenericStdAdapter` | | `std_msgs/msg/Bool` | `Boolean` | `_GenericStdAdapter` | | `tf2_msgs/msg/TFMessage` | `FrameTransform` (ROS-specific) | `FrameTransformAdapter` | | `sensor_msgs/msg/PointCloud2` | `PointCloud2` (ROS-specific) | `PointCloudAdapter` | --- # Contributing¶ If you plan to contribute to the codebase, you need to set up the pre-commit hooks in addition to the standard installation. These hooks enforce code quality checks automatically on every commit. ## Prerequisites¶ * **Python:** Version **3.10** or newer is required. * **Poetry:** For package management. ## Development Setup¶ Clone the repository and navigate to the SDK directory: ``` cd mosaico/mosaico-sdk-py ``` Install dependencies **and** register the pre-commit hooks in a single step: ``` poetry install && poetry run pre-commit install ``` The second command installs the Git hook under `.git/hooks/pre-commit`, wiring it to the rules defined in `.pre-commit-config.yaml`. From that point on, every `git commit` will automatically run **Ruff** (linting and formatting) against your staged files — the commit is blocked if any check fails, keeping the codebase consistently clean. > **Why this matters:** Skipping `pre-commit install` means your commits will bypass all quality gates. CI will catch the issues anyway, but fixing them after the fact is more disruptive than catching them locally before pushing. ## What the hooks do¶ The `.pre-commit-config.yaml` currently configures the following actions via Ruff: * **Linting** — detects common errors, unused imports, and style violations * **Formatting** — auto-formats code to a consistent style ## Running checks manually¶ You can trigger the hooks on demand without committing: ``` # Run on all files poetry run pre-commit run --all-files # Run on staged files only poetry run pre-commit run ``` ## Verify the hook is installed¶ After setup, confirm the hook is in place: ``` ls .git/hooks/pre-commit ``` You should see the file present. If it is missing, re-run `poetry run pre-commit install`. --- # Custom Actions¶ Mosaico implements its own administrative protocols directly on top of Apache Arrow Flight. Rather than relying on a separate control channel abstraction, Mosaico leverages the Flight `DoAction` RPC mechanism to handle discrete lifecycle events, administrative interfaces, and resource management. Unlike streaming endpoints designed for continuous data throughput, these custom actions manage the platform's overarching state. While individual calls are synchronous, they often initiate or conclude multi-step processes, such as topic upload, that govern the long-term integrity of data within the platform. All custom actions follow a standardized pattern: they expect a JSON-serialized payload defining the request parameters and return a JSON-serialized response containing the result. ## Sequence Management¶ Sequences are the fundamental containers for data recordings in Mosaico. These custom actions enforce a strict lifecycle state machine to guarantee data integrity. | Action | Description | Permission | | --- | --- | --- | | `sequence_create` | Initializes a new, empty sequence. | `write` | | `sequence_delete` | Permanently removes a sequence from the platform. | `delete` | ## Topic Management¶ Topics represent the individual sensor streams (e.g., `camera/front`, `gps`) contained within a sequence. | Action | Description | Permission | | --- | --- | --- | | `topic_create` | Registers a new topic. | `write` | | `topic_delete` | Removes a specific topic from a sequence. | `delete` | ## Session Management¶ Uploading data to the platform is made through sessions. Within a session it is possible to load one or more topics. Once closed, it becomes immutable. | Action | Description | Permission | | --- | --- | --- | | `session_create` | Start a new upload session. | `write` | | `session_finalize` | Moves the session status from *uploading* to *archived*. This action locks the session, marking it as immutable. Once finalized, no further data can be added or modified. | `write` | | `session_delete` | Removes a specific session and all its data. | `delete` | ## Notification System¶ The platform includes a tagging mechanism to attach alerts or informational messages to resources. For example, if an exception is raised during an upload, the notification system automatically registers the event, ensuring the failure is logged and visible for troubleshooting. | Action | Description | Permission | | --- | --- | --- | | `*_notification_create` | Attaches a notification to a Sequence or Topic, such as logging an error or status update. | `write` | | `*_notification_list` | Retrieves the history of active notifications for a resource, allowing clients to review alerts. | `read` | | `*_notification_purge` | Clears the notification history for a resource, useful for cleanup after resolution. | `delete` | Here, `*` can be either `sequence` or `topic`. ## Query¶ | Action | Description | Permission | | --- | --- | --- | | `query` | This action serves as the gateway to the query system. It accepts a complex filter object and returns a list of resources that match the criteria. | `read` | ## Misc¶ | Action | Description | Permission | | --- | --- | --- | | `version` | Retrieves the current daemon version. | `read` | --- # API keys¶ In Mosaico, access control is managed through **API keys**. An API key securely binds a unique token to a specific set of permissions. Scope of access It is important to note that API keys in Mosaico act as a *loose, coarse-grained access mechanism*. They are designed strictly to limit the *types of operations* a user can perform across the platform as a whole. This mechanism *does not support fine-grained policies*. You cannot use an API key to restrict access to specific resources, such as individual topics or sequences. For example, if an API key is granted the `read` permission, the client is allowed to read data globally across the entire platform, rather than being restricted to a single topic or a specific subset of data. ## Properties¶ Each API key in Mosaico acts as a single access control rule and consists of the following properties: | Property | Status | Description | | --- | --- | --- | | **Token** | Required | The unique token provided by the client to authenticate with the Mosaico platform. | | **Permissions** | Required | Which privileges (e.g., `read`, `write`) are granted through the API key. | | **Description** | Required | A human-readable text string explaining the specific purpose or use case of the policy. | | **Creation Time** | Auto-generated | The exact timestamp when the API key and its associated policy were generated. | | **Expiration Time** | Optional | A predetermined date and time after which the API key automatically becomes invalid. | ## Token Structure¶ Mosaico API key token follow a strict three-part format separated by underscores (`_`): ``` [HEADER]_[PAYLOAD]_[FINGERPRINT] ``` **Example** ``` msco_vrfeceju4lqivysxgaseefa3tsxs0vrl_1b676530 ``` **Header**. A fixed prefix that easily identifies the token as a Mosaico API key. This helps developers quickly spot Mosaico tokens in configuration files or environment variables. **Payload**. The actual secret token. It is a 32-byte string consisting entirely of lowercase, alphanumeric ASCII characters. **This payload must be kept secret and should never be exposed in client-side code.** **Fingerprint**. An hash of the payload. Mosaico uses the fingerprint to identify the key within the system without needing to handle or log the raw secret payload. It is primarily used for administrative actions, such as checking the key's status or revoking it. ## Available Permissions¶ Permissions dictate the exact global operations an API key can execute. | Permission | Action Allowed | Typical Use Case | | --- | --- | --- | | `read` | Retrieve and view data from anywhere on the Mosaico platform. | Front-end dashboards, data analytics integrations, or public-facing read-only APIs. | | `write` | Create new data or update existing data anywhere on the platform. | Ingestion scripts, user input forms, or webhooks pushing data to Mosaico. | | `delete` | Permanently remove data from the platform. | Data lifecycle management, GDPR compliance scripts, or cleanup tasks. | | `manage` | Perform administrative operations on the platform. | Rotating/revoking API keys, managing users, or running automated maintenance tasks. | Mosaico follows a hierarchical structure between them. Each permission automatically inherits all the privileges of the previous one (e.g. `write` has also `read` privileges, `manage` inherits `read`, `write` and `delete` privileges). --- # CLI Reference¶ ## mosaicod run¶ Start the server locally ``` mosaicod run [OPTIONS] ``` ### Options¶ | Option | Default | Description | | --- | --- | --- | | `--host ` | `127.0.0.1` | Specify a host address. | | `--port ` | `6726` | Port to listen on. | | `--local-store ` | `None` | Enable storage of objects on the local filesystem at the specified directory path. | | `--tls` | `false` | Enable TLS. When enabled, the following envirnoment variables needs to be set `MOSAICOD_TLS_CERT_FILE` and `MOSAICOD_TLS_PRIVATE_KEY_FILE` | | `--api-key` | `false` | Require API keys to operate. When enabled the system will require API keys to perform any actions. | ## mosaicod api-key¶ Manage API keys. ### Subcommands¶ | Command | Description | | --- | --- | | `create` | Create a new API key with a custom scope | | `revoke` | Revoke an existing API key | | `status` | Check the status of an API key | | `list` | List all API keys | ### mosaicod api-key create¶ Create a new API key. ``` mosaicod api-key create --permission [read|write|delete|manage] [OPTIONS] ``` | Option | Default | Description | | --- | --- | --- | | `-d, --description` | | Set a description for the API key to make it easily recognizable. | | `--expires-in ` | | Define a time duration, using the ISO8601 format, after which the key in no longer valid (e.g. `P1Y2M3D` 1 year 2 months and 3 days) | | `--expires-at ` | | Define a datetime, using the rfc3339 format, after which the key in no longer valid (e.g `2026-03-27T12:20:00Z`) | ### mosaicod api-key revoke¶ Revoke an existing API key. ``` mosaicod api-key revoke ``` The fingerprint are the last 8 digits of the API key. ### mosaicod api-key status¶ Check the status of an API key. ``` mosaicod api-key status ``` The fingerprint are the last 8 digits of the API key. ### mosaicod api-key list¶ List all API keys. ``` mosaicod api-key list ``` ## Common Options¶ Each `mosaicod` command shares the following common options: | Options | Default | Description | | --- | --- | --- | | `--log-format ` | `pretty` | Set the log output format. Available values are: `json`, `pretty`, `plain` | | `--log-level ` | `warning` | Set the log level. Possible values: warning, info, debug | --- The **Mosaico Daemon**, a.k.a. `mosaicod`, acts as engine of the data platform. Developed in **Rust**, it is engineered to be the high-performance arbiter for all data interactions, guaranteeing that every byte of robotics data is strictly typed, atomically stored, and efficiently retrievable. It functions on a standard client-server model, mediating between your high-level applications (via the SDKs) and the low-level storage infrastructure. ## Architectural Design¶ `mosaicod` is architected atop the Apache Arrow Flight protocol. Apache Arrow Flight is a general-purpose, high-performance client-server framework developed for the exchange of massive datasets. It operates directly on Apache Arrow columnar data, enabling efficient transport over gRPC without the overhead of serialization. Unlike traditional REST APIs which serialize data into text-based JSON, Flight is designed specifically for high-throughput data systems. This architectural choice provides Mosaico with three critical advantages: **Zero-Copy Serialization.** Data is transmitted in the Arrow columnar format, the exact same format used in-memory by modern analytics tools like pandas and Polars. This eliminates the CPU-heavy cost of serializing and deserializing data at every hop. **Parallelized Transport.** Operations are not bound to a single pipe; data transfer can be striped across multiple connections to saturate available bandwidth. **Snapshot-Based Schema Enforcement.** Data types are not guessed, nor are they forced into a rigid global model. Instead, the protocol enforces a rigorous schema handshake that validates data against a specific schema snapshot stored with the sequence. ### Resource Addressing¶ Mosaico treats every entity in the system, whether it's a Sequence or a Topic, as a uniquely addressable resource. These resources are identified by a **Resource Locator**, a uniform logical path that remains consistent across all channels. Mosaico uses two types of resource locators: * A **Sequence Locator** identifies a recording session by its sequence name (e.g., `run_2023_01`). * A **Topic Locator** identifies a specific data stream using a hierarchical path that includes the sequence name and topic path (e.g., `run_2023_01/sensors/lidar_front`). ### Flight Endpoints¶ The daemon exposes Apache Arrow Flight endpoints that handle various operations using Flight's core methods: `list_flights` and `get_flight_info` for discovery and metadata management, `do_put` for high-speed data ingestion, and `do_get` for efficient data retrieval. This design ensures administrative operations don't interfere with data throughput while maintaining low-latency columnar data access. ### Storage Architecture¶ `mosaicod` uses an RDBMS to perform fast queries on metadata, manage system state such as sequence and topic definitions, and handle the event queue for processing asynchronous tasks like background data processing or notifications. An object store (such as S3, MinIO, or local filesystem) provides long-term storage for resilience and durability, holding the bulk sensor data, images, point clouds, and immutable schema snapshots that define data structures. Database Durability and Recovery The DBMS state is not strictly required for data durability. The object store is the source of truth for all data, while the database serves as a metadata catalog for efficient querying and management. If the metadata database is corrupted or destroyed, `mosaicod` can rebuild the entire catalog by rescanning the durable object storage. This design ensures that while the DBMS is used to create relations between datasets, the store guarantees long-term durability and recovery, protecting your data against catastrophic infrastructure failure. --- # Ingestion¶ Data ingestion in Mosaico is explicitly engineered to handle write-heavy workloads, enabling the system to absorb high-bandwidth sensor data, such as 4K video streams or high-frequency Lidar point clouds, without contending with administrative traffic. ## Sessions¶ We have intentionally moved away from traditional versioning. While versioning is a common pattern, it often introduces significant architectural complexity and degrades the developer experience when querying historical data. For instance, if multiple versions of a single sequence exist, the query engine faces an ambiguous choice: should it return the latest state, the most stable state, or a specific point-in-time snapshot? Consider a scenario where you are tracking a temperature sensor. If you have versioned sequences, a query for a given temperature becomes problematic: do you search across Version 1.2 or Version 2.0? Has the data changed across versions? Mosaico eliminates this ambiguity by focusing on **Sessions** rather than versions. A session represents an immutable **data layer** within Mosaico. It acts as a logical container for topics that are uploaded together as a single unit. To visualize the hierarchy, think of a sequence as the primary data container. Sessions then represent specific, immutable layers of data within that sequence, while topics serve as a specific instance of an ontology model. This design allows for high-concurrency environments; you can upload multiple sessions simultaneously without the risk of data races or state corruption. Sessions are the primary mechanism used to update and evolve sequences. This model is particularly powerful for parallel workloads that need to contribute data to the same base layer. Because sessions are independent and immutable, separate workloads can operate in isolation. For example, one process might be cleaning a dataset while another is appending real-time logs; both can work based on the same base data layer and commit their results as independent sessions without interfering with one another. ### An example¶ To illustrate the session workflow in practice, imagine you are managing a dataset for autonomous driving. When you first initialize a sequence, you perform a bulk upload of raw sensor data collected during a drive. This initial upload constitutes your first session, containing the base telemetry and camera streams: ``` [ SESSION: BASE DATA ] topolino_amaranto_25032026/camera/front topolino_amaranto_25032026/camera/back topolino_amaranto_25032026/car/gps topolino_amaranto_25032026/car/imu ``` Once this base layer is established, you can trigger two independent processing workloads. The first workload focuses on computer vision, analyzing the front and back camera streams to generate object detection labels. Simultaneously, a second workload runs a visual odometry algorithm, fusing data from the cameras, IMU, and GPS to calculate the precise trajectory of the vehicle. Because Mosaico supports concurrent sessions, these two tasks do not need to wait for one another or risk overwriting the base data. The vision task commits its results as one session, and the odometry task commits its results as another. The final state of your sequence is now an enriched, multi-layered data container. When you query this sequence, the engine provides a unified view that includes the original raw sensors plus the newly generated analytical topics: ``` [ SESSION: BASE DATA ] topolino_amaranto_25032026/camera/front topolino_amaranto_25032026/camera/back topolino_amaranto_25032026/car/gps topolino_amaranto_25032026/car/imu [ SESSION: LABELING DATA ] topolino_amaranto_25032026/labels/object_detection [ SESSION: VISUAL ODOMETRY ] topolino_amaranto_25032026/tracks/visual_odometry ``` This approach allows your data to grow *horizontally* with new topics while maintaining the absolute immutability of the original sensor readings. ## Ingestion Protocol¶ The data ingestion protocol in Mosaico follows a structured, multi-step flow designed to ensure type safety and prevent race conditions in high-concurrency environments. The process begins with `sequence_create`, which establishes the primary data container. Because Mosaico uses a session-based update model, you must then initialize a specific *data layer* using `session_create`, returning a session UUID that serves as the active context for all subsequent uploads within that specific batch. Within this active session, you define individual data streams via `topic_create`, where each topic is assigned a unique path (e.g., `my_sequence/topic/1`) and returns its own topic UUID. Data is then transmitted using the Arrow Flight `do_put` operation, starting with an Arrow schema for structural validation and followed by a stream of `RecordBatch` payloads. By passing the topic UUID to `do_put`, the system ensures the incoming binary stream is mapped correctly to the intended topic and session layer. Once all topics and their respective data streams are uploaded, the session must be formally committed with `session_finalize`. This action triggers server-side validation against registered ontologies, chunks the data for efficient storage, and locks the session to make the data permanent and available for downstream queries. Here is a simplified example of the ingestion flow using a pyhton-like pseudocode: Ingestion protocol ``` # Initialize the Sequence and the Session layer sequence_create("my_sequence", metadata) ss_uuid = session_create("my_sequence") # Create topics within the session and stream data t1_uuid = topic_create(ss_uuid, "my_sequence/topic/1", metadata) # (1)! do_put(t1_uuid, data_stream) t2_uuid = topic_create(ss_uuid, "my_sequence/topic/2", metadata) do_put(t2_uuid, data_stream) # Commit the session to make data immutable session_finalize(ss_uuid) # (2)! ``` 1. The `topic_create` action returns a UUID that must be passed to the `do_put` call to route the data stream correctly. 2. During finalization, all resources are consolidated and locked. Alternatively, you can call `session_delete(ss_uuid)` to discard the upload. Or call `sequence_delete(sq_uuid)` to discard the entire sequence if you want to start over. Why UUIDs? UUIDs are employed throughout the protocol to prevent contentious uploads. For instance, if two users attempt to create a resource with the same name simultaneously, the system ensures only one successfully receives a UUID. This identifier acts as a "token of authority," ensuring that subsequent `do_put` or `finalize` operations are performed only by the user who initiated the resource. UUID are available to clients only during the `*_create` actions. Possessing a UUID is a prerequisite for performing any further operations on that resource, such as uploading data or finalizing the session. This design choice ensures that only the creator of a resource can modify it. ### Updating a Sequence¶ To extend a sequence with new data, you follow a nearly identical flow to the initial ingestion, but you omit the `sequence_create`. This process leverages Mosaico's session model to layer new information onto the established sequence without modifying the original data. Here is a simplified example of the update flow using a pyhton-like pseudocode: Extending a sequence with new data ``` # Initialize a NEW Session layer for the existing sequence ss_uuid = session_create("my_sequence") # Create new topics (e.g., processed labels) within this session t3_uuid = topic_create(ss_uuid, "my_sequence/labels/object_detection", metadata) # (1)! do_put(t3_uuid, processed_data_stream) # Commit the new session to merge the layer into the sequence session_finalize(ss_uuid) ``` 1. Even though the sequence already exists, the `topic_create` call within a new session ensures this specific data stream is tracked as a new, immutable contribution. Concurrency Because each upload is encapsulated in its own session, multiple workers can extend the same sequence simultaneously. Mosaico handles the isolation of these sessions, ensuring that `session_finalize` only commits the specific topics and data associated with that worker's session UUID. ### Aborting an Upload¶ If you encounter an error *during the ingestion process* or simply want to discard all the data just uploaded, there is a straightforward way to abort the sequence. Here is a simplified example of the abort flow using a pyhton-like pseudocode: Aborting an upload ``` # Initialize the Sequence and the Session layer sequence_create("my_sequence", metadata) ss_uuid = session_create("my_sequence") # Create topics within the session and stream data t1_uuid = topic_create(ss_uuid, "my_sequence/topic/1", metadata) # (1)! do_put(t1_uuid, data_stream) t2_uuid = topic_create(ss_uuid, "my_sequence/topic/2", metadata) do_put(t2_uuid, data_stream) # Delete the session to discard all uploaded topics data session_delete(ss_uuid) # You can delete the dangling sequence since there are no data associated # or avoid deletion and start a new session to upload new data with a `session_create` sequence_delete() ``` Permissions If **API key management** is enabled, the `sequence_delete` and `session_delete` actions require a key with at least `delete` privileges. ## Chunking & Indexing Strategy¶ The backend automatically manages *chunking* to efficiently handle intra-sequence queries and prevent memory overload from ingesting large data streams. As data streams in, the server buffers the incoming data until a full chunk is accumulated, then writes it to disk as an optimal storage unit called a *chunk*. Configuring Chunk Size The chunk size is configurable via the `MOSAICOD_MAX_CHUNK_SIZE_IN_BYTES` environment variable. Setting this value to `0` disables automatic chunking, allowing for unlimited chunk sizes. However, it is generally recommended to set a reasonable chunk size to balance memory usage and query performance. See the environment variables section for more details. For each chunk written to disk, the server calculates and stores *skip indices* in the metadata database. These indices include ontology-specific statistics, such as type-specific metadata (e.g., coordinate bounding boxes for GPS data or value ranges for sensors). This allows the query engine to perform content-based filtering without needing to read the entire bulk data. --- # Setup¶ ## Running with Containers¶ For rapid prototyping, we provide a standard Docker Compose configuration. This creates an isolated network environment containing the `mosaicod` server and its required PostgreSQL database. Daemon compose file ``` name: "mosaico" services: database: image: postgres:18 container_name: postgres hostname: db environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password POSTGRES_DB: mosaico networks: - mosaico volumes: - pg-data:/var/lib/postgresql healthcheck: # (3)! test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 5s retries: 5 mosaicod: image: ghcr.io/mosaico-labs/mosaicod container_name: mosaicod networks: - mosaico environment: # (4)! MOSAICOD_DB_URL: postgresql://postgres:password@db:5432/mosaico volumes: - mosaico-data:/data command: | # (1)! run --host 127.0.0.1 --port 6726 --log-level info --local-store /data depends_on: database: condition: service_healthy ports: - "127.0.0.1:6726:6726" # (2)! volumes: pg-data: mosaico-data: networks: mosaico: ``` 1. Here you can list any additional command line options for `mosaicod`. In this example, we configure the server to use the local filesystem for storage, which is mounted to the `/data` directory in the container. This allows you to persist data across container restarts and easily access it from the host machine. If you prefer to use S3-compatible storage, simply remove the `--local-store` option and set the appropriate environment variables for your object storage configuration. 2. Remove `127.0.0.1` to expose this service to external networks. By default, this configuration restricts access to the local machine for security reasons. If you need to access the server from other machines on the network, you can modify the port mapping to allow external connections. 3. The `healthcheck` ensures that the `mosaicod` service only starts after the PostgreSQL database is ready to accept connections. This prevents startup errors related to database connectivity. 4. Additional environment variables can be set here to configure the daemon's behavior, see environment variables for a complete list of options. This configuration provisions both Postgres and mosaicod within a private Docker network. Only the daemon instance is exposed to the host. Security In this basic prototyping setup, TLS and API key management are disabled. The port mapping is restricted to `127.0.0.1`. If you need to access this from an external network, consider configuring `mosaicod` to enable TLS or use a reverse proxy to handle SSL termination. ## Building from Source¶ While Docker images are available for each release, you can compile `mosaicod` from source if you need a specific version not available as a pre-built image. Building from source requires a Rust toolchain. The project uses `sqlx` for compile-time query verification, which normally requires a live database connection. However, Mosaico supports a simpler offline build mode that uses cached query metadata from the `.sqlx` directory, removing the need for a database during compilation. ### Offline Build¶ You can run a offline build using cached sqlx queries with a single command. ``` SQLX_OFFLINE=true cargo build --release ``` ### Online Build¶ If you need to modify the database schema, a running PostgreSQL instance is required. This allows `sqlx` to verify queries against a live database during compilation. You can use the provided Docker Compose file in `docker/devel` which sets up an instance of MinIO and a PostgreSQL database. First, start the development environment: ``` cd docker/devel # Start the services in the background docker compose up -d # To stop and remove the volumes (which clears all data), run: docker compose down -v ``` Apply database migrations to the running PostgreSQL instance. This ensures that the database schema is up-to-date and allows `sqlx` to verify queries during compilation. Next, from the root of the `mosaicod` workspace, install the necessary tools, configure the environment, and run the build. ``` cd mosaicod # Install the SQLx command-line tool cargo install sqlx-cli # Copy the development environment variables for the database connection cp env.devel .env # Apply the database migrations cd crates/mosaicod-db cargo sqlx migrate run # And finally you can build mosaicod cargo build --release --bin mosaicod ``` ## Configuration¶ The server supports S3-compatible object storage by default but can be configured for local storage via command line options. ### Database¶ Mosaico requires a connection to a running **PostgreSQL** instance, which is defined via the `MOSAICOD_DB_URL` environment variable. ### Remote Storage Configuration¶ For production deployments, `mosaicod` should be configured to use an S3-compatible object store (such as AWS S3, Google Cloud Storage, Hetzner Object Store, etc) for durable, long-term storage. This is configured setting the proper environment variables for your object store provider. ### Local Storage Configuration¶ This command will start a `mosaicod` instance using the local filesystem as storage layer. ``` mosaicod run --local-store /tmp/mosaicod ``` --- # Queries¶ Mosaico distinguishes itself from simple file stores with a powerful **Query System** capable of filtering data based on both high-level metadata and content values. The query engine operates through the `query` action, accepting structured JSON-based filter expressions that can span the entire data hierarchy. ## Query Architecture¶ The query engine is designed around a three-tier filtering model that allows you to construct complex, multi-dimensional searches: **Sequence Filtering.** Target recordings by structural attributes like sequence name, creation timestamp, or user-defined metadata tags. This level allows you to narrow down which recording sessions are relevant to your search. **Topic Filtering.** Refine your search to specific data streams within sequences. You can filter by topic name, ontology tag (the data type), serialization format, or topic-level user metadata. **Ontology Filtering.** Query the actual physical values recorded inside the sensor data without scanning terabytes of files. The engine leverages statistical indices computed during ingestion, min/max bounds stored in the metadata cache for each chunk, to rapidly include or exclude entire segments of data. ## Filter Domains¶ ### Sequence Filter¶ The sequence filter allows you to target specific recording sessions based on their metadata: | Field | Description | | --- | --- | | `sequence.name` | The sequence identifier (supports text operations) | | `sequence.created_at` | The creation timestamp in nanoseconds (supports timestamp operations) | | `sequence.user_metadata.` | Custom user-defined metadata attached to the sequence | ### Topic Filter¶ The topic filter narrows the search to specific data streams within matching sequences: | Field | Description | | --- | --- | | `topic.name` | The topic path within the sequence (supports text operations) | | `topic.created_at` | The topic creation timestamp in nanoseconds (supports timestamp operations) | | `topic.ontology_tag` | The data type identifier (e.g., `Lidar`, `Camera`, `IMU`) | | `topic.serialization_format` | The binary layout format (`Default`, `Ragged`, or `Image`) | | `topic.user_metadata.` | Custom user-defined metadata attached to the topic | ### Ontology Filter¶ The ontology filter queries the actual sensor data values. Fields are specified using dot notation: `.`. For example, to query IMU acceleration data: `imu.acceleration.x`, where `imu` is the ontology tag and `acceleration.x` is the field path within that data model. #### Timestamp query support¶ If `include_timestamp_range` is set to `true` the response will also return timestamps ranges for each query. ## Supported Operators¶ The query engine supports a rich set of comparison operators. Each operator is prefixed with `$` in the JSON syntax: | Operator | Description | | --- | --- | | `$eq` | Equal to (supports all types) | | `$neq` | Not equal to (supports all types) | | `$lt` | Less than (numeric and timestamp only) | | `$gt` | Greater than (numeric and timestamp only) | | `$leq` | Less than or equal to (numeric and timestamp only) | | `$geq` | Greater than or equal to (numeric and timestamp only) | | `$between` | Within a range `[min, max]` inclusive (numeric and timestamp only) | | `$in` | Value is in a set of options (supports integers and text) | | `$match` | Matches a pattern (text only, supports SQL LIKE patterns with `%` wildcards) | | `$ex` | Field exists | | `$nex` | Field does not exist | ## Query Syntax¶ Queries are submitted as JSON objects. Each field is mapped to an operator and value. Multiple conditions are combined with implicit AND logic. ``` { "sequence": { "name": { "$match": "test_run_%" }, "user_metadata": { "driver": { "$eq": "Alice" } } }, "topic": { "ontology_tag": { "$eq": "imu" } }, "ontology": { "imu.acceleration.x": { "$gt": 5.0 }, "imu.acceleration.y": { "$between": [-2.0, 2.0] }, "include_timestamp_range": true, // (1)! } } ``` 1. This filed is optional, if set to `true` the query returns the timestamp ranges This query searches for: * Sequences with names matching `test_run_%` pattern * Where the user metadata field `driver` equals `"Alice"` * Containing topics with ontology tag `imu` * Where the IMU's x-axis acceleration exceeds 5.0 * And the y-axis acceleration is between -2.0 and 2.0 ## Response Structure¶ The query response is hierarchically grouped by sequence. For each matching sequence, it provides the list of topics that satisfied the filter criteria, along with optional timestamp ranges indicating when the ontology conditions were met. Query response example ``` { "items": [ { "sequence": "test_run_01", "topics": [ { "locator": "test_run_01/sensors/imu", "timestamp_range": [1000000000, 2000000000] }, { "locator": "test_run_01/sensors/gps", "timestamp_range": [1000000000, 2000000000] } ] }, { "sequence": "test_run_02", "topics": [ { "locator": "test_run_02/camera/front", "timestamp_range": [1500000000, 2500000000] }, { "locator": "test_run_02/lidar/point_cloud", "timestamp_range": [1500000000, 2500000000] } ] } ] } ``` ### Timestamps¶ It returns the time window `[min, max]` where the filter conditions were met for that topic, with `min` being the timestamp of the first matching event and max being the timestamp of the last matching event. This allows you to retrieve only the relevant data slices using the retrieval protocol. Note The `timestamp_range` field is included only when ontology filters are applied and `include_timestamp_range` is set to `true` inside the `ontology` filter. ## Performance Characteristics¶ The query engine is optimized for high performance by minimizing unnecessary data retrieval and I/O operations. During execution, the engine uses index-based pruning to evaluate precomputed min/max statistics and skip indices, allowing it to bypass irrelevant data chunks without reading the underlying files. Performance is further improved by executing metadata cache queries, such as sequence and topic filters, directly within the database, which ensures sub-second response times even across thousands of sequences. The system employs **lazy evaluation** to keep network payloads lightweight; instead of returning raw data immediately, queries return locators and timestamp ranges. This architecture allows client applications to fetch only the required data slices via the retrieval protocol as needed. --- # Retrieval¶ Unlike simple file downloads, this protocol provides an interface for requesting precise data slices, dynamically assembled and streamed back as optimized stream of Arrow record batches. ## The Retrieval Protocol¶ Accessing data requires specifying the Locator, which defines the topic path, and an optional time range in nanoseconds. Upon receiving a request, the server performs an index lookup in the metadata cache to identify physical data chunks intersecting the requested time window. This is followed by pruning, discarding chunks outside the query bounds to avoid redundant I/O. Once relevant segments are identified, the server streams the data by opening underlying files and delivering it in a high-throughput pipeline. In the protocol, the `get_flight_info` call returns a list of resources, each containing an endpoint (the name of the topic or sequence, such as `my_sequence` or `my_sequence/my/topic`) and a ticket, an opaque binary blob used by the server in the `do_get` call to extract and stream the data. Calling `get_flight_info` on a sequence returns all topics associated with that sequence, whereas calling it on a specific topic returns only the endpoint and ticket for that topic. Retrieval protocol ``` # Define the locator and the temporal slice locator = "my_sequence/topic/1" time_range = (start_ns, end_ns) # Optional: defaults to full history # Resolve the locator into endpoint and tickets flight_info = get_flight_info(locator, time_range) for endpoint in flight_info.endpoints: # Use the ticket to stream the actual data batches data_stream = do_get(endpoint.ticket) for batch in data_stream: process(batch) ``` ## Sequence List¶ To find the list of all sequences available in the system, you can call `list_flights` with the root locator: List sequences ``` # List all sequences in the system flight_info = list_flights("") # or list_flights("/") ``` This will return the list of all sequence resource locators available in the platform, which can then be used to retrieve specific topics or data slices. ## Metadata Context Headers¶ To provide full context, the data stream is prefixed with a schema message containing embedded custom metadata. Mosaico injects context into this header for client reconstruction of the environment. This includes `user_metadata`, preserving original project context like experimental tags or vehicle IDs, and the `ontology_tag`, informing the client of sensor data types (e.g., `lidar`, `camera`) for type-safe deserialization. The `serialization_format` guides interpretation of the underlying serialization protocol used. Now the supported formats include: * `Default`: The standard format. * `Ragged`: Optimized for variable-length lists. * `Image`: An optimized array format for high-resolution visual data. --- # TLS Support¶ Securing your Mosaico instance is straightforward, as TLS (Transport Layer Security) is fully supported out of the box. Enabling TLS ensures that all communications with the daemon are encrypted and secure. To activate it, simply append the `--tls` flag to your `mosaicod run` command. When the `--tls` flag is used, `mosaicod` requires a valid certificate and private key. It looks for these credentials via the following environment variables: * `MOSAICOD_TLS_CERT_FILE`: The path to the PEM-encoded X.509 certificate. * `MOSAICOD_TLS_PRIVATE_KEY_FILE`: The path to the file containing the PEM-encoded RSA private key. Use a reverse proxy for TLS termination If you prefer to manage TLS termination separately, you can run `mosaicod` without the `--tls` flag and use a reverse proxy (like Nginx or Caddy) to handle SSL termination. This allows you to centralize TLS management and offload encryption tasks from the daemon. ## Generate a Self-Signed Certificate¶ Run the following command to generate a `cert.pem` a `key.pem` and a `ca.pem` file: ``` # Generate the root CA openssl genrsa -out ca.key 4096 openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \ -subj "/CN=MyTestCA" -out ca.pem # Generate the server private key openssl genrsa -out key.pem 4096 # Create a certificate signing request (CSR) for the server openssl req -new -key key.pem -out server.csr \ -subj "/CN=localhost" \ -addext "subjectAltName=DNS:localhost,IP:127.0.0.1" # Sign the server CSR with the root CA to create the end-entity certificate openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial \ -out cert.pem -days 365 -sha256 \ -extfile <(printf "basicConstraints=CA:FALSE\nkeyUsage=digitalSignature,keyEncipherment\nextendedKeyUsage=serverAuth\nsubjectAltName=DNS:localhost,IP:127.0.0.1") ``` The scripts will generate the following files | File | Description | | --- | --- | | `ca.key` | The private key for the Certificate Authority; used only to sign other certificates and **must be kept secret**. | | `ca.pem` | The public certificate for the Authority; provided to clients so they can verify the server's identity. | | `ca.srl` | A text file containing the next serial number to be assigned by the CA; safe to ignore or delete. | | `key.pem` | The private key for the mosaicod server; used to decrypt traffic and prove ownership of the server certificate. | | `cert.pem` | The end-entity certificate for the server. | | `server.csr` | A temporary Certificate Signing Request file used to bridge the public key and the identity during generation. | Use `cert.pem` and `key.pem` for server-side TLS identity in the mosaicod daemon, while distributing `ca.pem` to clients as the trusted root certificate. Test certificate The certificates produced by the command above is strictly for local development or testing. Do not use it in production. --- # Release Cycle¶ This document describes briefly how the release process is handled for Mosaico project. ## Monorepo¶ Mosaico utilizes a monorepo structure to simplify integration and testing between `mosaicod` daemon and Python SDK. While these components reside in the same repository, they are decoupled: each component maintains its own release schedule and both follow semantic versioning. ## Development workflow¶ The development workflow relies on a specific set of branches and tags to manage stability and feature development. * `main`: the primary integration branch. All stable features and fixes eventually land here. Official release tags are cut directly from main once sufficient changes have been accumulated. * `issue/`: feature or bug-fix branches linked to a specific GitHub issue. Branched from *main* and merged back via pull request upon completion. * `release/[mosaicod|mosaico-py]/vX.Y.`: maintenance branches for *critical hotfixes*. Created from an existing version tag *when a patch is required* for an older release. The final commit is tagged with the incremented version. Relevant fixes should be cherry-picked or merged back into main if applicable. These branches might be used in the future to support pre-release stages. ## Tags¶ We use specific tag prefixes to trigger CI/CD pipelines and distinguish between *stable releases* of the daemon and the SDK | Component | Tag | | --- | --- | | Daemon | `mosaicod/vX.Y.Z` | | Python SDK | `mosaico-py/vX.Y.Z` | ---