Writing Data¶
mosaicolabs.handlers.config.WriterConfig
dataclass
¶
Configuration settings for Sequence and Topic writers.
Internal Usage
This is currently not a user-facing class. It is automatically
instantiated by the MosaicoClient when
allocating new SequenceWriter
instances via sequence_create().
This dataclass defines the operational parameters for data ingestion, controlling
both the error recovery strategy and the performance-critical buffering logic
used by the SequenceWriter and
TopicWriter.
on_error
instance-attribute
¶
Determines the terminal behavior when an exception occurs during the ingestion lifecycle.
- If set to
OnErrorPolicy.Delete, the system purges all data from the failed sequence. - If set to
OnErrorPolicy.Report, the system retains the partial data in an unlocked state for debugging.
max_batch_size_bytes
instance-attribute
¶
The memory threshold in bytes before a data batch is flushed to the server.
When the internal buffer of a TopicWriter
exceeds this value, it triggers a serialization and transmission event.
Larger values increase throughput by reducing network overhead but require more
client-side memory.
max_batch_size_records
instance-attribute
¶
The threshold in row (record) count before a data batch is flushed to the server.
A flush is triggered whenever either this record limit or the
max_batch_size_bytes limit is reached, ensuring that data is transmitted
regularly even for topics with very small individual records.
mosaicolabs.handlers.SequenceWriter ¶
Bases: BaseSequenceWriter
Orchestrates the creation and data ingestion lifecycle of a Mosaico Sequence.
The SequenceWriter is the central controller for high-performance data writing.
It manages the transition of a sequence through its lifecycle states: Create -> Write -> Finalize.
Key Responsibilities¶
- Lifecycle Management: Coordinates creation, finalization, or abort signals with the server.
- Resource Distribution: Implements a "Multi-Lane" architecture by distributing network connections
from a Connection Pool and thread executors from an Executor Pool to individual
TopicWriterinstances. This ensures strict isolation and maximum parallelism between diverse data streams.
Usage Pattern
This class must be used within a with statement (Context Manager).
The context entry triggers sequence registration on the server, while the exit handles
automatic finalization or error cleanup based on the configured OnErrorPolicy.
Obtaining a Writer
Do not instantiate this class directly. Use the
MosaicoClient.sequence_create()
factory method.
Internal constructor for SequenceWriter.
Do not call this directly. Users must call
MosaicoClient.sequence_create()
to obtain an initialized writer.
Example
from mosaicolabs import MosaicoClient, OnErrorPolicy
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create( # (1)!
sequence_name="mission_log_042",
# Custom metadata for this data sequence.
metadata={
"driver": {
"driver_id": "drv_sim_017",
"role": "validation",
"experience_level": "senior",
},
"location": {
"city": "Milan",
"country": "IT",
"facility": "Downtown",
"gps": {
"lat": 45.46481,
"lon": 9.19201,
},
},
}
on_error = OnErrorPolicy.Delete # Default
) as seq_writer:
# Start creating topics and pushing data
# (2)!
# Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server
# and closes all connections and pools
- See also:
MosaicoClient.sequence_create() - See also:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
Unique name for the new sequence. |
required |
client
|
FlightClient
|
The primary control FlightClient. |
required |
connection_pool
|
Optional[_ConnectionPool]
|
Shared pool of data connections for parallel writing. |
required |
executor_pool
|
Optional[_ExecutorPool]
|
Shared pool of thread executors for asynchronous I/O. |
required |
metadata
|
dict[str, Any]
|
User-defined metadata dictionary. |
required |
config
|
WriterConfig
|
Operational configuration (e.g., error policies, batch sizes). |
required |
sequence_status
property
¶
Returns the current operational status of the sequence.
Returns:
| Type | Description |
|---|---|
SequenceStatus
|
The |
topic_writer_exists ¶
Checks if a TopicWriter has already been initialized
for the given name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The name of the topic to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the topic writer exists locally, False otherwise. |
list_topic_writers ¶
Returns the list of all topic names currently managed by this writer.
get_topic_writer ¶
Retrieves an existing TopicWriter instance from the internal cache.
This method is particularly useful when ingesting data from unified recording formats where different sensor types (e.g., Vision, IMU, Odometry) are stored chronologically in a single stream or file.
In these scenarios, messages for various topics appear in an interleaved fashion.
Using get_topic_writer allows the developer to:
- Reuse Buffers: Efficiently switch between writers for different sensor streams.
- Ensure Data Ordering: Maintain a consistent batching logic for each topic as you iterate through a mixed-content log.
- Optimize Throughput: Leverage Mosaico's background I/O by routing all data for a specific identifier through a single, persistent writer instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The unique name or identifier of the topic writer to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
The |
Example
Processing a generic interleaved sensor log (like a ROS bag or a custom JSON log):
from mosaicolabs import SequenceWriter, IMU, Image
# Topic to Ontology Mapping: Defines the schema for each sensor stream
# Example: {"/camera": Image, "/imu": IMU}
topic_to_ontology = { ... }
# Adapter Factory: Maps raw sensor payloads to Mosaico Ontology instances
# Example: {"/imu": lambda p: IMU(acceleration=Vector3d.from_list(p['acc']), ...)}
adapter = { ... }
with client.sequence_create("physical_ai_trial_01") as seq_writer:
# log_iterator represents an interleaved stream (e.g., ROSbags, MCAP, or custom logs).
for ts, topic, payload in log_iterator:
# Access the topic-specific buffer.
# get_topic_writer retrieves a persistent writer from the internal cache
twriter = seq_writer.get_topic_writer(topic)
if twriter is None:
# Dynamic Topic Registration.
# If the topic is encountered for the first time, register it using the
# pre-defined Ontology type to ensure data integrity.
twriter = seq_writer.topic_create(
topic_name=topic,
ontology_type=topic_to_ontology[topic]
)
# Data Transformation & Ingestion.
# The adapter converts the raw payload into a validated Mosaico object.
# push() handles high-performance batching and asynchronous I/O to the rust backend.
twriter.push( # (1)!
message=Message(
timestamp_ns=ts,
data=adapter[topic](payload),
)
)
# SequenceWriter automatically calls _finalize() on all internal TopicWriters,
# guaranteeing that every sensor measurement is safely committed to the platform.
- See also:
TopicWriter.push()
topic_create ¶
Creates a new topic within the active sequence.
This method performs a "Multi-Lane" resource assignment, granting the new
TopicWriter, its own connection from the pool
and a dedicated executor for background serialization and I/O.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The relative name of the new topic. |
required |
metadata
|
dict[str, Any]
|
Topic-specific user metadata. |
required |
ontology_type
|
Type[Serializable]
|
The |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
A |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside of a |
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
# Each writer gets its own assigned resources from the pools
imu_writer = seq_writer.topic_create(
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Another individual topic writer for the GPS device
gps_writer = seq_writer.topic_create(
topic_name="sensors/gps", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"role": "primary_gps",
"vendor": "satnavics",
"model": "snx-g500",
"firmware_version": "3.2.0",
"serial_number": "GPS-7C1F4A9B",
"interface": { # (2)!
"type": "UART",
"baudrate": 115200,
"protocol": "NMEA",
},
}, # The topic/sensor custom metadata
ontology_type=GPS, # The ontology type stored in this topic
)
# Push data
imu_writer.push( # (3)!
message=Message(
timestamp_ns=1700000000000,
data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
)
)
# ...
# Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server
# and closes all connections and pools
- See also:
MosaicoClient.sequence_create() - The metadata fields will be queryable via the
Querymechanism. The mechanism allows creating query expressions like:Topic.Q.user_metadata["interface.type"].eq("UART"). See also: - See also:
TopicWriter.push()
mosaicolabs.handlers.TopicWriter ¶
Manages a high-performance data stream for a single Mosaico topic.
The TopicWriter abstracts the complexity of the PyArrow Flight DoPut protocol,
handling internal buffering, serialization, and network transmission.
It accumulates records in memory and automatically flushes them to the server when
configured batch limits—defined by either byte size or record count—are exceeded.
Performance & Parallelism¶
If an executor pool is provided by the parent client, the TopicWriter performs
data serialization on background threads, preventing I/O operations from blocking
the main application logic.
Obtaining a Writer
End-users should not instantiate this class directly. Use the
SequenceWriter.topic_create()
factory method to obtain an active writer.
Internal constructor for TopicWriter.
Do not call this directly. Internal library modules should use the
_create() factory. Users must call
SequenceWriter.topic_create()
to obtain an initialized writer.
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
# Each writer gets its own assigned resources from the pools
imu_writer = seq_writer.topic_create( # (2)!
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Push data...
imu_writer.push( # (3)!
message=Message(
data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
timestamp_ns=1700000000000,
)
)
# Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
- See also:
MosaicoClient.sequence_create() - See also:
SequenceWriter.topic_create() - See also:
TopicWriter.push()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The name of the specific topic. |
required |
sequence_name
|
str
|
The name of the parent sequence. |
required |
client
|
FlightClient
|
The FlightClient used for data transmission. |
required |
state
|
_TopicWriteState
|
The internal state object managing buffers and streams. |
required |
config
|
WriterConfig
|
Operational configuration for batching and error handling. |
required |
push ¶
Adds a new record to the internal write buffer.
Records are accumulated in memory. If a push triggers a batch limit, the buffer is automatically serialized and transmitted to the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
A pre-constructed Message object. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If a buffer flush fails during the operation. |
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
# Each writer gets its own assigned resources from the pools
imu_writer = seq_writer.topic_create( # (2)!
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Another individual topic writer for the GPS device
gps_writer = seq_writer.topic_create(
topic_name="sensors/gps", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"role": "primary_gps",
"vendor": "satnavics",
"model": "snx-g500",
"firmware_version": "3.2.0",
"serial_number": "GPS-7C1F4A9B",
"interface": {
"type": "UART",
"baudrate": 115200,
"protocol": "NMEA",
},
}, # The topic/sensor custom metadata
ontology_type=GPS, # The ontology type stored in this topic
)
gps_msg = Message(timestamp_ns=1700000000100, data=GPS(...))
gps_writer.push(message=gps_msg)
# Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
- See also:
MosaicoClient.sequence_create() - See also:
SequenceWriter.topic_create()
is_active ¶
Returns True if the writing stream is open and the writer accepts new messages.