1313# limitations under the License.
1414
1515from __future__ import annotations
16- from typing import AsyncIterable , TYPE_CHECKING
16+ from typing import TYPE_CHECKING
1717from google .cloud .firestore_v1 import pipeline_stages as stages
1818from google .cloud .firestore_v1 .base_pipeline import _BasePipeline
19+ from google .cloud .firestore_v1 .pipeline_result import AsyncPipelineStream
20+ from google .cloud .firestore_v1 .pipeline_result import PipelineSnapshot
21+ from google .cloud .firestore_v1 .pipeline_result import PipelineResult
1922
2023if TYPE_CHECKING : # pragma: NO COVER
2124 import datetime
2225 from google .cloud .firestore_v1 .async_client import AsyncClient
23- from google .cloud .firestore_v1 .pipeline_result import PipelineResult
2426 from google .cloud .firestore_v1 .async_transaction import AsyncTransaction
27+ from google .cloud .firestore_v1 .pipeline_expressions import Constant
28+ from google .cloud .firestore_v1 .types .document import Value
29+ from google .cloud .firestore_v1 .query_profile import PipelineExplainOptions
2530
2631
2732class AsyncPipeline (_BasePipeline ):
@@ -41,7 +46,7 @@ class AsyncPipeline(_BasePipeline):
4146 ... .collection("books")
4247 ... .where(Field.of("published").gt(1980))
4348 ... .select("title", "author")
44- ... async for result in pipeline.execute ():
49+ ... async for result in pipeline.stream ():
4550 ... print(result)
4651
4752 Use `client.pipeline()` to create instances of this class.
@@ -59,15 +64,18 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage):
5964
6065 async def execute (
6166 self ,
67+ * ,
6268 transaction : "AsyncTransaction" | None = None ,
6369 read_time : datetime .datetime | None = None ,
64- ) -> list [PipelineResult ]:
70+ explain_options : PipelineExplainOptions | None = None ,
71+ index_mode : str | None = None ,
72+ additional_options : dict [str , Value | Constant ] = {},
73+ ) -> PipelineSnapshot [PipelineResult ]:
6574 """
6675 Executes this pipeline and returns results as a list
6776
6877 Args:
69- transaction
70- (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
78+ transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
7179 An existing transaction that this query will run in.
7280 If a ``transaction`` is used and it already has write operations
7381 added, this method cannot be used (i.e. read-after-write is not
@@ -76,25 +84,33 @@ async def execute(
7684 time. This must be a microsecond precision timestamp within the past one hour, or
7785 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
7886 within the past 7 days. For the most accurate results, use UTC timezone.
87+ explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
88+ Options to enable query profiling for this query. When set,
89+ explain_metrics will be available on the returned list.
90+ index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
91+ Firestore will reject the request if there is not appropiate indexes to serve the query.
92+ additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
93+ These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
7994 """
80- return [
81- result
82- async for result in self .stream (
83- transaction = transaction , read_time = read_time
84- )
85- ]
95+ kwargs = {k : v for k , v in locals ().items () if k != "self" }
96+ stream = AsyncPipelineStream (PipelineResult , self , ** kwargs )
97+ results = [result async for result in stream ]
98+ return PipelineSnapshot (results , stream )
8699
87- async def stream (
100+ def stream (
88101 self ,
89- transaction : "AsyncTransaction" | None = None ,
102+ * ,
90103 read_time : datetime .datetime | None = None ,
91- ) -> AsyncIterable [PipelineResult ]:
104+ transaction : "AsyncTransaction" | None = None ,
105+ explain_options : PipelineExplainOptions | None = None ,
106+ index_mode : str | None = None ,
107+ additional_options : dict [str , Value | Constant ] = {},
108+ ) -> AsyncPipelineStream [PipelineResult ]:
92109 """
93- Process this pipeline as a stream, providing results through an Iterable
110+ Process this pipeline as a stream, providing results through an AsyncIterable
94111
95112 Args:
96- transaction
97- (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
113+ transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
98114 An existing transaction that this query will run in.
99115 If a ``transaction`` is used and it already has write operations
100116 added, this method cannot be used (i.e. read-after-write is not
@@ -103,10 +119,13 @@ async def stream(
103119 time. This must be a microsecond precision timestamp within the past one hour, or
104120 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
105121 within the past 7 days. For the most accurate results, use UTC timezone.
122+ explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
123+ Options to enable query profiling for this query. When set,
124+ explain_metrics will be available on the returned generator.
125+ index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
126+ Firestore will reject the request if there is not appropiate indexes to serve the query.
127+ additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
128+ These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
106129 """
107- request = self ._prep_execute_request (transaction , read_time )
108- async for response in await self ._client ._firestore_api .execute_pipeline (
109- request
110- ):
111- for result in self ._execute_response_helper (response ):
112- yield result
130+ kwargs = {k : v for k , v in locals ().items () if k != "self" }
131+ return AsyncPipelineStream (PipelineResult , self , ** kwargs )
0 commit comments