Base Models and Mixins
mosaicolabs.models.Message ¶
Bases: BaseModel
The universal transport envelope for Mosaico data.
The Message class wraps a polymorphic Serializable
payload with middleware metadata, such as recording timestamps and headers.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp_ns |
int
|
Sensor acquisition timestamp in nanoseconds (event time). This represents the time at which the underlying physical event occurred — i.e., when the sensor actually captured or generated the data. |
data |
Serializable
|
The actual ontology data payload (e.g., an IMU or GPS instance). |
recording_timestamp_ns |
Optional[int]
|
Ingestion timestamp in nanoseconds (record time). This represents the time at which the message was received and persisted by the recording system (e.g., rosbag, parquet writer, logging pipeline, or database). |
frame_id |
Optional[str]
|
A string identifier for the coordinate frame (spatial context). |
sequence_id |
Optional[int]
|
An optional sequence ID, primarily used for legacy tracking. |
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the Message attributes are fully queryable.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.timestamp_ns |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.recording_timestamp_ns |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.frame_id |
String |
.eq(), .neq(), .match(), .in_() |
<Model>.Q.sequence_id |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
Universal Compatibility
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined class that is a subclass of Serializable.
Example
from mosaicolabs import MosaicoClient, IMU, Floating64, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.timestamp_ns.lt(1770282868))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter primitive Floating64 telemetry by frame identifier
qresponse = client.query(
QueryOntologyCatalog(Floating64.Q.frame_id.eq("robot_base"))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
timestamp_ns
instance-attribute
¶
Sensor acquisition timestamp in nanoseconds (event time).
This represents the time at which the underlying physical event occurred — i.e., when the sensor actually captured or generated the data.
Ideally, this value originates from:
- the sensor hardware clock, or
- a driver-converted hardware timestamp expressed in system time.
This is the authoritative time describing when the data happened and should be used for:
- synchronization across sensors
- state estimation and sensor fusion
- temporal alignment
- latency analysis (when compared to recording time)
Querying with the .Q Proxy¶
The timestamp_ns field is queryable using the .Q proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.timestamp_ns |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined class that is a subclass of Serializable.
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.timestamp_ns.lt(1770282868))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
recording_timestamp_ns
class-attribute
instance-attribute
¶
Ingestion timestamp in nanoseconds (record time).
This represents the time at which this message was received and persisted by the recording system (e.g., rosbag, parquet writer, logging pipeline, or database).
This timestamp reflects infrastructure timing and may include:
- transport delay
- middleware delay
- serialization/deserialization delay
- scheduling delay
It does NOT represent when the sensor measurement occurred.
Typical usage:
- latency measurement (recording_timestamp_ns - timestamp_ns)
- debugging transport or pipeline delays
- ordering messages by arrival time
If not explicitly set, this value may be None.
Querying with the .Q Proxy¶
The recording_timestamp_ns field is queryable using the .Q proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.recording_timestamp_ns |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined class that is a subclass of Serializable
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific recording second
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.recording_timestamp_ns.lt(1770282868))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
frame_id
class-attribute
instance-attribute
¶
A string identifier for the coordinate frame (spatial context).
Querying with the .Q Proxy¶
The frame_id field is queryable using the .Q proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.frame_id |
String |
.eq(), .neq(), .in_(), .match() |
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined class that is a subclass of Serializable
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific frame_id
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.frame_id.eq("base_link"))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
sequence_id
class-attribute
instance-attribute
¶
An optional sequence ID, primarily used for legacy tracking.
Querying with the .Q Proxy¶
The sequence_id field is queryable using the .Q proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.sequence_id |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined class that is a subclass of Serializable
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific sequence_id
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.sequence_id.eq(123))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
model_post_init ¶
Validates the message structure after initialization.
Ensures that there are no field name collisions between the envelope
(e.g., timestamp_ns) and the data payload.
ontology_type ¶
Retrieves the class type of the ontology object stored in the data field.
ontology_tag ¶
Returns the unique ontology tag name associated with the object in the data field.
get_data ¶
Safe, type-hinted accessor for the data payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_type
|
Type[TSerializable]
|
The expected |
required |
Returns:
| Type | Description |
|---|---|
Optional[TSerializable]
|
The data object cast to the requested type. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If the actual data type does not match the requested |
Example
# Get the IMU data from the message
image_data = message.get_data(Image)
print(f"Timestamp: {message.timestamp_ns}")
print(f"Image size: {image_data.height}x{image_data.width}")
# Show the image
image_data.to_pillow().show()
# Get the Floating64 data from the message
floating64_data = message.get_data(Floating64)
print(f"Timestamp: {message.timestamp_ns}")
print(f"Data value: {floating64_data.data}")
from_dataframe_row
staticmethod
¶
Reconstructs a Message object from a flattened DataFrame row.
In the Mosaico Data Platform, DataFrames represent topics using a nested naming
convention: {topic}.{tag}.{field}. This method performs
Smart Reconstruction by:
- Topic Validation: Verifying if any columns associated with the
topic_nameexist in the row. - Tag Inference: Inspecting the column headers to automatically determine
the original ontology tag (e.g.,
"imu"). - Data Extraction: Stripping prefixes and re-nesting the flat columns into their original dictionary structures.
- Type Casting: Re-instantiating the specific
Serializablesubclass and wrapping it in aMessageenvelope.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
row
|
Series
|
A single row from a Pandas DataFrame, representing a point in time across one or more topics. |
required |
topic_name
|
str
|
The name of the specific topic to extract from the row. |
required |
timestamp_column_name
|
str
|
The name of the column containing the timestamp. |
'timestamp_ns'
|
Returns:
| Type | Description |
|---|---|
Optional[Message]
|
A reconstructed |
Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.get_sequence_handler("example_sequence")
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
# Do something with the dataframe.
# For example, you can sync the data using the `SyncTransformer`:
sync_transformer = SyncTransformer(
target_fps = 30, # resample at 30 Hz and fill the Nans with a Hold policy
)
synced_df = sync_transformer.transform(df)
# Reconstruct the image message from a dataframe row
image_msg = Message.from_dataframe_row(synced_df, "/front/camera/image_raw")
image_data = image_msg.get_data(Image)
# Show the image
image_data.to_pillow().show()
# ...
mosaicolabs.models.Serializable ¶
Bases: BaseModel, _QueryProxyMixin
The base class for all Mosaico ontology data payloads.
This class serves as the root for every sensor and data type in the Mosaico ecosystem.
By inheriting from Serializable, data models are automatically compatible with the platform's storage,
querying, and serialization engines.
Dynamic Attributes Injection¶
When you define a subclass, several key attributes are automatically managed or required. Understanding these is essential for customizing how your data is treated by the platform:
-
__serialization_format__: Determines the batching strategy and storage optimization.- Role: It tells the
SequenceWriterwhether to flush data based on byte size (optimal for heavy data likeImages) or record count (optimal for light telemetry likeIMU). - Default:
SerializationFormat.Default.
- Role: It tells the
-
__ontology_tag__: The unique string identifier for the class (e.g.,"imu","gps_raw").- Role: This tag is used in the global registry to reconstruct objects from raw platform data.
- Generation: If not explicitly provided, it is auto-generated by converting the class name from
CamelCasetosnake_case.
-
__class_type__: A reference to the concrete class itself.- Role: Injected during initialization to facilitate polymorphic instantiation and safe type-checking when extracting data from a
Message.
- Role: Injected during initialization to facilitate polymorphic instantiation and safe type-checking when extracting data from a
Requirements for Custom Ontologies¶
To create a valid custom ontology, your subclass must:
- Inherit from
Serializable. - Define a
__msco_pyarrow_struct__attribute usingpa.StructTypeto specify the physical schema. - Define the class fields (using Pydantic syntax) matching the Arrow structure.
Automatic Registration
Any subclass of Serializable is automatically registered in the global Mosaico registry upon definition. This enables the use of the factory methods and the .Q query proxy immediately.
is_registered
classmethod
¶
Checks if a class is registered.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if registered. |
ontology_tag
classmethod
¶
Retrieves the unique identifier (tag) for the current ontology class, automatically generated during class definition.
This method provides the string key used by the Mosaico platform to identify and route specific data types within the ontology registry. It abstracts away the internal naming conventions, ensuring that you always use the correct identifier for queries and serialization.
Returns:
| Type | Description |
|---|---|
str
|
The registered string tag for this class (e.g., |
Raises:
| Type | Description |
|---|---|
Exception
|
If the class was not properly initialized via |
Practical Application: Topic Filtering
This method is particularly useful when constructing QueryTopic
requests. By using the convenience method QueryTopic.with_ontology_tag(),
you can filter topics by data type without hardcoding strings that might change.
Example:
from mosaicolabs import MosaicoClient, Topic, IMU, QueryTopic
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for a specific data value (using constructor)
qresponse = client.query(
QueryTopic(
Topic.with_ontology_tag(IMU.ontology_tag()),
)
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
mosaicolabs.models.CovarianceMixin ¶
Bases: BaseModel
A mixin that adds uncertainty fields (covariance and covariance_type) to data models.
This is particularly useful for complex sensors like IMUs, Odometry, or GNSS receivers that provide multidimensional uncertainty matrices along with their primary measurements.
Dynamic Schema Injection¶
This mixin uses the __init_subclass__ hook to perform a Schema Append operation:
- It inspects the child class's existing
__msco_pyarrow_struct__. - It appends a
covarianceandcovariance_typefields. - It reconstructs the final
pa.structfor the class.
Collision Safety
The mixin performs a collision check during class definition. If the child
class already defines a covariance or covariance_type field in its PyArrow struct, a ValueError
will be raised to prevent schema corruption.
Attributes:
| Name | Type | Description |
|---|---|---|
covariance |
Optional[List[float]]
|
Optional list of 64-bit floats representing the flattened matrix. |
covariance_type |
Optional[int]
|
Optional 16-bit integer representing the covariance enum. |
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the class fields are queryable across any model inheriting from this mixin, according to the following table:
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.covariance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.covariance |
Non-Queryable | None |
Universal Compatibility
The <Model> placeholder adapts based on how the CovarianceMixin is integrated into your data structure:
- Direct Inheritance: Represents any class (e.g.,
Vector3d,Quaternion) that inherits directly fromCovarianceMixin. - Composition (Nested Fields): When a complex model (like
IMU) contains fields that are themselves covariance-aware, the proxy allows you to "drill down" to that specific attribute. For example, sinceIMU.accelerationis aVector3d, you access its covariance type viaIMU.Q.acceleration.covariance_type.
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
# `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.acceleration.covariance_type.eq(FROM_CALIBRATED_PROCEDURE))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
covariance
class-attribute
instance-attribute
¶
Optional list of 64-bit floats representing the flattened matrix.
Querying with the .Q Proxy¶
Non-Queryable
The field is not queryable with the .Q Proxy.
covariance_type
class-attribute
instance-attribute
¶
Optional 16-bit integer representing the covariance enum.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.
Querying with the .Q Proxy¶
The covariance_type field is fully queryable via the .Q Proxy. The <Model> placeholder
in the path represents any Mosaico class that exposes covariance information, either directly or through its internal fields.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.covariance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder adapts based on how the CovarianceMixin is integrated into your data structure:
- Direct Inheritance: Represents any class (e.g.,
Vector3d,Quaternion) that inherits directly fromCovarianceMixin. - Composition (Nested Fields): When a complex model (like
IMU) contains fields that are themselves covariance-aware, the proxy allows you to "drill down" to that specific attribute. For example, sinceIMU.accelerationis aVector3d, you access its covariance type viaIMU.Q.acceleration.covariance_type.
Filtering by Calibration Type
This example demonstrates searching for data segments where the acceleration was derived from a specific calibrated procedure.
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
# Assume FROM_CALIBRATED_PROCEDURE is a user-defined integer constant
with MosaicoClient.connect("localhost", 6726) as client:
# Target the covariance_type nested within the acceleration field
qbuilder = QueryOntologyCatalog(
IMU.Q.acceleration.covariance_type.eq(FROM_CALIBRATED_PROCEDURE)
)
results = client.query(qbuilder)
if results:
for item in results:
print(f"Sequence: {item.sequence.name}")
print(f"Matching Topics: {[topic.name for topic in item.topics]}")
mosaicolabs.models.VarianceMixin ¶
Bases: BaseModel
A mixin that adds 1-dimensional uncertainty fields (variance and variance_type).
Recommended for sensors with scalar uncertain outputs, such as ultrasonic rangefinders, temperature sensors, or individual encoders.
Dynamic Schema Injection¶
This mixin uses the __init_subclass__ hook to perform a Schema Append operation:
- It inspects the child class's existing
__msco_pyarrow_struct__. - It appends a
varianceandvariance_typefield. - It reconstructs the final
pa.structfor the class.
Collision Safety
The mixin performs a collision check during class definition. If the child
class already defines a variance or variance_type field in its PyArrow struct, a ValueError
will be raised to prevent schema corruption.
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the class fields are queryable across any model inheriting from this mixin, according to the following table:
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.variance |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.variance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
Universal Compatibility
The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:
- Direct Inheritance: Used for classes like
PressureorTemperaturethat inherit directly fromVarianceMixinto represent 1D uncertainty. - Composition (Nested Access): If a complex model contains a field that is a subclass of
VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Example
from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter Pressure data by a specific variance value
qresponse = client.query(
QueryOntologyCatalog(Pressure.Q.variance.lt(0.76))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter Pressure data by a specific variance type
# `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
qresponse = client.query(
QueryOntologyCatalog(Pressure.Q.variance_type.eq(FROM_CALIBRATED_PROCEDURE))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
variance
class-attribute
instance-attribute
¶
Optional 64-bit float representing the variance of the data.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional variance attribute.
Querying with the .Q Proxy¶
The variance field is queryable with the .Q Proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.variance |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:
- Direct Inheritance: Used for classes like
PressureorTemperaturethat inherit directly fromVarianceMixinto represent 1D uncertainty. - Composition (Nested Access): If a complex model contains a field that is a subclass of
VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Example
from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter Pressure data by a specific acquisition second
qresponse = client.query(
QueryOntologyCatalog(Pressure.Q.variance.lt(0.76))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
variance_type
class-attribute
instance-attribute
¶
Optional 16-bit integer representing the variance parameterization.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.
Querying with the .Q Proxy¶
The variance_type field is fully queryable via the .Q Proxy.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.variance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
The <Model> placeholder adapts based on how the VarianceMixin is integrated into the data structure:
- Direct Inheritance: Used for classes like
PressureorTemperaturethat inherit directly fromVarianceMixinto represent 1D uncertainty. - Composition (Nested Access): If a complex model contains a field that is a subclass of
VarianceMixin, the proxy allows you to traverse the hierarchy to that specific attribute.
Filtering by Precision
The following examples demonstrate how to filter sensor data based on the magnitude of the variance or the specific procedure used to calculate it.
from mosaicolabs import MosaicoClient, Pressure, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter by variance threshold
results = client.query(QueryOntologyCatalog(
Pressure.Q.variance.lt(0.76)
))
if results:
for item in results:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
mosaicolabs.models.BaseModel ¶
Bases: BaseModel
The root base class for SDK data models.
It inherits from pydantic.BaseModel to provide runtime type checking and
initialization logic. It adds a hook for defining the corresponding
PyArrow structure (__msco_pyarrow_struct__), enabling the SDK to auto-generate
Flight schemas.
Note
This class has been added mainly for wrapping pydantic, toward future implementation where other fields mapping and checks are implemented