Communication Module
mosaicolabs.comm.MosaicoClient ¶
MosaicoClient(
*,
host,
port,
timeout,
control_client,
connection_pool,
executor_pool,
sentinel,
tls_cert,
api_key_fingerprint,
middlewares,
)
The gateway to the Mosaico Data Platform.
This class centralizes connection management, resource pooling, and serves as a factory for specialized handlers. It is designed to manage the lifecycle of both network connections and asynchronous executors efficiently.
Context Manager Usage
The MosaicoClient is best used as a context manager to ensure all
internal pools and connections are gracefully closed.
Internal Constructor (do not call this directly): The MosaicoClient enforces a strict
factory pattern for security and proper resource setup.
Please use the connect() method
instead to obtain an initialized client.
Sentinel Enforcement
This constructor checks for a private internal sentinel.
Attempting to instantiate this class manually will result in a
RuntimeError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
The remote server host. |
required |
port
|
int
|
The remote server port. |
required |
timeout
|
int
|
The connection timeout. |
required |
control_client
|
FlightClient
|
The primary PyArrow Flight control client. |
required |
connection_pool
|
Optional[_ConnectionPool]
|
Internal pool for data connections. |
required |
executor_pool
|
Optional[_ExecutorPool]
|
Internal pool for async I/O. |
required |
sentinel
|
object
|
Private object used to verify factory-based instantiation. |
required |
tls_cert
|
Optional[bytes]
|
The TLS certificate. |
required |
api_key_fingerprint
|
Optional[str]
|
The fingerprint of the API key to use for authentication. |
required |
middlewares
|
dict[str, ClientMiddlewareFactory]
|
The middlewares to be used for the connection. |
required |
connect
classmethod
¶
The primary entry point to the Mosaico Data Platform.
This factory method is the only recommended way to obtain a valid
MosaicoClient instance. It orchestrates the
necessary handshake, initializes the primary control channel, and prepares
the internal resource pools.
Factory Pattern
Direct instantiation via __init__ is restricted through a sentinel
pattern and will raise a RuntimeError. This
ensures that every client in use has been correctly configured with a
valid network connection.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
The server host address (e.g., "127.0.0.1" or "mosaico.local"). |
required |
port
|
int
|
The server port (e.g., 6726). |
required |
timeout
|
int
|
Maximum time in seconds to wait for a connection response. Defaults to 5. |
5
|
tls_cert_path
|
Optional[str]
|
Path to the TLS certificate file. Defaults to None. |
None
|
api_key
|
Optional[str]
|
The API key for authentication. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
MosaicoClient |
MosaicoClient
|
An initialized and connected client ready for operations. |
Raises:
| Type | Description |
|---|---|
ConnectionError
|
If the server is unreachable or the handshake fails. |
ValueError
|
If the tls_cert_path is invalid or unable to read the certificate (if using TLS). |
FileNotFoundError
|
If the tls_cert_path does not exist (if using TLS). |
from_env
classmethod
¶
Creates a MosaicoClient instance by resolving configuration from environment variables.
This method acts as a smart constructor that automatically discovers system settings. It currently focuses on security configurations, specifically resolving TLS settings and Auth API-Key if the required environment variables are present.
As the SDK evolves, this method will be expanded to automatically detect additional parameters from the environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
The server hostname or IP address. |
required |
port
|
int
|
The port number of the Mosaico service. |
required |
timeout
|
int
|
Maximum time in seconds to wait for a connection response. Defaults to 5. |
5
|
Returns:
| Name | Type | Description |
|---|---|---|
MosaicoClient |
MosaicoClient
|
A client instance pre-configured with discovered settings. |
MosaicoClient
|
If no specific environment variables are found, it returns a |
|
MosaicoClient
|
client with default settings. |
sequence_handler ¶
Retrieves a SequenceHandler for the given sequence.
Handlers are cached; subsequent calls for the same sequence return the existing object to avoid redundant handshakes.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The unique identifier of the sequence. |
required |
Returns:
| Type | Description |
|---|---|
Optional[SequenceHandler]
|
Optional[SequenceHandler]: A handler for managing sequence operations, or None if not found. |
Example
from mosaicolabs import MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve a sequence handler
sequence_handler = client.sequence_handler("my_sequence")
if sequence_handler:
# Print sequence details
print(f"Sequence: {sequence_handler.name}")
print(f"Created: {sequence_handler.created_datetime}")
print(f"Topic list: {sequence_handler.topics}")
print(f"User Metadata: {sequence_handler.user_metadata}")
print(f"Size (MB): {sequence_handler.total_size_bytes / 1024 / 1024}")
topic_handler ¶
Retrieves a TopicHandler for a specific data channel.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The parent sequence name. |
required |
topic_name
|
str
|
The specific topic name. |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicHandler]
|
Optional[TopicHandler]: A handler for managing topic operations, or None if not found. |
Example
from mosaicolabs import MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve a topic handler
topic_handler = client.topic_handler("my_sequence", "/front/camera/image_raw)
if topic_handler:
# Print topic details
print(f"Topic: {topic_handler.sequence_name}:{topic_handler.name}")
print(f"Ontology Tag: {topic_handler.ontology_tag}")
print(f"Created: {topic_handler.created_datetime}")
print(f"User Metadata: {topic_handler.user_metadata}")
print(f"Size (MB): {topic_handler.total_size_bytes / 1024 / 1024}")
sequence_create ¶
sequence_create(
sequence_name,
metadata,
on_error=Report,
max_batch_size_bytes=None,
max_batch_size_records=None,
)
Creates a new sequence on the platform and returns a SequenceWriter for ingestion.
Important
The function must be called inside a with context, otherwise a RuntimeError is raised.
Note
If using the Authorization middleware (via an API-Key), this method requires at least
APIKeyPermissionEnum.Write
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
Unique name for the sequence. |
required |
metadata
|
dict[str, Any]
|
User-defined metadata to attach. |
required |
on_error
|
SessionLevelErrorPolicy | OnErrorPolicy
|
Behavior on write failure. Defaults to
Deprecated:
|
Report
|
max_batch_size_bytes
|
Optional[int]
|
Max bytes per Arrow batch. |
None
|
max_batch_size_records
|
Optional[int]
|
Max records per Arrow batch. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
SequenceWriter |
SequenceWriter
|
An initialized writer instance. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the method is called outside a |
Exception
|
If any error occurs during sequence injection. |
Example
from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(
sequence_name="mission_log_042",
# Custom metadata for this data sequence.
metadata={
"driver": {
"driver_id": "drv_sim_017",
"role": "validation",
"experience_level": "senior",
},
"location": {
"city": "Milan",
"country": "IT",
"facility": "Downtown",
"gps": {
"lat": 45.46481,
"lon": 9.19201,
},
},
}
on_error = SessionLevelErrorPolicy.Delete
) as seq_writer:
# Start creating topics and pushing data...
# (1)!
sequence_delete ¶
Permanently deletes a sequence and all its associated data from the server.
This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks belonging to the sequence. Once executed, all storage occupied by the sequence is freed.
Note
If using the Authorization middleware (via an API-Key), this method requires at least
APIKeyPermissionEnum.Delete
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The unique name of the sequence to remove. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during sequence deletion. |
session_delete ¶
Permanently deletes a session and all its associated data from the server.
This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks stored in the session. Once executed, all storage occupied by the session is freed.
Note
If using the Authorization middleware (via an API-Key), this method requires at least
APIKeyPermissionEnum.Delete
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_uuid
|
str
|
The unique identifier of the session to remove. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during session deletion. |
list_sequences ¶
Retrieves a list of all sequence names available on the server.
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: The list of sequence identifiers. |
list_sequence_notifications ¶
Retrieves a list of all notifications available on the server for a specific sequence.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence to list notifications for. |
required |
Returns:
| Type | Description |
|---|---|
List[Notification]
|
List[Notification]: The list of sequence notifications. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during sequence notification listing. |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
sequence_notifications = client.list_sequence_notifications("my_sequence")
for notification in sequence_notifications:
print(f"Notification Type: {notification.type}")
print(f"Notification Message: {notification.message}")
print(f"Notification Created: {notification.created_datetime}")
clear_sequence_notifications ¶
Clears the notifications for a specific sequence from the server.
Note
If using the Authorization middleware (via an API-Key), this method requires at least
APIKeyPermissionEnum.Delete
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during sequence notification clearing. |
list_topic_notifications ¶
Retrieves a list of all notifications available on the server for a specific topic
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence to list notifications for. |
required |
topic_name
|
str
|
The name of the topic to list notifications for. |
required |
Returns:
| Type | Description |
|---|---|
List[Notification]
|
List[Notification]: The list of topic notifications. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during topic notification listing. |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
topic_notifications = client.list_topic_notifications("my_sequence", "my_topic")
for notification in topic_notifications:
print(f"Notification Type: {notification.type}")
print(f"Notification Message: {notification.message}")
print(f"Notification Created: {notification.created_datetime}")
clear_topic_notifications ¶
Clears the notifications for a specific topic from the server.
Note
If using the Authorization middleware (via an API-Key), this method requires at least
APIKeyPermissionEnum.Delete
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence. |
required |
topic_name
|
str
|
The name of the topic. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during topic notification clearing. |
query ¶
Executes one or more queries against the Mosaico database.
Multiple provided queries are joined using a logical AND condition.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
Variable arguments of query builder objects (e.g., |
()
|
query
|
Optional[Query]
|
An alternative pre-constructed Query object. |
None
|
Returns:
| Type | Description |
|---|---|
Optional[QueryResponse]
|
Optional[QueryResponse]: The query results, or None if an error occurs. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If conflicting query types are passed or no queries are provided. |
Exception
|
If any error occurs during query execution. |
Query with variadic arguments
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Perform the server side query
results = client.query(
# Append a filter for sequence metadata
QuerySequence()
.with_user_metadata("environment.visibility", lt=50)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.timestamp_ns.gt(1700134567))
)
# Inspect the results
if results is not None:
# Results are automatically grouped by Sequence for easier data management
for item in results:
print(f"Sequence: {item.sequence.name}")
Query with Query object
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_user_metadata("environment.visibility", lt=50)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.timestamp_ns.gt(1700134567))
)
# Perform the server side query
results = client.query(query=query)
# Inspect the results
if results is not None:
# Results are automatically grouped by Sequence for easier data management
for item in results:
print(f"Sequence: {item.sequence.name}")
version ¶
Get the version of the Mosaico server.
Note
If using the Authorization middleware (via an API-Key), this method requires the minimum
APIKeyPermissionEnum.Read
permission.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The version of the Mosaico server. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during version retrieval. |
clear_sequence_handlers_cache ¶
Clears the internal cache of SequenceHandler objects.
clear_topic_handlers_cache ¶
Clears the internal cache of TopicHandler objects.
api_key_create ¶
Creates a new API key with the specified permissions.
Note
Requires the client to have APIKeyPermissionEnum.Manage
permission. You can also optionally set an expiration time and a description for the key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
permission
|
APIKeyPermissionEnum
|
Permission for the key. |
required |
description
|
str
|
Description for the key. |
required |
expires_at_ns
|
Optional[int]
|
Optional expiration timestamp in nanoseconds. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
Optional[str]
|
The generated API key token or None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during API key creation. |
Example
from mosaicolabs import MosaicoClient, APIKeyPermissionEnum
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726, api_key="<API_KEY_MANAGE>") as client:
# Create a new API key with read and write permissions
api_key = client.api_key_create(
permission=APIKeyPermissionEnum.Write,
description="API key for data ingestion",
)
api_key_status ¶
Retrieves the status and metadata of an API key.
Note
Requires the client to have APIKeyPermissionEnum.Manage
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key_fingerprint
|
Optional[str]
|
The fingerprint of the API key to query. If not provided, the fingerprint of the current API key will be used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
APIKeyStatus |
Optional[APIKeyStatus]
|
An object containing the API key's status information, or None if the query fails. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during API key status retrieval. |
api_key_revoke ¶
Revokes an API key by its fingerprint.
Note
Requires the client to have APIKeyPermissionEnum.Manage
permission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key_fingerprint
|
str
|
The fingerprint of the API key to revoke. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If any error occurs during API key revocation. |
close ¶
Gracefully shuts down the Mosaico client and releases all underlying resources.
This method ensures a clean termination of the client's lifecycle by:
* Closing Handlers: Invalidates and closes all cached SequenceHandlers and TopicHandlers to prevent stale data access.
* Network Cleanup: Terminated the connection pool to the mosaicod backend.
* Thread Termination: Shuts down the internal thread executor pool responsible for asynchronous data fetching and background streaming.
Note
If using the client as a context manager (via with MosaicoClient.connect(...)),
this method is invoked automatically on exit. Explicit calls are required
only for manual lifecycle management.
mosaicolabs.comm.NotificationType ¶
Bases: Enum
Classification of platform-level notifications.
These identifiers distinguish the severity and intent of messages sent from the Mosaico server regarding resource states or operation failures.
Attributes:
| Name | Type | Description |
|---|---|---|
ERROR |
Indicates a critical failure during resource operations, such as a writing interruption or serialization fault. |
mosaicolabs.comm.Notification
dataclass
¶
Platform diagnostic notification.
A Notification object represents a specific event or error report stored on the
platform server. These are typically generated by asynchronous ingestion tasks
and are critical for debugging failures when using
SessionLevelErrorPolicy.Report.
Discovery¶
Notifications can be retrieved at both the sequence and topic level via the Mosaico client:
Example
Attributes:
| Name | Type | Description |
|---|---|---|
sequence_name |
str
|
The unique identifier of the associated sequence. |
type |
NotificationType
|
The |
message |
str
|
A detailed string describing the event or error cause. |
created_datetime |
datetime
|
The timestamp when the server generated the notification. |
topic_name |
Optional[str]
|
Optional; the specific topic name if the notification is granular to a single data channel. |
mosaicolabs.comm.middlewares ¶
MosaicoAuthMiddleware ¶
Bases: ClientMiddleware
Middleware adding the API token to every flight request.
Initialize the middleware
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
The API key to use for authentication |
required |
sending_headers ¶
Called before sending headers to the server
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Dict[str, List[str] | List[bytes]]
|
Headers to be sent to the server |
received_headers ¶
Called after receiving headers from the server
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
headers
|
Dict[str, List[str] | List[bytes]]
|
Headers received from the server |
required |
MosaicoAuthMiddlewareFactory ¶
Bases: ClientMiddlewareFactory
Factory to create istances of MosaicoAuthMiddleware.
Initialize the factory
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
The API key to use for authentication |
required |
api_key_fingerprint
property
¶
The fingerprint of the API key
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The fingerprint of the API key |
start_call ¶
Called at every flight client operation (GetFlightInfo, DoAction, ecc.)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
info
|
CallInfo
|
Information about the flight call |
required |
Returns:
| Name | Type | Description |
|---|---|---|
MosaicoAuthMiddleware |
MosaicoAuthMiddleware
|
The middleware to be used for the flight call |