Skip to content

Query Builders

mosaicolabs.models.query.builders

This module provides the high-level "Fluent" API for constructing complex searches across the Mosaico Data Platform.

It implements a Domain-Specific Language that allows users to filter Sequences, Topics, and Ontology data using a type-safe, method-chaining interface.

Key Components:

  • Query: The root container that aggregates multiple specialized sub-queries.
  • QueryOntologyCatalog: For fine-grained filtering based on sensor-specific field values (e.g., IMU.Q.acceleration.x > 9.8).
  • QueryTopic: Specifically for filtering topic-level metadata.
  • QuerySequence: Specifically for filtering sequence-level metadata.

QueryOntologyCatalog

QueryOntologyCatalog(
    *expressions, include_timestamp_range=None
)

A top-level query object for the Data Catalog that combines multiple sensor-field expressions.

This builder allows for fine-grained filtering based on the actual values contained within sensor payloads (e.g., IMU acceleration, GPS coordinates, or custom telemetry). It produces a "flat" dictionary output where field paths utilize dot-notation (e.g., "imu.acceleration.x").

This class is designed to work with the .Q query proxy injected into every Serializable data ontology model. You can use this proxy on any registered sensor class (like IMU, Vector3d, Point3d), etc. to create type-safe expressions.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter for a specific data value (using constructor)
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0)) # Using constructor
        .with_expression(IMU.Q.acceleration.y.gt(5.0)) # Using with_expression
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

    # Filter for a specific component value and extract the first and last occurrence times
    qresponse = client.query(
        QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0), include_timestamp_range=True)
        .with_expression(IMU.Q.acceleration.y.gt(5.0))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {{topic.name:
                        [topic.timestamp_range.start, topic.timestamp_range.end]
                        for topic in item.topics}}")

The constructor initializes the query with an optional list of _QueryCatalogExpression objects, generated via <Model>.Q. proxy, where model is any of the available data ontology (e.g. IMU.Q, GPS.Q, String.Q, etc.)

Parameters:

Name Type Description Default
*expressions _QueryExpression

A variable number of expressions, generated via the .Q proxy on an ontology model.

()
include_timestamp_range Optional[bool]

If True, the server will return the start and end timestamps corresponding to the temporal bounds of the matched data.

None

Raises:

Type Description
TypeError

If an expression is not of the supported type.

ValueError

If an operator does not start with the required '$' prefix.

NotImplementedError

If a duplicate key (field path) is detected within the same query.

with_expression

with_expression(expr)

Adds a new _QueryCatalogExpression expression to the query using a fluent interface.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Chain multiple sensor filters together
    qresponse = client.query(
        QueryOntologyCatalog()
        .with_expression(GPS.Q.status.satellites.geq(8))
        .with_expression(GPS.Q.position.x.between([44.0, 45.0]))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

    # Filter for a specific component value and extract the first and last occurrence times
    qresponse = client.query(
        QueryOntologyCatalog(include_timestamp_range=True)
        .with_expression(IMU.Q.acceleration.x.lt(-4.0))
        .with_expression(IMU.Q.acceleration.y.gt(5.0))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {{topic.name:
                        [topic.timestamp_range.start, topic.timestamp_range.end]
                        for topic in item.topics}}")

Parameters:

Name Type Description Default
expr _QueryExpression

A valid expression generated via the .Q proxy on an ontology model, e.g., GPS.Q.status.satellites.leq(10).

required

Returns:

Type Description
QueryOntologyCatalog

The QueryOntologyCatalog instance for method chaining.

name

name()

Returns the top-level key ('ontology') used for nesting inside a root Query.

to_dict

to_dict()

Serializes the ontology expressions into a flat dictionary for the platform API.

Example Output

{"imu.timestamp_ns": {"$between": [...]}, "imu.acceleration.x": {"$leq": 10}}

Returns:

Type Description
Dict[str, Any]

A dictionary containing all merged sensor-field expressions.

QueryTopic

QueryTopic(*expressions)

A top-level query object for Topic data that combines multiple expressions with a logical AND.

This builder handles the complex partitioning required to query both flat system fields (like name or ontology_tag) and nested dictionary fields (like user_metadata). The resulting dictionary output preserves this hierarchical structure for server-side processing.

Example
from mosaicolabs import MosaicoClient, Image, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Query for all 'image' topics created in a specific timeframe, matching some metadata (key, value) pair
    qresponse = client.query(
        QueryTopic()
        .with_ontology_tag(Image.ontology_tag())
        .with_created_timestamp(time_start=Time.from_float(1700000000))
        .with_expression(Topic.Q.user_metadata["camera_id.serial_number"].eq("ABC123_XYZ"))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

The constructor initializes the query with an optional list of _QueryTopicExpression objects, generated via Topic.Q. proxy.

This builder leverages the .Q query proxy on the user_metadata field of the Topic model to provide a type-safe, fluent interface for filtering.

Parameters:

Name Type Description Default
*expressions _QueryExpression

A variable number of Topic.Q (_QueryTopicExpression) expression objects.

()

Raises:

Type Description
TypeError

If an expression is not of the supported Topic.Q type.

ValueError

If an operator does not follow the required internal '$' prefix format.

NotImplementedError

If a duplicate key is detected, as the current implementation enforces unique keys per query.

with_expression

with_expression(expr)

Adds a new expression to the query using a fluent interface.

This is the way to add filters for nested metadata.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Target a specific known topic path
    qresponse = client.query(
        QueryTopic().with_expression(Topic.Q.user_metadata["version"].eq("1.0"))
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
expr _QueryExpression

A _QueryTopicExpression constructed via a Topic.Q proxy.

required

Returns:

Type Description
QueryTopic

The QueryTopic instance for method chaining.

with_name

with_name(name)

Adds an exact match filter for the topic 'name' field.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Target a specific known topic path
    qresponse = client.query(
        QueryTopic().with_name("vehicle/front/camera")
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
name str

The exact name of the topic to match.

required

Returns:

Type Description
QueryTopic

The QueryTopic instance for method chaining.

with_name_match

with_name_match(name)

Adds a partial (fuzzy) match filter for the topic 'name' field.

This performs an 'in-between' search (equivalent to %name%) on the full sequence/topic path.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Search for all topics containing the word 'camera'
    qresponse = client.query(
        QueryTopic().with_name_match("camera")
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
name str

The string pattern to search for within the topic name.

required

Returns:

Type Description
QueryTopic

The QueryTopic instance for method chaining.

with_ontology_tag

with_ontology_tag(ontology_tag)

Adds an exact match filter for the 'ontology_tag' field.

This filter restricts the search to topics belonging to a specific data type identifier (e.g., 'imu', 'gnss').

Example

from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Filter for IMU-only data streams
    qresponse = client.query(
        QueryTopic().with_ontology_tag(IMU.ontology_tag())
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")
Note: To ensure compatibility and avoid hardcoding strings, it is highly recommended to retrieve the tag dynamically using the ontology_tag() method of the desired ontology class.

Parameters:

Name Type Description Default
ontology_tag str

The string tag (e.g., 'imu', 'gps') to filter by.

required

Returns:

Type Description
QueryTopic

The QueryTopic instance for method chaining.

with_created_timestamp

with_created_timestamp(time_start=None, time_end=None)

Adds a filter for the 'created_timestamp' field using high-precision Time.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Find sequences created during a specific day
    qresponse = client.query(
        QueryTopic().with_created_timestamp(
            time_start=Time.from_float(1704067200.0), # 2024-01-01
            time_end=Time.from_float(1704153600.0)    # 2024-01-02
        )
    )

    # Inspect the response
    if qresponse is not None:
        # Results are automatically grouped by Sequence for easier data management
        for item in qresponse:
            print(f"Sequence: {item.sequence.name}")
            print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
time_start Optional[Time]

Optional lower bound (inclusive).

None
time_end Optional[Time]

Optional upper bound (inclusive).

None

Returns:

Type Description
QueryTopic

The QueryTopic instance for method chaining.

Raises:

Type Description
ValueError

If both bounds are None or if time_start > time_end.

name

name()

Returns the top-level key ('topic') used when nesting this query inside a root Query.

to_dict

to_dict()

Serializes the query into a nested dictionary for the platform API.

This method partitions expressions into two groups:

  1. System Fields: Standard fields like name are kept in the root dictionary.
  2. Metadata Fields: Fields starting with a dictionary-type model key (e.g., user_metadata) are stripped of their prefix and nested under that key.

Returns:

Type Description
Dict[str, Any]

A dictionary representation of the query, e.g., {"name": {"$eq": "..."}, "user_metadata": {"key": {"$eq": "..."}}}.

QuerySequence

QuerySequence(*expressions)

A top-level query object for Sequence data that combines multiple expressions with a logical AND.

This builder handles the complex partitioning required to query both flat system fields (like name) and nested dictionary fields (like user_metadata). The resulting dictionary output preserves this hierarchical structure for server-side processing.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Search for sequences by project name and creation date
    qresponse = client.query(
        QuerySequence()
        .with_expression(Sequence.Q.user_metadata["project"].eq("Apollo"))
        .with_created_timestamp(time_start=Time.from_float(1690000000.0))
    )

    # Inspect the response
    for item in qresponse:
        print(f"Sequence: {item.sequence.name}")
        print(f"Topics: {[topic.name for topic in item.topics]}")

The constructor initializes the query with an optional list of _QuerySequenceExpression objects, generated via Sequence.Q. proxy.

This builder leverages the .Q query proxy specifically on the user_metadata field of the Sequence model to provide a type-safe, fluent interface for filtering.

Parameters:

Name Type Description Default
*expressions _QueryExpression

A variable number of Sequence.Q (_QuerySequenceExpression) objects.

()

Raises:

Type Description
TypeError

If an expression is not of the supported Sequence.Q type.

ValueError

If an operator does not follow the required internal '$' prefix format.

NotImplementedError

If a duplicate key is detected, as the current implementation enforces unique keys per query.

with_expression

with_expression(expr)

Adds a new expression to the query using a fluent interface.

This is the way to add filters for nested metadata.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Target a specific known topic path
    qresponse = client.query(
        QuerySequence().with_expression(Sequence.Q.user_metadata["project"].eq("Apollo"))
    )

    # Inspect the response
    for item in qresponse:
        print(f"Sequence: {item.sequence.name}")
        print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
expr _QueryExpression

A _QuerySequenceExpression constructed via a Sequence.Q proxy.

required

Returns:

Type Description
QuerySequence

The QuerySequence instance for method chaining.

with_name

with_name(name)

Adds an exact match filter for the sequence 'name' field.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Find all sequences with name equal to 'test_winter_01'
    qresponse = client.query(
        QuerySequence().with_name("test_winter_01")
    )

    # Inspect the response
    for item in qresponse:
        print(f"Sequence: {item.sequence.name}")
        print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
name str

The exact name of the sequence to match.

required

Returns:

Type Description
QuerySequence

The QuerySequence instance for method chaining.

with_name_match

with_name_match(name)

Adds a partial (fuzzy) match filter for the sequence 'name' field.

This performs an 'in-between' search (equivalent to %name%) on the sequence name.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Find all sequences with name containing 'calibration_run_'
    qresponse = client.query(
        QuerySequence().with_name_match("calibration_run_")
    )

    # Inspect the response
    for item in qresponse:
        print(f"Sequence: {item.sequence.name}")
        print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
name str

The string pattern to search for within the sequence name.

required

Returns:

Type Description
QuerySequence

The QuerySequence instance for method chaining.

with_created_timestamp

with_created_timestamp(time_start=None, time_end=None)

Adds a filter for the 'created_timestamp' field using high-precision Time.

Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence

with MosaicoClient.connect("localhost", 6726) as client:
    # Find sequences created during a specific time range
    qresponse = client.query(
        QuerySequence().with_created_timestamp(
            time_start=Time.from_float(1704067200.0), # 2024-01-01
            time_end=Time.from_float(1704153600.0)    # 2024-01-02
        )
    )

    # Inspect the response
    for item in qresponse:
        print(f"Sequence: {item.sequence.name}")
        print(f"Topics: {[topic.name for topic in item.topics]}")

Parameters:

Name Type Description Default
time_start Optional[Time]

Optional lower bound (inclusive).

None
time_end Optional[Time]

Optional upper bound (inclusive).

None

Returns:

Type Description
QuerySequence

The QuerySequence instance for method chaining.

Raises:

Type Description
ValueError

If both bounds are None or if time_start > time_end.

name

name()

Returns the top-level key ('sequence') used for nesting inside a root Query.

to_dict

to_dict()

Serializes the query into a nested dictionary for the platform API.

This method partitions expressions into:

  1. Normal Fields: Fields like name are kept in a flat dictionary.
  2. Metadata Fields: Fields targeting user_metadata are collected and nested.

Returns:

Type Description
Dict[str, Any]

A dictionary representation preserving the hierarchical structure.

Query

Query(*queries)

A top-level "root" query object that aggregates multiple specialized sub-queries into a single request body.

This class serves as the final envelope for multi-domain queries, ensuring that different query types (Topic, Sequence, Ontology) do not overwrite each other.

Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
    # Build a filter with name pattern and metadata-related expression
    query = Query(
        # Append a filter for sequence metadata
        QuerySequence()
        .with_expression(
            # Use query proxy for generating a _QuerySequenceExpression
            Sequence.Q.user_metadata["environment.visibility"].lt(50)
        )
        .with_name_match("test_drive"),
        # Append a filter with deep time-series data discovery and measurement time windowing
        QueryOntologyCatalog(include_timestamp_range=True)
        .with_expression(IMU.Q.acceleration.x.gt(5.0))
        .with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
        .with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
    )
    # Perform the server side query
    qresponse = client.query(query=query)
    # Inspect the response
        if qresponse is not None:
            # Results are automatically grouped by Sequence for easier data management
            for item in qresponse:
                print(f"Sequence: {item.sequence.name}")
                print(f"Topics: {{topic.name:
                            [topic.timestamp_range.start, topic.timestamp_range.end]
                            for topic in item.topics}}")

Initializes the root query with a set of sub-queries.

Parameters:

Name Type Description Default
*queries QueryableProtocol

A variable number of sub-query objects (e.g., QueryTopic(), QuerySequence()).

()

Raises:

Type Description
ValueError

If duplicate query types are detected in the initial arguments.

append

append(*queries)

Adds additional sub-queries to the existing root query.

Parameters:

Name Type Description Default
*queries QueryableProtocol

Additional sub-query instances.

()

Raises:

Type Description
ValueError

If an appended query type is already present in the request.

Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient

# Build a filter with name pattern and metadata-related expression
query = Query(
    # Append a filter for sequence metadata
    QuerySequence()
    .with_expression(
        # Use query proxy for generating a _QuerySequenceExpression
        Sequence.Q.user_metadata["environment.visibility"].lt(50)
    )
    .with_name_match("test_drive")
)

# Append a filter with deep time-series data discovery and measurement time windowing
query.append(
    QueryOntologyCatalog()
    .with_expression(IMU.Q.acceleration.x.gt(5.0))
    .with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
    .with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
)

to_dict

to_dict()

Serializes the entire multi-domain query into the final JSON dictionary.

It orchestrates the conversion by calling the .name() and .to_dict() methods of each contained sub-query.

Example Output
{
    "topic": { ... topic filters ... },
    "sequence": { ... sequence filters ... },
    "ontology": { ... ontology filters ... }
}

Returns:

Type Description
Dict[str, Any]

The final aggregated query dictionary.