Skip to content

Data Retrieval

mosaicolabs.handlers.system_info.SystemInfo dataclass

SystemInfo(
    total_size_bytes,
    created_datetime,
    is_locked,
    chunks_number=None,
)

Metadata and structural information for a Mosaico Sequence or Topic resource.

This Data Transfer Object summarizes the physical and logical state of a sequence or topic on the server, typically retrieved via a system-info action.

Attributes:

Name Type Description
total_size_bytes int

The aggregate size of all data chunks in bytes.

created_datetime datetime

The UTC timestamp of when the resource was first initialized.

is_locked bool

Indicates if the resource is currently read-only. Usually true if an upload is finalized or a retention policy is active.

chunks_number Optional[int]

The total count of data partitions (chunks) stored on the server. Defaults to None if not applicable.

mosaicolabs.handlers.SequenceHandler

SequenceHandler(
    *,
    sequence_model,
    client,
    timestamp_ns_min,
    timestamp_ns_max,
)

Represents a client-side handle for an existing Sequence on the Mosaico platform.

The SequenceHandler acts as a primary container for inspecting sequence-level metadata, listing available topics, and accessing data reading interfaces like the SequenceDataStreamer.

Obtaining a Handler

Users should not instantiate this class directly. The recommended way to obtain a handler is via the MosaicoClient.sequence_handler() factory method.

Internal constructor for SequenceHandler.

Do not call this directly. Users should retrieve instances via MosaicoClient.sequence_handler(), while internal modules should use the SequenceHandler._connect() factory.

Parameters:

Name Type Description Default
sequence_model Sequence

The underlying metadata and system info model for the sequence.

required
client FlightClient

The active FlightClient for remote operations.

required
timestamp_ns_min Optional[int]

The lowest timestamp (in ns) available in this sequence.

required
timestamp_ns_max Optional[int]

The highest timestamp (in ns) available in this sequence.

required

name property

name

The unique name of the sequence.

Returns:

Type Description
str

The unique name of the sequence.

topics property

topics

The list of topic names (data channels) available within this sequence.

Returns:

Type Description
List[str]

The list of topic names (data channels) available within this sequence.

user_metadata property

user_metadata

The user-defined metadata dictionary associated with this sequence.

Returns:

Type Description
Dict[str, Any]

The user-defined metadata dictionary associated with this sequence.

created_datetime property

created_datetime

The UTC timestamp indicating when the entity was created on the server.

Returns:

Type Description
datetime

The UTC timestamp indicating when the entity was created on the server.

is_locked property

is_locked

Indicates if the resource is currently locked.

A locked state typically occurs during active writing or maintenance operations, preventing deletion or structural modifications.

Returns:

Type Description
bool

The lock status of the sequence.

total_size_bytes property

total_size_bytes

The total physical storage footprint of the entity on the server in bytes.

Returns:

Type Description
int

The total physical storage footprint of the entity on the server in bytes.

timestamp_ns_min property

timestamp_ns_min

The lowest timestamp (nanoseconds) recorded in the sequence across all topics.

Returns:

Type Description
Optional[int]

The lowest timestamp (nanoseconds) recorded in the sequence across all topics, or None if the sequence contains no data or the timestamps could not be derived.

timestamp_ns_max property

timestamp_ns_max

The highest timestamp (nanoseconds) recorded in the sequence across all topics.

Returns:

Type Description
Optional[int]

The highest timestamp (nanoseconds) recorded in the sequence across all topics, or None if the sequence contains no data or the timestamps could not be derived.

get_data_streamer

get_data_streamer(
    topics=[],
    start_timestamp_ns=None,
    end_timestamp_ns=None,
)

Opens a reading channel for iterating over the sequence data.

The returned SequenceDataStreamer performs a K-way merge sort to provide a single, time-synchronized chronological stream of messages from multiple topics.

Parameters:

Name Type Description Default
topics List[str]

A subset of topic names to stream. If empty, all topics in the sequence are streamed.

[]
start_timestamp_ns Optional[int]

The inclusive lower bound (t >= start) for the time window in nanoseconds. The stream starts at the first message with a timestamp greater than or equal to this value.

None
end_timestamp_ns Optional[int]

The exclusive upper bound (t < end) for the time window in nanoseconds. The stream stops at the first message with a timestamp strictly less than this value.

None

Returns:

Type Description
SequenceDataStreamer

A SequenceDataStreamer iterator yielding (topic_name, message) tuples.

Raises:

Type Description
ValueError

If the provided topic names do not exist or if the sequence contains no data.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Start timed data-stream
        for topic, msg in streamer:
            print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}")

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()
Important

Every call to get_data_streamer() will automatically invoke close() on any previously spawned SequenceDataStreamer instance and its associated Apache Arrow Flight channels before initializing the new stream.

Example:

seq_handler = client.sequence_handler("mission_alpha")

# Opens first stream
streamer_v1 = seq_handler.get_data_streamer(start_timestamp_ns=T1)

# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = seq_handler.get_data_streamer(start_timestamp_ns=T2)

# Using `streamer_v1` will raise a ValueError
for topic, msg in streamer_v1 # raises here!
    pass

get_topic_handler

get_topic_handler(topic_name, force_new_instance=False)

Get a specific TopicHandler for a child topic.

Parameters:

Name Type Description Default
topic_name str

The relative name of the topic (e.g., "/camera/front").

required
force_new_instance bool

If True, bypasses the internal cache and recreates the handler.

False

Returns:

Type Description
TopicHandler

A TopicHandler dedicated to the specified topic.

Raises:

Type Description
ValueError

If the topic is not available in this sequence or an internal connection error occurs.

Example
import sys
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Use a Handler to inspect the catalog
        top_handler = seq_handler.get_topic_handler("/front/imu")
        if top_handler:
            print(f"Sequence: {top_handler.sequence_name}")
            print(f"        |Topic: {top_handler.sequence_name}:{top_handler.name}")
            print(f"        |User metadata: {top_handler.user_metadata}")
            print(f"        |Timestamp span: {top_handler.timestamp_ns_min} - {top_handler.timestamp_ns_max}")
            print(f"        |Created {top_handler.created_datetime}")
            print(f"        |Size (MB) {top_handler.total_size_bytes/(1024*1024)}")

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

close

close()

Gracefully closes all cached topic handlers and active data streamers.

This method should be called to release network and memory resources when the handler is no longer needed.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Perform operations
        # ...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

mosaicolabs.handlers.TopicHandler

TopicHandler(
    *,
    client,
    topic_model,
    ticket,
    timestamp_ns_min,
    timestamp_ns_max,
)

Represents an existing topic on the Mosaico platform.

The TopicHandler provides a client-side interface for interacting with an individual data stream (topic). It allows users to inspect static metadata and system diagnostics (via the Topic model), and access the raw message stream through a dedicated TopicDataStreamer.

Obtaining a Handler

Direct instantiation of this class is discouraged. Use the MosaicoClient.topic_handler() factory method to retrieve an initialized handler.

Internal constructor for TopicHandler.

Do not call this directly. Users should retrieve instances via MosaicoClient.topic_handler(), or by using the get_topic_handler() method from the SequenceHandler instance of the parent senquence. Internal modules should use the TopicHandler._connect() factory.

Parameters:

Name Type Description Default
client FlightClient

The active FlightClient for remote operations.

required
topic_model Topic

The underlying metadata and system info model for the topic.

required
ticket Ticket

The remote resource ticket used for data retrieval.

required
timestamp_ns_min Optional[int]

The lowest timestamp (in ns) available in this topic.

required
timestamp_ns_max Optional[int]

The highest timestamp (in ns) available in this topic.

required

name property

name

The relative name of the topic (e.g., "/front_cam/image_raw").

Returns:

Type Description
str

The relative name of the topic.

sequence_name property

sequence_name

The name of the parent sequence containing this topic.

Returns:

Type Description
str

The name of the parent sequence.

user_metadata property

user_metadata

The user-defined metadata dictionary associated with this topic.

Returns:

Type Description
Dict[str, Any]

The user-defined metadata dictionary.

created_datetime property

created_datetime

The UTC timestamp indicating when the entity was created on the server.

Returns:

Type Description
datetime

The UTC timestamp indicating when the entity was created on the server.

is_locked property

is_locked

Indicates if the resource is currently locked.

A locked state typically occurs during active writing or maintenance operations, preventing deletion or structural modifications.

Returns:

Type Description
bool

True if the resource is currently locked, False otherwise.

chunks_number property

chunks_number

The number of physical data chunks stored for this topic.

Returns:

Type Description
Optional[int]

The number of physical data chunks stored for this topic, or None if the server did not provide detailed storage statistics.

ontology_tag property

ontology_tag

The ontology type identifier (e.g., 'imu', 'gnss').

This corresponds to the __ontology_tag__ defined in the Serializable class registry.

Returns:

Type Description
str

The ontology type identifier.

serialization_format property

serialization_format

The format used to serialize the topic data (e.g., 'arrow', 'image').

This corresponds to the SerializationFormat enum.

Returns:

Type Description
str

The serialization format.

total_size_bytes property

total_size_bytes

The total physical storage footprint of the entity on the server in bytes.

Returns:

Type Description
int

The total physical storage footprint of the entity on the server in bytes.

timestamp_ns_min property

timestamp_ns_min

The lowest timestamp (nanoseconds) recorded in this topic.

Returns:

Type Description
Optional[int]

The lowest timestamp (nanoseconds) recorded in this topic, or None if the topic is empty or timestamps are unavailable.

timestamp_ns_max property

timestamp_ns_max

The highest timestamp (nanoseconds) recorded in this topic.

Returns:

Type Description
Optional[int]

The highest timestamp (nanoseconds) recorded in this topic, or None if the topic is empty or timestamps are unavailable.

get_data_streamer

get_data_streamer(
    start_timestamp_ns=None, end_timestamp_ns=None
)

Opens a high-performance reading channel for iterating over this topic's data.

Stream Lifecycle Policy: Single-Active-Streamer

To optimize resource utilization and prevent backend socket exhaustion, this handler maintains at most one active stream at a time.

Parameters:

Name Type Description Default
start_timestamp_ns Optional[int]

The inclusive lower bound (t >= start) in nanoseconds. The stream begins at the first message with a timestamp >= this value.

None
end_timestamp_ns Optional[int]

The exclusive upper bound (t < end) in nanoseconds. The stream terminates before reaching any message with a timestamp >= this value.

None

Returns:

Name Type Description
TopicDataStreamer TopicDataStreamer

A chronological iterator for the requested data window.

Raises:

Type Description
ValueError

If the topic contains no data or the handler is in an invalid state.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Direct, low-overhead loop
        for imu_msg in imu_stream:
            process_sample(imu_msg.get_data(IMU)) # Some custom process function

        # Once done, close the reading channel (recommended)
        top_handler.close()
Important

Every call to get_data_streamer() will automatically invoke close() on any previously spawned TopicDataStreamer instance and its associated Apache Arrow Flight channel before initializing the new stream.

Example:

top_handler = client.topic_handler("mission_alpha", "/front/imu")

# Opens first stream
streamer_v1 = top_handler.get_data_streamer(start_timestamp_ns=T1)

# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = top_handler.get_data_streamer(start_timestamp_ns=T2)

# Using `streamer_v1` will raise a ValueError
for msg in streamer_v1 # raises here!
    pass

close

close()

Terminates the active data streamer associated with this topic and releases allocated system resources.

In the Mosaico architecture, a TopicHandler acts as a factory for TopicDataStreamers. Calling close() ensures that any background data fetching, buffering, or network sockets held by an active streamer are immediately shut down.

Note
  • If no streamer has been spawned (via get_data_streamer), this method performs no operation and returns safely.
  • Explicitly closing handlers is a best practice when iterating through large datasets to prevent resource accumulation.
Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Access a specific sensor topic (e.g., IMU)
    top_handler = client.topic_handler("mission_alpha", "/front/imu")

    if top_handler:
        # Initialize a high-performance data stream
        imu_stream = top_handler.get_data_streamer(
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Consume data for ML training or analysis
        # for msg in imu_stream: ...

        # Release the streaming channel and backend resources
        top_handler.close()

mosaicolabs.handlers.SequenceDataStreamer

SequenceDataStreamer(
    *, sequence_name, client, topic_readers
)

A unified, time-ordered iterator for reading multi-topic sequences.

The SequenceDataStreamer performs a K-Way Merge across multiple topic streams to provide a single, coherent chronological view of an entire sequence. This is essential when topics have different recording rates or asynchronous sampling times.

Key Capabilities
  • Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
  • Peek-Ahead: Provides the next_timestamp() method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting algorithm.
The Merge Algorithm

This class manages multiple internal TopicDataStreamer instances. On every iteration, it:

  1. Peeks at the next available timestamp from every active topic stream.
  2. Selects the topic currently holding the lowest absolute timestamp.
  3. Yields that specific record and advances only the "winning" topic stream.
Obtaining a Streamer

Do not instantiate this class directly. Use the SequenceHandler.get_data_streamer() method to obtain a configured instance.

Internal constructor for SequenceDataStreamer.

Do not call this directly. Internal library modules should use the SequenceDataStreamer._connect() factory.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Start timed data-stream
        for topic, msg in streamer:
            # Do some processing...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence being streamed.

required
client FlightClient

The active FlightClient for remote operations.

required
topic_readers Dict[str, TopicDataStreamer]

A dictionary mapping topic names to their respective TopicDataStreamer instances.

required

next_timestamp

next_timestamp()

Peeks at the timestamp of the next chronological measurement without consuming the record.

Returns:

Type Description
Optional[int]

The minimum timestamp (nanoseconds) found across all active topics, or None if all streams are exhausted.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    # Use a Handler to inspect the catalog
    seq_handler = client.sequence_handler("mission_alpha")
    if seq_handler:
        # Start a Unified Stream (K-Way Merge) for multi-sensor replay
        streamer = seq_handler.get_data_streamer(
            topics=["/gps", "/imu"], # Optionally filter topics
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Do some processing...

        # Once done, close the resources, topic handler and related reading channels (recommended).
        seq_handler.close()

close

close()

Gracefully terminates all underlying topic streams and releases allocated resources.

This method iterates through all active TopicDataStreamer instances, ensuring that each remote connection is closed and local memory buffers are cleared.

Automatic Cleanup

In standard workflows, you do not need to call this manually. This function is automatically invoked by the SequenceHandler.close() method, which in turn is triggered by the __exit__ logic of the parent SequenceHandler when used within a with context.

mosaicolabs.handlers.TopicDataStreamer

TopicDataStreamer(*, client, state)

An iterator that streams ontology records from a single topic.

The TopicDataStreamer wraps a PyArrow Flight DoGet stream to fetch RecordBatches from the server and reconstruct individual Message objects. It is designed for efficient row-by-row iteration while providing peek-ahead capabilities for time-synchronized merging.

Key Capabilities
  • Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
  • Peek-Ahead: Provides the next_timestamp() method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting performed by the SequenceDataStreamer.
Obtaining a Streamer

Users should typically not instantiate this class directly. The recommended way to obtain a streamer is via the TopicHandler.get_data_streamer() method.

Internal constructor for TopicDataStreamer.

Do not call this directly. Internal library modules should use the TopicDataStreamer._connect() or TopicDataStreamer._connect_from_ticket() factory methods instead.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Direct, low-overhead loop
        for imu_msg in imu_stream:
            # Do some processing...

        # Once done, close the reading channel (recommended)
        top_handler.close()

Parameters:

Name Type Description Default
client FlightClient

The active FlightClient used for remote operations.

required
state _TopicReadState

The internal state object managing the Arrow reader and peek buffers.

required

ontology_tag property

ontology_tag

The ontology tag associated with this streamer.

Returns:

Type Description
str

The ontology tag.

name

name()

The name of the topic associated with this streamer.

Returns:

Type Description
str

The name of the topic.

next_timestamp

next_timestamp()

Peeks at the timestamp of the next record without consuming it.

Returns:

Type Description
Optional[int]

The next timestamp in nanoseconds, or None if the stream is empty.

Raises:

Type Description
ValueError

if the data streamer instance has been closed.

Example
from mosaicolabs import MosaicoClient, IMU

with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve the topic handler using (e.g.) MosaicoClient
    top_handler = client.topic_handler("mission_alpha", "/front/imu")
    if top_handler:
        imu_stream = top_handler.get_data_streamer(
            # Optionally set the time window to extract
            start_timestamp_ns=1738508778000000000,
            end_timestamp_ns=1738509618000000000
        )

        # Peek at the start time (without consuming data)
        print(f"Recording starts at: {streamer.next_timestamp()}")

        # Do some processing...

        # Once done, close the reading channel (recommended)
        top_handler.close()

close

close()

Gracefully terminates the underlying Apache Arrow Flight stream and releases buffers.

Automatic Lifecycle Management

In most production workflows, manual invocation is not required. This method is automatically called by the parent TopicHandler.close(). If the handler is managed within a with context, the SDK ensures a top-down cleanup of the handler and its associated streamers upon exit.

Example
# Manual resource management (if not using 'with' block)
streamer = topic_handler.get_data_streamer()
try:
    for meas in streamer:
        process_robot_data(meas)
finally:
    streamer.close()