ROS Ingestion
This tutorial demonstrates a complete Mosaico data ingestion using the NVIDIA R2B Dataset 2024. You will learn how to automate the transition from monolithic ROS bags (.mcap) to a structured, queryable archive.
By following this guide, you will:
- Automate Asset Preparation: Programmatically download and manage remote datasets.
- Implement Adaptation Logic: Translate raw ROS types into the strongly-typed Mosaico Ontology.
- Execute Ingestion: Use the
RosbagInjectorto handle batching and network transmission. - Verify Integrity: Programmatically inspect the server to ensure the data is cataloged.
Experiment Yourself
This guide is fully executable.
- Start the Mosaico Infrastructure
- Run the example
Full Code
The full code of the example is available here.
In Depth Explanation
Step 1: Custom Ontology Definition (isaac.py)¶
In Mosaico, data is strongly typed. When dealing with specialized hardware like the NVIDIA Isaac Nova encoders, with custom data models, not available in the SDK, we must define a model that the platform understands.
The Data Model¶
The EncoderTicks class defines the physical storage format.
import pyarrow as pa
from mosaicolabs import Serializable
class EncoderTicks(Serializable): # (1)!
# --- Wire Schema Definition ---
__msco_pyarrow_struct__ = pa.struct([
pa.field("left_ticks", pa.uint32(), nullable=False),
pa.field("right_ticks", pa.uint32(), nullable=False),
pa.field("encoder_timestamp", pa.uint64(), nullable=False),
])
# --- Pydantic Fields ---
left_ticks: int # (2)!
right_ticks: int
encoder_timestamp: int
- Inheriting from
Serializableautomatically registers your model in the Mosaico ecosystem, making it dispatchable to the data platform, and enables the.Qquery proxy. - The field names in the
pa.structmust match exactly the names of the Python attributes.
In Depth Explanation
Step 2: Implementing the ROS Adapter (isaac_adapters.py)¶
The translation between the raw ROS dictionary and the Mosaico ontology data model is made via adapters. Being a custom message type, "isaac_ros_nova_interfaces/msg/EncoderTicks" does not have a default adapter. Therefore we must define one:
the ROSAdapterBase class provides the necessary infrastructure for this. We just need to implement the from_dict method, which is responsible for converting the raw ROS message dictionary into our custom ontology model.
The Adapter Implementation¶
from mosaicolabs.ros_bridge import ROSMessage, ROSAdapterBase, register_default_adapter
from mosaicolabs.ros_bridge.adapters.helpers import _validate_msgdata
from .isaac import EncoderTicks
@register_default_adapter
class EncoderTicksAdapter(ROSAdapterBase[EncoderTicks]):
ros_msgtype = ("isaac_ros_nova_interfaces/msg/EncoderTicks",) # (1)!
__mosaico_ontology_type__ = EncoderTicks # (2)!
_REQUIRED_KEYS = ("left_ticks", "right_ticks", "encoder_timestamp") # (3)!
@classmethod
def from_dict(cls, ros_data: dict) -> EncoderTicks: # (4)!
"""
Convert a ROS message dictionary to an EncoderTicks object.
"""
_validate_msgdata(cls, ros_data)
return EncoderTicks(
left_ticks=ros_data["left_ticks"],
right_ticks=ros_data["right_ticks"],
encoder_timestamp=ros_data["encoder_timestamp"],
)
@classmethod
def translate(cls, ros_msg: ROSMessage, **kwargs: Any) -> Message: # (5)!
"""
Translates a ROS EncoderTicks message into a Mosaico Message container.
"""
return super().translate(ros_msg, **kwargs)
- A tuple of strings representing the ROS message types that this adapter can handle.
- The Mosaico ontology type that this adapter can handle.
- A tuple of strings representing the required keys in the ROS message. This is used by the
_validate_msgdatamethod to check that the ROS message does contains the required fields. - This is the heart of the translator. It takes a Python dictionary and maps the keys to our
EncoderTicksontology model. - This method is called by the
RosbagInjectorclass for each message in the bag. It is responsible for converting the raw ROS message dictionary into the Mosaico message, wrapping the custom ontology model.
Key Operation: The @register_default_adapter decorator ensures the RosbagInjector automatically knows how to handle this message type during the ingestion loop.
Step 3: Orchestrating the Ingestion Loop¶
In a real-world scenario, you often need to ingest a batch of files. The main.py implementation uses a 3-phase loop to manage this workflow.
Phase 1: Asset Preparation¶
Before we can ingest data, we need the raw file. This phase downloads a verified dataset from NVIDIA.
for bag_path in BAG_FILES_PATH:
bag_file_url = BASE_BAGFILE_URL + bag_path
# This utility handles downloads with visual progress bars
out_bag_file = download_asset(bag_file_url, ASSET_DIR)
Phase 2: High-Performance Ingestion¶
Configure and run the injector. This layer handles connection pooling, asynchronous serialization, and batching.
config = ROSInjectionConfig(
host=MOSAICO_HOST,
port=MOSAICO_PORT,
file_path=out_bag_file,
sequence_name=out_bag_file.stem, # Sequence name derived from filename
# Some example metadata for the sequence
metadata={
"source_url": BAGFILE_URL,
"ingested_via": "mosaico_example_ros_injection",
"download_time_utc": str(downloaded_time),
},
log_level="INFO",
)
# Handles connection, loading, adaptation, and batching
injector = RosbagInjector(config)
injector.run() # Starts the ingestion process
Phase 3: Verification & Retrieval¶
Programmatically confirm the sequence is available on the server using a context manager.
with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: # (1)!
seq_list = client.list_sequences() # (2)!
if config.sequence_name in seq_list:
print(f"Success: '{config.sequence_name}' is now queryable.")
- Establishes a secure connection to the platform.
- Asks for the lists of sequences on the server