Skip to content

Machine Learning Module

mosaicolabs.ml.DataFrameExtractor

DataFrameExtractor(
    sequence_handler, timestamp_column_name=None
)

Extracts and manages data from Mosaico Sequences, converting them into tabular DataFrames.

This class serves as a high-performance bridge for training ML models or performing data analysis. It extracts data from multiple sequence topics and unifies them into a single, flattened, sparse DataFrame aligned by timestamps.

Key Features:

  • Memory Efficiency: Uses a windowed approach to process multi-gigabyte sequences in chunks without overloading RAM.
  • Flattening: Automatically flattens nested structures (e.g., pose.position.x) into dot-notation columns.
  • Sparse Alignment: Merges multiple topics with different frequencies into a single timeline (using NaN for missing values at specific timestamps).

Initializes the DataFrameExtractor.

Parameters:

Name Type Description Default
sequence_handler SequenceHandler

An active handle to a Mosaico sequence.

required
timestamp_column_name str

Name of the timestamp column. Defaults to None, in which case "timestamp_ns" will be used.

None

to_pandas_chunks

to_pandas_chunks(
    topics=None,
    window_sec=5.0,
    timestamp_ns_start=None,
    timestamp_ns_end=None,
)

Generator that yields time-windowed pandas DataFrames from the sequence.

This method leverages server-side filtering and local batch processing to maintain a low memory footprint. It handles batches that cross window boundaries by carrying over the remainder to the next chunk.

Important

This function must be iterated (e.g. called in a for loop)

Warning

Setting window_sec to a very large value might disable windowing. The extractor will attempt to load the entire requested time range into memory. This is only recommended for small sequences or systems with high RAM capacity.

Parameters:

Name Type Description Default
topics List[str]

Topics to extract. Defaults to all topics.

None
window_sec float

Duration of each DataFrame chunk in seconds.

5.0
timestamp_ns_start int

Global start time for extraction.

None
timestamp_ns_end int

Global end time for extraction.

None

Yields:

Type Description
DataFrame

pd.DataFrame: A sparse, flattened DataFrame containing data from all selected topics and their fields within the current time window.

Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.sequence_handler("example_sequence")
    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
        topics = ["/front/imu", "/front/camera/image_raw"]
    ):
        # Do something with the dataframe.
        # ...

mosaicolabs.ml.SyncTransformer

SyncTransformer(
    target_fps,
    policy=SyncHold(),
    timestamp_column="timestamp_ns",
)

Bases: BaseEstimator, TransformerMixin

Stateful resampler for Mosaico DataFrames.

This class aligns heterogeneous sensor streams onto a uniform time grid. It is designed to consume the windowed outputs of the DataFrameExtractor sequentially, maintaining internal state to ensure signal continuity across batch boundaries.

Scikit-Learn Compatibility

The class implements the standard fit/transform interface, making it fully compliant with Scikit-learn Pipeline and FeatureUnion objects.

  • fit(X): Captures the initial timestamp from the first chunk to align the grid.
  • transform(X): Executes the temporal resampling logic for a single DataFrame chunk and returns a dense DataFrame.
  • fit_transform(X): Fits the transformer to the data and then transforms it.

Key Features:

  • Fixed Frequency: Normalizes multi-rate sensors to a target FPS.
  • Stateful Persistence: Carries the last known sensor state into the next chunk.
  • Semantic Integrity: Correctly handles 'Late Arrivals' by yielding None for ticks preceding the first physical measurement.

Parameters:

Name Type Description Default
target_fps float

The desired output frequency in Hz.

required
policy SyncPolicy

A strategy implementing the SyncPolicy protocol.

SyncHold()
timestamp_column str

The column name containing the timestamp data.

'timestamp_ns'

fit_transform

fit_transform(X, y=None, **fit_params)

Fits the transformer to the data and then transforms it.

fit

fit(X, y=None)

Initializes the grid alignment based on the first observed timestamp.

transform

transform(X)

Syncronizes a sparse DataFrame chunk into a dense, uniform DataFrame.

Example with generic dataframe
from mosaicolabs.ml import SyncTransformer, SyncHold

# 5 Hz Target (200ms steps)
transformer = SyncTransformer(target_fps=5, policy=SyncHold())

# Define a sparse dataframe with two sensors:
# `sensor_a` starts at 0, `sensor_b` arrives at 600ms
sparse_data = {
    "timestamp_ns": [
        0,
        600_000_000,
        900_000_000,
        1_200_000_000,
        1_500_000_000,
    ],
    "val1": [10.0, 11.0, None, 12.0, 13.0],
    "val2": [None, 1.0, 2.0, None, 3.0],
}
df = pd.DataFrame(sparse_data)
dense_df = transformer.fit(df).transform(df)

# Expected output
# "timestamp_ns",   "sensor_a", "sensor_b"
# 0,                10.0,       None        # <- avoid hallucination on `sensor_b`
# 200_000_000,      10.0,       None        # <- avoid hallucination on `sensor_b`
# 400_000_000,      10.0,       None        # <- avoid hallucination on `sensor_b`
# 600_000_000,      11.0,       1.0
# 800_000_000,      11.0,       2.0
# 1_000_000_000,    11.0,       2.0
# 1_200_000_000,    12.0,       2.0
# 1_400_000_000,    12.0,       2.0
Example with Mosaico dataframe
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, SyncTransformer

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.sequence_handler("example_sequence")
    # Resample at 30Hz and fill the NaNs with a `Hold` policy
    sync_transformer = SyncTransformer(
        target_fps = 30,
    ) # (1)!

    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks(
        topics = ["/front/imu", "/front/camera/image_raw"]
    ):
        synched_df = sync_transformer.fit(df).transform(df)
        # Do something with the synched dataframe
        # ...
  1. Note: the SyncTransformer is created outside the chunk-related for loop: the transformer is a stateful state-machine designed to maintain signal continuity across independent data chunks.

reset

reset()

Resets the internal temporal state and cached sensor values.

mosaicolabs.ml.SyncPolicy

Bases: Protocol

Protocol defining the interface for data synchronization policies.

A SyncPolicy determines how sparse data samples are mapped onto a standard, dense time grid. Classes implementing this protocol are used by SyncTransformer to resample sensor data (e.g., holding the last value, interpolating, or dropping old data).

Common implementations include:

  • SyncHold: Zero-order hold (carries the last value forward).
  • SyncAsOf: Tolerance-based hold (carries value forward only for a specific duration).
  • SyncDrop: Strict interval matching (drops data outside the current grid step).

apply

apply(grid, s_ts, s_val)

Applies the synchronization logic to sparse samples, mapping them to the target grid.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline (nanosecond timestamps).

required
s_ts ndarray

The source acquisition timestamps (sparse data).

required
s_val ndarray

The source sensor values corresponding to s_ts.

required

Returns:

Type Description
ndarray

np.ndarray: An object-array of the same length as grid, containing the synchronized values. Slots with no valid data are filled with None.

mosaicolabs.ml.SyncHold

Classic Last-Value-Hold (Zero-Order Hold) synchronization.

This policy carries the most recent valid sample forward to all future grid ticks until a new sample is received. It effectively creates a "step" function from the sparse samples.

apply

apply(grid, s_ts, s_val)

Applies the Zero-Order Hold logic.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline (nanosecond timestamps).

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Densely populated array where each point holds the last known value.

mosaicolabs.ml.SyncAsOf

SyncAsOf(tolerance_ns)

Tolerance-based 'As-Of' synchronization.

Similar to SyncHold, but limits how far a value can be carried forward. If the time difference between the grid tick and the last sample exceeds tolerance_ns, the value is considered stale and the slot is left as None.

Parameters:

Name Type Description Default
tolerance_ns int

Maximum allowed age (in nanoseconds) for a sample to be valid.

required

apply

apply(grid, s_ts, s_val)

Applies the As-Of synchronization logic with tolerance check.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline.

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Densely populated array, with None where data is missing or stale.

mosaicolabs.ml.SyncDrop

SyncDrop(step_ns)

Strict Interval-based 'Drop' synchronization.

Only yields a value if a sample was acquired strictly within the current grid interval (t_grid - step_ns, t_grid]. If no sample falls in this window, the result is None. This is useful for event-based matching.

Parameters:

Name Type Description Default
step_ns int

The duration of the backward-looking window in nanoseconds.

required

apply

apply(grid, s_ts, s_val)

Applies the Drop synchronization logic.

Parameters:

Name Type Description Default
grid ndarray

The target dense timeline.

required
s_ts ndarray

The source acquisition timestamps.

required
s_val ndarray

The source sensor values.

required

Returns:

Type Description
ndarray

np.ndarray: Array containing values only for populated intervals, otherwise None.

mosaicolabs.ml.VideoDecodingTransformer

VideoDecodingTransformer(
    topics,
    stateless_codec=_StatelessDefaultCodec(),
    stateful_decoding_session_type=StatefulDecodingSession,
)

Bases: BaseEstimator, TransformerMixin

A Scikit-Learn compatible stateful transformer that reconstructs CompressedImage byte streams into usable PIL Images chronologically before temporal synchronization.

This transformer bridges the gap between video compression mechanics (which rely on inter-frame dependencies like I/P/B frames) and machine learning batching strategies. It maintains a persistent decoding session across data chunks, ensuring that delta-frames are correctly decoded using the proper reference frames before any temporal synchronization or randomization occurs downstream.

Attributes:

Name Type Description
topics List[str]

A list of Mosaico topic names (e.g., "/front/camera/image") that contain the compressed image data to be decoded.

_stateless_codec

The codec used for stateless formats like JPEG or PNG.

_stateful_decoding_session_type

The class/factory used to instantiate the stateful session for formats like H.264 or HEVC.

_session

The active decoding session instance.

Initializes the VideoDecodingTransformer.

Parameters:

Name Type Description Default
topics List[str]

The list of topics to target for image decoding.

required
stateless_codec optional

An instance of a codec to handle formats that do not require state (e.g., JPEG). Defaults to _StatelessDefaultCodec().

_StatelessDefaultCodec()
stateful_decoding_session_type optional

The class to instantiate for state-dependent video decoding. Defaults to StatefulDecodingSession.

StatefulDecodingSession

fit_transform

fit_transform(X, y=None, **fit_params)

Fits the transformer to the data and then transforms it.

fit

fit(X, y=None)

Initializes the persistent decoding session.

This method should be called before transforming the first chunk of data. It establishes the stateful session required to track reference frames across subsequent transform calls.

Architectural Note

The C-level decoding session is intentionally initialized here rather than in __init__(). This complies with Scikit-Learn's strict "no side-effects" contract for constructors (allowing safe sklearn.base.clone operations), ensures the transformer can be pickled for multiprocessing (e.g., via joblib), and enforces lazy resource allocation.

The session is initialized at the first function call. Any other method call does nothing, unless the reset() method is called.

Parameters:

Name Type Description Default
X DataFrame

The input DataFrame chunk (unused in this method but required by the Scikit-Learn API).

required
y optional

Target values (unused).

None

Returns:

Name Type Description
self VideoDecodingTransformer

Returns the transformer instance.

transform

transform(X)

Decodes the compressed image data within the DataFrame chronologically.

For each requested topic, this method extracts the raw bytes and image format. Stateful formats (H.264, HEVC) are routed through the persistent decoding session, using the topic name as the context to isolate decoder states per camera. Stateless formats (JPEG) are routed to the stateless codec.

The resulting PIL.Image objects are inserted into a new column named {topic}.compressed_image.decoded, and the original raw byte/format columns are dropped to conserve memory.

Example
# Obtain a dataframe with DataFrameExtractor
from mosaicolabs import MosaicoClient, IMU, Image
from mosaicolabs.ml import DataFrameExtractor, VideoDecodingTransformer

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_handler = client.sequence_handler("example_sequence")
    # Resample at 30Hz and fill the NaNs with a `Hold` policy
    vdec_transf = VideoDecodingTransformer(
        topics=["/front_stereo_camera/left/image_compressed"]
    ) # (1)!

    for df in DataFrameExtractor(sequence_handler).to_pandas_chunks():
        decoded_df = vdec_transf.fit_transform(df) # (2)!
        # Do something with the decoded dataframe
        # ...
  1. Note: the VideoDecodingTransformer is created outside the chunk-related for loop: the transformer is a stateful state-machine designed to maintain signal continuity across different data chunks.
  2. The decoded image is here: decoded_df["/front_stereo_camera/left/image_compressed.compressed_image.decoded"]

Parameters:

Name Type Description Default
X DataFrame

A chronologically ordered sparse chunk from the DataFrameExtractor, with a new column named {topic}.compressed_image.decoded.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A new DataFrame containing the fully reconstructed PIL.Image objects in place of the raw byte streams.

reset

reset()

Releases the C-level decoder resources and resets the session state.

This should be called when processing is complete or if the sequence is restarted, to prevent memory leaks and ensure a clean state for the next run.