ROS Ingestion
Downloads the NVIDIA R2B Dataset 2024 and ingests it from .mcap ROS 2 bags into a structured, queryable Mosaico archive. Asset download, type translation, batching, and catalog verification are all handled automatically.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
mosaicolabs.examples ros_injection
Run the command with --help to see all available options.
The daemon must be running before you start. See the setup guide if you have not started it yet.
The full source is on GitHub.
Custom Ontology Definition
The NVIDIA Isaac Nova encoder produces isaac_ros_nova_interfaces/msg/EncoderTicks, a message type with no equivalent in the built-in ontology. Before the injector can run, we define a custom model by inheriting from Serializable. This registers the type automatically in the Mosaico ecosystem at import time, making it available as an ontology_type for topic creation and enabling the .Q query proxy on its fields.
from mosaicolabs import MosaicoField, MosaicoType, Serializable
class EncoderTicks(Serializable):
left_ticks: MosaicoType.uint32 = MosaicoField(
description="Cumulative counts from the left wheel encoder."
)
right_ticks: MosaicoType.uint32 = MosaicoField(
description="Cumulative counts from the right wheel encoder."
)
encoder_timestamp: MosaicoType.uint64 = MosaicoField(
description="Timestamp of the encoder ticks."
)
ROS Adapter
Because EncoderTicks is a custom type, no default adapter exists. We subclass ROSAdapterBase and implement from_dict to map ROS dictionary fields onto the model. The @register_default_adapter decorator ensures the RosbagInjector selects this adapter automatically whenever it encounters the matching schema name in the bag.
from mosaicolabs.ros_bridge import ROSMessage, ROSAdapterBase, register_default_adapter
from mosaicolabs.ros_bridge.adapters.helpers import _validate_msgdata
from .isaac import EncoderTicks
@register_default_adapter
class EncoderTicksAdapter(ROSAdapterBase[EncoderTicks]):
ros_msgtype = ("isaac_ros_nova_interfaces/msg/EncoderTicks",)
__mosaico_ontology_type__ = EncoderTicks
_REQUIRED_KEYS = ("left_ticks", "right_ticks", "encoder_timestamp")
@classmethod
def from_dict(cls, ros_data: dict) -> EncoderTicks:
_validate_msgdata(cls, ros_data)
return EncoderTicks(
left_ticks=ros_data["left_ticks"],
right_ticks=ros_data["right_ticks"],
encoder_timestamp=ros_data["encoder_timestamp"],
)
@classmethod
def translate(cls, ros_msg: ROSMessage, **kwargs) -> Message:
return super().translate(ros_msg, **kwargs)
Ingestion Pipeline
The example loops over each bag file in the dataset and runs three phases: download, ingest, verify.
Download. download_asset fetches the raw .mcap from NVIDIA with a progress bar and caches it locally so re-runs skip the download.
Ingest. RosbagInjector handles the full write pipeline: it opens the MCAP, routes each message through its registered adapter, batches the resulting Message objects, and transmits them to the daemon over gRPC. The sequence name is derived from the filename so each bag becomes a separate, independently queryable recording session.
Verify. A list_sequences() call after ingestion confirms the sequence is visible in the catalog before the script exits.
for bag_path in BAG_FILES_PATH:
out_bag_file = download_asset(BASE_BAGFILE_URL + bag_path, ASSET_DIR)
config = ROSInjectionConfig(
host=MOSAICO_HOST,
port=MOSAICO_PORT,
file_path=out_bag_file,
sequence_name=out_bag_file.stem,
metadata={
"source_url": BAGFILE_URL,
"ingested_via": "mosaico_example_ros_injection",
"download_time_utc": str(downloaded_time),
},
)
RosbagInjector(config).run()
with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client:
if config.sequence_name in client.list_sequences():
print(f"'{config.sequence_name}' is now queryable.")
Once complete, the ingested sequences are available for the Data Inspection, Querying Catalogs, and MuJoCo Visualisation examples.