Skip to content

Communication Module

mosaicolabs.comm.MosaicoClient

MosaicoClient(
    *,
    host,
    port,
    timeout,
    control_client,
    connection_pool,
    executor_pool,
    sentinel,
)

The gateway to the Mosaico Data Platform.

This class centralizes connection management, resource pooling, and serves as a factory for specialized handlers. It is designed to manage the lifecycle of both network connections and asynchronous executors efficiently.

Context Manager Usage

The MosaicoClient is best used as a context manager to ensure all internal pools and connections are gracefully closed.

from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    sequences = client.list_sequences()
    print(f"Available data: {sequences}")

Internal Constructor (do not call this directly): The MosaicoClient enforces a strict factory pattern for security and proper resource setup. Please use the connect() method instead to obtain an initialized client.

Sentinel Enforcement

This constructor checks for a private internal sentinel. Attempting to instantiate this class manually will result in a RuntimeError.

Parameters:

Name Type Description Default
host str

The remote server host.

required
port int

The remote server port.

required
timeout int

The connection timeout.

required
control_client FlightClient

The primary PyArrow Flight control client.

required
connection_pool Optional[_ConnectionPool]

Internal pool for data connections.

required
executor_pool Optional[_ExecutorPool]

Internal pool for async I/O.

required
sentinel object

Private object used to verify factory-based instantiation.

required

connect classmethod

connect(host, port, timeout=5)

The primary entry point to the Mosaico Data Platform.

This factory method is the only recommended way to obtain a valid MosaicoClient instance. It orchestrates the necessary handshake, initializes the primary control channel, and prepares the internal resource pools.

Factory Pattern

Direct instantiation via __init__ is restricted through a sentinel pattern and will raise a RuntimeError. This ensures that every client in use has been correctly configured with a valid network connection.

Parameters:

Name Type Description Default
host str

The server host address (e.g., "127.0.0.1" or "mosaico.local").

required
port int

The server port (e.g., 6726).

required
timeout int

Maximum time in seconds to wait for a connection response. Defaults to 5.

5

Returns:

Name Type Description
MosaicoClient MosaicoClient

An initialized and connected client ready for operations.

Raises:

Type Description
ConnectionError

If the server is unreachable or the handshake fails.

RuntimeError

If the class is instantiated directly instead of using this method.

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Perform operations using the client
    pass

sequence_handler

sequence_handler(sequence_name)

Retrieves a SequenceHandler for the given sequence.

Handlers are cached; subsequent calls for the same sequence return the existing object to avoid redundant handshakes.

Parameters:

Name Type Description Default
sequence_name str

The unique identifier of the sequence.

required

Returns:

Type Description
Optional[SequenceHandler]

Optional[SequenceHandler]: A handler for managing sequence operations, or None if not found.

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve a sequence handler
    sequence_handler = client.sequence_handler("my_sequence")
    if sequence_handler:
        # Print sequence details
        print(f"Sequence: {sequence_handler.name}")
        print(f"Created: {sequence_handler.created_datetime}")
        print(f"Topic list: {sequence_handler.topics}")
        print(f"User Metadata: {sequence_handler.user_metadata}")
        print(f"Size (MB): {sequence_handler.total_size_bytes / 1024 / 1024}")

topic_handler

topic_handler(sequence_name, topic_name)

Retrieves a TopicHandler for a specific data channel.

Parameters:

Name Type Description Default
sequence_name str

The parent sequence name.

required
topic_name str

The specific topic name.

required

Returns:

Type Description
Optional[TopicHandler]

Optional[TopicHandler]: A handler for managing topic operations, or None if not found.

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve a topic handler
    topic_handler = client.topic_handler("my_sequence", "/front/camera/image_raw)
    if topic_handler:
        # Print topic details
        print(f"Topic: {topic_handler.sequence_name}:{topic_handler.name}")
        print(f"Ontology Tag: {topic_handler.ontology_tag}")
        print(f"Created: {topic_handler.created_datetime}")
        print(f"User Metadata: {topic_handler.user_metadata}")
        print(f"Size (MB): {topic_handler.total_size_bytes / 1024 / 1024}")

sequence_create

sequence_create(
    sequence_name,
    metadata,
    on_error=Delete,
    max_batch_size_bytes=None,
    max_batch_size_records=None,
)

Creates a new sequence on the platform and returns a SequenceWriter for ingestion.

Important

The function must be called inside a with context, otherwise a RuntimeError is raised.

Parameters:

Name Type Description Default
sequence_name str

Unique name for the sequence.

required
metadata dict[str, Any]

User-defined metadata to attach.

required
on_error OnErrorPolicy

Behavior on write failure. Defaults to Delete.

Delete
max_batch_size_bytes Optional[int]

Max bytes per Arrow batch.

None
max_batch_size_records Optional[int]

Max records per Arrow batch.

None

Returns:

Name Type Description
SequenceWriter SequenceWriter

An initialized writer instance.

Raises:

Type Description
RuntimeError

If the method is called outside a with context.

Exception

If any error occurs during sequence injection.

Example
from mosaicolabs import MosaicoClient, OnErrorPolicy

# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create(
        sequence_name="mission_log_042",
        # Custom metadata for this data sequence.
        metadata={
            "driver": {
                "driver_id": "drv_sim_017",
                "role": "validation",
                "experience_level": "senior",
            },
            "location": {
                "city": "Milan",
                "country": "IT",
                "facility": "Downtown",
                "gps": {
                    "lat": 45.46481,
                    "lon": 9.19201,
                },
            },
        }
        on_error = OnErrorPolicy.Delete # Default
        ) as seq_writer:
            # Start creating topics and pushing data...
            # (1)!
  1. See also:

sequence_delete

sequence_delete(sequence_name)

Permanently deletes a sequence and all its associated data from the server.

This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks belonging to the sequence. Once executed, all storage occupied by the sequence is freed.

Sequence Locking

This action can only be performed on unlocked sequences. If a sequence is currently locked (e.g., for archival or safety reasons), the deletion request will be rejected by the server.

Parameters:

Name Type Description Default
sequence_name str

The unique name of the sequence to remove.

required

list_sequences

list_sequences()

Retrieves a list of all sequence names available on the server.

Returns:

Type Description
List[str]

List[str]: The list of sequence identifiers.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    sequences = client.list_sequences()
    print(f"Available sequences: {sequences}")

list_sequence_notify

list_sequence_notify(sequence_name)

Retrieves a list of all notifications available on the server for a specific sequence.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence to list notifications for.

required

Returns:

Type Description
List[Notified]

List[Notified]: The list of sequence notifications.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_notifications = client.list_sequence_notify("my_sequence")
    for notify in sequence_notifications:
        print(f"Notification Type: {notify.notify_type}")
        print(f"Notification Message: {notify.message}")
        print(f"Notification Created: {notify.created_datetime}")

clear_sequence_notify

clear_sequence_notify(sequence_name)

Clears the notifications for a specific sequence from the server.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence.

required

list_topic_notify

list_topic_notify(sequence_name, topic_name)

Retrieves a list of all notifications available on the server for a specific topic

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence to list notifications for.

required
topic_name str

The name of the topic to list notifications for.

required

Returns:

Type Description
List[Notified]

List[str]: The list of topic notifications.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    topic_notifications = client.list_topic_notify("my_sequence", "my_topic")
    for notify in topic_notifications:
        print(f"Notification Type: {notify.notify_type}")
        print(f"Notification Message: {notify.message}")
        print(f"Notification Created: {notify.created_datetime}")

clear_topic_notify

clear_topic_notify(sequence_name, topic_name)

Clears the notifications for a specific topic from the server.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence.

required
topic_name str

The name of the topic.

required

query

query(*queries, query=None)

Executes one or more queries against the Mosaico database.

Multiple provided queries are joined using a logical AND condition.

Parameters:

Name Type Description Default
*queries QueryableProtocol

Variable arguments of query builder objects (e.g., QuerySequence).

()
query Optional[Query]

An alternative pre-constructed Query object.

None

Returns:

Type Description
Optional[QueryResponse]

Optional[QueryResponse]: The query results, or None if an error occurs.

Raises:

Type Description
ValueError

If conflicting query types are passed or no queries are provided.

Query with variadic arguments
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Perform the server side query
    results = client.query(
        # Append a filter for sequence metadata
        QuerySequence()
        .with_expression(
            # Use query proxy for generating a _QuerySequenceExpression
            Sequence.Q.user_metadata["environment.visibility"].lt(50)
        )
        .with_name_match("test_drive"),
        # Append a filter with deep time-series data discovery and measurement time windowing
        QueryOntologyCatalog()
        .with_expression(IMU.Q.acceleration.x.gt(5.0))
        .with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
        .with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
    )
    # Inspect the results
    if results is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in results:
            print(f"Sequence: {item.sequence.name}")
Query with Query object
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Build a filter with name pattern and metadata-related expression
    query = Query(
        # Append a filter for sequence metadata
        QuerySequence()
        .with_expression(
            # Use query proxy for generating a _QuerySequenceExpression
            Sequence.Q.user_metadata["environment.visibility"].lt(50)
        )
        .with_name_match("test_drive"),
        # Append a filter with deep time-series data discovery and measurement time windowing
        QueryOntologyCatalog()
        .with_expression(IMU.Q.acceleration.x.gt(5.0))
        .with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
        .with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
    )
    # Perform the server side query
    results = client.query(query=query)
    # Inspect the results
    if results is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in results:
            print(f"Sequence: {item.sequence.name}")

clear_sequence_handlers_cache

clear_sequence_handlers_cache()

Clears the internal cache of SequenceHandler objects.

clear_topic_handlers_cache

clear_topic_handlers_cache()

Clears the internal cache of TopicHandler objects.

close

close()

Gracefully shuts down the Mosaico client and releases all underlying resources.

This method ensures a clean termination of the client's lifecycle by: * Closing Handlers: Invalidates and closes all cached SequenceHandlers and TopicHandlers to prevent stale data access. * Network Cleanup: Terminated the connection pool to the mosaicod backend. * Thread Termination: Shuts down the internal thread executor pool responsible for asynchronous data fetching and background streaming.

Note

If using the client as a context manager (via with MosaicoClient.connect(...)), this method is invoked automatically on exit. Explicit calls are required only for manual lifecycle management.

Example
from mosaicolabs import MosaicoClient

# Manual connection management
client = MosaicoClient.connect("localhost", 6726)
# High-performance streaming or ML extraction
qresp = client.query(...)
# Do something else...

# Ensure resources are consistently freed.
client.close()

mosaicolabs.comm.NotifyType

Bases: StrEnum

Classification of platform-level notifications.

These identifiers distinguish the severity and intent of messages sent from the Mosaico server regarding resource states or operation failures.

Attributes:

Name Type Description
ERROR

Indicates a critical failure during resource operations, such as a writing interruption or serialization fault.

ERROR class-attribute instance-attribute

ERROR = 'error'

Critical error notification.

mosaicolabs.comm.Notified dataclass

Notified(
    sequence_name,
    notify_type,
    message,
    created_datetime,
    topic_name=None,
)

Platform diagnostic notification.

A Notified object represents a specific event or error report stored on the platform server. These are typically generated by asynchronous ingestion tasks and are critical for debugging failures when using OnErrorPolicy.Report.

Discovery

Notifications can be retrieved at both the sequence and topic level via the Mosaico client:

Example
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve notifications for a problematic sequence
    errors = client.list_sequence_notify("mission_alpha")
    for error in errors:
        print(f"[{error.created_datetime}] {error.notify_type}: {error.message}")

Attributes:

Name Type Description
sequence_name str

The unique identifier of the associated sequence.

notify_type NotifyType

The NotifyType categorization of this event.

message str

A detailed string describing the event or error cause.

created_datetime datetime

The timestamp when the server generated the notification.

topic_name Optional[str]

Optional; the specific topic name if the notification is granular to a single data channel.