Machine Learning & Analytics
The Mosaico ML module serves as the high-performance bridge between the Mosaico Data Platform and the modern Data Science ecosystem. While the platform is optimized for high-speed raw message streaming, this module provides the abstractions necessary to transform asynchronous sensor data into tabular formats compatible with Physical AI, Deep Learning, and Predictive Analytics.
Working with robotics and multi-modal datasets presents three primary technical hurdles that the ML module is designed to solve:
- Heterogeneous Sampling: Sensors like LIDAR (low frequency), IMU (high frequency), and GPS (intermittent) operate at different rates.
- High Volume: Datasets often exceed the available system RAM.
- Nested Structures: Robotics data is typically deeply nested with coordinate transformations and covariance matrices.
From Sequences to DataFrames¶
API Reference
The DataFrameExtractor is a specialized utility designed to convert Mosaico sequences into tabular formats. Unlike standard streamers that instantiate individual Python objects, this extractor operates at the Batch Level by pulling raw RecordBatch objects directly from the underlying stream to maximize throughput.
Key Technical Features¶
- Recursive Flattening: Automatically "unpacks" deeply nested Mosaico Ontology structures into primitive columns.
- Semantic Naming: Columns use a
{topic_name}.{ontology_tag}.{field_path}convention (e.g.,/front/camera/imu.imu.acceleration.x) to remain self-describing. - Namespace Isolation: Topic names are included in column headers to prevent collisions when multiple sensors of the same type are present.
- Memory-Efficient Windowing: Uses a generator-based approach to yield data in time-based "chunks" (e.g., 5-second windows) while handling straddling batches via a carry-over buffer.
- Sparse Merging: Creates a "sparse" DataFrame containing the union of all timestamps, using
NaNfor missing sensor readings at specific intervals.
This example demonstrates iterating through a sequence in 10-second tabular chunks.
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Iterate through 10-second chunks
for df in extractor.to_pandas_chunks(window_sec=10.0):
# 'df' is a pandas DataFrame with semantic columns
# Example: df["/front/camera/imu.imu.acceleration.x"]
print(f"Processing chunk with {len(df)} rows")
For complex types like images that require specialized decoding, Mosaico allows you to "inflate" a flattened DataFrame row back into a strongly-typed Message object.
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor
from mosaicolabs.models import Message, Image
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Get data chunks
for df in extractor.to_pandas_chunks(topics=["/sensors/front/image_raw"]):
for _, row in df.iterrows():
# Reconstruct the full Message (envelope + payload) from a row
img_msg = Message.from_dataframe_row(
row=row,
topic_name="/sensors/front/image_raw",
)
if img_msg:
img = img_msg.get_data(Image).to_pillow()
# Access typed fields with IDE autocompletion
print(f"Time: {img_msg.timestamp_ns}")
img.show()
Video Decoding for ML Pipelines¶
API Reference
mosaicolabs.ml.VideoDecodingTransformer
When handling multi-camera robotics datasets, managing image data presents a fundamental architectural conflict between video compression mechanics and machine learning dataloader strategies.
Stateful video compression formats like H.264 and HEVC rely on inter-frame dependencies to achieve high compression ratios. They transmit a complete image (an I-frame or Keyframe) followed by a sequence of delta-frames (P-frames or B-frames) that encode only the pixel differences from the preceding frames.
This creates a critical issue for ML pipelines:
- Randomization Conflict: If a dataloader shuffles or randomizes DataFrame rows before decoding, the decoder receives delta-frames without the necessary reference buffers, resulting in severe visual artifacts or outright crashes.
- Synchronization Conflict: If temporal synchronization (like
SyncTransformerwith aSyncHoldpolicy) is applied before decoding, the exact same compressed delta-frame bytes are fed into the decoder multiple times, instantly corrupting the C-level state machine of the codec.
To resolve this, the ML module introduces the VideoDecodingTransformer, which enforces a strict "Decode First, Sync Second, Shuffle Last" architecture.
Key Technical Features¶
- Stateful Chronological Decoding: The transformer maintains a persistent
StatefulDecodingSessionacross sequential DataFrame chunks. It unpacks the raw byte streams into fully reconstructedPIL.Imageobjects while the timeline is still perfectly sequential and unaltered. - Intelligent Format Routing: The transformer inspects the
ImageFormatmetadata of each row. Stateful streams (H.264, HEVC) are securely routed to the isolated, stateful C-level decoders, while stateless formats (JPEG, PNG) automatically fall back to lightweight stateless codecs. - Lazy Resource Allocation: To ensure complete compatibility with Scikit-Learn's
cloneutility and multiprocessing tools likejoblib, the underlying C-level FFmpeg/PyAV pointers are never instantiated during__init__. The session is lazily allocated during thefit()phase, preventing pickling errors when distributing ML pipelines across multiple CPU cores. - Aggressive Memory Management: Immediately after reconstructing the
PIL.Image, the transformer drops the rawcompressed_image.databyte columns from the Pandas DataFrame. This actively mitigates RAM saturation before the data reaches the downstream model. - Zero-Bloat Soft Dependency: The transformer uses a soft-dependency architecture. If
scikit-learnis installed, it inherits natively fromBaseEstimatorandTransformerMixin. If not, it safely falls back to dummy classes, ensuring Mosaico remains lightweight for users who only require core extraction features.
Pipeline Integration¶
Because the VideoDecodingTransformer implements the standard Scikit-Learn fit/transform contract, it can be chained with the SyncTransformer. Once the data exits this pipeline, it is dense, fully synchronized, completely decoded, and safe to shuffle for PyTorch or TensorFlow dataloaders.
from sklearn.pipeline import Pipeline
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer, VideoDecodingTransformer
from mosaicolabs.ml.sync_policies import SyncHold
# 1. Define the execution pipeline order (Decode -> Sync)
pipeline = Pipeline([
('decoder', VideoDecodingTransformer(topics=["/front/camera/image"])),
])
with MosaicoClient.connect("localhost", 6726) as client:
seq_handler = client.sequence_handler("multi_camera_mission")
extractor = DataFrameExtractor(seq_handler)
# 2. Process sequential row-based chunks
for sparse_chunk in extractor.to_pandas_chunks(
topics=["/front/camera/image", "/chassis/imu"],
batch_size=2048
):
# The decoder reconstructs video frames chronologically.
decoded_chunk = pipeline.transform(sparse_chunk)
# 'decoded_chunk' now contains arrays of usable PIL.Image objects.
# It is now perfectly safe to shuffle, batch, or convert to PyTorch Tensors.
del decoded_chunk # Explicitly signal for garbage collection
Sparse to Dense Representation¶
API Reference
The SyncTransformer is a temporal resampler designed to solve the Heterogeneous Sampling problem inherent in robotics and Physical AI.
It aligns multi-rate sensor streams (for example, an IMU at 100Hz and a GPS at 5Hz) onto a uniform, fixed-frequency grid to prepare them for machine learning models.
The SyncTransformer operates as a processor that bridges the gaps between windowed chunks yielded by the DataFrameExtractor.
Unlike standard resamplers that treat each data batch in isolation, this transformer maintains internal state to ensure signal continuity across batch boundaries.
Key Design Principles¶
- Stateful Continuity: It maintains an internal cache of the last known sensor values and the next expected grid tick, allowing signals to bridge the gap between independent DataFrame chunks.
- Semantic Integrity: It respects the physical reality of data acquisition by yielding
Nonefor grid ticks that occur before a sensor's first physical measurement, avoiding data "hallucination". - Vectorized Performance: Internal kernels leverage high-speed lookups for high-throughput processing.
- Protocol-Based Extensibility: The mathematical logic for resampling is decoupled through a
SyncPolicyprotocol, allowing for custom kernel injection.
Implementation and Stateful Lifecycle¶
Architecturally, the SyncTransformer is implemented as a stateful state-machine designed to maintain signal continuity across independent data chunks. Unlike standard library resamplers that often treat each input batch as an isolated event, this transformer is engineered to "stitch" the temporal gaps between the windowed DataFrames yielded by the DataFrameExtractor.
Internal State Management¶
The transformer maintains two critical internal buffers that persist for its entire lifecycle:
_next_timestamp_ns: A scalar value tracking the exact nanosecond where the next grid tick must occur. This prevents temporal drift across hours of recording by ensuring the fixed-frequency grid remains globally aligned._last_values: A dictionary that caches the final (timestamp, value) tuple for every topic processed in the previous chunk.
The Transformation Pipeline¶
When transform() is called on a new sparse_chunk, the following internal operations occur:
- Grid Synthesis: The transformer generates a new time grid starting precisely from
_next_timestamp_nsand extending to the end of the current chunk. - Data Prepending: Through the
_prepare_datamethod, the transformer retrieves the last known value of each topic and prepends it to the current chunk's arrays. This ensures that the resampling algorithm (e.g.,SyncHold) has a reference point to fill the gap at the very beginning of the new window. - Policy Delegation: The actual mathematical mapping is delegated to the configured
SyncPolicy, which yields the final dense values for the synthesized grid.
The 'Re-instantiation' Trap
A common architectural error is re-instantiating the SyncTransformer inside a processing loop. Because the transformer is stateful, it must be instantiated once outside the loop to function correctly.
Re-instantiating the transformer inside the loop destroys the internal cache, causing signal discontinuities and potential data "hallucination" at the start of every chunk.
# ERROR: Transformer is created inside the loop
for sparse_chunk in extractor.to_pandas_chunks():
# This resets the grid and last_values every iteration!
transformer = SyncTransformer(target_fps=50)
dense_chunk = transformer.transform(sparse_chunk)
Correct Pattern (Persistent State)¶
By keeping the instance outside the loop, the transformer correctly bridges the gap between sparse_chunk_N and sparse_chunk_N+1.
# CORRECT: Instantiate once
transformer = SyncTransformer(target_fps=50)
for sparse_chunk in extractor.to_pandas_chunks():
# transform() updates internal state and prepares it for the next call
dense_chunk = transformer.transform(sparse_chunk)
If you need to reuse the same transformer instance for a different recording or sequence, you must call the .reset() method to clear the internal buffers and prepare for a new grid alignment.
Implemented Synchronization Policies¶
API Reference
Each policy defines a specific logic for how the transformer bridges temporal gaps between sparse data points.
1. SyncHold (Last-Value-Hold)¶
- Behavior: Finds the most recent valid measurement and "holds" it constant until a new one arrives.
- Best For: Sensors where states remain valid until explicitly changed, such as robot joint positions or battery levels.
2. SyncAsOf (Staleness Guard)¶
- Behavior: Carries the last known value forward only if it has not exceeded a defined maximum "tolerance" (fresher than a specific age).
- Best For: High-speed signals that become unreliable if not updated frequently, such as localization coordinates.
3. SyncDrop (Interval Filter)¶
- Behavior: Ensures a grid tick only receives a value if a new measurement actually occurred within that specific grid interval; otherwise, it returns
None. - Best For: Downsampling high-frequency data where a strict 1-to-1 relationship between windows and unique hardware events is required.
Memory & Performance Best Practices¶
The memory footprint of your ML pipeline is primarily governed by the product of two variables: the temporal window size (window_sec) in DataFrameExtractor.to_pandas_chunks() and the synthesis frequency (target_fps) in SyncTransformer.transform().
Resource Consumption Matrix¶
| Parameter Configuration | Memory Impact | CPU Impact | Use Case |
|---|---|---|---|
| Small Window / Low FPS (1s / 10Hz) |
Minimal: Lowest RAM overhead per chunk. | Low | Real-time monitoring, low-power edge devices. |
| Large Window / Low FPS (60s / 10Hz) |
Moderate: High memory usage in the DataFrameExtractor due to large sparse buffers. |
Moderate | Batch analysis where global context is needed before transformation. |
| Small Window / High FPS (1s / 1000Hz) |
High: Rapid creation of many dense rows. Heavy overhead on Python's Garbage Collector. | High | High-fidelity signal processing (e.g., vibration analysis). |
| Large Window / High FPS (60s / 1000Hz) |
Critical: Risk of MemoryError. Can generate millions of rows in a single transform() call. |
Very High | Deep Learning training on high-end workstations with 64GB+ RAM. |
Optimization Strategies¶
-
Avoid the "Full Load" Trap: The
DataFrameExtractoris designed for (batched) streaming. If you setwindow_secto a value greater than the total sequence duration, the extractor will fall back to a "Full Load" mode, attempting to load the entire recording into RAM, which can mean multi-GB batch for heavy datasets. -
The Rule of 100k Rows: As a general architectural guideline, aim for a configuration where
window_sec * target_fps < 100,000rows per chunk (for time-series). This keeps the Pandas operations within the L3 cache limits of most modern CPUs and ensures the garbage collector can keep up with the loop iterations. -
Downsampling before Transformation: If you have high-frequency raw data (e.g., IMU at 400Hz) but only need 50Hz for your ML model, use the
SyncDroppolicy. It is significantly faster thanSyncHoldbecause it discards redundant intermediate samples before the dense grid is synthesized, reducing the internal array sizes during the_prepare_dataphase. -
Garbage Collection in Long Loops: When processing sequences that span several hours, Python's automatic reference counting may not trigger fast enough. If you observe a "memory leak" (steadily increasing RAM usage), explicitly delete the
dense_chunkat the end of your loop:
Scikit-Learn Compatibility¶
By implementing the standard fit/transform interface, the SyncTransformer makes robotics data a "first-class citizen" of the Scikit-learn ecosystem. This allows for the plug-and-play integration of multi-rate sensor data into standard pipelines.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from mosaicolabs import MosaicoClient
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer, SynchHold
# Define a pipeline for physical AI preprocessing
pipeline = Pipeline([
('sync', SyncTransformer(target_fps=30.0, policy=SynchHold())),
('scaler', StandardScaler())
])
with MosaicoClient.connect("localhost", 6726):
# Initialize from an existing SequenceHandler
seq_handler = client.sequence_handler("drive_session_01")
extractor = DataFrameExtractor(seq_handler)
# Process sequential chunks while maintaining signal continuity
for sparse_chunk in extractor.to_pandas_chunks(window_sec=5.0):
# The transformer automatically carries state across sequential calls
normalized_dense_chunk = pipeline.transform(sparse_chunk)