Machine Learning Module¶
mosaicolabs.ml.DataFrameExtractor ¶
Extracts and manages data from Mosaico Sequences, converting them into tabular DataFrames.
This class serves as a high-performance bridge for training ML models or performing data analysis. It extracts data from multiple sequence topics and unifies them into a single, flattened, sparse DataFrame aligned by timestamps.
Key Features:
- Memory Efficiency: Uses a windowed approach to process multi-gigabyte sequences in chunks without overloading RAM.
- Flattening: Automatically flattens nested structures (e.g.,
pose.position.x) into dot-notation columns. - Sparse Alignment: Merges multiple topics with different frequencies into a single timeline
(using
NaNfor missing values at specific timestamps).
Initializes the DataFrameExtractor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_handler
|
SequenceHandler
|
An active handle to a Mosaico sequence. |
required |
to_pandas_chunks ¶
Generator that yields time-windowed pandas DataFrames from the sequence.
This method leverages server-side filtering and local batch processing to maintain a low memory footprint. It handles batches that cross window boundaries by carrying over the remainder to the next chunk.
Important
This function must be iterated (e.g. called in a for loop)
Warning
Setting window_sec to a very large value might disable windowing.
The extractor will attempt to load the entire requested time range into memory.
This is only recommended for small sequences or systems with high RAM capacity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
Topics to extract. Defaults to all topics. |
None
|
window_sec
|
float
|
Duration of each DataFrame chunk in seconds. |
5.0
|
timestamp_ns_start
|
int
|
Global start time for extraction. |
None
|
timestamp_ns_end
|
int
|
Global end time for extraction. |
None
|
Yields:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: A sparse, flattened DataFrame containing data from all selected topics and their fields within the current time window. |
Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.get_sequence_handler("example_sequence")
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
# Do something with the dataframe.
# For example, you can sync the data using the `SyncTransformer`:
sync_transformer = SyncTransformer(
target_fps = 30, # resample at 30 Hz and fill the Nans with a Hold policy
)
synced_df = sync_transformer.transform(df)
# Reconstruct the image message from a dataframe row
image_msg = Message.from_dataframe_row(synced_df, "/front/camera/image_raw")
image_data = image_msg.get_data(Image)
# Show the image
image_data.to_pillow().show()
# ...
mosaicolabs.ml.SyncTransformer ¶
SyncTransformer(
target_fps,
policy=SyncHold(),
timestamp_column="timestamp_ns",
)
Stateful resampler for Mosaico DataFrames.
This class aligns heterogeneous sensor streams onto a uniform time grid.
It is designed to consume the windowed outputs of the DataFrameExtractor
sequentially, maintaining internal state to ensure signal continuity across
batch boundaries.
Scikit-Learn Compatibility¶
The class implements the standard fit/transform interface, making it
fully compliant with Scikit-learn Pipeline and FeatureUnion objects.
- fit(X): Captures the initial timestamp from the first chunk to align the grid.
- transform(X): Executes the temporal resampling logic for a single DataFrame chunk and returns a dense DataFrame.
- fit_transform(X): Fits the transformer to the data and then transforms it.
Key Features:
- Fixed Frequency: Normalizes multi-rate sensors to a target FPS.
- Stateful Persistence: Carries the last known sensor state into the next chunk.
- Semantic Integrity: Correctly handles 'Late Arrivals' by yielding None for ticks preceding the first physical measurement.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_fps
|
float
|
The desired output frequency in Hz. |
required |
policy
|
SyncPolicy
|
A strategy implementing the |
SyncHold()
|
timestamp_column
|
str
|
The column name containing the timestamp data. |
'timestamp_ns'
|
transform ¶
Syncronizes a sparse DataFrame chunk into a dense, uniform DataFrame.
Example with generic dataframe
from mosaicolabs.ml import SyncTransformer, SyncHold
# 5 Hz Target (200ms steps)
transformer = SyncTransformer(target_fps=5, policy=SyncHold())
# Define a sparse dataframe with two sensors:
# `sensor_a` starts at 0, `sensor_b` arrives at 600ms
sparse_data = {
"timestamp_ns": [
0,
600_000_000,
900_000_000,
1_200_000_000,
1_500_000_000,
],
"val1": [10.0, 11.0, None, 12.0, 13.0],
"val2": [None, 1.0, 2.0, None, 3.0],
}
df = pd.DataFrame(sparse_data)
dense_df = transformer.fit(df).transform(df)
# Expected output
# "timestamp_ns", "sensor_a", "sensor_b"
# 0, 10.0, None # <- avoid hallucination on `sensor_b`
# 200_000_000, 10.0, None # <- avoid hallucination on `sensor_b`
# 400_000_000, 10.0, None # <- avoid hallucination on `sensor_b`
# 600_000_000, 11.0, 1.0
# 800_000_000, 11.0, 2.0
# 1_000_000_000, 11.0, 2.0
# 1_200_000_000, 12.0, 2.0
# 1_400_000_000, 12.0, 2.0
Example with Mosaico dataframe
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer
with MosaicoClient.connect("localhost", 6726) as client:
sequence_handler = client.get_sequence_handler("example_sequence")
for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
topics = ["/front/imu", "/front/camera/image_raw"]
):
# Synch the data at 30 Hz:
sync_transformer = SyncTransformer(
target_fps = 30, # resample at 30 Hz and fill the Nans with a `Hold` policy
)
synced_df = sync_transformer.transform(df)
# Do something with the synced dataframe
# ...
mosaicolabs.ml.SyncPolicy ¶
Bases: Protocol
Protocol defining the interface for data synchronization policies.
A SyncPolicy determines how sparse data samples are mapped onto a standard, dense time grid.
Classes implementing this protocol are used by SyncTransformer to resample sensor data
(e.g., holding the last value, interpolating, or dropping old data).
Common implementations include:
SyncHold: Zero-order hold (carries the last value forward).SyncAsOf: Tolerance-based hold (carries value forward only for a specific duration).SyncDrop: Strict interval matching (drops data outside the current grid step).
apply ¶
Applies the synchronization logic to sparse samples, mapping them to the target grid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline (nanosecond timestamps). |
required |
s_ts
|
ndarray
|
The source acquisition timestamps (sparse data). |
required |
s_val
|
ndarray
|
The source sensor values corresponding to |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: An object-array of the same length as |
mosaicolabs.ml.SyncHold ¶
Classic Last-Value-Hold (Zero-Order Hold) synchronization.
This policy carries the most recent valid sample forward to all future grid ticks until a new sample is received. It effectively creates a "step" function from the sparse samples.
apply ¶
Applies the Zero-Order Hold logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline (nanosecond timestamps). |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Densely populated array where each point holds the last known value. |
mosaicolabs.ml.SyncAsOf ¶
Tolerance-based 'As-Of' synchronization.
Similar to SyncHold, but limits how far a value can be carried forward.
If the time difference between the grid tick and the last sample exceeds
tolerance_ns, the value is considered stale and the slot is left as None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tolerance_ns
|
int
|
Maximum allowed age (in nanoseconds) for a sample to be valid. |
required |
apply ¶
Applies the As-Of synchronization logic with tolerance check.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline. |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Densely populated array, with |
mosaicolabs.ml.SyncDrop ¶
Strict Interval-based 'Drop' synchronization.
Only yields a value if a sample was acquired strictly within the
current grid interval (t_grid - step_ns, t_grid]. If no sample falls
in this window, the result is None. This is useful for event-based matching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_ns
|
int
|
The duration of the backward-looking window in nanoseconds. |
required |
apply ¶
Applies the Drop synchronization logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
grid
|
ndarray
|
The target dense timeline. |
required |
s_ts
|
ndarray
|
The source acquisition timestamps. |
required |
s_val
|
ndarray
|
The source sensor values. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: Array containing values only for populated intervals, otherwise |