Base Models and Mixins¶
mosaicolabs.models.BaseModel ¶
Bases: BaseModel
The root base class for SDK data models.
It inherits from pydantic.BaseModel to provide runtime type checking and
initialization logic. It adds a hook for defining the corresponding
PyArrow structure (__msco_pyarrow_struct__), enabling the SDK to auto-generate
Flight schemas.
Note
This class has been added mainly for wrapping pydantic, toward future implementation where other fields mapping and checks are implemented
mosaicolabs.models.Serializable ¶
Bases: BaseModel, _QueryProxyMixin
The base class for all Mosaico ontology data payloads.
This class serves as the root for every sensor and data type in the Mosaico ecosystem.
By inheriting from Serializable, data models are automatically compatible with the platform's storage,
querying, and serialization engines.
Dynamic Attributes Injection¶
When you define a subclass, several key attributes are automatically managed or required. Understanding these is essential for customizing how your data is treated by the platform:
-
__serialization_format__: Determines the batching strategy and storage optimization.- Role: It tells the
SequenceWriterwhether to flush data based on byte size (optimal for heavy data likeImages) or record count (optimal for light telemetry likeIMU). - Default:
SerializationFormat.Default.
- Role: It tells the
-
__ontology_tag__: The unique string identifier for the class (e.g.,"imu","gps_raw").- Role: This tag is used in the global registry to reconstruct objects from raw platform data.
- Generation: If not explicitly provided, it is auto-generated by converting the class name from
CamelCasetosnake_case.
-
__class_type__: A reference to the concrete class itself.- Role: Injected during initialization to facilitate polymorphic instantiation and safe type-checking when extracting data from a
Message.
- Role: Injected during initialization to facilitate polymorphic instantiation and safe type-checking when extracting data from a
Requirements for Custom Ontologies¶
To create a valid custom ontology, your subclass must:
- Inherit from
Serializable. - Define a
__msco_pyarrow_struct__attribute usingpa.StructTypeto specify the physical schema. - Define the class fields (using Pydantic syntax) matching the Arrow structure.
Automatic Registration
Any subclass of Serializable is automatically registered in the global Mosaico registry upon definition. This enables the use of the factory methods and the .Q query proxy immediately.
is_registered
classmethod
¶
Checks if a class is registered.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if registered. |
ontology_tag
classmethod
¶
Retrieves the unique identifier (tag) for the current ontology class, automatically generated during class definition.
This method provides the string key used by the Mosaico platform to identify and route specific data types within the ontology registry. It abstracts away the internal naming conventions, ensuring that you always use the correct identifier for queries and serialization.
Returns:
| Type | Description |
|---|---|
str
|
The registered string tag for this class (e.g., |
Raises:
| Type | Description |
|---|---|
Exception
|
If the class was not properly initialized via |
Practical Application: Topic Filtering
This method is particularly useful when constructing QueryTopic
requests. By using the convenience method QueryTopic.with_ontology_tag(),
you can filter topics by data type without hardcoding strings that might change.
Example:
from mosaicolabs import MosaicoClient, Topic, IMU, QueryTopic
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for a specific data value (using constructor)
qresponse = client.query(
QueryTopic(
Topic.with_ontology_tag(IMU.ontology_tag()),
)
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
mosaicolabs.models.Message ¶
Bases: BaseModel
The universal transport envelope for Mosaico data.
The Message class wraps a polymorphic Serializable
payload with middleware metadata, such as recording timestamps and headers.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp_ns |
Optional[int]
|
Message/Sensor acquisition timestamp in nanoseconds (resambles the data ontology high precision time header). |
data |
Serializable
|
The actual ontology data payload (e.g., an IMU or GPS instance). |
recording_timestamp_ns |
Optional[int]
|
Recording timestamp in nanoseconds. This is the timestamp in which the message was recorded in the receiving store file (like rosbags, parquet files, etc.), different from sensor acquisition time. |
timestamp_ns
class-attribute
instance-attribute
¶
Message/Sensor acquisition timestamp in nanoseconds (resambles the data ontology high precision time header).
Can be omitted if the data ontology already contains the timestamp (e.g. data.header.stamp) or the recording_timestamp_ns is set.
If all timestamps data are None, the message will be rejected and a ValueError is raised.
recording_timestamp_ns
class-attribute
instance-attribute
¶
Recording timestamp in nanoseconds (different from sensor acquisition time).
This is the timestamp in which the message was recorded in the receiving store file (like rosbags, parquet files, etc.)
model_post_init ¶
Validates the message structure after initialization.
Ensures that there are no field name collisions between the envelope
(e.g., timestamp_ns) and the data payload.
ontology_type ¶
Retrieves the class type of the ontology object stored in the data field.
ontology_tag ¶
Returns the unique ontology tag name associated with the object in the data field.
get_data ¶
Safe, type-hinted accessor for the data payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_type
|
Type[TSerializable]
|
The expected |
required |
Returns:
| Type | Description |
|---|---|
TSerializable
|
The data object cast to the requested type. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If the actual data type does not match the requested |
Example
# Get the IMU data from the message
image_data = message.get_data(Image)
print(f"Message time: {message.timestamp_ns}: Sensor time: {image_data.header.stamp.to_nanoseconds()}")
print(f"Message time: {message.timestamp_ns}: Image size: {image_data.height}x{image_data.width}")
# Show the image
image_data.to_pillow().show()
# Get the Floating64 data from the message
floating64_data = message.get_data(Floating64)
print(f"Message time: {message.timestamp_ns}: Data time: {floating64_data.header.stamp.to_nanoseconds()}")
print(f"Message time: {message.timestamp_ns}: Data value: {floating64_data.data}")
from_dataframe_row
staticmethod
¶
Reconstructs a Message object from a flattened DataFrame row.
In the Mosaico Data Platform, DataFrames represent topics using a nested naming
convention: {topic}.{tag}.{field}. This method performs
Smart Reconstruction by:
- Topic Validation: Verifying if any columns associated with the
topic_nameexist in the row. - Tag Inference: Inspecting the column headers to automatically determine
the original ontology tag (e.g.,
"imu"). - Data Extraction: Stripping prefixes and re-nesting the flat columns into their original dictionary structures.
- Type Casting: Re-instantiating the specific
Serializablesubclass and wrapping it in aMessageenvelope.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
row
|
Series
|
A single row from a Pandas DataFrame, representing a point in time across one or more topics. |
required |
topic_name
|
str
|
The name of the specific topic to extract from the row. |
required |
timestamp_column_name
|
str
|
The name of the column containing the timestamp. |
'timestamp_ns'
|
Returns:
| Type | Description |
|---|---|
Optional[Message]
|
A reconstructed |
Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.get_sequence_handler("example_sequence")
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
# Do something with the dataframe.
# For example, you can sync the data using the `SyncTransformer`:
sync_transformer = SyncTransformer(
target_fps = 30, # resample at 30 Hz and fill the Nans with a Hold policy
)
synced_df = sync_transformer.transform(df)
# Reconstruct the image message from a dataframe row
image_msg = Message.from_dataframe_row(synced_df, "/front/camera/image_raw")
image_data = image_msg.get_data(Image)
# Show the image
image_data.to_pillow().show()
# ...
mosaicolabs.models.Time ¶
Bases: BaseModel
A high-precision time representation designed to prevent precision loss.
The Time class splits a timestamp into a 64-bit integer for seconds and a 32-bit
unsigned integer for nanoseconds. This dual-integer structure follows
robotics standards (like ROS) to ensure temporal accuracy that standard 64-bit
floating-point timestamps cannot maintain over long durations.
Attributes:
| Name | Type | Description |
|---|---|---|
sec |
int
|
Seconds since the epoch (Unix time). |
nanosec |
int
|
Nanoseconds component within the current second, ranging from 0 to 999,999,999. |
nanosec
instance-attribute
¶
Nanoseconds component within the current second, ranging from 0 to 999,999,999.
validate_nanosec
classmethod
¶
Ensures nanoseconds are within the valid [0, 1e9) range.
from_float
classmethod
¶
Factory method to create a Time object from a float (seconds since epoch).
This method carefully handles floating-point artifacts by using rounding for the fractional part to ensure stable nanosecond conversion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ftime
|
float
|
Total seconds since epoch (e.g., from |
required |
Returns:
| Type | Description |
|---|---|
Time
|
A normalized |
from_milliseconds
classmethod
¶
Factory method to create a Time object from a total count of milliseconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_milliseconds
|
int
|
Total time elapsed in milliseconds. |
required |
Returns:
| Type | Description |
|---|---|
Time
|
A |
from_nanoseconds
classmethod
¶
Factory method to create a Time object from a total count of nanoseconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_nanoseconds
|
int
|
Total time elapsed in nanoseconds. |
required |
Returns:
| Type | Description |
|---|---|
Time
|
A |
from_datetime
classmethod
¶
Factory method to create a Time object from a Python datetime instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dt
|
datetime
|
A standard Python |
required |
Returns:
| Type | Description |
|---|---|
Time
|
A |
to_float ¶
Converts the high-precision time to a float.
Precision Loss
Converting to a 64-bit float may result in the loss of nanosecond precision due to mantissa limitations.
to_nanoseconds ¶
Converts the time to a total integer of nanoseconds.
This conversion preserves full precision.
to_milliseconds ¶
Converts the time to a total integer of milliseconds.
This conversion preserves full precision.
to_datetime ¶
Converts the time to a Python UTC datetime object.
Microsecond Limitation
Python's datetime objects typically support microsecond precision;
nanosecond data below that threshold will be truncated.
mosaicolabs.models.Header ¶
Bases: BaseModel
Standard metadata header used to provide context to ontology data.
The Header structure provides spatial and temporal context, matching common
industry standards for sensor data. It is typically injected
into sensor models via the HeaderMixin.
Attributes:
| Name | Type | Description |
|---|---|---|
stamp |
Time
|
The high-precision |
frame_id |
Optional[str]
|
A string identifier for the coordinate frame (spatial context). |
seq |
Optional[int]
|
An optional sequence ID, primarily used for legacy tracking. |
Nullable Fields
In the underlying PyArrow schema, all header fields are explicitly marked as
nullable=True. This ensures that empty headers are correctly
deserialized as None rather than default-initialized objects.
mosaicolabs.models.HeaderMixin ¶
Bases: BaseModel
A mixin that injects a standard header field into any inheriting ontology model.
The HeaderMixin is used to add standard metadata (such as acquisition timestamps
or frame IDs) to a sensor model through composition. It ensures that the
underlying PyArrow schema remains synchronized with the Pydantic data model.
Dynamic Schema Injection¶
This mixin uses the __init_subclass__ hook to perform a Schema Append operation:
- It inspects the child class's existing
__msco_pyarrow_struct__. - It appends a
headerfield of typeHeader. - It reconstructs the final
pa.structfor the class.
Collision Safety
The mixin performs a collision check during class definition. If the child
class already defines a header field in its PyArrow struct, a ValueError
will be raised to prevent schema corruption.
Attributes:
| Name | Type | Description |
|---|---|---|
header |
Optional[Header]
|
An optional |
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the header component is fully queryable across any model inheriting from this mixin.
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.header.seq |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.header.stamp.sec |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.header.stamp.nanosec |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.header.frame_id |
String |
.eq(), .neq(), .match(), .in_() |
Universal Compatibility
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined Serializable class that inherits
from HeaderMixin.
Example
from mosaicolabs import MosaicoClient, IMU, Floating64, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.header.stamp.sec.lt(1770282868))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter primitive Floating64 telemetry by frame identifier
qresponse = client.query(
QueryOntologyCatalog(Floating64.Q.header.frame_id.eq("robot_base"))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
header
class-attribute
instance-attribute
¶
An optional metadata header providing temporal and spatial context to the ontology model.
This field is injected into the model via composition, ensuring that sensor data is paired with standard acquisition attributes like sequence IDs and high-precision timestamps.
Querying with the .Q Proxy¶
Check the documentation of the HeaderMixin to construct a valid expression for the
QueryOntologyCatalog builder involving the header component.
mosaicolabs.models.CovarianceMixin ¶
Bases: BaseModel
A mixin that adds uncertainty fields (covariance and covariance_type) to data models.
This is particularly useful for complex sensors like IMUs, Odometry, or GNSS receivers that provide multidimensional uncertainty matrices along with their primary measurements.
Dynamic Schema Injection¶
This mixin uses the __init_subclass__ hook to perform a Schema Append operation:
- It inspects the child class's existing
__msco_pyarrow_struct__. - It appends a
covarianceandcovariance_typefields. - It reconstructs the final
pa.structfor the class.
Collision Safety
The mixin performs a collision check during class definition. If the child
class already defines a covariance or covariance_type field in its PyArrow struct, a ValueError
will be raised to prevent schema corruption.
Attributes:
| Name | Type | Description |
|---|---|---|
covariance |
Optional[List[float]]
|
Optional list of 64-bit floats representing the flattened matrix. |
covariance_type |
Optional[int]
|
Optional 16-bit integer representing the covariance enum. |
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the class fields are queryable across any model inheriting from this mixin, according to the following table:
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.covariance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.covariance |
Non-Queryable | None |
Universal Compatibility
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined Serializable class that inherits
from HeaderMixin.
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
# `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.covariance_type.eq(FROM_CALIBRATED_PROCEDURE))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
covariance
class-attribute
instance-attribute
¶
Optional list of 64-bit floats representing the flattened matrix.
Querying with the .Q Proxy¶
Non-Queryable
The field is not queryable with the .Q Proxy.
covariance_type
class-attribute
instance-attribute
¶
Optional 16-bit integer representing the covariance enum.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.
Querying with the .Q Proxy¶
Check the documentation of the CovarianceMixin to construct a valid expression for the
QueryOntologyCatalog builder involving the covariance_type component.
mosaicolabs.models.VarianceMixin ¶
Bases: BaseModel
A mixin that adds 1-dimensional uncertainty fields (variance and variance_type).
Recommended for sensors with scalar uncertain outputs, such as ultrasonic rangefinders, temperature sensors, or individual encoders.
Dynamic Schema Injection¶
This mixin uses the __init_subclass__ hook to perform a Schema Append operation:
- It inspects the child class's existing
__msco_pyarrow_struct__. - It appends a
varianceandvariance_typefield. - It reconstructs the final
pa.structfor the class.
Collision Safety
The mixin performs a collision check during class definition. If the child
class already defines a variance or variance_type field in its PyArrow struct, a ValueError
will be raised to prevent schema corruption.
Querying with the .Q Proxy¶
When constructing a QueryOntologyCatalog,
the class fields are queryable across any model inheriting from this mixin, according to the following table:
| Field Access Path | Queryable Type | Supported Operators |
|---|---|---|
<Model>.Q.variance |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
<Model>.Q.variance_type |
Numeric |
.eq(), .neq(), .lt(), .gt(), .leq(), .geq(), .in_(), .between() |
Universal Compatibility
The <Model> placeholder represents any Mosaico ontology class (e.g., IMU, GPS, Floating64)
or any custom user-defined Serializable class that inherits
from HeaderMixin.
Example
from mosaicolabs import MosaicoClient, IMU, QueryOntologyCatalog
with MosaicoClient.connect("localhost", 6726) as client:
# Filter IMU data by a specific acquisition second
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.variance.lt(0.76))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter IMU data by a specific acquisition second
# `FROM_CALIBRATED_PROCEDURE` is some enum value defined by the user
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.variance_type.eq(FROM_CALIBRATED_PROCEDURE))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
variance
class-attribute
instance-attribute
¶
Optional 64-bit float representing the variance of the data.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional variance attribute.
Querying with the .Q Proxy¶
Check the documentation of the VarianceMixin to construct a valid expression for the
QueryOntologyCatalog builder involving the variance component.
variance_type
class-attribute
instance-attribute
¶
Optional 16-bit integer representing the variance parameterization.
This field is injected into the model via composition, ensuring that sensor data is paired with the optional covariance type attribute.
Querying with the .Q Proxy¶
Check the documentation of the VarianceMixin to construct a valid expression for the
QueryOntologyCatalog builder involving the variance_type component.