Communication Module¶
mosaicolabs.comm.MosaicoClient ¶
The gateway to the Mosaico Data Platform.
This class centralizes connection management, resource pooling, and serves as a factory for specialized handlers. It is designed to manage the lifecycle of both network connections and asynchronous executors efficiently.
Context Manager Usage
The MosaicoClient is best used as a context manager to ensure all
internal pools and connections are gracefully closed.
Internal Constructor (do not call this directly): The MosaicoClient enforces a strict
factory pattern for security and proper resource setup.
Please use the connect() method
instead to obtain an initialized client.
Sentinel Enforcement
This constructor checks for a private internal sentinel.
Attempting to instantiate this class manually will result in a
RuntimeError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
The remote server host. |
required |
port
|
int
|
The remote server port. |
required |
timeout
|
int
|
The connection timeout. |
required |
control_client
|
FlightClient
|
The primary PyArrow Flight control client. |
required |
connection_pool
|
Optional[_ConnectionPool]
|
Internal pool for data connections. |
required |
executor_pool
|
Optional[_ExecutorPool]
|
Internal pool for async I/O. |
required |
sentinel
|
object
|
Private object used to verify factory-based instantiation. |
required |
connect
classmethod
¶
The primary entry point to the Mosaico Data Platform.
This factory method is the only recommended way to obtain a valid
MosaicoClient instance. It orchestrates the
necessary handshake, initializes the primary control channel, and prepares
the internal resource pools.
Factory Pattern
Direct instantiation via __init__ is restricted through a sentinel
pattern and will raise a RuntimeError. This
ensures that every client in use has been correctly configured with a
valid network connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
The server host address (e.g., "127.0.0.1" or "mosaico.local"). |
required |
port
|
int
|
The server port (e.g., 6726). |
required |
timeout
|
int
|
Maximum time in seconds to wait for a connection response. Defaults to 5. |
5
|
Returns:
| Name | Type | Description |
|---|---|---|
MosaicoClient |
MosaicoClient
|
An initialized and connected client ready for operations. |
Raises:
| Type | Description |
|---|---|
ConnectionError
|
If the server is unreachable or the handshake fails. |
RuntimeError
|
If the class is instantiated directly instead of using this method. |
sequence_handler ¶
Retrieves a SequenceHandler for the given sequence.
Handlers are cached; subsequent calls for the same sequence return the existing object to avoid redundant handshakes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The unique identifier of the sequence. |
required |
Returns:
| Type | Description |
|---|---|
Optional[SequenceHandler]
|
Optional[SequenceHandler]: A handler for managing sequence operations, or None if not found. |
Example
from mosaicolabs import MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve a sequence handler
sequence_handler = client.sequence_handler("my_sequence")
if sequence_handler:
# Print sequence details
print(f"Sequence: {sequence_handler.name}")
print(f"Created: {sequence_handler.created_datetime}")
print(f"Topic list: {sequence_handler.topics}")
print(f"User Metadata: {sequence_handler.user_metadata}")
print(f"Size (MB): {sequence_handler.total_size_bytes / 1024 / 1024}")
topic_handler ¶
Retrieves a TopicHandler for a specific data channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The parent sequence name. |
required |
topic_name
|
str
|
The specific topic name. |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicHandler]
|
Optional[TopicHandler]: A handler for managing topic operations, or None if not found. |
Example
from mosaicolabs import MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Retrieve a topic handler
topic_handler = client.topic_handler("my_sequence", "/front/camera/image_raw)
if topic_handler:
# Print topic details
print(f"Topic: {topic_handler.sequence_name}:{topic_handler.name}")
print(f"Ontology Tag: {topic_handler.ontology_tag}")
print(f"Created: {topic_handler.created_datetime}")
print(f"User Metadata: {topic_handler.user_metadata}")
print(f"Size (MB): {topic_handler.total_size_bytes / 1024 / 1024}")
sequence_create ¶
sequence_create(
sequence_name,
metadata,
on_error=Delete,
max_batch_size_bytes=None,
max_batch_size_records=None,
)
Creates a new sequence on the platform and returns a SequenceWriter for ingestion.
Important
The function must be called inside a with context, otherwise a RuntimeError is raised.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
Unique name for the sequence. |
required |
metadata
|
dict[str, Any]
|
User-defined metadata to attach. |
required |
on_error
|
OnErrorPolicy
|
Behavior on write failure. Defaults to |
Delete
|
max_batch_size_bytes
|
Optional[int]
|
Max bytes per Arrow batch. |
None
|
max_batch_size_records
|
Optional[int]
|
Max records per Arrow batch. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
SequenceWriter |
SequenceWriter
|
An initialized writer instance. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the method is called outside a |
Exception
|
If any error occurs during sequence injection. |
Example
from mosaicolabs import MosaicoClient, OnErrorPolicy
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(
sequence_name="mission_log_042",
# Custom metadata for this data sequence.
metadata={
"driver": {
"driver_id": "drv_sim_017",
"role": "validation",
"experience_level": "senior",
},
"location": {
"city": "Milan",
"country": "IT",
"facility": "Downtown",
"gps": {
"lat": 45.46481,
"lon": 9.19201,
},
},
}
on_error = OnErrorPolicy.Delete # Default
) as seq_writer:
# Start creating topics and pushing data...
# (1)!
sequence_delete ¶
Permanently deletes a sequence and all its associated data from the server.
This operation is destructive and triggers a cascading deletion of all underlying resources, including all topics and data chunks belonging to the sequence. Once executed, all storage occupied by the sequence is freed.
Sequence Locking
This action can only be performed on unlocked sequences. If a sequence is currently locked (e.g., for archival or safety reasons), the deletion request will be rejected by the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The unique name of the sequence to remove. |
required |
list_sequences ¶
Retrieves a list of all sequence names available on the server.
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: The list of sequence identifiers. |
list_sequence_notify ¶
Retrieves a list of all notifications available on the server for a specific sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence to list notifications for. |
required |
Returns:
| Type | Description |
|---|---|
List[Notified]
|
List[Notified]: The list of sequence notifications. |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
sequence_notifications = client.list_sequence_notify("my_sequence")
for notify in sequence_notifications:
print(f"Notification Type: {notify.notify_type}")
print(f"Notification Message: {notify.message}")
print(f"Notification Created: {notify.created_datetime}")
clear_sequence_notify ¶
Clears the notifications for a specific sequence from the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence. |
required |
list_topic_notify ¶
Retrieves a list of all notifications available on the server for a specific topic
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence to list notifications for. |
required |
topic_name
|
str
|
The name of the topic to list notifications for. |
required |
Returns:
| Type | Description |
|---|---|
List[Notified]
|
List[str]: The list of topic notifications. |
Example
from mosaicolabs import MosaicoClient
with MosaicoClient.connect("localhost", 6726) as client:
topic_notifications = client.list_topic_notify("my_sequence", "my_topic")
for notify in topic_notifications:
print(f"Notification Type: {notify.notify_type}")
print(f"Notification Message: {notify.message}")
print(f"Notification Created: {notify.created_datetime}")
clear_topic_notify ¶
Clears the notifications for a specific topic from the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
The name of the sequence. |
required |
topic_name
|
str
|
The name of the topic. |
required |
query ¶
Executes one or more queries against the Mosaico database.
Multiple provided queries are joined using a logical AND condition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
Variable arguments of query builder objects (e.g., |
()
|
query
|
Optional[Query]
|
An alternative pre-constructed Query object. |
None
|
Returns:
| Type | Description |
|---|---|
Optional[QueryResponse]
|
Optional[QueryResponse]: The query results, or None if an error occurs. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If conflicting query types are passed or no queries are provided. |
Query with variadic arguments
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Perform the server side query
results = client.query(
# Append a filter for sequence metadata
QuerySequence()
.with_expression(
# Use query proxy for generating a _QuerySequenceExpression
Sequence.Q.user_metadata["environment.visibility"].lt(50)
)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
.with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
)
# Inspect the results
if results is not None:
# Results are automatically grouped by Sequence for easier data management
for item in results:
print(f"Sequence: {item.sequence.name}")
Query with Query object
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_expression(
# Use query proxy for generating a _QuerySequenceExpression
Sequence.Q.user_metadata["environment.visibility"].lt(50)
)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
.with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
)
# Perform the server side query
results = client.query(query=query)
# Inspect the results
if results is not None:
# Results are automatically grouped by Sequence for easier data management
for item in results:
print(f"Sequence: {item.sequence.name}")
clear_sequence_handlers_cache ¶
Clears the internal cache of SequenceHandler objects.
clear_topic_handlers_cache ¶
Clears the internal cache of TopicHandler objects.
close ¶
Gracefully shuts down the Mosaico client and releases all underlying resources.
This method ensures a clean termination of the client's lifecycle by:
* Closing Handlers: Invalidates and closes all cached SequenceHandlers and TopicHandlers to prevent stale data access.
* Network Cleanup: Terminated the connection pool to the mosaicod backend.
* Thread Termination: Shuts down the internal thread executor pool responsible for asynchronous data fetching and background streaming.
Note
If using the client as a context manager (via with MosaicoClient.connect(...)),
this method is invoked automatically on exit. Explicit calls are required
only for manual lifecycle management.
mosaicolabs.comm.NotifyType ¶
Bases: StrEnum
Classification of platform-level notifications.
These identifiers distinguish the severity and intent of messages sent from the Mosaico server regarding resource states or operation failures.
Attributes:
| Name | Type | Description |
|---|---|---|
ERROR |
Indicates a critical failure during resource operations, such as a writing interruption or serialization fault. |
mosaicolabs.comm.Notified
dataclass
¶
Platform diagnostic notification.
A Notified object represents a specific event or error report stored on the
platform server. These are typically generated by asynchronous ingestion tasks
and are critical for debugging failures when using
OnErrorPolicy.Report.
Discovery¶
Notifications can be retrieved at both the sequence and topic level via the Mosaico client:
Example
Attributes:
| Name | Type | Description |
|---|---|---|
sequence_name |
str
|
The unique identifier of the associated sequence. |
notify_type |
NotifyType
|
The |
message |
str
|
A detailed string describing the event or error cause. |
created_datetime |
datetime
|
The timestamp when the server generated the notification. |
topic_name |
Optional[str]
|
Optional; the specific topic name if the notification is granular to a single data channel. |