Skip to content

geometry_msgs

mosaicolabs.ros_bridge.adapters.geometry_msgs

Geometry Messages Adaptation Module.

This module provides specialized adapters for translating ROS geometry_msgs into the standardized Mosaico Ontology. It implements recursive unwrapping to handle common ROS patterns, such as "Stamped" envelopes and covariance wrappers, ensuring that spatial data is normalized before ingestion.

PoseAdapter

Bases: ROSAdapterBase[Pose]

Adapter for translating ROS Pose-related messages to Mosaico Pose.

This adapter follows the "Adaptation, Not Just Parsing" philosophy by actively unwrapping nested ROS structures and normalizing them into strongly-typed Mosaico Pose objects.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'pose' keys. If found (as in PoseStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
# Internal usage within the ROS Bridge
ros_msg = ROSMessage(
    timestamp=17000,
    topic="/pose",
    msg_type="geometry_msgs/msg/PoseStamped",
    data={
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "pose": {
            "position": {"x": 1.0, "y": 2.0, "z": 0.0},
            "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}
        },
    }
}
# Automatically resolves to a flat Mosaico Pose with attached metadata
mosaico_pose = PoseAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Main entry point for translating a high-level ROSMessage.

Parameters:

Name Type Description Default
ros_msg ROSMessage

The source ROS message yielded by the loader.

required
**kwargs Any

Additional context for the translation.

{}

Returns:

Type Description
Message

A Mosaico Message containing the normalized Pose payload.

from_dict classmethod

from_dict(ros_data)

Recursively parses a dictionary to extract a Pose object.

Strategy:

  • Recurse: If a 'pose' key is found, dive deeper into the structure.
  • Leaf Node: At the base level, map 'position' and 'orientation' to Point3d and Quaternion.
  • Metadata Binding: Headers and covariances are attached during recursion unwinding.
Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "pose": {
        "position": {"x": 1.0, "y": 2.0, "z": 0.0},
        "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}
    },
}
# Automatically resolves to a flat Mosaico Pose with attached metadata
mosaico_pose = PoseAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Pose Pose

The constructed Mosaico Pose object.

Raises:

Type Description
ValueError

If the recursive 'pose' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

schema_metadata abstractmethod classmethod

schema_metadata(ros_data, **kwargs)

Extracts ROS-specific schema metadata for the Mosaico platform.

This allows preserving original ROS attributes that may not fit directly into the physical ontology fields.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

TwistAdapter

Bases: ROSAdapterBase[Velocity]

Adapter for translating ROS Twist-related messages to Mosaico Velocity.

Commonly referred to as a "Twist," this model captures the instantaneous motion of an object split into linear and angular components.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'twist' keys. If found (as in TwistStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg= ROSMessage(
    timestamp=1700000000000,
    topic="/cmd_vel",
    msg_type="geometry_msgs/msg/TwistStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "twist": {
            "linear": {"x": 5.0, "y": 0.0, "z": 0.0},
            "angular": {"x": 0.0, "y": 0.0, "z": 1.0}
    },
    "covariance": [0.1] * 36
)
# Automatically resolves to a flat Mosaico Velocity with attached metadata
mosaico_velocity = TwistAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Velocity object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Recursively parses the ROS data dictionary to extract a Velocity (Twist).

Strategy: - Recurse: If a 'twist' key is found, dive deeper into the structure. - Leaf Node: At the base level, map 'linear' and 'angular' to Vector3. - Metadata Binding: Headers and covariances are attached during recursion unwinding.

Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "twist": {
        "linear": {"x": 5.0, "y": 0.0, "z": 0.0},
        "angular": {"x": 0.0, "y": 0.0, "z": 1.0}
    },
    "covariance": [0.1] * 36
}
# Automatically resolves to a flat Mosaico Velocity with attached metadata
mosaico_velocity = TwistAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Velocity Velocity

The constructed Mosaico Velocity object.

Raises:

Type Description
ValueError

If the recursive 'twist' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

AccelAdapter

Bases: ROSAdapterBase[Acceleration]

Adapter for translating ROS Accel-related messages to Mosaico Acceleration.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'accel' keys. If found (as in AccelStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/accel",
    timestamp=17000,
    msg_type="geometry_msgs/msg/AccelStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "accel": {
            "linear": {"x": 5.0, "y": 0.0, "z": 0.0},
            "angular": {"x": 0.0, "y": 0.0, "z": 1.0}
        },
        "covariance": [0.1] * 36
    }
# Automatically resolves to a flat Mosaico Acceleration with attached metadata
mosaico_acceleration = AccelAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Acceleration object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Recursively parses the ROS data dictionary to extract an Acceleration.

Strategy: - Recurse: If a 'accel' key is found, dive deeper into the structure. - Leaf Node: At the base level, map 'linear' and 'angular' to Vector3. - Metadata Binding: Headers and covariances are attached during recursion unwinding.

Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "accel": {
        "linear": {"x": 5.0, "y": 0.0, "z": 0.0},
        "angular": {"x": 0.0, "y": 0.0, "z": 1.0}
    },
    "covariance": [0.1] * 36
}
# Automatically resolves to a flat Mosaico Acceleration with attached metadata
mosaico_acceleration = AccelAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Acceleration Acceleration

The constructed Mosaico Acceleration object.

Raises:

Type Description
ValueError

If the recursive 'accel' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

Vector3Adapter

Bases: ROSAdapterBase[Vector3d]

Adapter for translating ROS Vector3 messages to Mosaico Vector3d.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'vector' keys. If found (as in Vector3Stamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/vector3",
    timestamp=17000,
    msg_type="geometry_msgs/msg/Vector3Stamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "vector": {"x": 5.0, "y": 0.0, "z": 0.0},
    }
# Automatically resolves to a flat Mosaico Vector3 with attached metadata
mosaico_vector3 = Vector3Adapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Vector3d object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Recursively parses the ROS data to extract a Vector3d.

Strategy: - Recurse: If a 'vector' key is found, dive deeper into the structure. - Leaf Node: At the base level, map 'x', 'y' and 'z' to Vector3d. - Metadata Binding: Headers and covariances are attached during recursion unwinding.

Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "vector": {"x": 5.0, "y": 0.0, "z": 0.0},
}
# Automatically resolves to a flat Mosaico Vector3d with attached metadata
mosaico_vector3d = Vector3Adapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Vector3d Vector3d

The constructed Mosaico Vector3d object.

Raises:

Type Description
ValueError

If the recursive 'vector' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

PointAdapter

Bases: ROSAdapterBase[Point3d]

Adapter for translating ROS Point messages to Mosaico Point3d.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'point' keys. If found (as in PointStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/point",
    timestamp=17000,
    msg_type="geometry_msgs/msg/PointStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "point": {"x": 5.0, "y": 0.0, "z": 0.0},
    }
# Automatically resolves to a flat Mosaico Point3d with attached metadata
mosaico_point3d = PointAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Point3d object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Recursively parses the ROS data to extract a Point3d.

Strategy
  • Recurse: If a 'point' key is found, dive deeper into the structure.
  • Leaf Node: At the base level, map 'x', 'y' and 'z' to Point3d.
  • Metadata Binding: Headers and covariances are attached during recursion unwinding.
Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "point": {"x": 5.0, "y": 0.0, "z": 0.0},
}
# Automatically resolves to a flat Mosaico Point3d with attached metadata
mosaico_point3d = PointAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Point3d Point3d

The constructed Mosaico Point3d object.

Raises:

Type Description
ValueError

If the recursive 'point' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

QuaternionAdapter

Bases: ROSAdapterBase[Quaternion]

Adapter for translating ROS Quaternion messages to Mosaico Quaternion.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'quaternion' keys. If found (as in QuaternionStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/quaternion",
    timestamp=17000,
    msg_type="geometry_msgs/msg/QuaternionStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "quaternion": {"x": 5.0, "y": 0.0, "z": 0.0, "w": 1.0},
    }
# Automatically resolves to a flat Mosaico Quaternion with attached metadata
mosaico_quaternion = QuaternionAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Quaternion object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Recursively parses the ROS data to extract a Quaternion.

Strategy
  • Recurse: If a 'quaternion' key is found, dive deeper into the structure.
  • Leaf Node: At the base level, map 'x', 'y', 'z' and 'w' to Quaternion.
  • Metadata Binding: Headers and covariances are attached during recursion unwinding.
Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "quaternion": {"x": 5.0, "y": 0.0, "z": 0.0, "w": 1.0},
}
# Automatically resolves to a flat Mosaico Quaternion with attached metadata
mosaico_quaternion = QuaternionAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Quaternion Quaternion

The constructed Mosaico Quaternion object.

Raises:

Type Description
ValueError

If the recursive 'quaternion' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

TransformAdapter

Bases: ROSAdapterBase[Transform]

Adapter for translating ROS Transform messages to Mosaico Transform.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'transform' keys. If found (as in TransformStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/transform",
    timestamp=17000,
    msg_type="geometry_msgs/msg/TransformStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "transform": {"translation": {"x": 5.0, "y": 0.0, "z": 0.0}, "rotation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}},
    }
# Automatically resolves to a flat Mosaico Transform with attached metadata
mosaico_transform = TransformAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a Transform object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Parses ROS Transform data. Handles both nested 'transform' field (from Stamped) and flat structure.

Strategy
  • Recurse: If a 'transform' key is found, dive deeper into the structure.
  • Leaf Node: At the base level, map 'translation' and 'rotation' to Transform.
  • Metadata Binding: Headers and covariances are attached during recursion unwinding.
Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "transform": {"translation": {"x": 5.0, "y": 0.0, "z": 0.0}, "rotation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}},
}
# Automatically resolves to a flat Mosaico Transform with attached metadata
mosaico_transform = TransformAdapter.from_dict(ros_data)

Parameters:

Name Type Description Default
ros_data dict

The raw dictionary from the ROS message.

required

Returns:

Name Type Description
Transform Transform

The constructed Mosaico Transform object.

Raises:

Type Description
ValueError

If the recursive 'transform' key exists but is not a dict, or if required keys are missing.

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.

WrenchAdapter

Bases: ROSAdapterBase[ForceTorque]

Adapter for translating ROS Wrench messages to Mosaico ForceTorque.

Supported ROS Types:

Recursive Unwrapping Strategy: The adapter checks for nested 'wrench' keys. If found (as in WrenchStamped), it recurses to the leaf node while collecting metadata like headers and covariance matrices along the way.

Example
ros_msg = ROSMessage(
    topic="/wrench",
    timestamp=17000,
    msg_type="geometry_msgs/msg/WrenchStamped",
    data=
    {
        "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
        "wrench": {"force": {"x": 5.0, "y": 0.0, "z": 0.0}, "torque": {"x": 0.0, "y": 0.0, "z": 0.0}},
    }
# Automatically resolves to a flat Mosaico ForceTorque with attached metadata
mosaico_wrench = WrenchAdapter.translate(ros_msg)

translate classmethod

translate(ros_msg, **kwargs)

Translates a ROS message into a Mosaico Message.

Returns:

Name Type Description
Message Message

The translated message containing a ForceTorque object.

Raises:

Type Description
Exception

Wraps any translation error with context (topic name, timestamp).

from_dict classmethod

from_dict(ros_data)

Parses ROS ForceTorque data. Handles both nested 'wrench' field (from Stamped) and flat structure.

Strategy
  • Recurse: If a 'wrench' key is found, dive deeper into the structure.
  • Leaf Node: At the base level, map 'force' and 'torque' to ForceTorque.
  • Metadata Binding: Headers and covariances are attached during recursion unwinding.
Example
ros_data=
{
    "header": {"frame_id": "map", "stamp": {"sec": 17000, "nanosec": 0}},
    "wrench": {"force": {"x": 5.0, "y": 0.0, "z": 0.0}, "torque": {"x": 0.0, "y": 0.0, "z": 0.0}},
}
# Automatically resolves to a flat Mosaico ForceTorque with attached metadata
mosaico_wrench = WrenchAdapter.from_dict(ros_data)

ros_msg_type abstractmethod classmethod

ros_msg_type()

Returns the specific ROS message type handled by this adapter.

ontology_data_type classmethod

ontology_data_type()

Returns the Ontology class type associated with this adapter.