Skip to content

sensor_msgs Adapters

mosaicolabs.ros_bridge.adapters.sensor_msgs

CameraInfoAdapter

Bases: ROSAdapterBase[CameraInfo]

Adapter for translating ROS CameraInfo messages to Mosaico CameraInfo.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/camera_info",
    msg_type="sensor_msgs/msg/CameraInfo",
    data=
    {
        "height": 480,
        "width": 640,
        "binning_x": 1,
        "binning_y": 1,
        "roi": {
            "x_offset": 0,
            "y_offset": 0,
            "height": 480,
            "width": 640,
            "do_rectify": False,
        },
        "distortion_model": "plumb_bob",
        "d": [0.0, 0.0, 0.0, 0.0, 0.0],
        "k": [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0],
        "p": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        "r": [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0],
    }
)
# Automatically resolves to a flat Mosaico CameraInfo with attached metadata
mosaico_camera_info = CameraInfoAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a CameraInfo object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "height": 480,
    "width": 640,
    "binning_x": 1,
    "binning_y": 1,
    "roi": {
        "x_offset": 0,
        "y_offset": 0,
        "height": 480,
        "width": 640,
        "do_rectify": False,
    },
    "distortion_model": "plumb_bob",
    "d": [0.0, 0.0, 0.0, 0.0, 0.0],
    "k": [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0],
    "p": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    "r": [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0],
}
# Automatically resolves to a flat Mosaico CameraInfo with attached metadata
mosaico_camera_info = CameraInfoAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
CameraInfo CameraInfo

The constructed Mosaico CameraInfo object.

Raises:

Type Description
ValueError

If the recursive 'roi' key exists but is not a dict, or if required keys are missing.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

NavSatStatusAdapter

Bases: ROSAdapterBase[GPSStatus]

Adapter for translating ROS NavSatStatus messages to Mosaico GPSStatus.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/gps_status",
    msg_type="sensor_msgs/msg/NavSatFix",
    data=
    {
        "status": 0,
        "service": 1,
    }
)
# Automatically resolves to a flat Mosaico GPSStatus with attached metadata
mosaico_gps_status = NavSatStatusAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a GPSStatus object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "status": 0,
    "service": 1,
}
# Automatically resolves to a flat Mosaico GPSStatus with attached metadata
mosaico_gps_status = NavSatStatusAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
GPSStatus GPSStatus

The constructed Mosaico GPSStatus object.

Raises:

Type Description
ValueError

If the recursive 'roi' key exists but is not a dict, or if required keys are missing.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

GPSAdapter

Bases: ROSAdapterBase[GPS]

Adapter for translating ROS NavSatFix messages to Mosaico GPS.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/gps",
    msg_type="sensor_msgs/msg/NavSatFix",
    data=
    {
        "latitude": 45.5,
        "longitude": -122.5,
        "altitude": 100.0,
        "status": {
            "status": 0,
            "service": 1,
        },
    }
)
# Automatically resolves to a flat Mosaico GPS with attached metadata
mosaico_gps = GPSAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a GPS object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "latitude": 45.5,
    "longitude": -122.5,
    "altitude": 100.0,
    "status": {
        "status": 0,
        "service": 1,
    },
}
# Automatically resolves to a flat Mosaico GPS with attached metadata
mosaico_gps = GPSAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
GPS GPS

The constructed Mosaico GPS object.

Raises:

Type Description
ValueError

If the recursive 'roi' key exists but is not a dict, or if required keys are missing.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

IMUAdapter

Bases: ROSAdapterBase[IMU]

Adapter for translating ROS Imu messages to Mosaico IMU.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/imu",
    msg_type="sensor_msgs/msg/Imu",
    data=
    {
        "linear_acceleration": {"x": 0.0, "y": 0.0, "z": 0.0},
        "angular_velocity": {"x": 0.0, "y": 0.0, "z": 0.0},
        "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
        "orientation_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        "linear_acceleration_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        "angular_velocity_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        "header": {"seq": 0, "stamp": {"sec": 0, "nanosec": 0}, "frame_id": "robot_link"},
    }
)
# Automatically resolves to a flat Mosaico IMU with attached metadata
mosaico_imu = IMUAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a IMU object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "linear_acceleration": {"x": 0.0, "y": 0.0, "z": 0.0},
    "angular_velocity": {"x": 0.0, "y": 0.0, "z": 0.0},
    "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
    "orientation_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    "linear_acceleration_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    "angular_velocity_covariance": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    "header": {"seq": 0, "stamp": {"sec": 0, "nanosec": 0}, "frame_id": "robot_link"},
}
# Automatically resolves to a flat Mosaico IMU with attached metadata
mosaico_imu = IMUAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
IMU IMU

The constructed Mosaico IMU object.

Raises:

Type Description
ValueError

If the recursive 'roi' key exists but is not a dict, or if required keys are missing.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

NMEASentenceAdapter

Bases: ROSAdapterBase[NMEASentence]

Adapter for translating ROS NMEASentence messages to Mosaico NMEASentence.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/gps/fix",
    msg_type="sensor_msgs/msg/GPSFix",
    data=
    {
        "sentence": "GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47",
    }
)
# Automatically resolves to a flat Mosaico GPS with attached metadata
mosaico_gps = NMEASentenceAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a NMEASenetence object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "sentence": "GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47",
}
# Automatically resolves to a flat Mosaico NMEASentence with attached metadata
mosaico_nmea_sentence = NMEASentenceAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
NMEASentence NMEASentence

The constructed Mosaico NMEASentence object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

ImageAdapter

Bases: ROSAdapterBase[Image]

Adapter for translating ROS Image messages to Mosaico Image.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/image",
    msg_type="sensor_msgs/msg/Image",
    data=
    {
        "data": [...],
        "width": 1,
        "height": 1,
        "step": 4,
        "encoding": "bgr8",
    }
)
# Automatically resolves to a flat Mosaico Image with attached metadata
mosaico_image = ImageAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Image object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data, **kwargs)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "data": [...],
    "width": 1,
    "height": 1,
    "step": 4,
    "encoding": "bgr8",
}
# Automatically resolves to a flat Mosaico Image with attached metadata
mosaico_image = ImageAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Image Image

The constructed Mosaico Image object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

CompressedImageAdapter

Bases: ROSAdapterBase[CompressedImage]

Adapter for translating ROS CompressedImage messages to Mosaico CompressedImage.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/compressed_image",
    msg_type="sensor_msgs/msg/CompressedImage",
    data=
    {
        "data": [...],
        "format": "jpeg",
    }
)
# Automatically resolves to a flat Mosaico CompressedImage with attached metadata
mosaico_compressed_image = CompressedImageAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a CompressedImage object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "data": [...],
    "format": "jpeg",
}
# Automatically resolves to a flat Mosaico CompressedImage with attached metadata
mosaico_compressed_image = CompressedImageAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
CompressedImage CompressedImage

The constructed Mosaico CompressedImage object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

ROIAdapter

Bases: ROSAdapterBase[ROI]

Adapter for translating ROS RegionOfInterest messages to Mosaico ROI.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/roi",
    msg_type="sensor_msgs/RegionOfInterest",
    data=
    {
        "height": 1,
        "width": 1,
        "x_offset": 0,
        "y_offset": 0,
    }
)
# Automatically resolves to a flat Mosaico ROI with attached metadata
mosaico_roi = ROIAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a ROI object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "height": 1,
    "width": 1,
    "x_offset": 0,
    "y_offset": 0,
}
# Automatically resolves to a flat Mosaico ROI with attached metadata
mosaico_roi = ROIAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
ROI ROI

The constructed Mosaico ROI object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

BatteryStateAdapter

Bases: ROSAdapterBase[BatteryState]

Adapter for translating ROS BatteryState messages to Mosaico BatteryState.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/battery_state",
    msg_type="sensor_msgs/msg/BatteryState",
    data=
    {
        "voltage": 12.6,
        "capacity": 100,
        "cell_temperature": 25,
        "cell_voltage": [12.6],
        "location": "battery",
        "charge": 100,
        "current": 0,
        "design_capacity": 100,
        "location": "battery",
        "percentage": 100,
        "power_supply_health": "good",
        "power_supply_status": "charging",
        "power_supply_technology": "li-ion",
        "present": True,
        "serial_number": "1234567890",
        "temperature": 25,
    }
)
# Automatically resolves to a flat Mosaico BatteryState with attached metadata
mosaico_battery_state = BatteryStateAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a BatteryState object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data=
{
    "voltage": 12.6,
    "capacity": 100,
    "cell_temperature": 25,
    "cell_voltage": [12.6],
    "location": "battery",
    "charge": 100,
    "current": 0,
    "design_capacity": 100,
    "location": "battery",
    "percentage": 100,
    "power_supply_health": "good",
    "power_supply_status": "charging",
    "power_supply_technology": "li-ion",
    "present": True,
    "serial_number": "1234567890",
    "temperature": 25,
}
# Automatically resolves to a flat Mosaico BatteryState with attached metadata
mosaico_battery_state = BatteryStateAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
BatteryState BatteryState

The constructed Mosaico BatteryState object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

RobotJointAdapter

Bases: ROSAdapterBase[RobotJoint]

Adapter for translating ROS JointState messages to Mosaico RobotJoint.

Supported ROS Types:

Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/joint_states",
    msg_type="sensor_msgs/msg/JointState",
    data={
        "header": {
            "stamp": {
                "sec": 17000,
                "nanosec": 0,
            },
            "frame_id": "",
        },
        "name": ["joint1", "joint2"],
        "position": [0.0, 0.0],
        "velocity": [0.0, 0.0],
        "effort": [0.0, 0.0],
    },
)
# Automatically resolves to a flat Mosaico RobotJoint with attached metadata
mosaico_robot_joint = RobotJointAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a RobotJoint object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example
ros_data={
    "header": {
        "stamp": {
            "sec": 17000,
            "nanosec": 0,
        },
        "frame_id": "",
    },
    "name": ["joint1", "joint2"],
    "position": [0.0, 0.0],
    "velocity": [0.0, 0.0],
    "effort": [0.0, 0.0],
}
# Automatically resolves to a flat Mosaico RobotJoint with attached metadata
mosaico_robot_joint = RobotJointAdapter.from_dict(ros_data)

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

PointCloudAdapterBase

Bases: ROSAdapterBase[PointCloudModel]

Base adapter for translating ROS PointCloud2 message to Mosaico specific ontology.

decode classmethod

decode(ros_data)

Deserialize the binary buffer of a ROS sensor_msgs/msg/PointCloud2 message into named field arrays.

Parameters:

Name Type Description Default
ros_data dict

Raw ROS message payload. Relevant keys: data, height, width, fields, is_bigendian, point_step.

required

Returns:

Type Description
dict[str, list]

Dictionary mapping each field name to a list of decoded values. Returns empty lists for all fields if height * width == 0.

Raises:

Type Description
ValueError

If a field exposes an unsupported datatype.

from_dict classmethod

from_dict(ros_data)

Convert a raw ROS PointCloud2 message dictionary into a typed Mosaico model.

Parameters:

Name Type Description Default
ros_data dict

Raw ROS message payload as a dictionary.

required

Returns:

Type Description
PointCloudModel

A fully populated instance of the target PointCloudModel subtype (e.g. Lidar, Radar, RGBDCamera, ...).

Raises:

Type Description
ValueError

If any required message key is missing from ros_data, or if any required field is absent after decoding.

Note

This method is not meant to be overridden in most subclasses. Only PointCloudAdapter overrides it, as it operates at a different abstraction level, returning the raw PointCloud2 message instead of a decoded model.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message instance into a Mosaico Message.

Implementation should handle recursive unwrapping, unit conversion, and validation.

Parameters:

Name Type Description Default
ros_msg ROSMessage

The source container yielded by the ROSLoader.

required
**kwargs Any

Contextual data such as calibration parameters or frame overrides.

{}

Returns:

Type Description
Message

A Mosaico Message object containing the instantiated ontology data.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

PointCloudAdapter

Bases: PointCloudAdapterBase[PointCloud2]

Adapter for translating ROS PointCloud2 messages to Mosaico PointCloud2.

Supported ROS Types:

Example:

ros_msg = ROSMessage(
            timestamp=17000,
            topic="/point_cloud",
            msg_type="sensor_msgs/PointCloud2",
            data = {
                "height": 1,
                "width": 3,
                "fields": [
                    {"name": "x", "offset": 0,  "datatype": 7, "count": 1},
                    {"name": "y", "offset": 4,  "datatype": 7, "count": 1},
                    {"name": "z", "offset": 8,  "datatype": 7, "count": 1},
                ],
            "is_bigendian": False,
            "point_step": 12,
            "row_step": 36,
            "data": ...,
            "is_dense": True,
            }
        )
# Automatically resolves to a flat Mosaico PointCloud2 with attached metadata
mosaico_point_cloud = PointCloudAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a PointCloud2 object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Example:

ros_data = {
    "height": 1,           # unorganized point cloud = 1 row
    "width": 3,            # 3 points
    "fields": [
        {
            "name": "x",
            "offset": 0,
            "datatype": 7,  # FLOAT32
            "count": 1
        },
        {
            "name": "y",
            "offset": 4,
            "datatype": 7,  # FLOAT32
            "count": 1
        },
        {
            "name": "z",
            "offset": 8,
            "datatype": 7,  # FLOAT32
            "count": 1
        },
    ],
    "is_bigendian": False,
    "point_step": 12,      # 3 fields * 4 bytes (float32) = 12 bytes per point
    "row_step": 36,        # point_step * width = 12 * 3 = 36 bytes per row
    "data": ...,
    "is_dense": True
    }

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

decode classmethod

decode(ros_data)

Deserialize the binary buffer of a ROS sensor_msgs/msg/PointCloud2 message into named field arrays.

Parameters:

Name Type Description Default
ros_data dict

Raw ROS message payload. Relevant keys: data, height, width, fields, is_bigendian, point_step.

required

Returns:

Type Description
dict[str, list]

Dictionary mapping each field name to a list of decoded values. Returns empty lists for all fields if height * width == 0.

Raises:

Type Description
ValueError

If a field exposes an unsupported datatype.

LaserScannerAdapterBase

Bases: ROSAdapterBase[_LT]

Base adapter for translating ROS LaserScan and MultiEchoLaserScan messages to Mosaico LaserScan and MultiEchoLaserScan .

from_dict classmethod

from_dict(ros_data)

Create a LaserScan/MultiEchoLaserScan instance from a ROS message dictionary.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message instance into a Mosaico Message.

Implementation should handle recursive unwrapping, unit conversion, and validation.

Parameters:

Name Type Description Default
ros_msg ROSMessage

The source container yielded by the ROSLoader.

required
**kwargs Any

Contextual data such as calibration parameters or frame overrides.

{}

Returns:

Type Description
Message

A Mosaico Message object containing the instantiated ontology data.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

LaserScanAdapter

Bases: LaserScannerAdapterBase[LaserScan]

Adapter for translating ROS LaserScan messages to Mosaico LaserScan.

Supported ROS Types:

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a LaserScan object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Create a LaserScan instance from a ROS message dictionary.

Example:

ros_data = {
    "angle_min": -1.57,
    "angle_max":  1.57,
    "angle_increment": 0.01,
    "time_increment": 0.0,
    "scan_time": 0.1,
    "range_min": 0.2,
    "range_max": 10.0,
    "ranges": [1.0, 1.1, 1.2],
    "intensities": [100.0, 110.0, 120.0],
}
# Automatically resolves to a flat Mosaico LaserScan with attached data
mosaico_laser_scan = LaserScanAdapter.from_dict(ros_data)

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

MultiEchoLaserScanAdapter

Bases: LaserScannerAdapterBase[MultiEchoLaserScan]

Adapter for translating ROS MultiEchoLaserScan messages to Mosaico MultiEchoLaserScan.

Supported ROS Types:

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a MultiEchoLaserScan object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Create a MultiEchoLaserScan instance from a ROS message dictionary.

Example:

ros_data = {
    "angle_min": -1.57,
    "angle_max":  1.57,
    "angle_increment": 0.01,
    "time_increment": 0.0,
    "scan_time": 0.1,
    "range_min": 0.2,
    "range_max": 10.0,
    "ranges": [[1.0, 1.1, 1.2], [2.0, 2.1, 2.2], [3.0, 3.1, 3.2]],
    "intensities": [[100.0, 110.0, 120.0], [200.0, 210.0, 220.0], [300.0, 310.0, 320.0]],
}
# Automatically resolves to a flat Mosaico MultiEchoLaserScanAdapter with attached data
mosaico_laser_scan = MultiEchoLaserScanAdapter.from_dict(ros_data)

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

MagneticFieldAdapter

Bases: ROSAdapterBase[Magnetometer]

Adapter for translating ROS MagneticField messages to Mosaico Magnetometer.

Supported ROS Types:

  • sensor_msgs/msg/MagneticField
Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/magnetic_field",
    msg_type="sensor_msgs/msg/MagneticField",
    data={
        "magnetic_field": {
            "x": 0.12,
            "y": -0.05,
            "z": 0.98,
        }
    }
)

mosaico_magnetometer = MagneticFieldAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Magnetometer object.

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Magnetometer Magnetometer

The constructed Mosaico Magnetometer object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

MagneticField messages typically do not include additional schema metadata, so this returns None unless extended in the future.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

JoyAdapter

Bases: ROSAdapterBase[Joy]

Adapter for translating ROS Joy messages to Mosaico Joy.

Supported ROS Types:

  • sensor_msgs/msg/Joy
Example
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/joy",
    msg_type="sensor_msgs/msg/Joy",
    data={
        # Axes and buttons are list-based fields (not queryable via `.Q`)
        "axes": [0.0, -1.0, 0.5],
        "buttons": [0, 1, 0, 1],
    }
)

mosaico_joy = JoyAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Joy object.

from_dict classmethod

from_dict(ros_data)

Converts the raw dictionary data into the specific Mosaico type.

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Joy Joy

The constructed Mosaico Joy object.

schema_metadata classmethod

schema_metadata(ros_data, **kwargs)

Extract the ROS message specific schema metadata, if any.

Joy messages typically do not include additional schema metadata, so this returns None unless extended in the future.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.