Query Builders¶
mosaicolabs.models.query.builders ¶
This module provides the high-level "Fluent" API for constructing complex searches across the Mosaico Data Platform.
It implements a Domain-Specific Language that allows users to filter Sequences, Topics, and Ontology data using a type-safe, method-chaining interface.
Key Components:
Query: The root container that aggregates multiple specialized sub-queries.QueryOntologyCatalog: For fine-grained filtering based on sensor-specific field values (e.g.,IMU.Q.acceleration.x > 9.8).QueryTopic: Specifically for filtering topic-level metadata.QuerySequence: Specifically for filtering sequence-level metadata.
QueryOntologyCatalog ¶
A top-level query object for the Data Catalog that combines multiple sensor-field expressions.
This builder allows for fine-grained filtering based on the actual values contained within sensor payloads
(e.g., IMU acceleration, GPS coordinates, or custom telemetry).
It produces a "flat" dictionary output where field paths utilize dot-notation (e.g., "imu.acceleration.x").
This class is designed to work with the .Q query proxy injected into every
Serializable data ontology model.
You can use this proxy on any registered sensor class (like IMU,
Vector3d,
Point3d), etc.
to create type-safe expressions.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for a specific data value (using constructor)
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0)) # Using constructor
.with_expression(IMU.Q.acceleration.y.gt(5.0)) # Using with_expression
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter for a specific component value and extract the first and last occurrence times
qresponse = client.query(
QueryOntologyCatalog(IMU.Q.acceleration.x.lt(-4.0), include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.y.gt(5.0))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
The constructor initializes the query with an optional list of
_QueryCatalogExpression objects, generated
via <Model>.Q. proxy, where model is any of the available data ontology (e.g. IMU.Q, GPS.Q, String.Q, etc.)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of expressions, generated via the |
()
|
include_timestamp_range
|
Optional[bool]
|
If |
None
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported type. |
ValueError
|
If an operator does not start with the required '$' prefix. |
NotImplementedError
|
If a duplicate key (field path) is detected within the same query. |
with_expression ¶
Adds a new _QueryCatalogExpression
expression to the query using a fluent interface.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Chain multiple sensor filters together
qresponse = client.query(
QueryOntologyCatalog()
.with_expression(GPS.Q.status.satellites.geq(8))
.with_expression(GPS.Q.position.x.between([44.0, 45.0]))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
# Filter for a specific component value and extract the first and last occurrence times
qresponse = client.query(
QueryOntologyCatalog(include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.x.lt(-4.0))
.with_expression(IMU.Q.acceleration.y.gt(5.0))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A valid expression generated via the |
required |
Returns:
| Type | Description |
|---|---|
QueryOntologyCatalog
|
The |
to_dict ¶
Serializes the ontology expressions into a flat dictionary for the platform API.
Example Output
{"imu.timestamp_ns": {"$between": [...]}, "imu.acceleration.x": {"$leq": 10}}
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary containing all merged sensor-field expressions. |
QueryTopic ¶
A top-level query object for Topic data that combines multiple expressions with a logical AND.
This builder handles the complex partitioning required to query both flat system fields
(like name or ontology_tag) and nested dictionary fields (like user_metadata).
The resulting dictionary output preserves this hierarchical structure for server-side processing.
Example
from mosaicolabs import MosaicoClient, Image, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Query for all 'image' topics created in a specific timeframe, matching some metadata (key, value) pair
qresponse = client.query(
QueryTopic()
.with_ontology_tag(Image.ontology_tag())
.with_created_timestamp(time_start=Time.from_float(1700000000))
.with_expression(Topic.Q.user_metadata["camera_id.serial_number"].eq("ABC123_XYZ"))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
The constructor initializes the query with an optional list of
_QueryTopicExpression objects, generated
via Topic.Q. proxy.
This builder leverages the .Q query proxy on the user_metadata
field of the Topic model to provide
a type-safe, fluent interface for filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported |
ValueError
|
If an operator does not follow the required internal '$' prefix format. |
NotImplementedError
|
If a duplicate key is detected, as the current implementation enforces unique keys per query. |
with_expression ¶
Adds a new expression to the query using a fluent interface.
This is the way to add filters for nested metadata.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Target a specific known topic path
qresponse = client.query(
QueryTopic().with_expression(Topic.Q.user_metadata["version"].eq("1.0"))
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_name ¶
Adds an exact match filter for the topic 'name' field.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Target a specific known topic path
qresponse = client.query(
QueryTopic().with_name("vehicle/front/camera")
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The exact name of the topic to match. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_name_match ¶
Adds a partial (fuzzy) match filter for the topic 'name' field.
This performs an 'in-between' search (equivalent to %name%) on the full
sequence/topic path.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Search for all topics containing the word 'camera'
qresponse = client.query(
QueryTopic().with_name_match("camera")
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The string pattern to search for within the topic name. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_ontology_tag ¶
Adds an exact match filter for the 'ontology_tag' field.
This filter restricts the search to topics belonging to a specific data type identifier (e.g., 'imu', 'gnss').
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Filter for IMU-only data streams
qresponse = client.query(
QueryTopic().with_ontology_tag(IMU.ontology_tag())
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
ontology_tag()
method of the desired ontology class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ontology_tag
|
str
|
The string tag (e.g., 'imu', 'gps') to filter by. |
required |
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
with_created_timestamp ¶
Adds a filter for the 'created_timestamp' field using high-precision Time.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find sequences created during a specific day
qresponse = client.query(
QueryTopic().with_created_timestamp(
time_start=Time.from_float(1704067200.0), # 2024-01-01
time_end=Time.from_float(1704153600.0) # 2024-01-02
)
)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time_start
|
Optional[Time]
|
Optional lower bound (inclusive). |
None
|
time_end
|
Optional[Time]
|
Optional upper bound (inclusive). |
None
|
Returns:
| Type | Description |
|---|---|
QueryTopic
|
The |
Raises:
| Type | Description |
|---|---|
ValueError
|
If both bounds are None or if |
to_dict ¶
Serializes the query into a nested dictionary for the platform API.
This method partitions expressions into two groups:
- System Fields: Standard fields like
nameare kept in the root dictionary. - Metadata Fields: Fields starting with a dictionary-type model key (e.g.,
user_metadata) are stripped of their prefix and nested under that key.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary representation of the query, e.g., |
QuerySequence ¶
A top-level query object for Sequence data that combines multiple expressions with a logical AND.
This builder handles the complex partitioning required to query both flat system fields
(like name) and nested dictionary fields (like user_metadata).
The resulting dictionary output preserves this hierarchical structure for server-side processing.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Search for sequences by project name and creation date
qresponse = client.query(
QuerySequence()
.with_expression(Sequence.Q.user_metadata["project"].eq("Apollo"))
.with_created_timestamp(time_start=Time.from_float(1690000000.0))
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
The constructor initializes the query with an optional list of
_QuerySequenceExpression objects, generated
via Sequence.Q. proxy.
This builder leverages the .Q query proxy specifically on the user_metadata
field of the Sequence model to provide
a type-safe, fluent interface for filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*expressions
|
_QueryExpression
|
A variable number of |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If an expression is not of the supported |
ValueError
|
If an operator does not follow the required internal '$' prefix format. |
NotImplementedError
|
If a duplicate key is detected, as the current implementation enforces unique keys per query. |
with_expression ¶
Adds a new expression to the query using a fluent interface.
This is the way to add filters for nested metadata.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Target a specific known topic path
qresponse = client.query(
QuerySequence().with_expression(Sequence.Q.user_metadata["project"].eq("Apollo"))
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
_QueryExpression
|
A |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_name ¶
Adds an exact match filter for the sequence 'name' field.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find all sequences with name equal to 'test_winter_01'
qresponse = client.query(
QuerySequence().with_name("test_winter_01")
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The exact name of the sequence to match. |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_name_match ¶
Adds a partial (fuzzy) match filter for the sequence 'name' field.
This performs an 'in-between' search (equivalent to %name%) on the sequence name.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find all sequences with name containing 'calibration_run_'
qresponse = client.query(
QuerySequence().with_name_match("calibration_run_")
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The string pattern to search for within the sequence name. |
required |
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
with_created_timestamp ¶
Adds a filter for the 'created_timestamp' field using high-precision Time.
Example
from mosaicolabs import MosaicoClient, Topic, QuerySequence
with MosaicoClient.connect("localhost", 6726) as client:
# Find sequences created during a specific time range
qresponse = client.query(
QuerySequence().with_created_timestamp(
time_start=Time.from_float(1704067200.0), # 2024-01-01
time_end=Time.from_float(1704153600.0) # 2024-01-02
)
)
# Inspect the response
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {[topic.name for topic in item.topics]}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time_start
|
Optional[Time]
|
Optional lower bound (inclusive). |
None
|
time_end
|
Optional[Time]
|
Optional upper bound (inclusive). |
None
|
Returns:
| Type | Description |
|---|---|
QuerySequence
|
The |
Raises:
| Type | Description |
|---|---|
ValueError
|
If both bounds are |
to_dict ¶
Serializes the query into a nested dictionary for the platform API.
This method partitions expressions into:
- Normal Fields: Fields like
nameare kept in a flat dictionary. - Metadata Fields: Fields targeting
user_metadataare collected and nested.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary representation preserving the hierarchical structure. |
Query ¶
A top-level "root" query object that aggregates multiple specialized sub-queries into a single request body.
This class serves as the final envelope for multi-domain queries, ensuring that different query types (Topic, Sequence, Ontology) do not overwrite each other.
Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Establish a connection to the Mosaico Data Platform
with MosaicoClient.connect("localhost", 6726) as client:
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_expression(
# Use query proxy for generating a _QuerySequenceExpression
Sequence.Q.user_metadata["environment.visibility"].lt(50)
)
.with_name_match("test_drive"),
# Append a filter with deep time-series data discovery and measurement time windowing
QueryOntologyCatalog(include_timestamp_range=True)
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
.with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
)
# Perform the server side query
qresponse = client.query(query=query)
# Inspect the response
if qresponse is not None:
# Results are automatically grouped by Sequence for easier data management
for item in qresponse:
print(f"Sequence: {item.sequence.name}")
print(f"Topics: {{topic.name:
[topic.timestamp_range.start, topic.timestamp_range.end]
for topic in item.topics}}")
Initializes the root query with a set of sub-queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
A variable number of sub-query objects (e.g., |
()
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If duplicate query types are detected in the initial arguments. |
append ¶
Adds additional sub-queries to the existing root query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*queries
|
QueryableProtocol
|
Additional sub-query instances. |
()
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If an appended query type is already present in the request. |
Example
from mosaicolabs import QueryOntologyCatalog, QuerySequence, Query, IMU, MosaicoClient
# Build a filter with name pattern and metadata-related expression
query = Query(
# Append a filter for sequence metadata
QuerySequence()
.with_expression(
# Use query proxy for generating a _QuerySequenceExpression
Sequence.Q.user_metadata["environment.visibility"].lt(50)
)
.with_name_match("test_drive")
)
# Append a filter with deep time-series data discovery and measurement time windowing
query.append(
QueryOntologyCatalog()
.with_expression(IMU.Q.acceleration.x.gt(5.0))
.with_expression(IMU.Q.header.stamp.sec.gt(1700134567))
.with_expression(IMU.Q.header.stamp.nanosec.between([123456, 789123])),
)
to_dict ¶
Serializes the entire multi-domain query into the final JSON dictionary.
It orchestrates the conversion by calling the .name() and .to_dict()
methods of each contained sub-query.
Example Output
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The final aggregated query dictionary. |