Skip to content

Data Retrieval

mosaicolabs.handlers.SequenceHandler

SequenceHandler(
    *,
    sequence_model,
    client,
    connection_pool_allocator,
    executor_pool_allocator,
    timestamp_ns_min,
    timestamp_ns_max,
)

Represents a client-side handle for an existing Sequence on the Mosaico platform.

The SequenceHandler acts as a primary container for inspecting sequence-level metadata, listing available topics, and accessing data reading interfaces like the SequenceDataStreamer.

Obtaining a Handler

Users should not instantiate this class directly. The recommended way to obtain a handler is via the MosaicoClient.sequence_handler() factory method.

Internal constructor for SequenceHandler.

Do not call this directly. Users should retrieve instances via MosaicoClient.sequence_handler(), while internal modules should use the SequenceHandler._connect() factory.

Parameters:

Name Type Description Default
sequence_model Sequence

The underlying metadata and system info model for the sequence.

required
client FlightClient

The active FlightClient for remote operations.

required
timestamp_ns_min Optional[int]

The lowest timestamp (in ns) available in this sequence.

required
timestamp_ns_max Optional[int]

The highest timestamp (in ns) available in this sequence.

required

name property

name

The unique name of the sequence.

Returns:

Type Description
str

The unique name of the sequence.

topics property

topics

The list of topic names (data channels) available within this sequence.

Returns:

Type Description
List[str]

The list of topic names (data channels) available within this sequence.

sessions property

sessions

The list of all the writing sessions that produced this sequence (upon creation or updates).

Returns:

Type Description
List[Session]

A list of Session instances

user_metadata property

user_metadata

The user-defined metadata dictionary associated with this sequence.

Returns:

Type Description
Dict[str, Any]

The ucreated_timestampdata dictionary associated with this sequence.

created_timestamp property

created_timestamp

The UTC timestamp indicating when the entity was created on the server.

Returns:

Type Description
int

The UTC creation timestamp.

updated_timestamps property

updated_timestamps

The list of UTC timestamps indicating when the entity was updated on the server.

Returns:

Type Description
List[int]

The list of UTC update timestamps.

total_size_bytes property

total_size_bytes

The total physical storage footprint of the entity on the server in bytes.

Returns:

Type Description
int

The total physical storage in bytes.

timestamp_ns_min property

timestamp_ns_min

The lowest timestamp (nanoseconds) recorded in the sequence across all topics.

Returns:

Type Description
Optional[int]

The lowest timestamp (nanoseconds) recorded in the sequence across all topics, or None if the sequence contains no data or the timestamps could not be derived.

timestamp_ns_max property

timestamp_ns_max

The highest timestamp (nanoseconds) recorded in the sequence across all topics.

Returns:

Type Description
Optional[int]

The highest timestamp (nanoseconds) recorded in the sequence across all topics, or None if the sequence contains no data or the timestamps could not be derived.

get_data_streamer

get_data_streamer(
    topics=[],
    start_timestamp_ns=None,
    end_timestamp_ns=None,
)

Opens a reading channel for iterating over the sequence data.

The returned SequenceDataStreamer performs a K-way merge sort to provide a single, time-synchronized chronological stream of messages from multiple topics.

Parameters:

Name Type Description Default
topics List[str]

A subset of topic names to stream. If empty, all topics in the sequence are streamed.

[]
start_timestamp_ns Optional[int]

The inclusive lower bound (t >= start) for the time window in nanoseconds. The stream starts at the first message with a timestamp greater than or equal to this value.

None
end_timestamp_ns Optional[int]

The exclusive upper bound (t < end) for the time window in nanoseconds. The stream stops at the first message with a timestamp strictly less than this value.

None

Returns:

Type Description
SequenceDataStreamer

A SequenceDataStreamer iterator yielding (topic_name, message) tuples.

Raises:

Type Description
ValueError

If the provided topic names do not exist or if the sequence contains no data.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Start timed data-stream
        for topic, msg in streamer:
            print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}")

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()
Important

Every call to get_data_streamer() will automatically invoke close() on any previously spawned SequenceDataStreamer instance and its associated Apache Arrow Flight channels before initializing the new stream.

Example:

seq_handler = client.sequence_handler("mission_alpha")

# Opens first stream
streamer_v1 = seq_handler.get_data_streamer(start_timestamp_ns=T1)

# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = seq_handler.get_data_streamer(start_timestamp_ns=T2)

# Using `streamer_v1` will raise a ValueError
for topic, msg in streamer_v1 # raises here!
    pass

get_topic_handler

get_topic_handler(topic_name, force_new_instance=False)

Get a specific TopicHandler for a child topic.

Parameters:

Name Type Description Default
topic_name str

The relative name of the topic (e.g., "/camera/front").

required
force_new_instance bool

If True, bypasses the internal cache and recreates the handler.

False

Returns:

Type Description
TopicHandler

A TopicHandler dedicated to the specified topic.

Raises:

Type Description
ValueError

If the topic is not available in this sequence or an internal connection error occurs.

Example
import sys
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Use a Handler to inspect the catalog
        top_handler = seq_handler.get_topic_handler("/front/imu")
        if top_handler:
            print(f"Sequence: {top_handler.sequence_name}")
            print(f"        |Topic: {top_handler.sequence_name}:{top_handler.name}")
            print(f"        |User metadata: {top_handler.user_metadata}")
            print(f"        |Timestamp span: {top_handler.timestamp_ns_min} - {top_handler.timestamp_ns_max}")
            print(f"        |Created {top_handler.created_datetime}")
            print(f"        |Size (MB) {top_handler.total_size_bytes/(1024*1024)}")

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

update

update(
    on_error=Report,
    max_batch_size_bytes=None,
    max_batch_size_records=None,
)

Update the sequence on the platform and returns a SequenceUpdater for ingestion.

Important

The function must be called inside a with context, otherwise a RuntimeError is raised.

Parameters:

Name Type Description Default
on_error SessionLevelErrorPolicy | OnErrorPolicy

Behavior on write failure. Defaults to SessionLevelErrorPolicy.Report.

Deprecated: OnErrorPolicy is deprecated since v0.3.0; use SessionLevelErrorPolicy instead. It will be removed in v0.4.0.

Report
max_batch_size_bytes Optional[int]

Max bytes per Arrow batch.

None
max_batch_size_records Optional[int]

Max records per Arrow batch.

None

Returns:

Name Type Description
SequenceUpdater SequenceUpdater

An initialized updater instance.

Raises:

Type Description
RuntimeError

If the method is called outside a with context.

Exception

If any error occurs during sequence injection.

Example
from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy

# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
    # Get the handler for the sequence
    seq_handler = client.sequence_handler("mission_log_042")
    # Update the sequence
    with seq_handler.update(
        on_error = SessionLevelErrorPolicy.Delete
        ) as seq_updater:
            # Start creating topics and pushing data
            # (1)!

    # Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server
    # and closes all connections and pools
  1. See also:

reload

reload()

Reloads the handler's data from the server. Use this method when you need to retrieve the latest sequence information, e.g. after a sequence update.

Note

This method does not close any active topic handlers or data streamers. The function does not affect actual sequence data-streams. Therefore, it is safe to call this method multiple times without closing any active resources.

Returns:

Name Type Description
bool bool

True if the reload was successful, False otherwise.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Perform operations, typically updating the sequence on the server
        # ...
        # (1)!

        # Refresh the handler's data from the server
        if not seq_handler.reload():
            print("Failed to reload sequence handler")
  1. See also: SequenceUpdater

close

close()

Gracefully closes all cached topic handlers and active data streamers.

This method should be called to release network and memory resources when the handler is no longer needed.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Perform operations
        # ...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

mosaicolabs.handlers.TopicHandler

TopicHandler(
    *,
    client,
    topic_model,
    ticket,
    timestamp_ns_min,
    timestamp_ns_max,
)

Represents an existing topic on the Mosaico platform.

The TopicHandler provides a client-side interface for interacting with an individual data stream (topic). It allows users to inspect static metadata and system diagnostics (via the Topic model), and access the raw message stream through a dedicated TopicDataStreamer.

Obtaining a Handler

Direct instantiation of this class is discouraged. Use the MosaicoClient.topic_handler() factory method to retrieve an initialized handler.

Internal constructor for TopicHandler.

Do not call this directly. Users should retrieve instances via MosaicoClient.topic_handler(), or by using the get_topic_handler() method from the SequenceHandler instance of the parent senquence. Internal modules should use the TopicHandler._connect() factory.

Parameters:

Name Type Description Default
client FlightClient

The active FlightClient for remote operations.

required
topic_model Topic

The underlying metadata and system info model for the topic.

required
ticket Ticket

The remote resource ticket used for data retrieval.

required
timestamp_ns_min Optional[int]

The lowest timestamp (in ns) available in this topic.

required
timestamp_ns_max Optional[int]

The highest timestamp (in ns) available in this topic.

required

name property

name

The relative name of the topic (e.g., "/front_cam/image_raw").

Returns:

Type Description
str

The relative name of the topic.

sequence_name property

sequence_name

The name of the parent sequence containing this topic.

Returns:

Type Description
str

The name of the parent sequence.

user_metadata property

user_metadata

The user-defined metadata dictionary associated with this topic.

Returns:

Type Description
Dict[str, Any]

The user-defined metadata dictionary.

created_timestamp property

created_timestamp

The UTC timestamp indicating when the entity was created on the server.

Returns:

Type Description
int

The UTC timestamp indicating when the entity was created on the server.

locked property

locked

Indicates if the resource is currently locked.

A locked state typically occurs during active writing or maintenance operations, preventing deletion or structural modifications.

Returns:

Type Description
bool

True if the resource is currently locked, False otherwise.

chunks_number property

chunks_number

The number of physical data chunks stored for this topic.

Returns:

Type Description
Optional[int]

The number of physical data chunks stored for this topic, or None if the server did not provide detailed storage statistics.

ontology_tag property

ontology_tag

The ontology type identifier (e.g., 'imu', 'gnss').

This corresponds to the __ontology_tag__ defined in the Serializable class registry.

Returns:

Type Description
str

The ontology type identifier.

serialization_format property

serialization_format

The format used to serialize the topic data (e.g., 'arrow', 'image').

This corresponds to the SerializationFormat enum.

Returns:

Type Description
str

The serialization format.

total_size_bytes property

total_size_bytes

The total physical storage footprint of the entity on the server in bytes.

Returns:

Type Description
int

The total physical storage footprint of the entity on the server in bytes.

timestamp_ns_min property

timestamp_ns_min

The lowest timestamp (nanoseconds) recorded in this topic.

Returns:

Type Description
Optional[int]

The lowest timestamp (nanoseconds) recorded in this topic, or None if the topic is empty or timestamps are unavailable.

timestamp_ns_max property

timestamp_ns_max

The highest timestamp (nanoseconds) recorded in this topic.

Returns:

Type Description
Optional[int]

The highest timestamp (nanoseconds) recorded in this topic, or None if the topic is empty or timestamps are unavailable.

get_data_streamer

get_data_streamer(
    start_timestamp_ns=None, end_timestamp_ns=None
)

Opens a high-performance reading channel for iterating over this topic's data.

Stream Lifecycle Policy: Single-Active-Streamer

To optimize resource utilization and prevent backend socket exhaustion, this handler maintains at most one active stream at a time.

Parameters:

Name Type Description Default
start_timestamp_ns Optional[int]

The inclusive lower bound (t >= start) in nanoseconds. The stream begins at the first message with a timestamp >= this value.

None
end_timestamp_ns Optional[int]

The exclusive upper bound (t < end) in nanoseconds. The stream terminates before reaching any message with a timestamp >= this value.

None

Returns:

Name Type Description
TopicDataStreamer TopicDataStreamer

A chronological iterator for the requested data window.

Raises:

Type Description
ValueError

If the topic contains no data or the handler is in an invalid state.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Direct, low-overhead loop
        for imu_msg in imu_stream:
            process_sample(imu_msg.get_data(IMU)) # Some custom process function

        # Once done, close the reading channel (recommended)
        top_handler.close()
Important

Every call to get_data_streamer() will automatically invoke close() on any previously spawned TopicDataStreamer instance and its associated Apache Arrow Flight channel before initializing the new stream.

Example:

top_handler = client.topic_handler("mission_alpha", "/front/imu")

# Opens first stream
streamer_v1 = top_handler.get_data_streamer(start_timestamp_ns=T1)

# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = top_handler.get_data_streamer(start_timestamp_ns=T2)

# Using `streamer_v1` will raise a ValueError
for msg in streamer_v1 # raises here!
    pass

close

close()

Terminates the active data streamer associated with this topic and releases allocated system resources.

In the Mosaico architecture, a TopicHandler acts as a factory for TopicDataStreamers. Calling close() ensures that any background data fetching, buffering, or network sockets held by an active streamer are immediately shut down.

Note
  • If no streamer has been spawned (via get_data_streamer), this method performs no operation and returns safely.
  • Explicitly closing handlers is a best practice when iterating through large datasets to prevent resource accumulation.
Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Access a specific sensor topic (e.g., IMU)
    top_handler = client.topic_handler("mission_alpha", "/front/imu")

    if top_handler:
        # Initialize a high-performance data stream
        imu_stream = top_handler.get_data_streamer(
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Consume data for ML training or analysis
        # for msg in imu_stream: ...

        # Release the streaming channel and backend resources
        top_handler.close()

mosaicolabs.handlers.SequenceDataStreamer

SequenceDataStreamer(
    *, sequence_name, client, topic_readers
)

A unified, time-ordered iterator for reading multi-topic sequences.

The SequenceDataStreamer performs a K-Way Merge across multiple topic streams to provide a single, coherent chronological view of an entire sequence. This is essential when topics have different recording rates or asynchronous sampling times.

Key Capabilities
  • Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
  • Peek-Ahead: Provides the next_timestamp() method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting algorithm.
The Merge Algorithm

This class manages multiple internal TopicDataStreamer instances. On every iteration, it:

  1. Peeks at the next available timestamp from every active topic stream.
  2. Selects the topic currently holding the lowest absolute timestamp.
  3. Yields that specific record and advances only the "winning" topic stream.
Obtaining a Streamer

Do not instantiate this class directly. Use the SequenceHandler.get_data_streamer() method to obtain a configured instance.

Internal constructor for SequenceDataStreamer.

Do not call this directly. Internal library modules should use the SequenceDataStreamer._connect() factory.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Start timed data-stream
        for topic, msg in streamer:
            # Do some processing...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence being streamed.

required
client FlightClient

The active FlightClient for remote operations.

required
topic_readers Dict[str, TopicDataStreamer]

A dictionary mapping topic names to their respective TopicDataStreamer instances.

required

next_timestamp

next_timestamp()

Peeks at the timestamp of the next chronological measurement without consuming the record.

Returns:

Type Description
Optional[int]

The minimum timestamp (nanoseconds) found across all active topics, or None if all streams are exhausted.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Do some processing...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

close

close()

Gracefully terminates all underlying topic streams and releases allocated resources.

This method iterates through all active TopicDataStreamer instances, ensuring that each remote connection is closed and local memory buffers are cleared.

Automatic Cleanup

In standard workflows, you do not need to call this manually. This function is automatically invoked by the SequenceHandler.close() method, which in turn is triggered by the __exit__ logic of the parent SequenceHandler when used within a with context.

mosaicolabs.handlers.TopicDataStreamer

TopicDataStreamer(*, client, state)

An iterator that streams ontology records from a single topic.

The TopicDataStreamer wraps a PyArrow Flight DoGet stream to fetch RecordBatches from the server and reconstruct individual Message objects. It is designed for efficient row-by-row iteration while providing peek-ahead capabilities for time-synchronized merging.

Key Capabilities
  • Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
  • Peek-Ahead: Provides the next_timestamp() method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting performed by the SequenceDataStreamer.
Obtaining a Streamer

Users should typically not instantiate this class directly. The recommended way to obtain a streamer is via the TopicHandler.get_data_streamer() method.

Internal constructor for TopicDataStreamer.

Do not call this directly. Internal library modules should use the TopicDataStreamer._connect() or TopicDataStreamer._connect_from_ticket() factory methods instead.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Direct, low-overhead loop
        for imu_msg in imu_stream:
            # Do some processing...

        # Once done, close the reading channel (recommended)
        top_handler.close()

Parameters:

Name Type Description Default
client FlightClient

The active FlightClient used for remote operations.

required
state _TopicReadState

The internal state object managing the Arrow reader and peek buffers.

required

ontology_tag property

ontology_tag

The ontology tag associated with this streamer.

Returns:

Type Description
str

The ontology tag.

name

name()

The name of the topic associated with this streamer.

Returns:

Type Description
str

The name of the topic.

next_timestamp

next_timestamp()

Peeks at the timestamp of the next record without consuming it.

Returns:

Type Description
Optional[int]

The next timestamp in nanoseconds, or None if the stream is empty.

Raises:

Type Description
ValueError

if the data streamer instance has been closed.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Do some processing...

        # Once done, close the reading channel (recommended)
        top_handler.close()

close

close()

Gracefully terminates the underlying Apache Arrow Flight stream and releases buffers.

Automatic Lifecycle Management

In most production workflows, manual invocation is not required. This method is automatically called by the parent TopicHandler.close(). If the handler is managed within a with context, the SDK ensures a top-down cleanup of the handler and its associated streamers upon exit.

Example
# Manual resource management (if not using 'with' block)
streamer = topic_handler.get_data_streamer()
try:
    for meas in streamer:
        process_robot_data(meas)
finally:
    streamer.close()