MuJoCo Visualisation
Queries robot joint state data from a stored Mosaico sequence and replays the trajectory in a live MuJoCo simulation window, synchronized to the nanosecond timestamps stored in the archive.
- Python
- C++
- Rust
The C++ SDK is currently in development.
The Rust SDK is currently in development.
mosaicolabs.examples mujoco_vis
Run the command with --help to see all available options.
The daemon must be running with data from the ROS Ingestion example. The full source is on GitHub.
MuJoCo is not included in the default SDK dependencies — install it before running:
pip install mujoco
Locating the Joint State Topic
QuerySequence and QueryTopic are combined to find the right channel in a single server-side query. Filtering by RobotJoint.ontology_tag() ensures the result contains only joint state topics, regardless of how the channel is named in the bag.
from mosaicolabs import MosaicoClient, QuerySequence, QueryTopic
from mosaicolabs.models.sensors.robot import RobotJoint
with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client:
result = client.query(
QuerySequence().with_name(ROBOT_SEQUENCE_NAME),
QueryTopic().with_ontology_tag(RobotJoint.ontology_tag()),
)
if result is None:
print(f"Sequence '{ROBOT_SEQUENCE_NAME}' not found.")
Validating the Topic
A TopicHandler confirms the matched topic carries the expected type before any data is streamed. The query already filters by ontology tag, so this check acts as a defensive guard for pipelines where the data source is not fully controlled.
for items in result:
for topic in items.topics:
top_handler = client.topic_handler(items.sequence.name, topic.name)
if top_handler is None:
continue
if top_handler.ontology_tag != RobotJoint.ontology_tag():
print(f"Unexpected type: {top_handler.ontology_tag}")
continue
Streaming into MuJoCo
Each RobotJoint message is timestamped in nanoseconds relative to the recording start. That offset drives the simulation forward so the replay matches the original motion at the correct cadence. Joint positions are applied directly to the MuJoCo model state and the viewer is updated each frame.
for joint_msg in top_handler.get_data_streamer():
relative_ts = (
joint_msg.timestamp_ns - top_handler.timestamp_ns_min
) / 1.0e9
joints = joint_msg.get_data(RobotJoint)
mj_data.qpos[:] = joints.positions
mj.mj_forward(mj_model, mj_data)
viewer.sync()
When running correctly, the MuJoCo viewer should open and replay the recorded trajectory:
