|
37 | 37 | import com.google.cloud.firestore.pipeline.expressions.Field; |
38 | 38 | import com.google.cloud.firestore.pipeline.expressions.FunctionExpression; |
39 | 39 | import com.google.cloud.firestore.pipeline.expressions.Ordering; |
| 40 | +import com.google.cloud.firestore.pipeline.expressions.PipelineValueExpression; |
40 | 41 | import com.google.cloud.firestore.pipeline.expressions.Selectable; |
41 | 42 | import com.google.cloud.firestore.pipeline.stages.AddFields; |
42 | 43 | import com.google.cloud.firestore.pipeline.stages.Aggregate; |
43 | 44 | import com.google.cloud.firestore.pipeline.stages.AggregateOptions; |
| 45 | +import com.google.cloud.firestore.pipeline.stages.Define; |
44 | 46 | import com.google.cloud.firestore.pipeline.stages.Delete; |
45 | 47 | import com.google.cloud.firestore.pipeline.stages.Distinct; |
46 | 48 | import com.google.cloud.firestore.pipeline.stages.FindNearest; |
@@ -263,6 +265,213 @@ public Pipeline addFields(Selectable field, Selectable... additionalFields) { |
263 | 265 | .toArray(new Selectable[0])))); |
264 | 266 | } |
265 | 267 |
|
| 268 | + /** |
| 269 | + * Defines one or more variables in the pipeline's scope. `define` is used to bind a value to a |
| 270 | + * variable for internal reuse within the pipeline body (accessed via the {@link |
| 271 | + * Expression#variable(String)} function). |
| 272 | + * |
| 273 | + * <p>This stage is useful for declaring reusable values or intermediate calculations that can be |
| 274 | + * referenced multiple times in later parts of the pipeline, improving readability and |
| 275 | + * maintainability. |
| 276 | + * |
| 277 | + * <p>Each variable is defined using an {@link AliasedExpression}, which pairs an expression with |
| 278 | + * a name (alias). |
| 279 | + * |
| 280 | + * <p>Example: |
| 281 | + * |
| 282 | + * <pre>{@code |
| 283 | + * firestore.pipeline().collection("products") |
| 284 | + * .define( |
| 285 | + * multiply(field("price"), 0.9).as("discountedPrice"), |
| 286 | + * add(field("stock"), 10).as("newStock")) |
| 287 | + * .where(lessThan(variable("discountedPrice"), 100)) |
| 288 | + * .select(field("name"), variable("newStock")); |
| 289 | + * }</pre> |
| 290 | + * |
| 291 | + * @param expression The expression to define using {@link AliasedExpression}. |
| 292 | + * @param additionalExpressions Additional expressions to define using {@link AliasedExpression}. |
| 293 | + * @return A new Pipeline object with this stage appended to the stage list. |
| 294 | + */ |
| 295 | + public Pipeline define(AliasedExpression expression, AliasedExpression... additionalExpressions) { |
| 296 | + return append( |
| 297 | + new Define( |
| 298 | + PipelineUtils.selectablesToMap( |
| 299 | + ImmutableList.<AliasedExpression>builder() |
| 300 | + .add(expression) |
| 301 | + .add(additionalExpressions) |
| 302 | + .build() |
| 303 | + .toArray(new AliasedExpression[0])))); |
| 304 | + } |
| 305 | + |
| 306 | + /** |
| 307 | + * Converts the pipeline into an array expression. |
| 308 | + * |
| 309 | + * <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field |
| 310 | + * automatically unwrap that value to the top level, ignoring the inner alias. If the subquery |
| 311 | + * returns multiple fields, they are preserved as a map. |
| 312 | + * |
| 313 | + * <p><b>Example 1: Single field unwrapping</b> |
| 314 | + * |
| 315 | + * <pre>{@code |
| 316 | + * // Get a list of all reviewer names for each book |
| 317 | + * db.pipeline().collection("books") |
| 318 | + * .define(field("id").as("book_id")) |
| 319 | + * .addFields( |
| 320 | + * db.pipeline().collection("reviews") |
| 321 | + * .where(field("book_id").equal(variable("book_id"))) |
| 322 | + * .select(field("reviewer").as("name")) |
| 323 | + * .toArrayExpression() |
| 324 | + * .as("reviewers")) |
| 325 | + * }</pre> |
| 326 | + * |
| 327 | + * <p><i>The result set is unwrapped from {@code "reviewers": [{ "name": "Alice" }, { "name": |
| 328 | + * "Bob" }]} to {@code "reviewers": ["Alice", "Bob"]}.</i> |
| 329 | + * |
| 330 | + * <pre>{@code |
| 331 | + * // Output Document: |
| 332 | + * [ |
| 333 | + * { |
| 334 | + * "id": "1", |
| 335 | + * "title": "1984", |
| 336 | + * "reviewers": ["Alice", "Bob"] |
| 337 | + * } |
| 338 | + * ] |
| 339 | + * }</pre> |
| 340 | + * |
| 341 | + * <p><b>Example 2: Multiple fields (Map)</b> |
| 342 | + * |
| 343 | + * <pre>{@code |
| 344 | + * // Get a list of reviews (reviewer and rating) for each book |
| 345 | + * db.pipeline().collection("books") |
| 346 | + * .define(field("id").as("book_id")) |
| 347 | + * .addFields( |
| 348 | + * db.pipeline().collection("reviews") |
| 349 | + * .where(field("book_id").equal(variable("book_id"))) |
| 350 | + * .select(field("reviewer"), field("rating")) |
| 351 | + * .toArrayExpression() |
| 352 | + * .as("reviews")) |
| 353 | + * }</pre> |
| 354 | + * |
| 355 | + * <p><i>When the subquery produces multiple fields, they are kept as objects in the array:</i> |
| 356 | + * |
| 357 | + * <pre>{@code |
| 358 | + * // Output Document: |
| 359 | + * [ |
| 360 | + * { |
| 361 | + * "id": "1", |
| 362 | + * "title": "1984", |
| 363 | + * "reviews": [ |
| 364 | + * { "reviewer": "Alice", "rating": 5 }, |
| 365 | + * { "reviewer": "Bob", "rating": 4 } |
| 366 | + * ] |
| 367 | + * } |
| 368 | + * ] |
| 369 | + * }</pre> |
| 370 | + * |
| 371 | + * @return A new {@link Expression} representing the pipeline as an array. |
| 372 | + */ |
| 373 | + public Expression toArrayExpression() { |
| 374 | + return new FunctionExpression("array", ImmutableList.of(new PipelineValueExpression(this))); |
| 375 | + } |
| 376 | + |
| 377 | + /** |
| 378 | + * Converts this Pipeline into an expression that evaluates to a single scalar result. Used for |
| 379 | + * 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. |
| 380 | + * |
| 381 | + * <p><b>Runtime Validation:</b> The runtime will validate that the result set contains exactly |
| 382 | + * one item. It throws a runtime error if the result has more than one item, and evaluates to |
| 383 | + * {@code null} if the pipeline has zero results. |
| 384 | + * |
| 385 | + * <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field |
| 386 | + * automatically unwrap that value to the top level, ignoring the inner alias. If the subquery |
| 387 | + * returns multiple fields, they are preserved as a map. |
| 388 | + * |
| 389 | + * <p><b>Example 1: Single field unwrapping</b> |
| 390 | + * |
| 391 | + * <pre>{@code |
| 392 | + * // Calculate average rating for each restaurant using a subquery |
| 393 | + * db.pipeline().collection("restaurants") |
| 394 | + * .define(field("id").as("rid")) |
| 395 | + * .addFields( |
| 396 | + * db.pipeline().collection("reviews") |
| 397 | + * .where(field("restaurant_id").equal(variable("rid"))) |
| 398 | + * // Inner aggregation returns a single document |
| 399 | + * .aggregate(AggregateFunction.average("rating").as("value")) |
| 400 | + * // Convert Pipeline -> Scalar Expression (validates result is 1 item) |
| 401 | + * .toScalarExpression() |
| 402 | + * .as("average_rating")) |
| 403 | + * }</pre> |
| 404 | + * |
| 405 | + * <p><i>The result set is unwrapped twice: from {@code "average_rating": [{ "value": 4.5 }]} to |
| 406 | + * {@code "average_rating": { "value": 4.5 }}, and finally to {@code "average_rating": 4.5}.</i> |
| 407 | + * |
| 408 | + * <pre>{@code |
| 409 | + * // Output Document: |
| 410 | + * [ |
| 411 | + * { |
| 412 | + * "id": "123", |
| 413 | + * "name": "The Burger Joint", |
| 414 | + * "cuisine": "American", |
| 415 | + * "average_rating": 4.5 |
| 416 | + * }, |
| 417 | + * { |
| 418 | + * "id": "456", |
| 419 | + * "name": "Sushi World", |
| 420 | + * "cuisine": "Japanese", |
| 421 | + * "average_rating": 4.8 |
| 422 | + * } |
| 423 | + * ] |
| 424 | + * }</pre> |
| 425 | + * |
| 426 | + * <p><b>Example 2: Multiple fields (Map)</b> |
| 427 | + * |
| 428 | + * <pre>{@code |
| 429 | + * // For each restaurant, calculate review statistics (average rating AND total |
| 430 | + * // count) |
| 431 | + * db.pipeline().collection("restaurants") |
| 432 | + * .define(field("id").as("rid")) |
| 433 | + * .addFields( |
| 434 | + * db.pipeline().collection("reviews") |
| 435 | + * .where(field("restaurant_id").equal(variable("rid"))) |
| 436 | + * .aggregate( |
| 437 | + * AggregateFunction.average("rating").as("avg_score"), |
| 438 | + * AggregateFunction.countAll().as("review_count")) |
| 439 | + * .toScalarExpression() |
| 440 | + * .as("stats")) |
| 441 | + * }</pre> |
| 442 | + * |
| 443 | + * <p><i>When the subquery produces multiple fields, they are wrapped in a map:</i> |
| 444 | + * |
| 445 | + * <pre>{@code |
| 446 | + * // Output Document: |
| 447 | + * [ |
| 448 | + * { |
| 449 | + * "id": "123", |
| 450 | + * "name": "The Burger Joint", |
| 451 | + * "cuisine": "American", |
| 452 | + * "stats": { |
| 453 | + * "avg_score": 4.0, |
| 454 | + * "review_count": 3 |
| 455 | + * } |
| 456 | + * }, |
| 457 | + * { |
| 458 | + * "id": "456", |
| 459 | + * "name": "Sushi World", |
| 460 | + * "cuisine": "Japanese", |
| 461 | + * "stats": { |
| 462 | + * "avg_score": 4.8, |
| 463 | + * "review_count": 120 |
| 464 | + * } |
| 465 | + * } |
| 466 | + * ] |
| 467 | + * }</pre> |
| 468 | + * |
| 469 | + * @return A new {@link Expression} representing the pipeline as a scalar. |
| 470 | + */ |
| 471 | + public Expression toScalarExpression() { |
| 472 | + return new FunctionExpression("scalar", ImmutableList.of(new PipelineValueExpression(this))); |
| 473 | + } |
| 474 | + |
266 | 475 | /** |
267 | 476 | * Remove fields from outputs of previous stages. |
268 | 477 | * |
@@ -845,8 +1054,12 @@ public Pipeline sample(Sample sample) { |
845 | 1054 | * @param other The other {@code Pipeline} that is part of union. |
846 | 1055 | * @return A new {@code Pipeline} object with this stage appended to the stage list. |
847 | 1056 | */ |
848 | | - @BetaApi |
849 | 1057 | public Pipeline union(Pipeline other) { |
| 1058 | + if (other.rpcContext == null) { |
| 1059 | + throw new IllegalArgumentException( |
| 1060 | + "Union only supports combining root pipelines, doesn't support relative scope Pipeline" |
| 1061 | + + " like relative subcollection pipeline"); |
| 1062 | + } |
850 | 1063 | return append(new Union(other)); |
851 | 1064 | } |
852 | 1065 |
|
@@ -1227,6 +1440,11 @@ MetricsContext createMetricsContext(String methodName) { |
1227 | 1440 | */ |
1228 | 1441 | @BetaApi |
1229 | 1442 | public void execute(ApiStreamObserver<PipelineResult> observer) { |
| 1443 | + if (this.rpcContext == null) { |
| 1444 | + throw new IllegalStateException( |
| 1445 | + "This pipeline was created without a database (e.g., as a subcollection pipeline) and" |
| 1446 | + + " cannot be executed directly. It can only be used as part of another pipeline."); |
| 1447 | + } |
1230 | 1448 | MetricsContext metricsContext = |
1231 | 1449 | createMetricsContext(TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE_EXECUTE); |
1232 | 1450 |
|
@@ -1257,6 +1475,12 @@ ApiFuture<Snapshot> execute( |
1257 | 1475 | @Nonnull PipelineExecuteOptions options, |
1258 | 1476 | @Nullable final ByteString transactionId, |
1259 | 1477 | @Nullable com.google.protobuf.Timestamp readTime) { |
| 1478 | + if (this.rpcContext == null) { |
| 1479 | + throw new IllegalStateException( |
| 1480 | + "This pipeline was created without a database (e.g., as a subcollection pipeline) and" |
| 1481 | + + " cannot be executed directly. It can only be used as part of another pipeline."); |
| 1482 | + } |
| 1483 | + |
1260 | 1484 | TraceUtil.Span span = |
1261 | 1485 | rpcContext |
1262 | 1486 | .getFirestore() |
|
0 commit comments