Machine Learning & Analytics
The Mosaico ML module serves as the high-performance bridge between the Mosaico Data Platform and the modern Data Science ecosystem. While the platform is optimized for high-speed raw message streaming, this module provides the abstractions necessary to transform asynchronous sensor data into tabular formats compatible with Physical AI, Deep Learning, and Predictive Analytics.
Working with robotics and multi-modal datasets presents three primary technical hurdles that the ML module is designed to solve:
- Heterogeneous Sampling: Sensors like LIDAR (low frequency), IMU (high frequency), and GPS (intermittent) operate at different rates.
- High Volume: Datasets often exceed the available system RAM.
- Nested Structures: Robotics data is typically deeply nested with coordinate transformations and covariance matrices.
From Sequences to DataFrames¶
API Reference: mosaicolabs.ml.DataFrameExtractor
The DataFrameExtractor is a specialized utility designed to convert Mosaico sequences into tabular formats. Unlike standard streamers that instantiate individual Python objects, this extractor operates at the Batch Level by pulling raw RecordBatch objects directly from the underlying stream to maximize throughput.
Key Technical Features¶
- Recursive Flattening: Automatically "unpacks" deeply nested Mosaico Ontology structures into primitive columns.
- Semantic Naming: Columns use a
{topic_name}.{ontology_tag}.{field_path}convention (e.g.,/front/camera/imu.imu.acceleration.x) to remain self-describing. - Namespace Isolation: Topic names are included in column headers to prevent collisions when multiple sensors of the same type are present.
- Memory-Efficient Windowing: Uses a generator-based approach to yield data in time-based "chunks" (e.g., 5-second windows) while handling straddling batches via a carry-over buffer.
- Sparse Merging: Creates a "sparse" DataFrame containing the union of all timestamps, using
NaNfor missing sensor readings at specific intervals.
This example demonstrates iterating through a sequence in 10-second tabular chunks.
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Iterate through 10-second chunks
for df in extractor.to_pandas_chunks(window_sec=10.0):
# 'df' is a pandas DataFrame with semantic columns
# Example: df["/front/camera/imu.imu.acceleration.x"]
print(f"Processing chunk with {len(df)} rows")
For complex types like images that require specialized decoding, Mosaico allows you to "inflate" a flattened DataFrame row back into a strongly-typed Message object.
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor
from mosaicolabs.models import Message, Image
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Get data chunks
for df in extractor.to_pandas_chunks(topics=["/sensors/front/image_raw"]):
for _, row in df.iterrows():
# Reconstruct the full Message (envelope + payload) from a row
img_msg = Message.from_dataframe_row(
row=row,
topic_name="/sensors/front/image_raw",
)
if img_msg:
img = img_msg.get_data(Image).to_pillow()
# Access typed fields with IDE autocompletion
print(f"Time: {img_msg.timestamp_ns}")
img.show()
Sparse to Dense Representation¶
API Reference: mosaicolabs.ml.SyncTransformer
The SyncTransformer is a temporal resampler designed to solve the Heterogeneous Sampling problem inherent in robotics and Physical AI.
It aligns multi-rate sensor streams (for example, an IMU at 100Hz and a GPS at 5Hz) onto a uniform, fixed-frequency grid to prepare them for machine learning models.
The SyncTransformer operates as a processor that bridges the gaps between windowed chunks yielded by the DataFrameExtractor.
Unlike standard resamplers that treat each data batch in isolation, this transformer maintains internal state to ensure signal continuity across batch boundaries.
Key Design Principles¶
- Stateful Continuity: It maintains an internal cache of the last known sensor values and the next expected grid tick, allowing signals to bridge the gap between independent DataFrame chunks.
- Semantic Integrity: It respects the physical reality of data acquisition by yielding
Nonefor grid ticks that occur before a sensor's first physical measurement, avoiding data "hallucination". - Vectorized Performance: Internal kernels leverage high-speed lookups for high-throughput processing.
- Protocol-Based Extensibility: The mathematical logic for resampling is decoupled through a
SynchPolicyprotocol, allowing for custom kernel injection.
Implemented Synchronization Policies¶
API Reference: mosaicolabs.ml.SyncPolicy
Each policy defines a specific logic for how the transformer bridges temporal gaps between sparse data points.
1. SyncHold (Last-Value-Hold)¶
- Behavior: Finds the most recent valid measurement and "holds" it constant until a new one arrives.
- Best For: Sensors where states remain valid until explicitly changed, such as robot joint positions or battery levels.
2. SyncAsOf (Staleness Guard)¶
- Behavior: Carries the last known value forward only if it has not exceeded a defined maximum "tolerance" (fresher than a specific age).
- Best For: High-speed signals that become unreliable if not updated frequently, such as localization coordinates.
3. SyncDrop (Interval Filter)¶
- Behavior: Ensures a grid tick only receives a value if a new measurement actually occurred within that specific grid interval; otherwise, it returns
None. - Best For: Downsampling high-frequency data where a strict 1-to-1 relationship between windows and unique hardware events is required.
Scikit-Learn Compatibility¶
By implementing the standard fit/transform interface, the SyncTransformer makes robotics data a "first-class citizen" of the Scikit-learn ecosystem. This allows for the plug-and-play integration of multi-rate sensor data into standard pipelines.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer, SynchHold
# Define a pipeline for physical AI preprocessing
pipeline = Pipeline([
('sync', SyncTransformer(target_fps=30.0, policy=SynchHold())),
('scaler', StandardScaler())
])
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Process sequential chunks while maintaining signal continuity
for sparse_chunk in extractor.to_pandas_chunks(window_sec=5.0):
# The transformer automatically carries state across sequential calls
normalized_dense_chunk = pipeline.transform(sparse_chunk)