Skip to content

Machine Learning Module

mosaicolabs.ml.DataFrameExtractor

DataFrameExtractor(sequence_handler)

Extracts and manages data from Mosaico Sequences, converting them into tabular DataFrames.

This class serves as a high-performance bridge for training ML models or performing data analysis. It extracts data from multiple sequence topics and unifies them into a single, flattened, sparse DataFrame aligned by timestamps.

Key Features:

  • Memory Efficiency: Uses a windowed approach to process multi-gigabyte sequences in chunks without overloading RAM.
  • Flattening: Automatically flattens nested structures (e.g., pose.position.x) into dot-notation columns.
  • Sparse Alignment: Merges multiple topics with different frequencies into a single timeline (using NaN for missing values at specific timestamps).

Initializes the DataFrameExtractor.

Parameters:

Name Type Description Default
sequence_handler SequenceHandler

An active handle to a Mosaico sequence.

required

to_pandas_chunks

to_pandas_chunks(
    topics=None,
    window_sec=5.0,
    timestamp_ns_start=None,
    timestamp_ns_end=None,
)

Generator that yields time-windowed pandas DataFrames from the sequence.

This method leverages server-side filtering and local batch processing to maintain a low memory footprint. It handles batches that cross window boundaries by carrying over the remainder to the next chunk.

Important

This function must be iterated (e.g. called in a for loop)

Warning

Setting window_sec to a very large value might disable windowing. The extractor will attempt to load the entire requested time range into memory. This is only recommended for small sequences or systems with high RAM capacity.

Parameters:

Name Type Description Default
topics List[str]

Topics to extract. Defaults to all topics.

None
window_sec float

Duration of each DataFrame chunk in seconds.

5.0
timestamp_ns_start int

Global start time for extraction.

None
timestamp_ns_end int

Global end time for extraction.

None

Yields:

Type Description
DataFrame

pd.DataFrame: A sparse, flattened DataFrame containing data from all selected topics and their fields within the current time window.

Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.get_sequence_handler("example_sequence")
    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
        topics = ["/front/imu", "/front/camera/image_raw"]
    ):
        # Do something with the dataframe.
        # For example, you can sync the data using the `SyncTransformer`:
        sync_transformer = SyncTransformer(
            target_fps = 30, # resample at 30 Hz and fill the Nans with a Hold policy
        )
        synced_df = sync_transformer.transform(df)

        # Reconstruct the image message from a dataframe row
        image_msg = Message.from_dataframe_row(synced_df, "/front/camera/image_raw")
        image_data = image_msg.get_data(Image)
        # Show the image
        image_data.to_pillow().show()
        # ...

mosaicolabs.ml.SyncTransformer

SyncTransformer(
    target_fps,
    policy=SyncHold(),
    timestamp_column="timestamp_ns",
)

Stateful resampler for Mosaico DataFrames.

This class aligns heterogeneous sensor streams onto a uniform time grid. It is designed to consume the windowed outputs of the DataFrameExtractor sequentially, maintaining internal state to ensure signal continuity across batch boundaries.

Scikit-Learn Compatibility

The class implements the standard fit/transform interface, making it fully compliant with Scikit-learn Pipeline and FeatureUnion objects.

  • fit(X): Captures the initial timestamp from the first chunk to align the grid.
  • transform(X): Executes the temporal resampling logic for a single DataFrame chunk and returns a dense DataFrame.
  • fit_transform(X): Fits the transformer to the data and then transforms it.

Key Features:

  • Fixed Frequency: Normalizes multi-rate sensors to a target FPS.
  • Stateful Persistence: Carries the last known sensor state into the next chunk.
  • Semantic Integrity: Correctly handles 'Late Arrivals' by yielding None for ticks preceding the first physical measurement.

Parameters:

Name Type Description Default
target_fps float

The desired output frequency in Hz.

required
policy SyncPolicy

A strategy implementing the SyncPolicy protocol.

SyncHold()
timestamp_column str

The column name containing the timestamp data.

'timestamp_ns'

fit

fit(X, y=None)

Initializes the grid alignment based on the first observed timestamp.

transform

transform(X)

Syncronizes a sparse DataFrame chunk into a dense, uniform DataFrame.

Example with generic dataframe
from mosaicolabs.ml import SyncTransformer, SyncHold

# 5 Hz Target (200ms steps)
transformer = SyncTransformer(target_fps=5, policy=SyncHold())

# Define a sparse dataframe with two sensors:
# `sensor_a` starts at 0, `sensor_b` arrives at 600ms
sparse_data = {
    "timestamp_ns": [
        0,
        600_000_000,
        900_000_000,
        1_200_000_000,
        1_500_000_000,
    ],
    "val1": [10.0, 11.0, None, 12.0, 13.0],
    "val2": [None, 1.0, 2.0, None, 3.0],
}
df = pd.DataFrame(sparse_data)
dense_df = transformer.fit(df).transform(df)

# Expected output
# "timestamp_ns",   "sensor_a", "sensor_b"
# 0,                10.0,       None        # <- avoid hallucination on `sensor_b`
# 200_000_000,      10.0,       None        # <- avoid hallucination on `sensor_b`
# 400_000_000,      10.0,       None        # <- avoid hallucination on `sensor_b`
# 600_000_000,      11.0,       1.0
# 800_000_000,      11.0,       2.0
# 1_000_000_000,    11.0,       2.0
# 1_200_000_000,    12.0,       2.0
# 1_400_000_000,    12.0,       2.0
Example with Mosaico dataframe
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.get_sequence_handler("example_sequence")
    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
        topics = ["/front/imu", "/front/camera/image_raw"]
    ):
        # Synch the data at 30 Hz:
        sync_transformer = SyncTransformer(
            target_fps = 30, # resample at 30 Hz and fill the Nans with a `Hold` policy
        )
        synced_df = sync_transformer.transform(df)
        # Do something with the synced dataframe
        # ...

fit_transform

fit_transform(X, y=None)

Fits the transformer to the data and then transforms it.

reset

reset()

Resets the internal temporal state and cached sensor values.

mosaicolabs.ml.SyncPolicy

Bases: Protocol

Protocol defining the interface for data synchronization policies.

A SyncPolicy determines how sparse data samples are mapped onto a standard, dense time grid. Classes implementing this protocol are used by SyncTransformer to resample sensor data (e.g., holding the last value, interpolating, or dropping old data).

Common implementations include:

  • SyncHold: Zero-order hold (carries the last value forward).
  • SyncAsOf: Tolerance-based hold (carries value forward only for a specific duration).
  • SyncDrop: Strict interval matching (drops data outside the current grid step).

apply

apply(grid, s_ts, s_val)

Applies the synchronization logic to sparse samples, mapping them to the target grid.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline (nanosecond timestamps).

required
s_ts ndarray

The source acquisition timestamps (sparse data).

required
s_val ndarray

The source sensor values corresponding to s_ts.

required

Returns:

Type Description
ndarray

np.ndarray: An object-array of the same length as grid, containing the synchronized values. Slots with no valid data are filled with None.

mosaicolabs.ml.SyncHold

Classic Last-Value-Hold (Zero-Order Hold) synchronization.

This policy carries the most recent valid sample forward to all future grid ticks until a new sample is received. It effectively creates a "step" function from the sparse samples.

apply

apply(grid, s_ts, s_val)

Applies the Zero-Order Hold logic.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline (nanosecond timestamps).

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Densely populated array where each point holds the last known value.

mosaicolabs.ml.SyncAsOf

SyncAsOf(tolerance_ns)

Tolerance-based 'As-Of' synchronization.

Similar to SyncHold, but limits how far a value can be carried forward. If the time difference between the grid tick and the last sample exceeds tolerance_ns, the value is considered stale and the slot is left as None.

Parameters:

Name Type Description Default
tolerance_ns int

Maximum allowed age (in nanoseconds) for a sample to be valid.

required

apply

apply(grid, s_ts, s_val)

Applies the As-Of synchronization logic with tolerance check.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline.

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Densely populated array, with None where data is missing or stale.

mosaicolabs.ml.SyncDrop

SyncDrop(step_ns)

Strict Interval-based 'Drop' synchronization.

Only yields a value if a sample was acquired strictly within the current grid interval (t_grid - step_ns, t_grid]. If no sample falls in this window, the result is None. This is useful for event-based matching.

Parameters:

Name Type Description Default
step_ns int

The duration of the backward-looking window in nanoseconds.

required

apply

apply(grid, s_ts, s_val)

Applies the Drop synchronization logic.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline.

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Array containing values only for populated intervals, otherwise None.