Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.

Commit 4848fbe

Browse files
feat: pipeline explain stats and index mode (#1128)
1 parent 2d3ed73 commit 4848fbe

11 files changed

Lines changed: 1034 additions & 178 deletions

google/cloud/firestore_v1/async_pipeline.py

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from typing import AsyncIterable, TYPE_CHECKING
16+
from typing import TYPE_CHECKING
1717
from google.cloud.firestore_v1 import pipeline_stages as stages
1818
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
19+
from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream
20+
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
21+
from google.cloud.firestore_v1.pipeline_result import PipelineResult
1922

2023
if TYPE_CHECKING: # pragma: NO COVER
2124
import datetime
2225
from google.cloud.firestore_v1.async_client import AsyncClient
23-
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2426
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
27+
from google.cloud.firestore_v1.pipeline_expressions import Constant
28+
from google.cloud.firestore_v1.types.document import Value
29+
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions
2530

2631

2732
class AsyncPipeline(_BasePipeline):
@@ -41,7 +46,7 @@ class AsyncPipeline(_BasePipeline):
4146
... .collection("books")
4247
... .where(Field.of("published").gt(1980))
4348
... .select("title", "author")
44-
... async for result in pipeline.execute():
49+
... async for result in pipeline.stream():
4550
... print(result)
4651
4752
Use `client.pipeline()` to create instances of this class.
@@ -59,15 +64,18 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage):
5964

6065
async def execute(
6166
self,
67+
*,
6268
transaction: "AsyncTransaction" | None = None,
6369
read_time: datetime.datetime | None = None,
64-
) -> list[PipelineResult]:
70+
explain_options: PipelineExplainOptions | None = None,
71+
index_mode: str | None = None,
72+
additional_options: dict[str, Value | Constant] = {},
73+
) -> PipelineSnapshot[PipelineResult]:
6574
"""
6675
Executes this pipeline and returns results as a list
6776
6877
Args:
69-
transaction
70-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
78+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
7179
An existing transaction that this query will run in.
7280
If a ``transaction`` is used and it already has write operations
7381
added, this method cannot be used (i.e. read-after-write is not
@@ -76,25 +84,33 @@ async def execute(
7684
time. This must be a microsecond precision timestamp within the past one hour, or
7785
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
7886
within the past 7 days. For the most accurate results, use UTC timezone.
87+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
88+
Options to enable query profiling for this query. When set,
89+
explain_metrics will be available on the returned list.
90+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
91+
Firestore will reject the request if there is not appropiate indexes to serve the query.
92+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
93+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
7994
"""
80-
return [
81-
result
82-
async for result in self.stream(
83-
transaction=transaction, read_time=read_time
84-
)
85-
]
95+
kwargs = {k: v for k, v in locals().items() if k != "self"}
96+
stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
97+
results = [result async for result in stream]
98+
return PipelineSnapshot(results, stream)
8699

87-
async def stream(
100+
def stream(
88101
self,
89-
transaction: "AsyncTransaction" | None = None,
102+
*,
90103
read_time: datetime.datetime | None = None,
91-
) -> AsyncIterable[PipelineResult]:
104+
transaction: "AsyncTransaction" | None = None,
105+
explain_options: PipelineExplainOptions | None = None,
106+
index_mode: str | None = None,
107+
additional_options: dict[str, Value | Constant] = {},
108+
) -> AsyncPipelineStream[PipelineResult]:
92109
"""
93-
Process this pipeline as a stream, providing results through an Iterable
110+
Process this pipeline as a stream, providing results through an AsyncIterable
94111
95112
Args:
96-
transaction
97-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
113+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
98114
An existing transaction that this query will run in.
99115
If a ``transaction`` is used and it already has write operations
100116
added, this method cannot be used (i.e. read-after-write is not
@@ -103,10 +119,13 @@ async def stream(
103119
time. This must be a microsecond precision timestamp within the past one hour, or
104120
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
105121
within the past 7 days. For the most accurate results, use UTC timezone.
122+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
123+
Options to enable query profiling for this query. When set,
124+
explain_metrics will be available on the returned generator.
125+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
126+
Firestore will reject the request if there is not appropiate indexes to serve the query.
127+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
128+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
106129
"""
107-
request = self._prep_execute_request(transaction, read_time)
108-
async for response in await self._client._firestore_api.execute_pipeline(
109-
request
110-
):
111-
for result in self._execute_response_helper(response):
112-
yield result
130+
kwargs = {k: v for k, v in locals().items() if k != "self"}
131+
return AsyncPipelineStream(PipelineResult, self, **kwargs)

google/cloud/firestore_v1/base_pipeline.py

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from typing import Iterable, Sequence, TYPE_CHECKING
16+
from typing import Sequence, TYPE_CHECKING
1717
from google.cloud.firestore_v1 import pipeline_stages as stages
1818
from google.cloud.firestore_v1.types.pipeline import (
1919
StructuredPipeline as StructuredPipeline_pb,
2020
)
2121
from google.cloud.firestore_v1.vector import Vector
2222
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
23-
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
24-
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2523
from google.cloud.firestore_v1.pipeline_expressions import (
2624
AggregateFunction,
2725
AliasedExpression,
@@ -30,14 +28,10 @@
3028
BooleanExpression,
3129
Selectable,
3230
)
33-
from google.cloud.firestore_v1 import _helpers
3431

3532
if TYPE_CHECKING: # pragma: NO COVER
36-
import datetime
3733
from google.cloud.firestore_v1.client import Client
3834
from google.cloud.firestore_v1.async_client import AsyncClient
39-
from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse
40-
from google.cloud.firestore_v1.transaction import BaseTransaction
4135

4236

4337
class _BasePipeline:
@@ -88,9 +82,10 @@ def __repr__(self):
8882
stages_str = ",\n ".join([repr(s) for s in self.stages])
8983
return f"{cls_str}(\n {stages_str}\n)"
9084

91-
def _to_pb(self) -> StructuredPipeline_pb:
85+
def _to_pb(self, **options) -> StructuredPipeline_pb:
9286
return StructuredPipeline_pb(
93-
pipeline={"stages": [s._to_pb() for s in self.stages]}
87+
pipeline={"stages": [s._to_pb() for s in self.stages]},
88+
options=options,
9489
)
9590

9691
def _append(self, new_stage):
@@ -99,47 +94,6 @@ def _append(self, new_stage):
9994
"""
10095
return self.__class__._create_with_stages(self._client, *self.stages, new_stage)
10196

102-
def _prep_execute_request(
103-
self,
104-
transaction: BaseTransaction | None,
105-
read_time: datetime.datetime | None,
106-
) -> ExecutePipelineRequest:
107-
"""
108-
shared logic for creating an ExecutePipelineRequest
109-
"""
110-
database_name = (
111-
f"projects/{self._client.project}/databases/{self._client._database}"
112-
)
113-
transaction_id = (
114-
_helpers.get_transaction_id(transaction)
115-
if transaction is not None
116-
else None
117-
)
118-
request = ExecutePipelineRequest(
119-
database=database_name,
120-
transaction=transaction_id,
121-
structured_pipeline=self._to_pb(),
122-
read_time=read_time,
123-
)
124-
return request
125-
126-
def _execute_response_helper(
127-
self, response: ExecutePipelineResponse
128-
) -> Iterable[PipelineResult]:
129-
"""
130-
shared logic for unpacking an ExecutePipelineReponse into PipelineResults
131-
"""
132-
for doc in response.results:
133-
ref = self._client.document(doc.name) if doc.name else None
134-
yield PipelineResult(
135-
self._client,
136-
doc.fields,
137-
ref,
138-
response._pb.execution_time,
139-
doc._pb.create_time if doc.create_time else None,
140-
doc._pb.update_time if doc.update_time else None,
141-
)
142-
14397
def add_fields(self, *fields: Selectable) -> "_BasePipeline":
14498
"""
14599
Adds new fields to outputs from previous stages.

google/cloud/firestore_v1/pipeline.py

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from typing import Iterable, TYPE_CHECKING
16+
from typing import TYPE_CHECKING
1717
from google.cloud.firestore_v1 import pipeline_stages as stages
1818
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
19+
from google.cloud.firestore_v1.pipeline_result import PipelineStream
20+
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
21+
from google.cloud.firestore_v1.pipeline_result import PipelineResult
1922

2023
if TYPE_CHECKING: # pragma: NO COVER
2124
import datetime
2225
from google.cloud.firestore_v1.client import Client
23-
from google.cloud.firestore_v1.pipeline_result import PipelineResult
26+
from google.cloud.firestore_v1.pipeline_expressions import Constant
2427
from google.cloud.firestore_v1.transaction import Transaction
28+
from google.cloud.firestore_v1.types.document import Value
29+
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions
2530

2631

2732
class Pipeline(_BasePipeline):
@@ -56,15 +61,18 @@ def __init__(self, client: Client, *stages: stages.Stage):
5661

5762
def execute(
5863
self,
64+
*,
5965
transaction: "Transaction" | None = None,
6066
read_time: datetime.datetime | None = None,
61-
) -> list[PipelineResult]:
67+
explain_options: PipelineExplainOptions | None = None,
68+
index_mode: str | None = None,
69+
additional_options: dict[str, Value | Constant] = {},
70+
) -> PipelineSnapshot[PipelineResult]:
6271
"""
6372
Executes this pipeline and returns results as a list
6473
6574
Args:
66-
transaction
67-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
75+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
6876
An existing transaction that this query will run in.
6977
If a ``transaction`` is used and it already has write operations
7078
added, this method cannot be used (i.e. read-after-write is not
@@ -73,23 +81,33 @@ def execute(
7381
time. This must be a microsecond precision timestamp within the past one hour, or
7482
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
7583
within the past 7 days. For the most accurate results, use UTC timezone.
84+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
85+
Options to enable query profiling for this query. When set,
86+
explain_metrics will be available on the returned list.
87+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
88+
Firestore will reject the request if there is not appropiate indexes to serve the query.
89+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
90+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
7691
"""
77-
return [
78-
result
79-
for result in self.stream(transaction=transaction, read_time=read_time)
80-
]
92+
kwargs = {k: v for k, v in locals().items() if k != "self"}
93+
stream = PipelineStream(PipelineResult, self, **kwargs)
94+
results = [result for result in stream]
95+
return PipelineSnapshot(results, stream)
8196

8297
def stream(
8398
self,
99+
*,
84100
transaction: "Transaction" | None = None,
85101
read_time: datetime.datetime | None = None,
86-
) -> Iterable[PipelineResult]:
102+
explain_options: PipelineExplainOptions | None = None,
103+
index_mode: str | None = None,
104+
additional_options: dict[str, Value | Constant] = {},
105+
) -> PipelineStream[PipelineResult]:
87106
"""
88107
Process this pipeline as a stream, providing results through an Iterable
89108
90109
Args:
91-
transaction
92-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
110+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
93111
An existing transaction that this query will run in.
94112
If a ``transaction`` is used and it already has write operations
95113
added, this method cannot be used (i.e. read-after-write is not
@@ -98,7 +116,13 @@ def stream(
98116
time. This must be a microsecond precision timestamp within the past one hour, or
99117
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
100118
within the past 7 days. For the most accurate results, use UTC timezone.
119+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
120+
Options to enable query profiling for this query. When set,
121+
explain_metrics will be available on the returned generator.
122+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
123+
Firestore will reject the request if there is not appropiate indexes to serve the query.
124+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
125+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
101126
"""
102-
request = self._prep_execute_request(transaction, read_time)
103-
for response in self._client._firestore_api.execute_pipeline(request):
104-
yield from self._execute_response_helper(response)
127+
kwargs = {k: v for k, v in locals().items() if k != "self"}
128+
return PipelineStream(PipelineResult, self, **kwargs)

0 commit comments

Comments
 (0)