Skip to main content

MuJoCo Visualisation

Queries robot joint state data from a stored Mosaico sequence and replays the trajectory in a live MuJoCo simulation window, synchronized to the nanosecond timestamps stored in the archive.

Run
mosaicolabs.examples mujoco_vis

Run the command with --help to see all available options.

The daemon must be running with data from the ROS Ingestion example. The full source is on GitHub.

MuJoCo is not included in the default SDK dependencies — install it before running:

Install MuJoCo
pip install mujoco

Locating the Joint State Topic

QuerySequence and QueryTopic are combined to find the right channel in a single server-side query. Filtering by RobotJoint.ontology_tag() ensures the result contains only joint state topics, regardless of how the channel is named in the bag.

Query for robot joint state
from mosaicolabs import MosaicoClient, QuerySequence, QueryTopic
from mosaicolabs.models.sensors.robot import RobotJoint

with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client:
result = client.query(
QuerySequence().with_name(ROBOT_SEQUENCE_NAME),
QueryTopic().with_ontology_tag(RobotJoint.ontology_tag()),
)
if result is None:
print(f"Sequence '{ROBOT_SEQUENCE_NAME}' not found.")

Validating the Topic

A TopicHandler confirms the matched topic carries the expected type before any data is streamed. The query already filters by ontology tag, so this check acts as a defensive guard for pipelines where the data source is not fully controlled.

Validate topic ontology
for items in result:
for topic in items.topics:
top_handler = client.topic_handler(items.sequence.name, topic.name)
if top_handler is None:
continue
if top_handler.ontology_tag != RobotJoint.ontology_tag():
print(f"Unexpected type: {top_handler.ontology_tag}")
continue

Streaming into MuJoCo

Each RobotJoint message is timestamped in nanoseconds relative to the recording start. That offset drives the simulation forward so the replay matches the original motion at the correct cadence. Joint positions are applied directly to the MuJoCo model state and the viewer is updated each frame.

Stream joints and drive the simulation
for joint_msg in top_handler.get_data_streamer():
relative_ts = (
joint_msg.timestamp_ns - top_handler.timestamp_ns_min
) / 1.0e9

joints = joint_msg.get_data(RobotJoint)
mj_data.qpos[:] = joints.positions
mj.mj_forward(mj_model, mj_data)
viewer.sync()

When running correctly, the MuJoCo viewer should open and replay the recorded trajectory: