Writing Data
mosaicolabs.handlers.config.WriterConfig
dataclass
¶
Configuration for common settings for Sequence and Topic writers.
Internal Usage
This is currently not a user-facing class. It is extended
by the SessionWriterConfig.
This dataclass defines the operational parameters for data ingestion, controlling
both the error recovery strategy and the performance-critical buffering logic
used by the SequenceWriter and
TopicWriter.
max_batch_size_bytes
instance-attribute
¶
The memory threshold in bytes before a data batch is flushed to the server.
When the internal buffer of a TopicWriter
exceeds this value, it triggers a serialization and transmission event.
Larger values increase throughput by reducing network overhead but require more
client-side memory.
max_batch_size_records
instance-attribute
¶
The threshold in row (record) count before a data batch is flushed to the server.
A flush is triggered whenever either this record limit or the
max_batch_size_bytes limit is reached, ensuring that data is transmitted
regularly even for topics with very small individual records.
mosaicolabs.handlers.SequenceWriter ¶
Bases: _BaseSessionWriter
Orchestrates the creation and data ingestion lifecycle of a Mosaico Sequence.
The SequenceWriter is the central controller for high-performance data writing.
It manages the transition of a sequence through its lifecycle states: Create -> Write -> Finalize.
Usage Pattern
This class must be used within a with statement (Context Manager).
The context entry triggers sequence registration on the server, while the exit handles
automatic finalization or error cleanup based on the configured SessionLevelErrorPolicy.
Obtaining a Writer
Do not instantiate this class directly. Use the
MosaicoClient.sequence_create()
factory method.
Internal constructor for SequenceWriter.
Do not call this directly. Users must call
MosaicoClient.sequence_create()
to obtain an initialized writer.
Example
from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create( # (1)!
sequence_name="mission_log_042",
# Custom metadata for this data sequence.
metadata={
"driver": {
"driver_id": "drv_sim_017",
"role": "validation",
"experience_level": "senior",
},
"location": {
"city": "Milan",
"country": "IT",
"facility": "Downtown",
"gps": {
"lat": 45.46481,
"lon": 9.19201,
},
},
}
on_error = SessionLevelErrorPolicy.Delete
) as seq_writer:
# Start creating topics and pushing data
# (2)!
# Exiting the block automatically flushes all topic buffers and finalizes the sequence on the server
- See also:
MosaicoClient.sequence_create() - See also:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
Unique name for the new sequence. |
required |
client
|
FlightClient
|
The primary control FlightClient. |
required |
metadata
|
dict[str, Any]
|
User-defined metadata dictionary. |
required |
config
|
SessionWriterConfig
|
Operational configuration (e.g., error policies, batch sizes). |
required |
session_status
property
¶
Returns the current operational status of the session corresponding to this sequence write or update.
Returns:
| Type | Description |
|---|---|
SessionStatus
|
The |
session_locator
property
¶
Returns the locator of the session corresponding to this sequence write or update.
The locator format is: 'sequence_name:session_identifier'.
Returns:
| Type | Description |
|---|---|
str
|
The locator of the session. |
status
property
¶
Returns the current operational status of the sequence.
Returns:
| Type | Description |
|---|---|
SequenceStatus
|
The |
topic_writer_exists ¶
Checks if a TopicWriter has already been initialized
for the given name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The name of the topic to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the topic writer exists locally, False otherwise. |
list_topic_writers ¶
Returns the list of all topic names currently managed by this writer.
get_topic_writer ¶
Retrieves an existing TopicWriter instance from the internal cache.
This method is particularly useful when ingesting data from unified recording formats where different sensor types (e.g., Vision, IMU, Odometry) are stored chronologically in a single stream or file.
In these scenarios, messages for various topics appear in an interleaved fashion.
Using get_topic_writer allows the developer to:
- Reuse Buffers: Efficiently switch between writers for different sensor streams.
- Ensure Data Ordering: Maintain a consistent batching logic for each topic as you iterate through a mixed-content log.
- Optimize Throughput: Leverage Mosaico's background I/O by routing all data for a specific identifier through a single, persistent writer instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The unique name or identifier of the topic writer to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
The |
Example
Processing a generic interleaved sensor log (like a ROS bag or a custom JSON log):
from mosaicolabs import SequenceWriter, IMU, Image
# Topic to Ontology Mapping: Defines the schema for each sensor stream
# Example: {"/camera": Image, "/imu": IMU}
topic_to_ontology = { ... }
# Adapter Factory: Maps raw sensor payloads to Mosaico Ontology instances
# Example: {"/imu": lambda p: IMU(acceleration=Vector3d.from_list(p['acc']), ...)}
adapter = { ... }
with client.sequence_create("physical_ai_trial_01") as seq_writer:
# log_iterator represents an interleaved stream (e.g., ROSbags, MCAP, or custom logs).
for ts, topic, payload in log_iterator:
# Access the topic-specific buffer.
# get_topic_writer retrieves a persistent writer from the internal cache
twriter = seq_writer.get_topic_writer(topic)
if twriter is None:
# Dynamic Topic Registration.
# If the topic is encountered for the first time, register it using the
# pre-defined Ontology type to ensure data integrity.
twriter = seq_writer.topic_create(
topic_name=topic,
ontology_type=topic_to_ontology[topic]
)
# Data Transformation & Ingestion.
# The adapter converts the raw payload into a validated Mosaico object.
# push() handles high-performance batching and transmission to the rust backend.
twriter.push( # (1)!
message=Message(
timestamp_ns=ts,
data=adapter[topic](payload),
)
)
# SequenceWriter automatically calls _finalize() on all internal TopicWriters,
# guaranteeing that every sensor measurement is safely committed to the platform.
- See also:
TopicWriter.push()
topic_create ¶
topic_create(
topic_name, metadata, ontology_type, on_error=Raise
)
Creates a new topic within the active sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The relative name of the new topic. |
required |
metadata
|
dict[str, Any]
|
Topic-specific user metadata. |
required |
ontology_type
|
Type[Serializable]
|
The |
required |
on_error
|
TopicLevelErrorPolicy
|
The error policy to use in the |
Raise
|
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
A |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside of a |
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
imu_writer = seq_writer.topic_create(
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Another individual topic writer for the GPS device
gps_writer = seq_writer.topic_create(
topic_name="sensors/gps", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"role": "primary_gps",
"vendor": "satnavics",
"model": "snx-g500",
"firmware_version": "3.2.0",
"serial_number": "GPS-7C1F4A9B",
"interface": { # (2)!
"type": "UART",
"baudrate": 115200,
"protocol": "NMEA",
},
}, # The topic/sensor custom metadata
ontology_type=GPS, # The ontology type stored in this topic
)
# Push data
imu_writer.push( # (3)!
message=Message(
timestamp_ns=1700000000000,
data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
)
)
# ...
# Exiting the block automatically flushes all topic buffers and finalizes the sequence on the server
- See also:
MosaicoClient.sequence_create() - The metadata fields will be queryable via the
Querymechanism. The mechanism allows creating query expressions like:QueryTopic().with_user_metadata("interface.type", eq="UART"). See also: - See also:
TopicWriter.push()
mosaicolabs.handlers.TopicWriter ¶
Manages a high-performance data stream for a single Mosaico topic.
The TopicWriter abstracts the complexity of the PyArrow Flight DoPut protocol,
handling internal buffering, serialization, and network transmission.
It accumulates records in memory and automatically flushes them to the server when
configured batch limits—defined by either byte size or record count—are exceeded.
Obtaining a Writer
End-users should not instantiate this class directly. Use the
SequenceWriter.topic_create()
factory method to obtain an active writer.
Internal constructor for TopicWriter.
Do not call this directly. Internal library modules should use the
_create() factory. Users must call
SequenceWriter.topic_create()
to obtain an initialized writer.
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
imu_writer = seq_writer.topic_create( # (2)!
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Push data...
imu_writer.push( # (3)!
message=Message(
data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
timestamp_ns=1700000000000,
)
)
# Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
- See also:
MosaicoClient.sequence_create() - See also:
SequenceWriter.topic_create() - See also:
TopicWriter.push()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The name of the specific topic. |
required |
sequence_name
|
str
|
The name of the parent sequence. |
required |
client
|
FlightClient
|
The FlightClient used for data transmission. |
required |
state
|
_TopicWriteState
|
The internal state object managing buffers and streams. |
required |
config
|
TopicWriterConfig
|
Operational configuration for batching and error handling. |
required |
last_error
property
¶
Returns the last cached error, if any. The value is reset after a new successful push
Example
with twriter: # (1)!
mosaico_msg = custom_translator(twriter.name, raw_data) # Example helper function
twriter.push(message=mosaico_msg)
# Inspect failed processing/ingestion
if twriter.status == TopicWriterStatus.IgnoredLastError # (2)!
print(f"Error raised during last ingestion operation for topic {twriter.name}. Inner err: '{twriter.last_error}'")
twriteris aTopicWriterinstance, created withon_error=TopicLevelErrorPolicy.Ignore- Important: this check must be done outside the
withblock: any exception inside the context would exit the context prematurely.
is_active
property
¶
Returns True if the writing stream is open and the writer accepts new messages.
push ¶
Adds a new record to the internal write buffer.
Records are accumulated in memory. If a push triggers a batch limit, the buffer is automatically serialized and transmitted to the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
A pre-constructed Message object. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If a buffer flush fails during the operation. |
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
imu_writer = seq_writer.topic_create( # (2)!
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Another individual topic writer for the GPS device
gps_writer = seq_writer.topic_create(
topic_name="sensors/gps", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"role": "primary_gps",
"vendor": "satnavics",
"model": "snx-g500",
"firmware_version": "3.2.0",
"serial_number": "GPS-7C1F4A9B",
"interface": {
"type": "UART",
"baudrate": 115200,
"protocol": "NMEA",
},
}, # The topic/sensor custom metadata
ontology_type=GPS, # The ontology type stored in this topic
)
gps_msg = Message(timestamp_ns=1700000000100, data=GPS(...))
gps_writer.push(message=gps_msg)
# Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
- See also:
MosaicoClient.sequence_create() - See also:
SequenceWriter.topic_create()
mosaicolabs.handlers.SequenceUpdater ¶
Bases: _BaseSessionWriter
Orchestrates the sequence update and related data ingestion lifecycle of a Mosaico Sequence.
The SequenceUpdater is the central controller for high-performance data writing.
Usage Pattern
This class must be used within a with statement (Context Manager).
The context entry triggers sequence registration on the server, while the exit handles
automatic finalization or error cleanup based on the configured SessionLevelErrorPolicy.
Obtaining a Writer
Do not instantiate this class directly. Use the
SequenceHandler.update()
factory method.
Internal constructor for SequenceUpdater.
Do not call this directly. Users must call
SequenceHandler.update()
to obtain an initialized writer.
Example
from mosaicolabs import MosaicoClient, SessionLevelErrorPolicy
# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
# Get the handler for the sequence
seq_handler = client.sequence_handler("mission_log_042")
# Update the sequence
with seq_handler.update( # (1)!
on_error = SessionLevelErrorPolicy.Delete
) as seq_updater:
# Start creating topics and pushing data
# (2)!
# Exiting the block automatically flushes all topic buffers and finalizes the sequence on the server
- See also:
SequenceHandler.update() - See also:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_name
|
str
|
Unique name for the new sequence. |
required |
client
|
FlightClient
|
The primary control FlightClient. |
required |
config
|
SessionWriterConfig
|
Operational configuration (e.g., error policies, batch sizes). |
required |
session_status
property
¶
Returns the current operational status of the session corresponding to this sequence write or update.
Returns:
| Type | Description |
|---|---|
SessionStatus
|
The |
session_locator
property
¶
Returns the locator of the session corresponding to this sequence write or update.
The locator format is: 'sequence_name:session_identifier'.
Returns:
| Type | Description |
|---|---|
str
|
The locator of the session. |
topic_writer_exists ¶
Checks if a TopicWriter has already been initialized
for the given name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The name of the topic to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the topic writer exists locally, False otherwise. |
list_topic_writers ¶
Returns the list of all topic names currently managed by this writer.
get_topic_writer ¶
Retrieves an existing TopicWriter instance from the internal cache.
This method is particularly useful when ingesting data from unified recording formats where different sensor types (e.g., Vision, IMU, Odometry) are stored chronologically in a single stream or file.
In these scenarios, messages for various topics appear in an interleaved fashion.
Using get_topic_writer allows the developer to:
- Reuse Buffers: Efficiently switch between writers for different sensor streams.
- Ensure Data Ordering: Maintain a consistent batching logic for each topic as you iterate through a mixed-content log.
- Optimize Throughput: Leverage Mosaico's background I/O by routing all data for a specific identifier through a single, persistent writer instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The unique name or identifier of the topic writer to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
The |
Example
Processing a generic interleaved sensor log (like a ROS bag or a custom JSON log):
from mosaicolabs import SequenceWriter, IMU, Image
# Topic to Ontology Mapping: Defines the schema for each sensor stream
# Example: {"/camera": Image, "/imu": IMU}
topic_to_ontology = { ... }
# Adapter Factory: Maps raw sensor payloads to Mosaico Ontology instances
# Example: {"/imu": lambda p: IMU(acceleration=Vector3d.from_list(p['acc']), ...)}
adapter = { ... }
with client.sequence_create("physical_ai_trial_01") as seq_writer:
# log_iterator represents an interleaved stream (e.g., ROSbags, MCAP, or custom logs).
for ts, topic, payload in log_iterator:
# Access the topic-specific buffer.
# get_topic_writer retrieves a persistent writer from the internal cache
twriter = seq_writer.get_topic_writer(topic)
if twriter is None:
# Dynamic Topic Registration.
# If the topic is encountered for the first time, register it using the
# pre-defined Ontology type to ensure data integrity.
twriter = seq_writer.topic_create(
topic_name=topic,
ontology_type=topic_to_ontology[topic]
)
# Data Transformation & Ingestion.
# The adapter converts the raw payload into a validated Mosaico object.
# push() handles high-performance batching and transmission to the rust backend.
twriter.push( # (1)!
message=Message(
timestamp_ns=ts,
data=adapter[topic](payload),
)
)
# SequenceWriter automatically calls _finalize() on all internal TopicWriters,
# guaranteeing that every sensor measurement is safely committed to the platform.
- See also:
TopicWriter.push()
topic_create ¶
topic_create(
topic_name, metadata, ontology_type, on_error=Raise
)
Creates a new topic within the active sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_name
|
str
|
The relative name of the new topic. |
required |
metadata
|
dict[str, Any]
|
Topic-specific user metadata. |
required |
ontology_type
|
Type[Serializable]
|
The |
required |
on_error
|
TopicLevelErrorPolicy
|
The error policy to use in the |
Raise
|
Returns:
| Type | Description |
|---|---|
Optional[TopicWriter]
|
A |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside of a |
Example
with MosaicoClient.connect("localhost", 6726) as client:
# Start the Sequence Orchestrator
with client.sequence_create(...) as seq_writer: # (1)!
# Create individual Topic Writers
imu_writer = seq_writer.topic_create(
topic_name="sensors/imu", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"vendor": "inertix-dynamics",
"model": "ixd-f100",
"firmware_version": "1.2.0",
"serial_number": "IMUF-9A31D72X",
"calibrated":"false",
},
ontology_type=IMU, # The ontology type stored in this topic
)
# Another individual topic writer for the GPS device
gps_writer = seq_writer.topic_create(
topic_name="sensors/gps", # The univocal topic name
metadata={ # The topic/sensor custom metadata
"role": "primary_gps",
"vendor": "satnavics",
"model": "snx-g500",
"firmware_version": "3.2.0",
"serial_number": "GPS-7C1F4A9B",
"interface": { # (2)!
"type": "UART",
"baudrate": 115200,
"protocol": "NMEA",
},
}, # The topic/sensor custom metadata
ontology_type=GPS, # The ontology type stored in this topic
)
# Push data
imu_writer.push( # (3)!
message=Message(
timestamp_ns=1700000000000,
data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
)
)
# ...
# Exiting the block automatically flushes all topic buffers and finalizes the sequence on the server
- See also:
MosaicoClient.sequence_create() - The metadata fields will be queryable via the
Querymechanism. The mechanism allows creating query expressions like:QueryTopic().with_user_metadata("interface.type", eq="UART"). See also: - See also:
TopicWriter.push()