Skip to content

Commit cbf87ac

Browse files
feat: Add subquery support in pipeline (#2323)
* save work * Fix breaking tests * Fix testSingleLookupScalarSubquery * Fix broken tests * change syntax for define and return type of as function * add documentation and overload for getField() * change proto * chore: generate libraries at Thu Feb 26 19:35:06 UTC 2026 * format code * add tests for toArrayExpression * move the subquery test to independent file * Add tests for subcollection * Format code * refactor PipelineExpression * Address feedbacks * Improve documentation * change the test * Test subcollection used within Union stage * replace mapGet with getField * support subcollection in test * chore: generate libraries at Wed Mar 11 18:14:28 UTC 2026 * improve error message * chore: generate libraries at Tue Mar 24 17:04:45 UTC 2026 * add decode support * Make the subquery test only run in nightly environment * address feedbacks * chore: generate libraries at Fri Mar 27 22:19:12 UTC 2026 * remove beta annotation --------- Co-authored-by: cloud-java-bot <cloud-java-bot@google.com>
1 parent 2dc2464 commit cbf87ac

11 files changed

Lines changed: 1529 additions & 4 deletions

File tree

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java

Lines changed: 225 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@
3737
import com.google.cloud.firestore.pipeline.expressions.Field;
3838
import com.google.cloud.firestore.pipeline.expressions.FunctionExpression;
3939
import com.google.cloud.firestore.pipeline.expressions.Ordering;
40+
import com.google.cloud.firestore.pipeline.expressions.PipelineValueExpression;
4041
import com.google.cloud.firestore.pipeline.expressions.Selectable;
4142
import com.google.cloud.firestore.pipeline.stages.AddFields;
4243
import com.google.cloud.firestore.pipeline.stages.Aggregate;
4344
import com.google.cloud.firestore.pipeline.stages.AggregateOptions;
45+
import com.google.cloud.firestore.pipeline.stages.Define;
4446
import com.google.cloud.firestore.pipeline.stages.Delete;
4547
import com.google.cloud.firestore.pipeline.stages.Distinct;
4648
import com.google.cloud.firestore.pipeline.stages.FindNearest;
@@ -263,6 +265,213 @@ public Pipeline addFields(Selectable field, Selectable... additionalFields) {
263265
.toArray(new Selectable[0]))));
264266
}
265267

268+
/**
269+
* Defines one or more variables in the pipeline's scope. `define` is used to bind a value to a
270+
* variable for internal reuse within the pipeline body (accessed via the {@link
271+
* Expression#variable(String)} function).
272+
*
273+
* <p>This stage is useful for declaring reusable values or intermediate calculations that can be
274+
* referenced multiple times in later parts of the pipeline, improving readability and
275+
* maintainability.
276+
*
277+
* <p>Each variable is defined using an {@link AliasedExpression}, which pairs an expression with
278+
* a name (alias).
279+
*
280+
* <p>Example:
281+
*
282+
* <pre>{@code
283+
* firestore.pipeline().collection("products")
284+
* .define(
285+
* multiply(field("price"), 0.9).as("discountedPrice"),
286+
* add(field("stock"), 10).as("newStock"))
287+
* .where(lessThan(variable("discountedPrice"), 100))
288+
* .select(field("name"), variable("newStock"));
289+
* }</pre>
290+
*
291+
* @param expression The expression to define using {@link AliasedExpression}.
292+
* @param additionalExpressions Additional expressions to define using {@link AliasedExpression}.
293+
* @return A new Pipeline object with this stage appended to the stage list.
294+
*/
295+
public Pipeline define(AliasedExpression expression, AliasedExpression... additionalExpressions) {
296+
return append(
297+
new Define(
298+
PipelineUtils.selectablesToMap(
299+
ImmutableList.<AliasedExpression>builder()
300+
.add(expression)
301+
.add(additionalExpressions)
302+
.build()
303+
.toArray(new AliasedExpression[0]))));
304+
}
305+
306+
/**
307+
* Converts the pipeline into an array expression.
308+
*
309+
* <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field
310+
* automatically unwrap that value to the top level, ignoring the inner alias. If the subquery
311+
* returns multiple fields, they are preserved as a map.
312+
*
313+
* <p><b>Example 1: Single field unwrapping</b>
314+
*
315+
* <pre>{@code
316+
* // Get a list of all reviewer names for each book
317+
* db.pipeline().collection("books")
318+
* .define(field("id").as("book_id"))
319+
* .addFields(
320+
* db.pipeline().collection("reviews")
321+
* .where(field("book_id").equal(variable("book_id")))
322+
* .select(field("reviewer").as("name"))
323+
* .toArrayExpression()
324+
* .as("reviewers"))
325+
* }</pre>
326+
*
327+
* <p><i>The result set is unwrapped from {@code "reviewers": [{ "name": "Alice" }, { "name":
328+
* "Bob" }]} to {@code "reviewers": ["Alice", "Bob"]}.</i>
329+
*
330+
* <pre>{@code
331+
* // Output Document:
332+
* [
333+
* {
334+
* "id": "1",
335+
* "title": "1984",
336+
* "reviewers": ["Alice", "Bob"]
337+
* }
338+
* ]
339+
* }</pre>
340+
*
341+
* <p><b>Example 2: Multiple fields (Map)</b>
342+
*
343+
* <pre>{@code
344+
* // Get a list of reviews (reviewer and rating) for each book
345+
* db.pipeline().collection("books")
346+
* .define(field("id").as("book_id"))
347+
* .addFields(
348+
* db.pipeline().collection("reviews")
349+
* .where(field("book_id").equal(variable("book_id")))
350+
* .select(field("reviewer"), field("rating"))
351+
* .toArrayExpression()
352+
* .as("reviews"))
353+
* }</pre>
354+
*
355+
* <p><i>When the subquery produces multiple fields, they are kept as objects in the array:</i>
356+
*
357+
* <pre>{@code
358+
* // Output Document:
359+
* [
360+
* {
361+
* "id": "1",
362+
* "title": "1984",
363+
* "reviews": [
364+
* { "reviewer": "Alice", "rating": 5 },
365+
* { "reviewer": "Bob", "rating": 4 }
366+
* ]
367+
* }
368+
* ]
369+
* }</pre>
370+
*
371+
* @return A new {@link Expression} representing the pipeline as an array.
372+
*/
373+
public Expression toArrayExpression() {
374+
return new FunctionExpression("array", ImmutableList.of(new PipelineValueExpression(this)));
375+
}
376+
377+
/**
378+
* Converts this Pipeline into an expression that evaluates to a single scalar result. Used for
379+
* 1:1 lookups or Aggregations when the subquery is expected to return a single value or object.
380+
*
381+
* <p><b>Runtime Validation:</b> The runtime will validate that the result set contains exactly
382+
* one item. It throws a runtime error if the result has more than one item, and evaluates to
383+
* {@code null} if the pipeline has zero results.
384+
*
385+
* <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field
386+
* automatically unwrap that value to the top level, ignoring the inner alias. If the subquery
387+
* returns multiple fields, they are preserved as a map.
388+
*
389+
* <p><b>Example 1: Single field unwrapping</b>
390+
*
391+
* <pre>{@code
392+
* // Calculate average rating for each restaurant using a subquery
393+
* db.pipeline().collection("restaurants")
394+
* .define(field("id").as("rid"))
395+
* .addFields(
396+
* db.pipeline().collection("reviews")
397+
* .where(field("restaurant_id").equal(variable("rid")))
398+
* // Inner aggregation returns a single document
399+
* .aggregate(AggregateFunction.average("rating").as("value"))
400+
* // Convert Pipeline -> Scalar Expression (validates result is 1 item)
401+
* .toScalarExpression()
402+
* .as("average_rating"))
403+
* }</pre>
404+
*
405+
* <p><i>The result set is unwrapped twice: from {@code "average_rating": [{ "value": 4.5 }]} to
406+
* {@code "average_rating": { "value": 4.5 }}, and finally to {@code "average_rating": 4.5}.</i>
407+
*
408+
* <pre>{@code
409+
* // Output Document:
410+
* [
411+
* {
412+
* "id": "123",
413+
* "name": "The Burger Joint",
414+
* "cuisine": "American",
415+
* "average_rating": 4.5
416+
* },
417+
* {
418+
* "id": "456",
419+
* "name": "Sushi World",
420+
* "cuisine": "Japanese",
421+
* "average_rating": 4.8
422+
* }
423+
* ]
424+
* }</pre>
425+
*
426+
* <p><b>Example 2: Multiple fields (Map)</b>
427+
*
428+
* <pre>{@code
429+
* // For each restaurant, calculate review statistics (average rating AND total
430+
* // count)
431+
* db.pipeline().collection("restaurants")
432+
* .define(field("id").as("rid"))
433+
* .addFields(
434+
* db.pipeline().collection("reviews")
435+
* .where(field("restaurant_id").equal(variable("rid")))
436+
* .aggregate(
437+
* AggregateFunction.average("rating").as("avg_score"),
438+
* AggregateFunction.countAll().as("review_count"))
439+
* .toScalarExpression()
440+
* .as("stats"))
441+
* }</pre>
442+
*
443+
* <p><i>When the subquery produces multiple fields, they are wrapped in a map:</i>
444+
*
445+
* <pre>{@code
446+
* // Output Document:
447+
* [
448+
* {
449+
* "id": "123",
450+
* "name": "The Burger Joint",
451+
* "cuisine": "American",
452+
* "stats": {
453+
* "avg_score": 4.0,
454+
* "review_count": 3
455+
* }
456+
* },
457+
* {
458+
* "id": "456",
459+
* "name": "Sushi World",
460+
* "cuisine": "Japanese",
461+
* "stats": {
462+
* "avg_score": 4.8,
463+
* "review_count": 120
464+
* }
465+
* }
466+
* ]
467+
* }</pre>
468+
*
469+
* @return A new {@link Expression} representing the pipeline as a scalar.
470+
*/
471+
public Expression toScalarExpression() {
472+
return new FunctionExpression("scalar", ImmutableList.of(new PipelineValueExpression(this)));
473+
}
474+
266475
/**
267476
* Remove fields from outputs of previous stages.
268477
*
@@ -845,8 +1054,12 @@ public Pipeline sample(Sample sample) {
8451054
* @param other The other {@code Pipeline} that is part of union.
8461055
* @return A new {@code Pipeline} object with this stage appended to the stage list.
8471056
*/
848-
@BetaApi
8491057
public Pipeline union(Pipeline other) {
1058+
if (other.rpcContext == null) {
1059+
throw new IllegalArgumentException(
1060+
"Union only supports combining root pipelines, doesn't support relative scope Pipeline"
1061+
+ " like relative subcollection pipeline");
1062+
}
8501063
return append(new Union(other));
8511064
}
8521065

@@ -1227,6 +1440,11 @@ MetricsContext createMetricsContext(String methodName) {
12271440
*/
12281441
@BetaApi
12291442
public void execute(ApiStreamObserver<PipelineResult> observer) {
1443+
if (this.rpcContext == null) {
1444+
throw new IllegalStateException(
1445+
"This pipeline was created without a database (e.g., as a subcollection pipeline) and"
1446+
+ " cannot be executed directly. It can only be used as part of another pipeline.");
1447+
}
12301448
MetricsContext metricsContext =
12311449
createMetricsContext(TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE_EXECUTE);
12321450

@@ -1257,6 +1475,12 @@ ApiFuture<Snapshot> execute(
12571475
@Nonnull PipelineExecuteOptions options,
12581476
@Nullable final ByteString transactionId,
12591477
@Nullable com.google.protobuf.Timestamp readTime) {
1478+
if (this.rpcContext == null) {
1479+
throw new IllegalStateException(
1480+
"This pipeline was created without a database (e.g., as a subcollection pipeline) and"
1481+
+ " cannot be executed directly. It can only be used as part of another pipeline.");
1482+
}
1483+
12601484
TraceUtil.Span span =
12611485
rpcContext
12621486
.getFirestore()

google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineSource.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.firestore.pipeline.stages.Database;
2626
import com.google.cloud.firestore.pipeline.stages.Documents;
2727
import com.google.cloud.firestore.pipeline.stages.Literals;
28+
import com.google.cloud.firestore.pipeline.stages.Subcollection;
2829
import com.google.common.base.Preconditions;
2930
import java.util.Arrays;
3031
import javax.annotation.Nonnull;
@@ -209,4 +210,30 @@ public Pipeline createFrom(Query query) {
209210
public Pipeline createFrom(AggregateQuery query) {
210211
return query.pipeline();
211212
}
213+
214+
/**
215+
* Initializes a pipeline scoped to a subcollection.
216+
*
217+
* <p>This method allows you to start a new pipeline that operates on a subcollection of the
218+
* current document. It is intended to be used as a subquery.
219+
*
220+
* <p><b>Note:</b> A pipeline created with `subcollection` cannot be executed directly using
221+
* {@link Pipeline#execute()}. It must be used within a parent pipeline.
222+
*
223+
* <p>Example:
224+
*
225+
* <pre>{@code
226+
* firestore.pipeline().collection("books")
227+
* .addFields(
228+
* PipelineSource.subcollection("reviews")
229+
* .aggregate(AggregateFunction.average("rating").as("avg_rating"))
230+
* .toScalarExpression().as("average_rating"));
231+
* }</pre>
232+
*
233+
* @param path The path of the subcollection.
234+
* @return A new {@code Pipeline} instance scoped to the subcollection.
235+
*/
236+
public static Pipeline subcollection(String path) {
237+
return new Pipeline(null, new Subcollection(path));
238+
}
212239
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ static Object decodeValue(FirestoreRpcContext<?> rpcContext, Value v) {
247247
case REFERENCE_VALUE:
248248
String pathName = v.getReferenceValue();
249249
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
250+
case FIELD_REFERENCE_VALUE:
251+
return v.getFieldReferenceValue();
252+
case VARIABLE_REFERENCE_VALUE:
253+
return v.getVariableReferenceValue();
250254
case GEO_POINT_VALUE:
251255
return new GeoPoint(
252256
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());

0 commit comments

Comments
 (0)