Query Builders
mosaicolabs.models.query.builders ¶
This module provides the high-level "Fluent" API for constructing complex searches across the Mosaico Data Platform.
It implements a Domain-Specific Language that allows users to filter Sequences, Topics, and Ontology data using a type-safe, method-chaining interface.
Key Components:
Query: The root container that aggregates multiple specialized sub-queries.QueryOntologyCatalog: For fine-grained filtering based on sensor-specific field values (e.g.,IMU.Q.acceleration.x > 9.8).QueryTopic: Specifically for filtering topic-level metadata.QuerySequence: Specifically for filtering sequence-level metadata.
QueryOntologyCatalog ¶
A top-level query object for the Data Catalog that combines multiple sensor-field expressions.
This builder allows for fine-grained filtering based on the actual values contained within sensor payloads
(e.g., IMU acceleration, GPS coordinates, or custom telemetry).
It produces a "flat" dictionary output where field paths utilize dot-notation (e.g., "imu.acceleration.x").
This class is designed to work with the .Q query proxy injected into every
Serializable data ontology model.
You can use this proxy on any registered sensor class (like IMU,
Vector3d,
Point3d), etc.
to create type-safe expressions.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for a specific data value (using constructor)
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0)) # Using constructor
.with_expression(IMU.Q.acceleration.y.gt(5.0)) # Using with_expression
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter for a specific component value and extract the first and last occurrence times
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0), include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.y.gt(5.0))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
The constructor initializes the query with an optional list of
_QueryCatalogExpression objects, generated
via <Model>.Q. proxy, where model is any of the available data ontology (e.g. IMU.Q, GPS.Q, String.Q, etc.)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of expressions, generated via the |
()
|
include_timestamp_range
|
Optional[bool]
|
If |
None
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported type. |
ValueError
|
If an operator does not start with the required '$' prefix. |
NotImplementedError
|
If a duplicate key (field path) is detected within the same query. |
with_expression ¶
Adds a new _QueryCatalogExpression
expression to the query using a fluent interface.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Chain multiple sensor filters together
qresponse = client.query(
QueryOntologyCatalog()
.with_expression(GPS.Q.status.satellites.geq(8))
.with_expression(GPS.Q.position.x.between([44.0, 45.0]))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter for a specific component value and extract the first and last occurrence times
qresponse = client.query(
QueryOntologyCatalog(include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.x.lt(-4.0))
.with_expression(IMU.Q.acceleration.y.gt(5.0))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A valid expression generated via the |
required |
Returns:
| Type | Description |
|---|---|
QueryOntologyCatalog
|
The |
to_dict ¶
Serializes the ontology expressions into a flat dictionary for the platform API.
Example Output
{"imu.timestamp_ns": {"$between": [...]}, "imu.acceleration.x": {"$leq": 10}}
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary containing all merged sensor-field expressions. |
QueryTopic ¶
A top-level query object for Topic data that combines multiple expressions with a logical AND.
This builder handles the complex partitioning required to query both flat system fields
(like name or ontology_tag) and nested dictionary fields (like user_metadata).
The resulting dictionary output preserves this hierarchical structure for server-side processing.
Example
from mosaicolabs import MosaicoClient, Image, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Query for all 'image' topics created in a specific timeframe, matching some metadata (key, value) pair
qresponse = client.query(
QueryTopic()
.with_ontology_tag(Image.ontology_tag())
.with_created_timestamp(time_start=Time.from_float(1700000000))
.with_user_metadata("camera_id.serial_number", eq="ABC123_XYZ")
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
The constructor initializes the query with an optional list of
_QueryTopicExpression objects, generated
via Topic.Q. proxy.
Deprecated
The constructor is deprecated. Use the with_user_metadata()
convenience method instead, if wanting to query the user metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported |
ValueError
|
If an operator does not follow the required internal '$' prefix format. |
NotImplementedError
|
If a duplicate key is detected, as the current implementation enforces unique keys per query. |
with_expression ¶
Adds a new expression to the query using a fluent interface.
Deprecated API
This is the old way to add filters for nested metadata. Use the
with_user_metadata
convenience method
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_user_metadata ¶
Appends a metadata filter to the query using a fluent, operator-based interface.
This method simplifies metadata discovery by allowing direct filtering on the user_metadata
dictionary of the Topic. Each call adds a logical AND condition to the query.
Note
The previous method using Topic.Q.user_metadata is maintained for backward
compatibility but is scheduled for removal in release 0.4.0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The metadata key to filter on (e.g., "sensor_id"). Supports dot-notation for nested dictionary access (e.g., "calibration.focal_length"). |
required |
**operator_kwargs
|
Any
|
A single keyword argument where the key is the operator
and the value is the comparison target, e.g. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no operator is provided, if multiple operators are provided in a single call, or if an unsupported operator is used. |
Operators Supported
eq: Equal toneq: Not equal togt: Greater thangeq: Greater than or equal tolt: Less thanleq: Less than or equal tobetween: Range filter (expects a list of [min, max])
Example
Returns:
| Name | Type | Description |
|---|---|---|
QueryTopic |
QueryTopic
|
The current instance to support method chaining. |
with_name ¶
Adds an exact match filter for the topic 'name' field.
Example
from mosaicolabs import MosaicoClient, Topic, QueryTopic
with MosaicoClient.connect("localhost", 6726) as client:
# Target a specific known topic path
qresponse = client.query(
QueryTopic().with_name("vehicle/front/camera")
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The exact name of the topic to match. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_name_match ¶
Adds a partial (fuzzy) match filter for the topic 'name' field.
This performs an 'in-between' search (equivalent to %name%) on the full
sequence/topic path.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Search for all topics containing the word 'camera'
qresponse = client.query(
QueryTopic().with_name_match("camera")
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The string pattern to search for within the topic name. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_ontology_tag ¶
Adds an exact match filter for the 'ontology_tag' field.
This filter restricts the search to topics belonging to a specific data type identifier (e.g., 'imu', 'gnss').
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for IMU-only data streams
qresponse = client.query(
QueryTopic().with_ontology_tag(IMU.ontology_tag())
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
ontology_tag()
method of the desired ontology class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ontology_tag
|
str
|
The string tag (e.g., 'imu', 'gps') to filter by. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_created_timestamp ¶
Adds a filter for the 'created_at_ns' field using high-precision Time.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find sequences created during a specific day
qresponse = client.query(
QueryTopic().with_created_timestamp(
time_start=Time.from_float(1704067200.0), # 2024-01-01
time_end=Time.from_float(1704153600.0) # 2024-01-02
)
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time_start
|
Optional[Time]
|
Optional lower bound (inclusive). |
None
|
time_end
|
Optional[Time]
|
Optional upper bound (inclusive). |
None
|
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
Raises:
| Type | Description |
|---|---|
ValueError
|
If both bounds are None or if |
to_dict ¶
Serializes the query into a nested dictionary for the platform API.
This method partitions expressions into two groups:
- System Fields: Standard fields like
nameare kept in the root dictionary. - Metadata Fields: Fields starting with a dictionary-type model key (e.g.,
user_metadata) are stripped of their prefix and nested under that key.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary representation of the query, e.g., |
QuerySequence ¶
A top-level query object for Sequence data that combines multiple expressions with a logical AND.
This builder handles the complex partitioning required to query both flat system fields
(like name) and nested dictionary fields (like user_metadata).
The resulting dictionary output preserves this hierarchical structure for server-side processing.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Search for sequences by project name and creation date
qresponse = client.query(
QuerySequence()
.with_user_metadata("project", eq="Apollo")
.with_created_timestamp(time_start=Time.from_float(1690000000.0))
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
The constructor initializes the query with an optional list of
_QuerySequenceExpression objects, generated
via Sequence.Q. proxy.
Deprecated
The constructor is deprecated. Use the with_user_metadata()
convenience method instead, if wanting to query the user metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported |
ValueError
|
If an operator does not follow the required internal '$' prefix format. |
NotImplementedError
|
If a duplicate key is detected, as the current implementation enforces unique keys per query. |
with_expression ¶
Adds a new expression to the query using a fluent interface.
Deprecated API
This is the old way to add filters for nested metadata. Use the
with_user_metadata
convenience method
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_user_metadata ¶
Appends a metadata filter to the query using a fluent, operator-based interface.
This method simplifies metadata discovery by allowing direct filtering on the user_metadata
dictionary of the Sequence. Each call adds a logical AND condition to the query.
Note
The previous method using Sequence.Q.user_metadata is maintained for backward
compatibility but is scheduled for removal in release 0.4.0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The metadata key to filter on (e.g., "project"). Supports dot-notation for nested dictionary access (e.g., "vehicle.id"). |
required |
**operator_kwargs
|
Any
|
A single keyword argument where the key is the operator
and the value is the comparison target, e.g. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no operator is provided, if multiple operators are provided in a single call, or if an unsupported operator is used. |
Operators Supported
eq: Equal toneq: Not equal togt: Greater thangeq: Greater than or equal tolt: Less thanleq: Less than or equal tobetween: Range filter (expects a list of [min, max])
Example
Returns:
| Name | Type | Description |
|---|---|---|
QuerySequence |
QuerySequence
|
The current instance to support method chaining. |
with_name ¶
Adds an exact match filter for the sequence 'name' field.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find all sequences with name equal to 'test_winter_01'
qresponse = client.query(
QuerySequence().with_name("test_winter_01")
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The exact name of the sequence to match. |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_name_match ¶
Adds a partial (fuzzy) match filter for the sequence 'name' field.
This performs an 'in-between' search (equivalent to %name%) on the sequence name.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find all sequences with name containing 'calibration_run_'
qresponse = client.query(
QuerySequence().with_name_match("calibration_run_")
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The string pattern to search for within the sequence name. |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_created_timestamp ¶
Adds a filter for the 'created_at_ns' field using high-precision Time.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find sequences created during a specific time range
qresponse = client.query(
QuerySequence().with_created_timestamp(
time_start=Time.from_float(1704067200.0), # 2024-01-01
time_end=Time.from_float(1704153600.0) # 2024-01-02
)
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time_start
|
Optional[Time]
|
Optional lower bound (inclusive). |
None
|
time_end
|
Optional[Time]
|
Optional upper bound (inclusive). |
None
|
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
Raises:
| Type | Description |
|---|---|
ValueError
|
If both bounds are |
to_dict ¶
Serializes the query into a nested dictionary for the platform API.
This method partitions expressions into:
- Normal Fields: Fields like
nameare kept in a flat dictionary. - Metadata Fields: Fields targeting
user_metadataare collected and nested.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary representation preserving the hierarchical structure. |
Query ¶
A top-level "root" query object that aggregates multiple specialized sub-queries into a single request body.
This class serves as the final envelope for multi-domain queries, ensuring that different query types (Topic, Sequence, Ontology) do not overwrite each other.
Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_user_metadata("environment.visibility", lt=50)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog(include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.timestamp_ns.gt(1700134567))
)
# Perform the server side query
qresponse = client.query(query=query)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
Initializes the root query with a set of sub-queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
A variable number of sub-query objects (e.g., |
()
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If duplicate query types are detected in the initial arguments. |
append ¶
Adds additional sub-queries to the existing root query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
Additional sub-query instances. |
()
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If an appended query type is already present in the request. |
Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_user_metadata("environment.visibility", lt=50)
.with_name_match("test_drive")
)
# Append a filter with deep time-series data discovery and measurement time windowing
query.append(
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.timestamp_ns.gt(1700134567))
)
to_dict ¶
Serializes the entire multi-domain query into the final JSON dictionary.
It orchestrates the conversion by calling the .name() and .to_dict()
methods of each contained sub-query.
Example Output
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The final aggregated query dictionary. |