Data Retrieval¶
mosaicolabs.handlers.system_info.SystemInfo
dataclass
¶
Metadata and structural information for a Mosaico
Sequence or
Topic resource.
This Data Transfer Object summarizes the physical and logical state of a sequence or topic on the server, typically retrieved via a system-info action.
Attributes:
| Name | Type | Description |
|---|---|---|
total_size_bytes |
int
|
The aggregate size of all data chunks in bytes. |
created_datetime |
datetime
|
The UTC timestamp of when the resource was first initialized. |
is_locked |
bool
|
Indicates if the resource is currently read-only. Usually true if an upload is finalized or a retention policy is active. |
chunks_number |
Optional[int]
|
The total count of data partitions (chunks) stored on the server. Defaults to None if not applicable. |
mosaicolabs.handlers.SequenceHandler ¶
Represents a client-side handle for an existing Sequence on the Mosaico platform.
The SequenceHandler acts as a primary container for inspecting sequence-level metadata,
listing available topics, and accessing data reading interfaces like the
SequenceDataStreamer.
Obtaining a Handler
Users should not instantiate this class directly. The recommended way to
obtain a handler is via the MosaicoClient.sequence_handler()
factory method.
Internal constructor for SequenceHandler.
Do not call this directly. Users should retrieve instances via
MosaicoClient.sequence_handler(),
while internal modules should use the SequenceHandler._connect() factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_model
|
Sequence
|
The underlying metadata and system info model for the sequence. |
required |
client
|
FlightClient
|
The active FlightClient for remote operations. |
required |
timestamp_ns_min
|
Optional[int]
|
The lowest timestamp (in ns) available in this sequence. |
required |
timestamp_ns_max
|
Optional[int]
|
The highest timestamp (in ns) available in this sequence. |
required |
name
property
¶
The unique name of the sequence.
Returns:
| Type | Description |
|---|---|
str
|
The unique name of the sequence. |
topics
property
¶
The list of topic names (data channels) available within this sequence.
Returns:
| Type | Description |
|---|---|
List[str]
|
The list of topic names (data channels) available within this sequence. |
user_metadata
property
¶
The user-defined metadata dictionary associated with this sequence.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The user-defined metadata dictionary associated with this sequence. |
created_datetime
property
¶
The UTC timestamp indicating when the entity was created on the server.
Returns:
| Type | Description |
|---|---|
datetime
|
The UTC timestamp indicating when the entity was created on the server. |
is_locked
property
¶
Indicates if the resource is currently locked.
A locked state typically occurs during active writing or maintenance operations, preventing deletion or structural modifications.
Returns:
| Type | Description |
|---|---|
bool
|
The lock status of the sequence. |
total_size_bytes
property
¶
The total physical storage footprint of the entity on the server in bytes.
Returns:
| Type | Description |
|---|---|
int
|
The total physical storage footprint of the entity on the server in bytes. |
timestamp_ns_min
property
¶
The lowest timestamp (nanoseconds) recorded in the sequence across all topics.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The lowest timestamp (nanoseconds) recorded in the sequence across all topics, or |
timestamp_ns_max
property
¶
The highest timestamp (nanoseconds) recorded in the sequence across all topics.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The highest timestamp (nanoseconds) recorded in the sequence across all topics, or |
get_data_streamer ¶
Opens a reading channel for iterating over the sequence data.
The returned SequenceDataStreamer performs a K-way merge sort to provide
a single, time-synchronized chronological stream of messages from
multiple topics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
A subset of topic names to stream. If empty, all topics in the sequence are streamed. |
[]
|
start_timestamp_ns
|
Optional[int]
|
The inclusive lower bound (t >= start) for the time window in nanoseconds. The stream starts at the first message with a timestamp greater than or equal to this value. |
None
|
end_timestamp_ns
|
Optional[int]
|
The exclusive upper bound (t < end) for the time window in nanoseconds. The stream stops at the first message with a timestamp strictly less than this value. |
None
|
Returns:
| Type | Description |
|---|---|
SequenceDataStreamer
|
A |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the provided topic names do not exist or if the sequence contains no data. |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
# Use a Handler to inspect the catalog
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
# Start a Unified Stream (K-Way Merge) for multi-sensor replay
streamer = seq_handler.get_data_streamer(
topics=["/gps", "/imu"], # Optionally filter topics
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Peek at the start time (without consuming data)
print(f"Recording starts at: {streamer.next_timestamp()}")
# Start timed data-stream
for topic, msg in streamer:
print(f"[{topic}] at {msg.timestamp_ns}: {type(msg.data).__name__}")
# Once done, close the resources, topic handler and related reading channels (recommended).
seq_handler.close()
Important
Every call to get_data_streamer() will automatically invoke
close() on any previously spawned SequenceDataStreamer instance and its associated
Apache Arrow Flight channels before initializing the new stream.
Example:
seq_handler = client.sequence_handler("mission_alpha")
# Opens first stream
streamer_v1 = seq_handler.get_data_streamer(start_timestamp_ns=T1)
# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = seq_handler.get_data_streamer(start_timestamp_ns=T2)
# Using `streamer_v1` will raise a ValueError
for topic, msg in streamer_v1 # raises here!
pass
get_topic_handler ¶
Get a specific TopicHandler for a child topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The relative name of the topic (e.g., "/camera/front"). |
required |
force_new_instance
|
bool
|
If |
False
|
Returns:
| Type | Description |
|---|---|
TopicHandler
|
A |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the topic is not available in this sequence or an internal connection error occurs. |
Example
import sys
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
# Use a Handler to inspect the catalog
top_handler = seq_handler.get_topic_handler("/front/imu")
if top_handler:
print(f"Sequence: {top_handler.sequence_name}")
print(f" |Topic: {top_handler.sequence_name}:{top_handler.name}")
print(f" |User metadata: {top_handler.user_metadata}")
print(f" |Timestamp span: {top_handler.timestamp_ns_min} - {top_handler.timestamp_ns_max}")
print(f" |Created {top_handler.created_datetime}")
print(f" |Size (MB) {top_handler.total_size_bytes/(1024*1024)}")
# Once done, close the resources, topic handler and related reading channels (recommended).
seq_handler.close()
close ¶
Gracefully closes all cached topic handlers and active data streamers.
This method should be called to release network and memory resources when the handler is no longer needed.
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
# Use a Handler to inspect the catalog
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
# Perform operations
# ...
# Once done, close the resources, topic handler and related reading channels (recommended).
seq_handler.close()
mosaicolabs.handlers.TopicHandler ¶
Represents an existing topic on the Mosaico platform.
The TopicHandler provides a client-side interface for interacting with an individual
data stream (topic). It allows users to inspect static metadata and system diagnostics (via the Topic model),
and access the raw message stream through a dedicated TopicDataStreamer.
Obtaining a Handler
Direct instantiation of this class is discouraged. Use the
MosaicoClient.topic_handler()
factory method to retrieve an initialized handler.
Internal constructor for TopicHandler.
Do not call this directly. Users should retrieve instances via
MosaicoClient.topic_handler(),
or by using the get_topic_handler() method from the
SequenceHandler instance of the parent senquence.
Internal modules should use the TopicHandler._connect() factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
FlightClient
|
The active FlightClient for remote operations. |
required |
topic_model
|
Topic
|
The underlying metadata and system info model for the topic. |
required |
ticket
|
Ticket
|
The remote resource ticket used for data retrieval. |
required |
timestamp_ns_min
|
Optional[int]
|
The lowest timestamp (in ns) available in this topic. |
required |
timestamp_ns_max
|
Optional[int]
|
The highest timestamp (in ns) available in this topic. |
required |
name
property
¶
The relative name of the topic (e.g., "/front_cam/image_raw").
Returns:
| Type | Description |
|---|---|
str
|
The relative name of the topic. |
sequence_name
property
¶
The name of the parent sequence containing this topic.
Returns:
| Type | Description |
|---|---|
str
|
The name of the parent sequence. |
user_metadata
property
¶
The user-defined metadata dictionary associated with this topic.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The user-defined metadata dictionary. |
created_datetime
property
¶
The UTC timestamp indicating when the entity was created on the server.
Returns:
| Type | Description |
|---|---|
datetime
|
The UTC timestamp indicating when the entity was created on the server. |
is_locked
property
¶
Indicates if the resource is currently locked.
A locked state typically occurs during active writing or maintenance operations, preventing deletion or structural modifications.
Returns:
| Type | Description |
|---|---|
bool
|
True if the resource is currently locked, False otherwise. |
chunks_number
property
¶
The number of physical data chunks stored for this topic.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The number of physical data chunks stored for this topic, or |
ontology_tag
property
¶
The ontology type identifier (e.g., 'imu', 'gnss').
This corresponds to the __ontology_tag__ defined in the
Serializable class registry.
Returns:
| Type | Description |
|---|---|
str
|
The ontology type identifier. |
serialization_format
property
¶
The format used to serialize the topic data (e.g., 'arrow', 'image').
This corresponds to the SerializationFormat enum.
Returns:
| Type | Description |
|---|---|
str
|
The serialization format. |
total_size_bytes
property
¶
The total physical storage footprint of the entity on the server in bytes.
Returns:
| Type | Description |
|---|---|
int
|
The total physical storage footprint of the entity on the server in bytes. |
timestamp_ns_min
property
¶
The lowest timestamp (nanoseconds) recorded in this topic.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The lowest timestamp (nanoseconds) recorded in this topic, or |
timestamp_ns_max
property
¶
The highest timestamp (nanoseconds) recorded in this topic.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The highest timestamp (nanoseconds) recorded in this topic, or |
get_data_streamer ¶
Opens a high-performance reading channel for iterating over this topic's data.
Stream Lifecycle Policy: Single-Active-Streamer¶
To optimize resource utilization and prevent backend socket exhaustion, this handler maintains at most one active stream at a time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_timestamp_ns
|
Optional[int]
|
The inclusive lower bound (t >= start) in nanoseconds. The stream begins at the first message with a timestamp >= this value. |
None
|
end_timestamp_ns
|
Optional[int]
|
The exclusive upper bound (t < end) in nanoseconds. The stream terminates before reaching any message with a timestamp >= this value. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TopicDataStreamer |
TopicDataStreamer
|
A chronological iterator for the requested data window. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the topic contains no data or the handler is in an invalid state. |
Example
from mosaicolabs import MosaicoClient, IMU
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve the topic handler using (e.g.) MosaicoClient
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
imu_stream = top_handler.get_data_streamer(
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Peek at the start time (without consuming data)
print(f"Recording starts at: {streamer.next_timestamp()}")
# Direct, low-overhead loop
for imu_msg in imu_stream:
process_sample(imu_msg.get_data(IMU)) # Some custom process function
# Once done, close the reading channel (recommended)
top_handler.close()
Important
Every call to get_data_streamer() will automatically invoke
close() on any previously spawned TopicDataStreamer instance and its associated
Apache Arrow Flight channel before initializing the new stream.
Example:
top_handler = client.topic_handler("mission_alpha", "/front/imu")
# Opens first stream
streamer_v1 = top_handler.get_data_streamer(start_timestamp_ns=T1)
# Calling this again automatically CLOSES streamer_v1 and opens a new channel
streamer_v2 = top_handler.get_data_streamer(start_timestamp_ns=T2)
# Using `streamer_v1` will raise a ValueError
for msg in streamer_v1 # raises here!
pass
close ¶
Terminates the active data streamer associated with this topic and releases allocated system resources.
In the Mosaico architecture, a TopicHandler acts as a factory for
TopicDataStreamers. Calling close() ensures that any background data
fetching, buffering, or network sockets held by an active streamer are
immediately shut down.
Note
- If no streamer has been spawned (via
get_data_streamer), this method performs no operation and returns safely. - Explicitly closing handlers is a best practice when iterating through large datasets to prevent resource accumulation.
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
# Access a specific sensor topic (e.g., IMU)
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
# Initialize a high-performance data stream
imu_stream = top_handler.get_data_streamer(
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Consume data for ML training or analysis
# for msg in imu_stream: ...
# Release the streaming channel and backend resources
top_handler.close()
mosaicolabs.handlers.SequenceDataStreamer ¶
A unified, time-ordered iterator for reading multi-topic sequences.
The SequenceDataStreamer performs a K-Way Merge across multiple topic streams to
provide a single, coherent chronological view of an entire sequence.
This is essential when topics have different recording rates or asynchronous sampling
times.
Key Capabilities¶
- Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
- Peek-Ahead: Provides the
next_timestamp()method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting algorithm.
The Merge Algorithm¶
This class manages multiple internal TopicDataStreamer
instances. On every iteration, it:
- Peeks at the next available timestamp from every active topic stream.
- Selects the topic currently holding the lowest absolute timestamp.
- Yields that specific record and advances only the "winning" topic stream.
Obtaining a Streamer
Do not instantiate this class directly. Use the
SequenceHandler.get_data_streamer()
method to obtain a configured instance.
Internal constructor for SequenceDataStreamer.
Do not call this directly. Internal library modules should use the
SequenceDataStreamer._connect() factory.
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
# Use a Handler to inspect the catalog
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
# Start a Unified Stream (K-Way Merge) for multi-sensor replay
streamer = seq_handler.get_data_streamer(
topics=["/gps", "/imu"], # Optionally filter topics
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Start timed data-stream
for topic, msg in streamer:
# Do some processing...
# Once done, close the resources, topic handler and related reading channels (recommended).
seq_handler.close()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence being streamed. |
required |
client
|
FlightClient
|
The active FlightClient for remote operations. |
required |
topic_readers
|
Dict[str, TopicDataStreamer]
|
A dictionary mapping topic names to their respective
|
required |
next_timestamp ¶
Peeks at the timestamp of the next chronological measurement without consuming the record.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The minimum timestamp (nanoseconds) found across all active topics,
or |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
# Use a Handler to inspect the catalog
seq_handler = client.sequence_handler("mission_alpha")
if seq_handler:
# Start a Unified Stream (K-Way Merge) for multi-sensor replay
streamer = seq_handler.get_data_streamer(
topics=["/gps", "/imu"], # Optionally filter topics
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Peek at the start time (without consuming data)
print(f"Recording starts at: {streamer.next_timestamp()}")
# Do some processing...
# Once done, close the resources, topic handler and related reading channels (recommended).
seq_handler.close()
close ¶
Gracefully terminates all underlying topic streams and releases allocated resources.
This method iterates through all active TopicDataStreamer
instances, ensuring that each remote connection is closed and local memory buffers
are cleared.
Automatic Cleanup
In standard workflows, you do not need to call this manually. This function is
automatically invoked by the SequenceHandler.close()
method, which in turn is triggered by the __exit__ logic of the parent
SequenceHandler when used within a
with context.
mosaicolabs.handlers.TopicDataStreamer ¶
An iterator that streams ontology records from a single topic.
The TopicDataStreamer wraps a PyArrow Flight DoGet stream to fetch RecordBatches
from the server and reconstruct individual Message objects.
It is designed for efficient row-by-row iteration while providing peek-ahead
capabilities for time-synchronized merging.
Key Capabilities¶
- Temporal Slicing: Supports server-side filtering to stream data within specific time windows (t >= start and t < end).
- Peek-Ahead: Provides the
next_timestamp()method, allowing the system to inspect chronological order without consuming the record—a core requirement for the K-way merge sorting performed by theSequenceDataStreamer.
Obtaining a Streamer
Users should typically not instantiate this class directly.
The recommended way to obtain a streamer is via the
TopicHandler.get_data_streamer()
method.
Internal constructor for TopicDataStreamer.
Do not call this directly. Internal library modules should use the
TopicDataStreamer._connect() or TopicDataStreamer._connect_from_ticket() factory methods instead.
Example
from mosaicolabs import MosaicoClient, IMU
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve the topic handler using (e.g.) MosaicoClient
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
imu_stream = top_handler.get_data_streamer(
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Peek at the start time (without consuming data)
print(f"Recording starts at: {streamer.next_timestamp()}")
# Direct, low-overhead loop
for imu_msg in imu_stream:
# Do some processing...
# Once done, close the reading channel (recommended)
top_handler.close()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
FlightClient
|
The active FlightClient used for remote operations. |
required |
state
|
_TopicReadState
|
The internal state object managing the Arrow reader and peek buffers. |
required |
ontology_tag
property
¶
The ontology tag associated with this streamer.
Returns:
| Type | Description |
|---|---|
str
|
The ontology tag. |
name ¶
The name of the topic associated with this streamer.
Returns:
| Type | Description |
|---|---|
str
|
The name of the topic. |
next_timestamp ¶
Peeks at the timestamp of the next record without consuming it.
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The next timestamp in nanoseconds, or |
Raises:
| Type | Description |
|---|---|
ValueError
|
if the data streamer instance has been closed. |
Example
from mosaicolabs import MosaicoClient, IMU
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve the topic handler using (e.g.) MosaicoClient
top_handler = client.topic_handler("mission_alpha", "/front/imu")
if top_handler:
imu_stream = top_handler.get_data_streamer(
# Optionally set the time window to extract
start_timestamp_ns=1738508778000000000,
end_timestamp_ns=1738509618000000000
)
# Peek at the start time (without consuming data)
print(f"Recording starts at: {streamer.next_timestamp()}")
# Do some processing...
# Once done, close the reading channel (recommended)
top_handler.close()
close ¶
Gracefully terminates the underlying Apache Arrow Flight stream and releases buffers.
Automatic Lifecycle Management
In most production workflows, manual invocation is not required. This method is
automatically called by the parent TopicHandler.close().
If the handler is managed within a with context, the SDK ensures a top-down cleanup
of the handler and its associated streamers upon exit.