Skip to content

Communication Module

mosaicolabs.comm.MosaicoClient

MosaicoClient(
    *,
    host,
    port,
    timeout,
    control_client,
    connection_pool,
    executor_pool,
    sentinel,
    tls_cert,
    api_key_fingerprint,
    middlewares,
)

The gateway to the Mosaico Data Platform.

This class centralizes connection management, resource pooling, and serves as a factory for specialized handlers. It is designed to manage the lifecycle of both network connections and asynchronous executors efficiently.

Context Manager Usage

The MosaicoClient is best used as a context manager to ensure all internal pools and connections are gracefully closed.

from mosaicolabs import MosaicoClient

with MosaicoClient.connect(
    "localhost",
    6726,
    api_key="msco_s3l8gcdwuadege3pkhou0k0n2t5omfij_f9010b9e",
) as client:
    sequences = client.list_sequences()
    print(f"Available data: {sequences}")

Internal Constructor (do not call this directly): The MosaicoClient enforces a strict factory pattern for security and proper resource setup. Please use the connect() method instead to obtain an initialized client.

Sentinel Enforcement

This constructor checks for a private internal sentinel. Attempting to instantiate this class manually will result in a RuntimeError.

Parameters:

Name Type Description Default
host str

The remote server host.

required
port int

The remote server port.

required
timeout int

The connection timeout.

required
control_client FlightClient

The primary PyArrow Flight control client.

required
connection_pool Optional[_ConnectionPool]

Internal pool for data connections.

required
executor_pool Optional[_ExecutorPool]

Internal pool for async I/O.

required
sentinel object

Private object used to verify factory-based instantiation.

required
tls_cert Optional[bytes]

The TLS certificate.

required
api_key_fingerprint Optional[str]

The fingerprint of the API key to use for authentication.

required
middlewares dict[str, ClientMiddlewareFactory]

The middlewares to be used for the connection.

required

connect classmethod

connect(
    host, port, timeout=5, tls_cert_path=None, api_key=None
)

The primary entry point to the Mosaico Data Platform.

This factory method is the only recommended way to obtain a valid MosaicoClient instance. It orchestrates the necessary handshake, initializes the primary control channel, and prepares the internal resource pools.

Factory Pattern

Direct instantiation via __init__ is restricted through a sentinel pattern and will raise a RuntimeError. This ensures that every client in use has been correctly configured with a valid network connection.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
host str

The server host address (e.g., "127.0.0.1" or "mosaico.local").

required
port int

The server port (e.g., 6726).

required
timeout int

Maximum time in seconds to wait for a connection response. Defaults to 5.

5
tls_cert_path Optional[str]

Path to the TLS certificate file. Defaults to None.

None
api_key Optional[str]

The API key for authentication. Defaults to None.

None

Returns:

Name Type Description
MosaicoClient MosaicoClient

An initialized and connected client ready for operations.

Raises:

Type Description
ConnectionError

If the server is unreachable or the handshake fails.

ValueError

If the tls_cert_path is invalid or unable to read the certificate (if using TLS).

FileNotFoundError

If the tls_cert_path does not exist (if using TLS).

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect(
    "localhost",
    6726,
    api_key="msco_vy9lqa7u4lr7w3vimhz5t8bvvc0xbmk2_9c94a86",
) as client:
    # Perform operations using the client
    pass

from_env classmethod

from_env(host, port, timeout=5)

Creates a MosaicoClient instance by resolving configuration from environment variables.

This method acts as a smart constructor that automatically discovers system settings. It currently focuses on security configurations, specifically resolving TLS settings and Auth API-Key if the required environment variables are present.

As the SDK evolves, this method will be expanded to automatically detect additional parameters from the environment.

Parameters:

Name Type Description Default
host str

The server hostname or IP address.

required
port int

The port number of the Mosaico service.

required
timeout int

Maximum time in seconds to wait for a connection response. Defaults to 5.

5

Returns:

Name Type Description
MosaicoClient MosaicoClient

A client instance pre-configured with discovered settings.

MosaicoClient

If no specific environment variables are found, it returns a

MosaicoClient

client with default settings.

Example
    # If MOSAICOD_TLS_CERT_FILE is set in the shell:
    client = MosaicoClient.from_env("localhost", 6276)

sequence_handler

sequence_handler(sequence_name)

Retrieves a SequenceHandler for the given sequence.

Handlers are cached; subsequent calls for the same sequence return the existing object to avoid redundant handshakes.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
sequence_name str

The unique identifier of the sequence.

required

Returns:

Type Description
Optional[SequenceHandler]

Optional[SequenceHandler]: A handler for managing sequence operations, or None if not found.

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve a sequence handler
    sequence_handler = client.sequence_handler("my_sequence")
    if sequence_handler:
        # Print sequence details
        print(f"Sequence: {sequence_handler.name}")
        print(f"Created: {sequence_handler.created_datetime}")
        print(f"Topic list: {sequence_handler.topics}")
        print(f"User Metadata: {sequence_handler.user_metadata}")
        print(f"Size (MB): {sequence_handler.total_size_bytes / 1024 / 1024}")

topic_handler

topic_handler(sequence_name, topic_name)

Retrieves a TopicHandler for a specific data channel.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
sequence_name str

The parent sequence name.

required
topic_name str

The specific topic name.

required

Returns:

Type Description
Optional[TopicHandler]

Optional[TopicHandler]: A handler for managing topic operations, or None if not found.

Example
from mosaicolabs import MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve a topic handler
    topic_handler = client.topic_handler("my_sequence", "/front/camera/image_raw)
    if topic_handler:
        # Print topic details
        print(f"Topic: {topic_handler.sequence_name}:{topic_handler.name}")
        print(f"Ontology Tag: {topic_handler.ontology_tag}")
        print(f"Created: {topic_handler.created_datetime}")
        print(f"User Metadata: {topic_handler.user_metadata}")
        print(f"Size (MB): {topic_handler.total_size_bytes / 1024 / 1024}")

sequence_create

sequence_create(
    sequence_name,
    metadata,
    on_error=Report,
    max_batch_size_bytes=None,
    max_batch_size_records=None,
)

Creates a new sequence on the platform and returns a SequenceWriter for ingestion.

Important

The function must be called inside a with context, otherwise a RuntimeError is raised.

Note

If using the Authorization middleware (via an API-Key), this method requires at least APIKeyPermissionEnum.Write permission.

Parameters:

Name Type Description Default
sequence_name str

Unique name for the sequence.

required
metadata dict[str, Any]

User-defined metadata to attach.

required
on_error SessionLevelErrorPolicy | OnErrorPolicy

Behavior on write failure. Defaults to SessionLevelErrorPolicy.Report.

Deprecated: OnErrorPolicy is deprecated since v0.3.0; use SessionLevelErrorPolicy instead. It will be removed in v0.4.0.

Report
max_batch_size_bytes Optional[int]

Max bytes per Arrow batch.

None
max_batch_size_records Optional[int]

Max records per Arrow batch.

None

Returns:

Name Type Description
SequenceWriter SequenceWriter

An initialized writer instance.

Raises:

Type Description
RuntimeError

If the method is called outside a with context.

Exception

If any error occurs during sequence injection.

Example
from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy

# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create(
        sequence_name="mission_log_042",
        # Custom metadata for this data sequence.
        metadata={
            "driver": {
                "driver_id": "drv_sim_017",
                "role": "validation",
                "experience_level": "senior",
            },
            "location": {
                "city": "Milan",
                "country": "IT",
                "facility": "Downtown",
                "gps": {
                    "lat": 45.46481,
                    "lon": 9.19201,
                },
            },
        }
        on_error = SessionLevelErrorPolicy.Delete
        ) as seq_writer:
            # Start creating topics and pushing data...
            # (1)!
  1. See also:

sequence_delete

sequence_delete(sequence_name)

Permanently deletes a sequence and all its associated data from the server.

This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks belonging to the sequence. Once executed, all storage occupied by the sequence is freed.

Note

If using the Authorization middleware (via an API-Key), this method requires at least APIKeyPermissionEnum.Delete permission.

Parameters:

Name Type Description Default
sequence_name str

The unique name of the sequence to remove.

required

Raises:

Type Description
Exception

If any error occurs during sequence deletion.

session_delete

session_delete(session_uuid)

Permanently deletes a session and all its associated data from the server.

This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks stored in the session. Once executed, all storage occupied by the session is freed.

Note

If using the Authorization middleware (via an API-Key), this method requires at least APIKeyPermissionEnum.Delete permission.

Parameters:

Name Type Description Default
session_uuid str

The unique identifier of the session to remove.

required

Raises:

Type Description
Exception

If any error occurs during session deletion.

list_sequences

list_sequences()

Retrieves a list of all sequence names available on the server.

Returns:

Type Description
List[str]

List[str]: The list of sequence identifiers.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    sequences = client.list_sequences()
    print(f"Available sequences: {sequences}")

list_sequence_notifications

list_sequence_notifications(sequence_name)

Retrieves a list of all notifications available on the server for a specific sequence.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence to list notifications for.

required

Returns:

Type Description
List[Notification]

List[Notification]: The list of sequence notifications.

Raises:

Type Description
Exception

If any error occurs during sequence notification listing.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    sequence_notifications = client.list_sequence_notifications("my_sequence")
    for notification in sequence_notifications:
        print(f"Notification Type: {notification.type}")
        print(f"Notification Message: {notification.message}")
        print(f"Notification Created: {notification.created_datetime}")

clear_sequence_notifications

clear_sequence_notifications(sequence_name)

Clears the notifications for a specific sequence from the server.

Note

If using the Authorization middleware (via an API-Key), this method requires at least APIKeyPermissionEnum.Delete permission.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence.

required

Raises:

Type Description
Exception

If any error occurs during sequence notification clearing.

list_topic_notifications

list_topic_notifications(sequence_name, topic_name)

Retrieves a list of all notifications available on the server for a specific topic

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence to list notifications for.

required
topic_name str

The name of the topic to list notifications for.

required

Returns:

Type Description
List[Notification]

List[Notification]: The list of topic notifications.

Raises:

Type Description
Exception

If any error occurs during topic notification listing.

Example
from mosaicolabs import MosaicoClient

with MosaicoClient.connect("localhost", 6726) as client:
    topic_notifications = client.list_topic_notifications("my_sequence", "my_topic")
    for notification in topic_notifications:
        print(f"Notification Type: {notification.type}")
        print(f"Notification Message: {notification.message}")
        print(f"Notification Created: {notification.created_datetime}")

clear_topic_notifications

clear_topic_notifications(sequence_name, topic_name)

Clears the notifications for a specific topic from the server.

Note

If using the Authorization middleware (via an API-Key), this method requires at least APIKeyPermissionEnum.Delete permission.

Parameters:

Name Type Description Default
sequence_name str

The name of the sequence.

required
topic_name str

The name of the topic.

required

Raises:

Type Description
Exception

If any error occurs during topic notification clearing.

query

query(*queries, query=None)

Executes one or more queries against the Mosaico database.

Multiple provided queries are joined using a logical AND condition.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Parameters:

Name Type Description Default
*queries QueryableProtocol

Variable arguments of query builder objects (e.g., QuerySequence).

()
query Optional[Query]

An alternative pre-constructed Query object.

None

Returns:

Type Description
Optional[QueryResponse]

Optional[QueryResponse]: The query results, or None if an error occurs.

Raises:

Type Description
ValueError

If conflicting query types are passed or no queries are provided.

Exception

If any error occurs during query execution.

Query with variadic arguments
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Perform the server side query
    results = client.query(
        # Append a filter for sequence metadata
        QuerySequence()
        .with_user_metadata("environment.visibility", lt=50)
        .with_name_match("test_drive"),
        # Append a filter with deep time-series data discovery and measurement time windowing
        QueryOntologyCatalog()
        .with_expression(IMU.Q.acceleration.x.gt(5.0))
        .with_expression(IMU.Q.timestamp_ns.gt(1700134567))
    )
    # Inspect the results
    if results is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in results:
            print(f"Sequence: {item.sequence.name}")
Query with Query object
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Build a filter with name pattern and metadata-related expression
    query = Query(
        # Append a filter for sequence metadata
        QuerySequence()
        .with_user_metadata("environment.visibility", lt=50)
        .with_name_match("test_drive"),
        # Append a filter with deep time-series data discovery and measurement time windowing
        QueryOntologyCatalog()
        .with_expression(IMU.Q.acceleration.x.gt(5.0))
        .with_expression(IMU.Q.timestamp_ns.gt(1700134567))
    )
    # Perform the server side query
    results = client.query(query=query)
    # Inspect the results
    if results is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in results:
            print(f"Sequence: {item.sequence.name}")

version

version()

Get the version of the Mosaico server.

Note

If using the Authorization middleware (via an API-Key), this method requires the minimum APIKeyPermissionEnum.Read permission.

Returns:

Name Type Description
str str

The version of the Mosaico server.

Raises:

Type Description
Exception

If any error occurs during version retrieval.

clear_sequence_handlers_cache

clear_sequence_handlers_cache()

Clears the internal cache of SequenceHandler objects.

clear_topic_handlers_cache

clear_topic_handlers_cache()

Clears the internal cache of TopicHandler objects.

api_key_create

api_key_create(permission, description, expires_at_ns=None)

Creates a new API key with the specified permissions.

Note

Requires the client to have APIKeyPermissionEnum.Manage permission. You can also optionally set an expiration time and a description for the key.

Parameters:

Name Type Description Default
permission APIKeyPermissionEnum

Permission for the key.

required
description str

Description for the key.

required
expires_at_ns Optional[int]

Optional expiration timestamp in nanoseconds.

None

Returns:

Name Type Description
str Optional[str]

The generated API key token or None.

Raises:

Type Description
Exception

If any error occurs during API key creation.

Example
from mosaicolabs import MosaicoClient, APIKeyPermissionEnum

# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726, api_key="<API_KEY_MANAGE>") as client:
    # Create a new API key with read and write permissions
    api_key = client.api_key_create(
        permission=APIKeyPermissionEnum.Write,
        description="API key for data ingestion",
    )

api_key_status

api_key_status(api_key_fingerprint=None)

Retrieves the status and metadata of an API key.

Note

Requires the client to have APIKeyPermissionEnum.Manage permission.

Parameters:

Name Type Description Default
api_key_fingerprint Optional[str]

The fingerprint of the API key to query. If not provided, the fingerprint of the current API key will be used.

None

Returns:

Name Type Description
APIKeyStatus Optional[APIKeyStatus]

An object containing the API key's status information, or None if the query fails.

Raises:

Type Description
Exception

If any error occurs during API key status retrieval.

api_key_revoke

api_key_revoke(api_key_fingerprint)

Revokes an API key by its fingerprint.

Note

Requires the client to have APIKeyPermissionEnum.Manage permission.

Parameters:

Name Type Description Default
api_key_fingerprint str

The fingerprint of the API key to revoke.

required

Returns:

Type Description
None

None.

Raises:

Type Description
Exception

If any error occurs during API key revocation.

close

close()

Gracefully shuts down the Mosaico client and releases all underlying resources.

This method ensures a clean termination of the client's lifecycle by: * Closing Handlers: Invalidates and closes all cached SequenceHandlers and TopicHandlers to prevent stale data access. * Network Cleanup: Terminated the connection pool to the mosaicod backend. * Thread Termination: Shuts down the internal thread executor pool responsible for asynchronous data fetching and background streaming.

Note

If using the client as a context manager (via with MosaicoClient.connect(...)), this method is invoked automatically on exit. Explicit calls are required only for manual lifecycle management.

Example
from mosaicolabs import MosaicoClient

# Manual connection management
client = MosaicoClient.connect("localhost", 6726)
# High-performance streaming or ML extraction
qresp = client.query(...)
# Do something else...

# Ensure resources are consistently freed.
client.close()

mosaicolabs.comm.NotificationType

Bases: Enum

Classification of platform-level notifications.

These identifiers distinguish the severity and intent of messages sent from the Mosaico server regarding resource states or operation failures.

Attributes:

Name Type Description
ERROR

Indicates a critical failure during resource operations, such as a writing interruption or serialization fault.

ERROR class-attribute instance-attribute

ERROR = 'error'

Critical error notification.

mosaicolabs.comm.Notification dataclass

Notification(
    sequence_name,
    type,
    message,
    created_datetime,
    topic_name=None,
)

Platform diagnostic notification.

A Notification object represents a specific event or error report stored on the platform server. These are typically generated by asynchronous ingestion tasks and are critical for debugging failures when using SessionLevelErrorPolicy.Report.

Discovery

Notifications can be retrieved at both the sequence and topic level via the Mosaico client:

Example
with MosaicoClient.connect("localhost", 6726) as client:
    # Retrieve notifications for a problematic sequence
    errors = client.list_sequence_notifications("mission_alpha")
    for error in errors:
        print(f"[{error.created_datetime}] {error.type}: {error.message}")

Attributes:

Name Type Description
sequence_name str

The unique identifier of the associated sequence.

type NotificationType

The NotificationType categorization of this event.

message str

A detailed string describing the event or error cause.

created_datetime datetime

The timestamp when the server generated the notification.

topic_name Optional[str]

Optional; the specific topic name if the notification is granular to a single data channel.

mosaicolabs.comm.middlewares

MosaicoAuthMiddleware

MosaicoAuthMiddleware(api_key)

Bases: ClientMiddleware

Middleware adding the API token to every flight request.

Initialize the middleware

Parameters:

Name Type Description Default
api_key str

The API key to use for authentication

required

sending_headers

sending_headers()

Called before sending headers to the server

Returns:

Name Type Description
dict Dict[str, List[str] | List[bytes]]

Headers to be sent to the server

received_headers

received_headers(headers)

Called after receiving headers from the server

Parameters:

Name Type Description Default
headers Dict[str, List[str] | List[bytes]]

Headers received from the server

required

MosaicoAuthMiddlewareFactory

MosaicoAuthMiddlewareFactory(api_key)

Bases: ClientMiddlewareFactory

Factory to create istances of MosaicoAuthMiddleware.

Initialize the factory

Parameters:

Name Type Description Default
api_key str

The API key to use for authentication

required

api_key_fingerprint property

api_key_fingerprint

The fingerprint of the API key

Returns:

Name Type Description
str str

The fingerprint of the API key

start_call

start_call(info)

Called at every flight client operation (GetFlightInfo, DoAction, ecc.)

Parameters:

Name Type Description Default
info CallInfo

Information about the flight call

required

Returns:

Name Type Description
MosaicoAuthMiddleware MosaicoAuthMiddleware

The middleware to be used for the flight call