Skip to content

Writing Data

mosaicolabs.handlers.config.WriterConfig dataclass

WriterConfig(
    on_error, max_batch_size_bytes, max_batch_size_records
)

Configuration settings for Sequence and Topic writers.

Internal Usage

This is currently not a user-facing class. It is automatically instantiated by the MosaicoClient when allocating new SequenceWriter instances via sequence_create().

This dataclass defines the operational parameters for data ingestion, controlling both the error recovery strategy and the performance-critical buffering logic used by the SequenceWriter and TopicWriter.

on_error instance-attribute

on_error

Determines the terminal behavior when an exception occurs during the ingestion lifecycle.

max_batch_size_bytes instance-attribute

max_batch_size_bytes

The memory threshold in bytes before a data batch is flushed to the server.

When the internal buffer of a TopicWriter exceeds this value, it triggers a serialization and transmission event. Larger values increase throughput by reducing network overhead but require more client-side memory.

max_batch_size_records instance-attribute

max_batch_size_records

The threshold in row (record) count before a data batch is flushed to the server.

A flush is triggered whenever either this record limit or the max_batch_size_bytes limit is reached, ensuring that data is transmitted regularly even for topics with very small individual records.

mosaicolabs.handlers.SequenceWriter

SequenceWriter(
    *,
    sequence_name,
    client,
    connection_pool,
    executor_pool,
    metadata,
    config,
)

Bases: BaseSequenceWriter

Orchestrates the creation and data ingestion lifecycle of a Mosaico Sequence.

The SequenceWriter is the central controller for high-performance data writing. It manages the transition of a sequence through its lifecycle states: Create -> Write -> Finalize.

Key Responsibilities
  • Lifecycle Management: Coordinates creation, finalization, or abort signals with the server.
  • Resource Distribution: Implements a "Multi-Lane" architecture by distributing network connections from a Connection Pool and thread executors from an Executor Pool to individual TopicWriter instances. This ensures strict isolation and maximum parallelism between diverse data streams.
Usage Pattern

This class must be used within a with statement (Context Manager). The context entry triggers sequence registration on the server, while the exit handles automatic finalization or error cleanup based on the configured OnErrorPolicy.

Obtaining a Writer

Do not instantiate this class directly. Use the MosaicoClient.sequence_create() factory method.

Internal constructor for SequenceWriter.

Do not call this directly. Users must call MosaicoClient.sequence_create() to obtain an initialized writer.

Example
from mosaicolabs import MosaicoClient, OnErrorPolicy

# Open the connection with the Mosaico Client
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create( # (1)!
        sequence_name="mission_log_042",
        # Custom metadata for this data sequence.
        metadata={
            "driver": {
                "driver_id": "drv_sim_017",
                "role": "validation",
                "experience_level": "senior",
            },
            "location": {
                "city": "Milan",
                "country": "IT",
                "facility": "Downtown",
                "gps": {
                    "lat": 45.46481,
                    "lon": 9.19201,
                },
            },
        }
        on_error = OnErrorPolicy.Delete # Default
        ) as seq_writer:
            # Start creating topics and pushing data
            # (2)!

    # Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server
    # and closes all connections and pools
  1. See also: MosaicoClient.sequence_create()
  2. See also:

Parameters:

Name Type Description Default
sequence_name str

Unique name for the new sequence.

required
client FlightClient

The primary control FlightClient.

required
connection_pool Optional[_ConnectionPool]

Shared pool of data connections for parallel writing.

required
executor_pool Optional[_ExecutorPool]

Shared pool of thread executors for asynchronous I/O.

required
metadata dict[str, Any]

User-defined metadata dictionary.

required
config WriterConfig

Operational configuration (e.g., error policies, batch sizes).

required

sequence_status property

sequence_status

Returns the current operational status of the sequence.

Returns:

Type Description
SequenceStatus

topic_writer_exists

topic_writer_exists(topic_name)

Checks if a TopicWriter has already been initialized for the given name.

Parameters:

Name Type Description Default
topic_name str

The name of the topic to check.

required

Returns:

Type Description
bool

True if the topic writer exists locally, False otherwise.

list_topic_writers

list_topic_writers()

Returns the list of all topic names currently managed by this writer.

get_topic_writer

get_topic_writer(topic_name)

Retrieves an existing TopicWriter instance from the internal cache.

This method is particularly useful when ingesting data from unified recording formats where different sensor types (e.g., Vision, IMU, Odometry) are stored chronologically in a single stream or file.

In these scenarios, messages for various topics appear in an interleaved fashion. Using get_topic_writer allows the developer to:

  • Reuse Buffers: Efficiently switch between writers for different sensor streams.
  • Ensure Data Ordering: Maintain a consistent batching logic for each topic as you iterate through a mixed-content log.
  • Optimize Throughput: Leverage Mosaico's background I/O by routing all data for a specific identifier through a single, persistent writer instance.

Parameters:

Name Type Description Default
topic_name str

The unique name or identifier of the topic writer to retrieve.

required

Returns:

Type Description
Optional[TopicWriter]

The TopicWriter instance if it has been previously initialized within this SequenceWriter context, otherwise None.

Example

Processing a generic interleaved sensor log (like a ROS bag or a custom JSON log):

from mosaicolabs import SequenceWriter, IMU, Image

# Topic to Ontology Mapping: Defines the schema for each sensor stream
# Example: {"/camera": Image, "/imu": IMU}
topic_to_ontology = { ... }

# Adapter Factory: Maps raw sensor payloads to Mosaico Ontology instances
# Example: {"/imu": lambda p: IMU(acceleration=Vector3d.from_list(p['acc']), ...)}
adapter = { ... }

with client.sequence_create("physical_ai_trial_01") as seq_writer:
    # log_iterator represents an interleaved stream (e.g., ROSbags, MCAP, or custom logs).
    for ts, topic, payload in log_iterator:

        # Access the topic-specific buffer.
        # get_topic_writer retrieves a persistent writer from the internal cache
        twriter = seq_writer.get_topic_writer(topic)

        if twriter is None:
            # Dynamic Topic Registration.
            # If the topic is encountered for the first time, register it using the
            # pre-defined Ontology type to ensure data integrity.
            twriter = seq_writer.topic_create(
                topic_name=topic,
                ontology_type=topic_to_ontology[topic]
            )

        # Data Transformation & Ingestion.
        # The adapter converts the raw payload into a validated Mosaico object.
        # push() handles high-performance batching and asynchronous I/O to the rust backend.
        twriter.push( # (1)!
            message=Message(
                timestamp_ns=ts,
                data=adapter[topic](payload),
            )
        )

# SequenceWriter automatically calls _finalize() on all internal TopicWriters,
# guaranteeing that every sensor measurement is safely committed to the platform.
  1. See also: TopicWriter.push()

topic_create

topic_create(topic_name, metadata, ontology_type)

Creates a new topic within the active sequence.

This method performs a "Multi-Lane" resource assignment, granting the new TopicWriter, its own connection from the pool and a dedicated executor for background serialization and I/O.

Parameters:

Name Type Description Default
topic_name str

The relative name of the new topic.

required
metadata dict[str, Any]

Topic-specific user metadata.

required
ontology_type Type[Serializable]

The Serializable data model class defining the topic's schema.

required

Returns:

Type Description
Optional[TopicWriter]

A TopicWriter instance configured for parallel ingestion, or None if creation fails.

Raises:

Type Description
RuntimeError

If called outside of a with block.

Example
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create(...) as seq_writer: # (1)!
        # Create individual Topic Writers
        # Each writer gets its own assigned resources from the pools
        imu_writer = seq_writer.topic_create(
            topic_name="sensors/imu", # The univocal topic name
            metadata={ # The topic/sensor custom metadata
                "vendor": "inertix-dynamics",
                "model": "ixd-f100",
                "firmware_version": "1.2.0",
                "serial_number": "IMUF-9A31D72X",
                "calibrated":"false",
            },
            ontology_type=IMU, # The ontology type stored in this topic
        )

        # Another individual topic writer for the GPS device
        gps_writer = seq_writer.topic_create(
            topic_name="sensors/gps", # The univocal topic name
            metadata={ # The topic/sensor custom metadata
                "role": "primary_gps",
                "vendor": "satnavics",
                "model": "snx-g500",
                "firmware_version": "3.2.0",
                "serial_number": "GPS-7C1F4A9B",
                "interface": { # (2)!
                    "type": "UART",
                    "baudrate": 115200,
                    "protocol": "NMEA",
                },
            }, # The topic/sensor custom metadata
            ontology_type=GPS, # The ontology type stored in this topic
        )

        # Push data
        imu_writer.push( # (3)!
            message=Message(
                timestamp_ns=1700000000000,
                data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
            )
        )
        # ...

    # Exiting the block automatically flushes all topic buffers, finalizes the sequence on the server
    # and closes all connections and pools
  1. See also: MosaicoClient.sequence_create()
  2. The metadata fields will be queryable via the Query mechanism. The mechanism allows creating query expressions like: Topic.Q.user_metadata["interface.type"].eq("UART"). See also:
  3. See also: TopicWriter.push()

mosaicolabs.handlers.TopicWriter

TopicWriter(
    *, topic_name, sequence_name, client, state, config
)

Manages a high-performance data stream for a single Mosaico topic.

The TopicWriter abstracts the complexity of the PyArrow Flight DoPut protocol, handling internal buffering, serialization, and network transmission. It accumulates records in memory and automatically flushes them to the server when configured batch limits—defined by either byte size or record count—are exceeded.

Performance & Parallelism

If an executor pool is provided by the parent client, the TopicWriter performs data serialization on background threads, preventing I/O operations from blocking the main application logic.

Obtaining a Writer

End-users should not instantiate this class directly. Use the SequenceWriter.topic_create() factory method to obtain an active writer.

Internal constructor for TopicWriter.

Do not call this directly. Internal library modules should use the _create() factory. Users must call SequenceWriter.topic_create() to obtain an initialized writer.

Example
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create(...) as seq_writer: # (1)!
        # Create individual Topic Writers
        # Each writer gets its own assigned resources from the pools
        imu_writer = seq_writer.topic_create( # (2)!
            topic_name="sensors/imu", # The univocal topic name
            metadata={ # The topic/sensor custom metadata
                "vendor": "inertix-dynamics",
                "model": "ixd-f100",
                "firmware_version": "1.2.0",
                "serial_number": "IMUF-9A31D72X",
                "calibrated":"false",
            },
            ontology_type=IMU, # The ontology type stored in this topic
        )

        # Push data...
        imu_writer.push( # (3)!
            message=Message(
                data=IMU(acceleration=Vector3d(x=0, y=0, z=9.81), ...),
                timestamp_ns=1700000000000,
            )
        )
    # Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
  1. See also: MosaicoClient.sequence_create()
  2. See also: SequenceWriter.topic_create()
  3. See also: TopicWriter.push()

Parameters:

Name Type Description Default
topic_name str

The name of the specific topic.

required
sequence_name str

The name of the parent sequence.

required
client FlightClient

The FlightClient used for data transmission.

required
state _TopicWriteState

The internal state object managing buffers and streams.

required
config WriterConfig

Operational configuration for batching and error handling.

required

name property

name

Returns the name of the topic

push

push(message)

Adds a new record to the internal write buffer.

Records are accumulated in memory. If a push triggers a batch limit, the buffer is automatically serialized and transmitted to the server.

Parameters:

Name Type Description Default
message Message

A pre-constructed Message object.

required

Raises:

Type Description
Exception

If a buffer flush fails during the operation.

Example
with MosaicoClient.connect("localhost", 6726) as client:
    # Start the Sequence Orchestrator
    with client.sequence_create(...) as seq_writer: # (1)!
        # Create individual Topic Writers
        # Each writer gets its own assigned resources from the pools
        imu_writer = seq_writer.topic_create( # (2)!
            topic_name="sensors/imu", # The univocal topic name
            metadata={ # The topic/sensor custom metadata
                "vendor": "inertix-dynamics",
                "model": "ixd-f100",
                "firmware_version": "1.2.0",
                "serial_number": "IMUF-9A31D72X",
                "calibrated":"false",
            },
            ontology_type=IMU, # The ontology type stored in this topic
        )

        # Another individual topic writer for the GPS device
        gps_writer = seq_writer.topic_create(
            topic_name="sensors/gps", # The univocal topic name
            metadata={ # The topic/sensor custom metadata
                "role": "primary_gps",
                "vendor": "satnavics",
                "model": "snx-g500",
                "firmware_version": "3.2.0",
                "serial_number": "GPS-7C1F4A9B",
                "interface": {
                    "type": "UART",
                    "baudrate": 115200,
                    "protocol": "NMEA",
                },
            }, # The topic/sensor custom metadata
            ontology_type=GPS, # The ontology type stored in this topic
        )

        gps_msg = Message(timestamp_ns=1700000000100, data=GPS(...))
        gps_writer.push(message=gps_msg)
    # Exiting the seq_writer `with` block, the `_finalize()` method of all topic writers is called.
  1. See also: MosaicoClient.sequence_create()
  2. See also: SequenceWriter.topic_create()

is_active

is_active()

Returns True if the writing stream is open and the writer accepts new messages.