Machine Learning Module
mosaicolabs.ml.DataFrameExtractor ¶
Extracts and manages data from Mosaico Sequences, converting them into tabular DataFrames.
This class serves as a high-performance bridge for training ML models or performing data analysis. It extracts data from multiple sequence topics and unifies them into a single, flattened, sparse DataFrame aligned by timestamps.
Key Features:
- Memory Efficiency: Uses a windowed approach to process multi-gigabyte sequences in chunks without overloading RAM.
- Flattening: Automatically flattens nested structures (e.g.,
pose.position.x) into dot-notation columns. - Sparse Alignment: Merges multiple topics with different frequencies into a single timeline
(using
NaNfor missing values at specific timestamps).
Initializes the DataFrameExtractor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_handler
|
SequenceHandler
|
An active handle to a Mosaico sequence. |
required |
timestamp_column_name
|
str
|
Name of the timestamp column.
Defaults to |
None
|
to_pandas_chunks ¶
Generator that yields time-windowed pandas DataFrames from the sequence.
This method leverages server-side filtering and local batch processing to maintain a low memory footprint. It handles batches that cross window boundaries by carrying over the remainder to the next chunk.
Important
This function must be iterated (e.g. called in a for loop)
Warning
Setting window_sec to a very large value might disable windowing.
The extractor will attempt to load the entire requested time range into memory.
This is only recommended for small sequences or systems with high RAM capacity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
Topics to extract. Defaults to all topics. |
None
|
window_sec
|
float
|
Duration of each DataFrame chunk in seconds. |
5.0
|
timestamp_ns_start
|
int
|
Global start time for extraction. |
None
|
timestamp_ns_end
|
int
|
Global end time for extraction. |
None
|
Yields:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: A sparse, flattened DataFrame containing data from all selected topics and their fields within the current time window. |
Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.sequence_handler("example_sequence")
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
# Do something with the dataframe.
# ...
mosaicolabs.ml.SyncTransformer ¶
SyncTransformer(
target_fps,
policy=SyncHold(),
timestamp_column="timestamp_ns",
)
Bases: BaseEstimator, TransformerMixin
Stateful resampler for Mosaico DataFrames.
This class aligns heterogeneous sensor streams onto a uniform time grid.
It is designed to consume the windowed outputs of the DataFrameExtractor
sequentially, maintaining internal state to ensure signal continuity across
batch boundaries.
Scikit-Learn Compatibility¶
The class implements the standard fit/transform interface, making it
fully compliant with Scikit-learn Pipeline and FeatureUnion objects.
- fit(X): Captures the initial timestamp from the first chunk to align the grid.
- transform(X): Executes the temporal resampling logic for a single DataFrame chunk and returns a dense DataFrame.
- fit_transform(X): Fits the transformer to the data and then transforms it.
Key Features:
- Fixed Frequency: Normalizes multi-rate sensors to a target FPS.
- Stateful Persistence: Carries the last known sensor state into the next chunk.
- Semantic Integrity: Correctly handles 'Late Arrivals' by yielding None for ticks preceding the first physical measurement.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_fps
|
float
|
The desired output frequency in Hz. |
required |
policy
|
SyncPolicy
|
A strategy implementing the |
SyncHold()
|
timestamp_column
|
str
|
The column name containing the timestamp data. |
'timestamp_ns'
|
fit_transform ¶
Fits the transformer to the data and then transforms it.
transform ¶
Syncronizes a sparse DataFrame chunk into a dense, uniform DataFrame.
Example with generic dataframe
from mosaicolabs.ml import SyncTransformer, SyncHold
# 5 Hz Target (200ms steps)
transformer = SyncTransformer(target_fps=5, policy=SyncHold())
# Define a sparse dataframe with two sensors:
# `sensor_a` starts at 0, `sensor_b` arrives at 600ms
sparse_data = {
"timestamp_ns": [
0,
600_000_000,
900_000_000,
1_200_000_000,
1_500_000_000,
],
"val1": [10.0, 11.0, None, 12.0, 13.0],
"val2": [None, 1.0, 2.0, None, 3.0],
}
df = pd.DataFrame(sparse_data)
dense_df = transformer.fit(df).transform(df)
# Expected output
# "timestamp_ns", "sensor_a", "sensor_b"
# 0, 10.0, None # <- avoid hallucination on `sensor_b`
# 200_000_000, 10.0, None # <- avoid hallucination on `sensor_b`
# 400_000_000, 10.0, None # <- avoid hallucination on `sensor_b`
# 600_000_000, 11.0, 1.0
# 800_000_000, 11.0, 2.0
# 1_000_000_000, 11.0, 2.0
# 1_200_000_000, 12.0, 2.0
# 1_400_000_000, 12.0, 2.0
Example with Mosaico dataframe
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.sequence_handler("example_sequence")
# Resample at 30Hz and fill the NaNs with a `Hold` policy
sync_transformer = SyncTransformer(
target_fps = 30,
) # (1)!
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
synched_df = sync_transformer.fit(df).transform(df)
# Do something with the synched dataframe
# ...
- Note: the
SyncTransformeris created outside the chunk-relatedforloop: the transformer is a stateful state-machine designed to maintain signal continuity across independent data chunks.
mosaicolabs.ml.SyncPolicy ¶
Bases: Protocol
Protocol defining the interface for data synchronization policies.
A SyncPolicy determines how sparse data samples are mapped onto a standard, dense time grid.
Classes implementing this protocol are used by SyncTransformer to resample sensor data
(e.g., holding the last value, interpolating, or dropping old data).
Common implementations include:
SyncHold: Zero-order hold (carries the last value forward).SyncAsOf: Tolerance-based hold (carries value forward only for a specific duration).SyncDrop: Strict interval matching (drops data outside the current grid step).
apply ¶
Applies the synchronization logic to sparse samples, mapping them to the target grid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline (nanosecond timestamps). |
required |
s_ts
|
ndarray
|
The source acquisition timestamps (sparse data). |
required |
s_val
|
ndarray
|
The source sensor values corresponding to |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: An object-array of the same length as |
mosaicolabs.ml.SyncHold ¶
Classic Last-Value-Hold (Zero-Order Hold) synchronization.
This policy carries the most recent valid sample forward to all future grid ticks until a new sample is received. It effectively creates a "step" function from the sparse samples.
apply ¶
Applies the Zero-Order Hold logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline (nanosecond timestamps). |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Densely populated array where each point holds the last known value. |
mosaicolabs.ml.SyncAsOf ¶
Tolerance-based 'As-Of' synchronization.
Similar to SyncHold, but limits how far a value can be carried forward.
If the time difference between the grid tick and the last sample exceeds
tolerance_ns, the value is considered stale and the slot is left as None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tolerance_ns
|
int
|
Maximum allowed age (in nanoseconds) for a sample to be valid. |
required |
apply ¶
Applies the As-Of synchronization logic with tolerance check.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline. |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Densely populated array, with |
mosaicolabs.ml.SyncDrop ¶
Strict Interval-based 'Drop' synchronization.
Only yields a value if a sample was acquired strictly within the
current grid interval (t_grid - step_ns, t_grid]. If no sample falls
in this window, the result is None. This is useful for event-based matching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_ns
|
int
|
The duration of the backward-looking window in nanoseconds. |
required |
apply ¶
Applies the Drop synchronization logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline. |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Array containing values only for populated intervals, otherwise |
mosaicolabs.ml.VideoDecodingTransformer ¶
VideoDecodingTransformer(
topics,
stateless_codec=_StatelessDefaultCodec(),
stateful_decoding_session_type=StatefulDecodingSession,
)
Bases: BaseEstimator, TransformerMixin
A Scikit-Learn compatible stateful transformer that reconstructs CompressedImage byte streams into usable PIL Images chronologically before temporal synchronization.
This transformer bridges the gap between video compression mechanics (which rely on inter-frame dependencies like I/P/B frames) and machine learning batching strategies. It maintains a persistent decoding session across data chunks, ensuring that delta-frames are correctly decoded using the proper reference frames before any temporal synchronization or randomization occurs downstream.
Attributes:
| Name | Type | Description |
|---|---|---|
topics |
List[str]
|
A list of Mosaico topic names (e.g., "/front/camera/image") that contain the compressed image data to be decoded. |
_stateless_codec |
The codec used for stateless formats like JPEG or PNG. |
|
_stateful_decoding_session_type |
The class/factory used to instantiate the stateful session for formats like H.264 or HEVC. |
|
_session |
The active decoding session instance. |
Initializes the VideoDecodingTransformer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
The list of topics to target for image decoding. |
required |
stateless_codec
|
optional
|
An instance of a codec to handle formats that do not require state (e.g., JPEG). Defaults to _StatelessDefaultCodec(). |
_StatelessDefaultCodec()
|
stateful_decoding_session_type
|
optional
|
The class to instantiate for state-dependent video decoding. Defaults to StatefulDecodingSession. |
StatefulDecodingSession
|
fit_transform ¶
Fits the transformer to the data and then transforms it.
fit ¶
Initializes the persistent decoding session.
This method should be called before transforming the first chunk of data.
It establishes the stateful session required to track reference frames
across subsequent transform calls.
Architectural Note
The C-level decoding session is intentionally initialized here rather than in
__init__(). This complies with Scikit-Learn's strict "no side-effects"
contract for constructors (allowing safe sklearn.base.clone operations),
ensures the transformer can be pickled for multiprocessing (e.g., via joblib),
and enforces lazy resource allocation.
The session is initialized at the first function call. Any other method call
does nothing, unless the reset()
method is called.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
X
|
DataFrame
|
The input DataFrame chunk (unused in this method but required by the Scikit-Learn API). |
required |
y
|
optional
|
Target values (unused). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
self |
VideoDecodingTransformer
|
Returns the transformer instance. |
transform ¶
Decodes the compressed image data within the DataFrame chronologically.
For each requested topic, this method extracts the raw bytes and image format. Stateful formats (H.264, HEVC) are routed through the persistent decoding session, using the topic name as the context to isolate decoder states per camera. Stateless formats (JPEG) are routed to the stateless codec.
The resulting PIL.Image objects are inserted into a new column named
{topic}.compressed_image.decoded, and the original raw byte/format columns are dropped
to conserve memory.
Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, VideoDecodingTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.sequence_handler("example_sequence")
# Resample at 30Hz and fill the NaNs with a `Hold` policy
vdec_transf = VideoDecodingTransformer(
topics=["/front_stereo_camera/left/image_compressed"]
) # (1)!
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks():
decoded_df = vdec_transf.fit_transform(df) # (2)!
# Do something with the decoded dataframe
# ...
- Note: the
VideoDecodingTransformeris created outside the chunk-relatedforloop: the transformer is a stateful state-machine designed to maintain signal continuity across different data chunks. - The decoded image is here:
decoded_df["/front_stereo_camera/left/image_compressed.compressed_image.decoded"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
X
|
DataFrame
|
A chronologically ordered sparse chunk from the |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: A new DataFrame containing the fully reconstructed |
reset ¶
Releases the C-level decoder resources and resets the session state.
This should be called when processing is complete or if the sequence is restarted, to prevent memory leaks and ensure a clean state for the next run.