Skip to main content

ROS Ingestion

Downloads the NVIDIA R2B Dataset 2024 and ingests it from .mcap ROS 2 bags into a structured, queryable Mosaico archive. Asset download, type translation, batching, and catalog verification are all handled automatically.

Run
mosaicolabs.examples ros_injection

Run the command with --help to see all available options.

The daemon must be running before you start. See the setup guide if you have not started it yet.

The full source is on GitHub.

Custom Ontology Definition

The NVIDIA Isaac Nova encoder produces isaac_ros_nova_interfaces/msg/EncoderTicks, a message type with no equivalent in the built-in ontology. Before the injector can run, we define a custom model by inheriting from Serializable. This registers the type automatically in the Mosaico ecosystem at import time, making it available as an ontology_type for topic creation and enabling the .Q query proxy on its fields.

Custom EncoderTicks model
from mosaicolabs import MosaicoField, MosaicoType, Serializable

class EncoderTicks(Serializable):
left_ticks: MosaicoType.uint32 = MosaicoField(
description="Cumulative counts from the left wheel encoder."
)
right_ticks: MosaicoType.uint32 = MosaicoField(
description="Cumulative counts from the right wheel encoder."
)
encoder_timestamp: MosaicoType.uint64 = MosaicoField(
description="Timestamp of the encoder ticks."
)

ROS Adapter

Because EncoderTicks is a custom type, no default adapter exists. We subclass ROSAdapterBase and implement from_dict to map ROS dictionary fields onto the model. The @register_default_adapter decorator ensures the RosbagInjector selects this adapter automatically whenever it encounters the matching schema name in the bag.

EncoderTicks ROS adapter
from mosaicolabs.ros_bridge import ROSMessage, ROSAdapterBase, register_default_adapter
from mosaicolabs.ros_bridge.adapters.helpers import _validate_msgdata
from .isaac import EncoderTicks

@register_default_adapter
class EncoderTicksAdapter(ROSAdapterBase[EncoderTicks]):
ros_msgtype = ("isaac_ros_nova_interfaces/msg/EncoderTicks",)
__mosaico_ontology_type__ = EncoderTicks
_REQUIRED_KEYS = ("left_ticks", "right_ticks", "encoder_timestamp")

@classmethod
def from_dict(cls, ros_data: dict) -> EncoderTicks:
_validate_msgdata(cls, ros_data)
return EncoderTicks(
left_ticks=ros_data["left_ticks"],
right_ticks=ros_data["right_ticks"],
encoder_timestamp=ros_data["encoder_timestamp"],
)

@classmethod
def translate(cls, ros_msg: ROSMessage, **kwargs) -> Message:
return super().translate(ros_msg, **kwargs)

Ingestion Pipeline

The example loops over each bag file in the dataset and runs three phases: download, ingest, verify.

Download. download_asset fetches the raw .mcap from NVIDIA with a progress bar and caches it locally so re-runs skip the download.

Ingest. RosbagInjector handles the full write pipeline: it opens the MCAP, routes each message through its registered adapter, batches the resulting Message objects, and transmits them to the daemon over gRPC. The sequence name is derived from the filename so each bag becomes a separate, independently queryable recording session.

Verify. A list_sequences() call after ingestion confirms the sequence is visible in the catalog before the script exits.

Download, ingest, and verify
for bag_path in BAG_FILES_PATH:
out_bag_file = download_asset(BASE_BAGFILE_URL + bag_path, ASSET_DIR)

config = ROSInjectionConfig(
host=MOSAICO_HOST,
port=MOSAICO_PORT,
file_path=out_bag_file,
sequence_name=out_bag_file.stem,
metadata={
"source_url": BAGFILE_URL,
"ingested_via": "mosaico_example_ros_injection",
"download_time_utc": str(downloaded_time),
},
)
RosbagInjector(config).run()

with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client:
if config.sequence_name in client.list_sequences():
print(f"'{config.sequence_name}' is now queryable.")

Once complete, the ingested sequences are available for the Data Inspection, Querying Catalogs, and MuJoCo Visualisation examples.