Skip to content

Base Models and Mixins

mosaicolabs.models.Message

Bases: BaseModel

The universal transport envelope for Mosaico data.

The Message class wraps a polymorphic Serializable payload with middleware metadata, such as recording timestamps and headers.

Attributes:

Name Type Description
timestamp_ns int

Sensor acquisition timestamp in nanoseconds (event time). This represents the time at which the underlying physical event occurred — i.e., when the sensor actually captured or generated the data.

data Serializable

The actual ontology data payload (e.g., an IMU or GPS instance).

recording_timestamp_ns Optional[int]

Ingestion timestamp in nanoseconds (record time). This represents the time at which the message was received and persisted by the recording system (e.g., rosbag, parquet writer, logging pipeline, or database).

frame_id Optional[str]

A string identifier for the coordinate frame (spatial context).

sequence_id Optional[int]

An optional sequence ID, primarily used for legacy tracking.

Querying with the .Q Proxy

When constructing a QueryOntologyCatalog, the Message attributes are fully queryable.

Field Access Path Queryable Type Supported Operators
<Model>.Q.timestamp_ns Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
<Model>.Q.recording_timestamp_ns Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
<Model>.Q.frame_id String .eq(), .neq(), .match(), .in_()
<Model>.Q.sequence_id Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
Universal Compatibility

The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64) or any custom user-defined class that is a subclass of Serializable.

Example
from mosaicolabs import MosaicoClient, IMU, Floating64, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific acquisition second
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.timestamp_ns.lt(1770282868))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

    # Filter primitive Floating64 telemetry by frame identifier
    qresponse = client.query(
        QueryOntologyCatalog(Floating64.Q.frame_id.eq("robot_base"))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

data instance-attribute

data

The actual ontology data payload (e.g., an IMU or GPS instance).

timestamp_ns instance-attribute

timestamp_ns

Sensor acquisition timestamp in nanoseconds (event time).

This represents the time at which the underlying physical event occurred — i.e., when the sensor actually captured or generated the data.

Ideally, this value originates from:

  • the sensor hardware clock, or
  • a driver-converted hardware timestamp expressed in system time.

This is the authoritative time describing when the data happened and should be used for:

  • synchronization across sensors
  • state estimation and sensor fusion
  • temporal alignment
  • latency analysis (when compared to recording time)
Querying with the .Q Proxy

The timestamp_ns field is queryable using the .Q proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.timestamp_ns Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64) or any custom user-defined class that is a subclass of Serializable.

Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific acquisition second
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.timestamp_ns.lt(1770282868))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

recording_timestamp_ns class-attribute instance-attribute

recording_timestamp_ns = None

Ingestion timestamp in nanoseconds (record time).

This represents the time at which this message was received and persisted by the recording system (e.g., rosbag, parquet writer, logging pipeline, or database).

This timestamp reflects infrastructure timing and may include:

  • transport delay
  • middleware delay
  • serialization/deserialization delay
  • scheduling delay

It does NOT represent when the sensor measurement occurred.

Typical usage:

  • latency measurement (recording_timestamp_ns - timestamp_ns)
  • debugging transport or pipeline delays
  • ordering messages by arrival time

If not explicitly set, this value may be None.

Querying with the .Q Proxy

The recording_timestamp_ns field is queryable using the .Q proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.recording_timestamp_ns Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64) or any custom user-defined class that is a subclass of Serializable

Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific recording second
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.recording_timestamp_ns.lt(1770282868))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

frame_id class-attribute instance-attribute

frame_id = None

A string identifier for the coordinate frame (spatial context).

Querying with the .Q Proxy

The frame_id field is queryable using the .Q proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.frame_id String .eq(), .neq(), .in_(), .match()

The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64) or any custom user-defined class that is a subclass of Serializable

Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific frame_id
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.frame_id.eq("base_link"))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

sequence_id class-attribute instance-attribute

sequence_id = None

An optional sequence ID, primarily used for legacy tracking.

Querying with the .Q Proxy

The sequence_id field is queryable using the .Q proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.sequence_id Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64) or any custom user-defined class that is a subclass of Serializable

Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific sequence_id
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.sequence_id.eq(123))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

model_post_init

model_post_init(context)

Validates the message structure after initialization.

Ensures that there are no field name collisions between the envelope (e.g., timestamp_ns) and the data payload.

ontology_type

ontology_type()

Retrieves the class type of the ontology object stored in the data field.

ontology_tag

ontology_tag()

Returns the unique ontology tag name associated with the object in the data field.

get_data

get_data(target_type)

Safe, type-hinted accessor for the data payload.

Parameters:

Name Type Description Default
target_type Type[TSerializable]

The expected Serializable subclass type.

required

Returns:

Type Description
Optional[TSerializable]

The data object cast to the requested type.

Raises:

Type Description
TypeError

If the actual data type does not match the requested target_type.

Example
# Get the IMU data from the message
image_data = message.get_data(Image)
print(f"Timestamp: {message.timestamp_ns}")
print(f"Image size: {image_data.height}x{image_data.width}")
# Show the image
image_data.to_pillow().show()

# Get the Floating64 data from the message
floating64_data = message.get_data(Floating64)
print(f"Timestamp: {message.timestamp_ns}")
print(f"Data value: {floating64_data.data}")

from_dataframe_row staticmethod

from_dataframe_row(
    row, topic_name, timestamp_column_name="timestamp_ns"
)

Reconstructs a Message object from a flattened DataFrame row.

In the Mosaico Data Platform, DataFrames represent topics using a nested naming convention: {topic}.{tag}.{field}. This method performs Smart Reconstruction by:

  1. Topic Validation: Verifying if any columns associated with the topic_name exist in the row.
  2. Tag Inference: Inspecting the column headers to automatically determine the original ontology tag (e.g., "imu").
  3. Data Extraction: Stripping prefixes and re-nesting the flat columns into their original dictionary structures.
  4. Type Casting: Re-instantiating the specific Serializable subclass and wrapping it in a Message envelope.

Parameters:

Name Type Description Default
row Series

A single row from a Pandas DataFrame, representing a point in time across one or more topics.

required
topic_name str

The name of the specific topic to extract from the row.

required
timestamp_column_name str

The name of the column containing the timestamp.

'timestamp_ns'

Returns:

Type Description
Optional[Message]

A reconstructed Message instance containing the typed ontology data, or None if the topic is not present or the data is incomplete.

Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.get_sequence_handler("example_sequence")
    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
        topics = ["/front/imu", "/front/camera/image_raw"]
    ):
        # Do something with the dataframe.
        # For example, you can sync the data using the `SyncTransformer`:
        sync_transformer = SyncTransformer(
            target_fps = 30, # resample at 30 Hz and fill the Nans with a Hold policy
        )
        synced_df = sync_transformer.transform(df)

        # Reconstruct the image message from a dataframe row
        image_msg = Message.from_dataframe_row(synced_df, "/front/camera/image_raw")
        image_data = image_msg.get_data(Image)
        # Show the image
        image_data.to_pillow().show()
        # ...

mosaicolabs.models.Serializable

Bases: BaseModel, _QueryProxyMixin

The base class for all Mosaico ontology data payloads.

This class serves as the root for every sensor and data type in the Mosaico ecosystem. By inheriting from Serializable, data models are automatically compatible with the platform's storage, querying, and serialization engines.

Dynamic Attributes Injection

When you define a subclass, several key attributes are automatically managed or required. Understanding these is essential for customizing how your data is treated by the platform:

  • __serialization_format__: Determines the batching strategy and storage optimization.

    • Role: It tells the SequenceWriter whether to flush data based on byte size (optimal for heavy data like Images) or record count (optimal for light telemetry like IMU).
    • Default: SerializationFormat.Default.
  • __ontology_tag__: The unique string identifier for the class (e.g., "imu", "gps_raw").

    • Role: This tag is used in the global registry to reconstruct objects from raw platform data.
    • Generation: If not explicitly provided, it is auto-generated by converting the class name from CamelCase to snake_case.
  • __class_type__: A reference to the concrete class itself.

    • Role: Injected during initialization to facilitate polymorphic instantiation and safe type-checking when extracting data from a Message.
Requirements for Custom Ontologies

To create a valid custom ontology, your subclass must:

  1. Inherit from Serializable.
  2. Define a __msco_pyarrow_struct__ attribute using pa.StructType to specify the physical schema.
  3. Define the class fields (using Pydantic syntax) matching the Arrow structure.
Automatic Registration

Any subclass of Serializable is automatically registered in the global Mosaico registry upon definition. This enables the use of the factory methods and the .Q query proxy immediately.

is_registered classmethod

is_registered()

Checks if a class is registered.

Returns:

Name Type Description
bool bool

True if registered.

ontology_tag classmethod

ontology_tag()

Retrieves the unique identifier (tag) for the current ontology class, automatically generated during class definition.

This method provides the string key used by the Mosaico platform to identify and route specific data types within the ontology registry. It abstracts away the internal naming conventions, ensuring that you always use the correct identifier for queries and serialization.

Returns:

Type Description
str

The registered string tag for this class (e.g., "imu", "gps").

Raises:

Type Description
Exception

If the class was not properly initialized via __init_subclass__.

Practical Application: Topic Filtering

This method is particularly useful when constructing QueryTopic requests. By using the convenience method QueryTopic.with_ontology_tag(), you can filter topics by data type without hardcoding strings that might change.

Example:

from mosaicolabs import MosaicoClient, Topic, IMU, QueryTopic

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter for a specific data value (using constructor)
    qresponse = client.query(
        QueryTopic(
            Topic.with_ontology_tag(IMU.ontology_tag()),
        )
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

mosaicolabs.models.CovarianceMixin

Bases: BaseModel

A mixin that adds uncertainty fields (covariance and covariance_type) to data models.

This is particularly useful for complex sensors like IMUs, Odometry, or GNSS receivers that provide multidimensional uncertainty matrices along with their primary measurements.

Dynamic Schema Injection

This mixin uses the __init_subclass__ hook to perform a Schema Append operation:

  1. It inspects the child class's existing __msco_pyarrow_struct__.
  2. It appends a covariance and covariance_type fields.
  3. It reconstructs the final pa.struct for the class.
Collision Safety

The mixin performs a collision check during class definition. If the child class already defines a covariance or covariance_type field in its PyArrow struct, a ValueError will be raised to prevent schema corruption.

Attributes:

Name Type Description
covariance Optional[List[float]]

Optional list of 64-bit floats representing the flattened matrix.

covariance_type Optional[int]

Optional 16-bit integer representing the covariance enum.

Querying with the .Q Proxy

When constructing a QueryOntologyCatalog, the class fields are queryable across any model inheriting from this mixin, according to the following table:

Field Access Path Queryable Type Supported Operators
<Model>.Q.covariance_type Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
<Model>.Q.covariance Non-Queryable None
Universal Compatibility

The <Model> placeholder adapts based on how the CovarianceMixin is integrated into your data structure:

  • Direct Inheritance: Represents any class (e.g., Vector3d, Quaternion) that inherits directly from CovarianceMixin.
  • Composition (Nested Fields): When a complex model (like IMU) contains fields that are themselves covariance-aware, the proxy allows you to "drill down" to that specific attribute. For example, since IMU.acceleration is a Vector3d, you access its covariance type via IMU.Q.acceleration.covariance_type.
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter IMU data by a specific acquisition second
    # `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.acceleration.covariance_type.eq(FROM_CALIBRATED_PROCEDURE))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

covariance class-attribute instance-attribute

covariance = None

Optional list of 64-bit floats representing the flattened matrix.

Querying with the .Q Proxy
Non-Queryable

The field is not queryable with the .Q Proxy.

covariance_type class-attribute instance-attribute

covariance_type = None

Optional 16-bit integer representing the covariance enum.

This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.

Querying with the .Q Proxy

The covariance_type field is fully queryable via the .Q Proxy. The <Model> placeholder in the path represents any Mosaico class that exposes covariance information, either directly or through its internal fields.

Field Access Path Queryable Type Supported Operators
<Model>.Q.covariance_type Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder adapts based on how the CovarianceMixin is integrated into your data structure:

  • Direct Inheritance: Represents any class (e.g., Vector3d, Quaternion) that inherits directly from CovarianceMixin.
  • Composition (Nested Fields): When a complex model (like IMU) contains fields that are themselves covariance-aware, the proxy allows you to "drill down" to that specific attribute. For example, since IMU.acceleration is a Vector3d, you access its covariance type via IMU.Q.acceleration.covariance_type.
Filtering by Calibration Type

This example demonstrates searching for data segments where the acceleration was derived from a specific calibrated procedure.

from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog

# Assume FROM_CALIBRATED_PROCEDURE is a user-defined integer constant
with MosaicoClient.connect("localhost", 6726) as client:
    # Target the covariance_type nested within the acceleration field
    qbuilder = QueryOntologyCatalog(
        IMU.Q.acceleration.covariance_type.eq(FROM_CALIBRATED_PROCEDURE)
    )

    results = client.query(qbuilder)

    if results:
        for item in results:
            print(f"Sequence: {item.sequence.name}")
            print(f"Matching Topics: {[topic.name for topic in item.topics]}")

mosaicolabs.models.VarianceMixin

Bases: BaseModel

A mixin that adds 1-dimensional uncertainty fields (variance and variance_type).

Recommended for sensors with scalar uncertain outputs, such as ultrasonic rangefinders, temperature sensors, or individual encoders.

Dynamic Schema Injection

This mixin uses the __init_subclass__ hook to perform a Schema Append operation:

  1. It inspects the child class's existing __msco_pyarrow_struct__.
  2. It appends a variance and variance_type field.
  3. It reconstructs the final pa.struct for the class.
Collision Safety

The mixin performs a collision check during class definition. If the child class already defines a variance or variance_type field in its PyArrow struct, a ValueError will be raised to prevent schema corruption.

Querying with the .Q Proxy

When constructing a QueryOntologyCatalog, the class fields are queryable across any model inheriting from this mixin, according to the following table:

Field Access Path Queryable Type Supported Operators
<Model>.Q.variance Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
<Model>.Q.variance_type Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()
Universal Compatibility

The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:

  • Direct Inheritance: Used for classes like Pressure or Temperature that inherit directly from VarianceMixin to represent 1D uncertainty.
  • Composition (Nested Access): If a complex model contains a field that is a subclass of VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Example
from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter Pressure data by a specific variance value
    qresponse = client.query(
        QueryOntologyCatalog(Pressure.Q.variance.lt(0.76))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

    # Filter Pressure data by a specific variance type
    # `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
    qresponse = client.query(
        QueryOntologyCatalog(Pressure.Q.variance_type.eq(FROM_CALIBRATED_PROCEDURE))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

variance class-attribute instance-attribute

variance = None

Optional 64-bit float representing the variance of the data.

This field is injected into the model via composition, ensuring that sensor data is paired with the optional variance attribute.

Querying with the .Q Proxy

The variance field is queryable with the .Q Proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.variance Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:

  • Direct Inheritance: Used for classes like Pressure or Temperature that inherit directly from VarianceMixin to represent 1D uncertainty.
  • Composition (Nested Access): If a complex model contains a field that is a subclass of VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Example
from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter Pressure data by a specific acquisition second
    qresponse = client.query(
        QueryOntologyCatalog(Pressure.Q.variance.lt(0.76))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

variance_type class-attribute instance-attribute

variance_type = None

Optional 16-bit integer representing the variance parameterization.

This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.

Querying with the .Q Proxy

The variance_type field is fully queryable via the .Q Proxy.

Field Access Path Queryable Type Supported Operators
<Model>.Q.variance_type Numeric .eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between()

The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:

  • Direct Inheritance: Used for classes like Pressure or Temperature that inherit directly from VarianceMixin to represent 1D uncertainty.
  • Composition (Nested Access): If a complex model contains a field that is a subclass of VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Filtering by Precision

The following examples demonstrate how to filter sensor data based on the magnitude of the variance or the specific procedure used to calculate it.

from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter by variance threshold
    results = client.query(QueryOntologyCatalog(
        Pressure.Q.variance.lt(0.76)
    ))

    if results:
        for item in results:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

mosaicolabs.models.BaseModel

Bases: BaseModel

The root base class for SDK data models.

It inherits from pydantic.BaseModel to provide runtime type checking and initialization logic. It adds a hook for defining the corresponding PyArrow structure (__msco_pyarrow_struct__), enabling the SDK to auto-generate Flight schemas.

Note

This class has been added mainly for wrapping pydantic, toward future implementation where other fields mapping and checks are implemented