2525import com .google .api .gax .rpc .StreamController ;
2626import com .google .cloud .Timestamp ;
2727import com .google .cloud .firestore .v1 .FirestoreSettings ;
28+ import com .google .common .collect .ImmutableMap ;
2829import com .google .firestore .v1 .RunAggregationQueryRequest ;
2930import com .google .firestore .v1 .RunAggregationQueryResponse ;
3031import com .google .firestore .v1 .RunQueryRequest ;
4041import java .util .Map ;
4142import java .util .Objects ;
4243import java .util .Set ;
43- import java .util .concurrent .atomic .AtomicBoolean ;
4444import javax .annotation .Nonnull ;
4545import javax .annotation .Nullable ;
4646
4949public class AggregateQuery {
5050 @ Nonnull private final Query query ;
5151
52- @ Nonnull private List <AggregateField > aggregateFieldList ;
52+ @ Nonnull private final List <AggregateField > aggregateFieldList ;
5353
54- @ Nonnull private Map <String , String > aliasMap ;
54+ @ Nonnull private final Map <String , String > aliasMap ;
5555
5656 AggregateQuery (@ Nonnull Query query , @ Nonnull List <AggregateField > aggregateFields ) {
5757 this .query = query ;
@@ -75,6 +75,26 @@ public ApiFuture<AggregateQuerySnapshot> get() {
7575 return get (null , null );
7676 }
7777
78+ /**
79+ * Plans and optionally executes this query. Returns an ApiFuture that will be resolved with the
80+ * planner information, statistics from the query execution (if any), and the query results (if
81+ * any).
82+ *
83+ * @return An ApiFuture that will be resolved with the planner information, statistics from the
84+ * query execution (if any), and the query results (if any).
85+ */
86+ @ Nonnull
87+ public ApiFuture <ExplainResults <AggregateQuerySnapshot >> explain (ExplainOptions options ) {
88+ AggregateQueryExplainResponseDeliverer responseDeliverer =
89+ new AggregateQueryExplainResponseDeliverer (
90+ /* transactionId= */ null ,
91+ /* readTime= */ null ,
92+ /* startTimeNanos= */ query .rpcContext .getClock ().nanoTime (),
93+ /* explainOptions= */ options );
94+ runQuery (responseDeliverer );
95+ return responseDeliverer .getFuture ();
96+ }
97+
7898 @ Nonnull
7999 ApiFuture <AggregateQuerySnapshot > get (
80100 @ Nullable final ByteString transactionId , @ Nullable com .google .protobuf .Timestamp readTime ) {
@@ -85,25 +105,34 @@ ApiFuture<AggregateQuerySnapshot> get(
85105 return responseDeliverer .getFuture ();
86106 }
87107
88- private void runQuery (AggregateQueryResponseDeliverer responseDeliverer ) {
108+ private < T > void runQuery (ResponseDeliverer < T > responseDeliverer ) {
89109 RunAggregationQueryRequest request =
90- toProto (responseDeliverer .transactionId , responseDeliverer .readTime );
91- AggregateQueryResponseObserver responseObserver =
92- new AggregateQueryResponseObserver (responseDeliverer );
110+ toProto (
111+ responseDeliverer .getTransactionId (),
112+ responseDeliverer .getReadTime (),
113+ responseDeliverer .getExplainOptions ());
114+ AggregateQueryResponseObserver <T > responseObserver =
115+ new AggregateQueryResponseObserver <T >(responseDeliverer );
93116 ServerStreamingCallable <RunAggregationQueryRequest , RunAggregationQueryResponse > callable =
94117 query .rpcContext .getClient ().runAggregationQueryCallable ();
95118 query .rpcContext .streamRequest (request , responseObserver , callable );
96119 }
97120
98- private final class AggregateQueryResponseDeliverer {
121+ @ Nonnull
122+ private Map <String , Value > convertServerAggregateFieldsMapToClientAggregateFieldsMap (
123+ @ Nonnull Map <String , Value > data ) {
124+ ImmutableMap .Builder <String , Value > builder = ImmutableMap .builder ();
125+ data .forEach ((serverAlias , value ) -> builder .put (aliasMap .get (serverAlias ), value ));
126+ return builder .build ();
127+ }
99128
100- @ Nullable private final ByteString transactionId ;
101- @ Nullable private final com .google .protobuf .Timestamp readTime ;
129+ private abstract static class ResponseDeliverer <T > {
130+ private final @ Nullable ByteString transactionId ;
131+ private final @ Nullable com .google .protobuf .Timestamp readTime ;
102132 private final long startTimeNanos ;
103- private final SettableApiFuture <AggregateQuerySnapshot > future = SettableApiFuture .create ();
104- private final AtomicBoolean isFutureCompleted = new AtomicBoolean (false );
133+ private final SettableApiFuture <T > future = SettableApiFuture .create ();
105134
106- AggregateQueryResponseDeliverer (
135+ ResponseDeliverer (
107136 @ Nullable ByteString transactionId ,
108137 @ Nullable com .google .protobuf .Timestamp readTime ,
109138 long startTimeNanos ) {
@@ -112,52 +141,148 @@ private final class AggregateQueryResponseDeliverer {
112141 this .startTimeNanos = startTimeNanos ;
113142 }
114143
115- ApiFuture <AggregateQuerySnapshot > getFuture () {
144+ @ Nullable
145+ ByteString getTransactionId () {
146+ return transactionId ;
147+ }
148+
149+ @ Nullable
150+ com .google .protobuf .Timestamp getReadTime () {
151+ return readTime ;
152+ }
153+
154+ long getStartTimeNanos () {
155+ return startTimeNanos ;
156+ }
157+
158+ @ Nullable
159+ ExplainOptions getExplainOptions () {
160+ return null ;
161+ }
162+
163+ ApiFuture <T > getFuture () {
116164 return future ;
117165 }
118166
119- void deliverResult (@ Nonnull Map <String , Value > data , Timestamp readTime ) {
120- if (isFutureCompleted .compareAndSet (false , true )) {
121- Map <String , Value > mappedData = new HashMap <>();
122- data .forEach ((serverAlias , value ) -> mappedData .put (aliasMap .get (serverAlias ), value ));
123- future .set (new AggregateQuerySnapshot (AggregateQuery .this , readTime , mappedData ));
124- }
167+ protected void setFuture (T value ) {
168+ future .set (value );
125169 }
126170
127171 void deliverError (Throwable throwable ) {
128- if (isFutureCompleted .compareAndSet (false , true )) {
129- future .setException (throwable );
172+ future .setException (throwable );
173+ }
174+
175+ abstract void deliverResult (
176+ @ Nullable Map <String , Value > serverData ,
177+ Timestamp readTime ,
178+ @ Nullable ExplainMetrics metrics );
179+ }
180+
181+ private class AggregateQueryResponseDeliverer extends ResponseDeliverer <AggregateQuerySnapshot > {
182+ AggregateQueryResponseDeliverer (
183+ @ Nullable ByteString transactionId ,
184+ @ Nullable com .google .protobuf .Timestamp readTime ,
185+ long startTimeNanos ) {
186+ super (transactionId , readTime , startTimeNanos );
187+ }
188+
189+ @ Override
190+ void deliverResult (
191+ @ Nullable Map <String , Value > serverData ,
192+ Timestamp readTime ,
193+ @ Nullable ExplainMetrics metrics ) {
194+ if (serverData == null ) {
195+ deliverError (new RuntimeException ("Did not receive any aggregate query results." ));
196+ return ;
130197 }
198+ setFuture (
199+ new AggregateQuerySnapshot (
200+ AggregateQuery .this ,
201+ readTime ,
202+ convertServerAggregateFieldsMapToClientAggregateFieldsMap (serverData )));
131203 }
132204 }
133205
134- private final class AggregateQueryResponseObserver
135- implements ResponseObserver <RunAggregationQueryResponse > {
206+ private final class AggregateQueryExplainResponseDeliverer
207+ extends ResponseDeliverer <ExplainResults <AggregateQuerySnapshot >> {
208+ private final @ Nullable ExplainOptions explainOptions ;
136209
137- private final AggregateQueryResponseDeliverer responseDeliverer ;
138- private StreamController streamController ;
210+ AggregateQueryExplainResponseDeliverer (
211+ @ Nullable ByteString transactionId ,
212+ @ Nullable com .google .protobuf .Timestamp readTime ,
213+ long startTimeNanos ,
214+ @ Nullable ExplainOptions explainOptions ) {
215+ super (transactionId , readTime , startTimeNanos );
216+ this .explainOptions = explainOptions ;
217+ }
139218
140- AggregateQueryResponseObserver (AggregateQueryResponseDeliverer responseDeliverer ) {
141- this .responseDeliverer = responseDeliverer ;
219+ @ Override
220+ @ Nullable
221+ ExplainOptions getExplainOptions () {
222+ return explainOptions ;
142223 }
143224
144225 @ Override
145- public void onStart (StreamController streamController ) {
146- this .streamController = streamController ;
226+ void deliverResult (
227+ @ Nullable Map <String , Value > serverData ,
228+ Timestamp readTime ,
229+ @ Nullable ExplainMetrics metrics ) {
230+ // The server is required to provide ExplainMetrics for explain queries.
231+ if (metrics == null ) {
232+ deliverError (new RuntimeException ("Did not receive any metrics for explain query." ));
233+ return ;
234+ }
235+ AggregateQuerySnapshot snapshot =
236+ serverData == null
237+ ? null
238+ : new AggregateQuerySnapshot (
239+ AggregateQuery .this ,
240+ readTime ,
241+ convertServerAggregateFieldsMapToClientAggregateFieldsMap (serverData ));
242+ setFuture (new ExplainResults <>(metrics , snapshot ));
243+ }
244+ }
245+
246+ private final class AggregateQueryResponseObserver <T >
247+ implements ResponseObserver <RunAggregationQueryResponse > {
248+ private final ResponseDeliverer <T > responseDeliverer ;
249+ private Timestamp readTime = Timestamp .MAX_VALUE ;
250+ @ Nullable private Map <String , Value > aggregateFieldsMap = null ;
251+ @ Nullable private ExplainMetrics metrics = null ;
252+
253+ AggregateQueryResponseObserver (ResponseDeliverer <T > responseDeliverer ) {
254+ this .responseDeliverer = responseDeliverer ;
147255 }
148256
257+ private boolean isExplainQuery () {
258+ return this .responseDeliverer .getExplainOptions () != null ;
259+ }
260+
261+ @ Override
262+ public void onStart (StreamController streamController ) {}
263+
149264 @ Override
150265 public void onResponse (RunAggregationQueryResponse response ) {
151- // Close the stream to avoid it dangling, since we're not expecting any more responses.
152- streamController .cancel ();
266+ if (response .hasReadTime ()) {
267+ readTime = Timestamp .fromProto (response .getReadTime ());
268+ }
269+
270+ if (response .hasResult ()) {
271+ aggregateFieldsMap = response .getResult ().getAggregateFieldsMap ();
272+ }
153273
154- // Extract the aggregations and read time from the RunAggregationQueryResponse.
155- Timestamp readTime = Timestamp .fromProto (response .getReadTime ());
274+ if (response .hasExplainMetrics ()) {
275+ metrics = new ExplainMetrics (response .getExplainMetrics ());
276+ }
156277
157- // Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC, meaning
158- // that `onResponse()` can be called multiple times, it _should_ only be called once. But even
159- // if it is called more than once, `responseDeliverer` will drop superfluous results.
160- responseDeliverer .deliverResult (response .getResult ().getAggregateFieldsMap (), readTime );
278+ if (!isExplainQuery ()) {
279+ // Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC,
280+ // meaning that `onResponse()` can be called multiple times, it _should_ only be called
281+ // once for non-explain queries. But even if it is called more than once,
282+ // `responseDeliverer` will drop superfluous results. For explain queries, there will
283+ // be more than one response, and the last response will contain the metrics.
284+ onComplete ();
285+ }
161286 }
162287
163288 @ Override
@@ -170,17 +295,26 @@ public void onError(Throwable throwable) {
170295 }
171296
172297 private boolean shouldRetry (Throwable throwable ) {
298+ // Do not retry EXPLAIN requests because it'd be executing
299+ // multiple queries. This means stats would have to be aggregated,
300+ // and that may not even make sense for many statistics.
301+ if (isExplainQuery ()) {
302+ return false ;
303+ }
304+
173305 Set <StatusCode .Code > retryableCodes =
174306 FirestoreSettings .newBuilder ().runAggregationQuerySettings ().getRetryableCodes ();
175307 return query .shouldRetryQuery (
176308 throwable ,
177- responseDeliverer .transactionId ,
178- responseDeliverer .startTimeNanos ,
309+ responseDeliverer .getTransactionId () ,
310+ responseDeliverer .getStartTimeNanos () ,
179311 retryableCodes );
180312 }
181313
182314 @ Override
183- public void onComplete () {}
315+ public void onComplete () {
316+ responseDeliverer .deliverResult (aggregateFieldsMap , readTime , metrics );
317+ }
184318 }
185319
186320 /**
@@ -191,13 +325,14 @@ public void onComplete() {}
191325 */
192326 @ Nonnull
193327 public RunAggregationQueryRequest toProto () {
194- return toProto (null , null );
328+ return toProto (/* transactionId= */ null , /* readTime= */ null , /* explainOptions= */ null );
195329 }
196330
197331 @ Nonnull
198332 RunAggregationQueryRequest toProto (
199333 @ Nullable final ByteString transactionId ,
200- @ Nullable final com .google .protobuf .Timestamp readTime ) {
334+ @ Nullable final com .google .protobuf .Timestamp readTime ,
335+ @ Nullable ExplainOptions explainOptions ) {
201336 RunQueryRequest runQueryRequest = query .toProto ();
202337
203338 RunAggregationQueryRequest .Builder request = RunAggregationQueryRequest .newBuilder ();
@@ -209,6 +344,10 @@ RunAggregationQueryRequest toProto(
209344 request .setReadTime (readTime );
210345 }
211346
347+ if (explainOptions != null ) {
348+ request .setExplainOptions (explainOptions .toProto ());
349+ }
350+
212351 StructuredAggregationQuery .Builder structuredAggregationQuery =
213352 request .getStructuredAggregationQueryBuilder ();
214353 structuredAggregationQuery .setStructuredQuery (runQueryRequest .getStructuredQuery ());
0 commit comments