|
18 | 18 |
|
19 | 19 | import org.mongodb.async.AsyncBlock; |
20 | 20 | import org.mongodb.async.MongoAsyncQueryCursor; |
| 21 | +import org.mongodb.command.AggregateCommand; |
21 | 22 | import org.mongodb.command.Count; |
22 | 23 | import org.mongodb.command.CountCommandResult; |
23 | 24 | import org.mongodb.command.FindAndModifyCommandResult; |
|
47 | 48 | import org.mongodb.operation.Update; |
48 | 49 | import org.mongodb.operation.UpdateOperation; |
49 | 50 |
|
| 51 | +import java.util.ArrayList; |
50 | 52 | import java.util.Collection; |
51 | 53 | import java.util.List; |
52 | 54 |
|
@@ -86,6 +88,11 @@ public WriteResult save(final T document) { |
86 | 88 | return new MongoCollectionView().save(document); |
87 | 89 | } |
88 | 90 |
|
| 91 | + @Override |
| 92 | + public MongoPipeline<T> pipe() { |
| 93 | + return new MongoCollectionPipeline(); |
| 94 | + } |
| 95 | + |
89 | 96 | @Override |
90 | 97 | public CollectionAdministration tools() { |
91 | 98 | return admin; |
@@ -265,7 +272,7 @@ public long count() { |
265 | 272 | @Override |
266 | 273 | public MongoIterable<T> mapReduce(final String map, final String reduce) { |
267 | 274 | final CommandResult commandResult = getDatabase().executeCommand(new MapReduceCommand(findOp, getName(), map, reduce)); |
268 | | - return new MappedReducedIterable<T>(commandResult); |
| 275 | + return new SingleShotCommandIterable<T>(commandResult); |
269 | 276 | } |
270 | 277 |
|
271 | 278 | @Override |
@@ -522,4 +529,112 @@ public boolean run(final T t) { |
522 | 529 | } |
523 | 530 | } |
524 | 531 |
|
| 532 | + private class MongoCollectionPipeline implements MongoPipeline<T> { |
| 533 | + private final List<Document> pipeline; |
| 534 | + |
| 535 | + private MongoCollectionPipeline() { |
| 536 | + pipeline = new ArrayList<Document>(); |
| 537 | + } |
| 538 | + |
| 539 | + public MongoCollectionPipeline(final MongoCollectionPipeline from) { |
| 540 | + pipeline = new ArrayList<Document>(from.pipeline); |
| 541 | + } |
| 542 | + |
| 543 | + @Override |
| 544 | + public MongoPipeline<T> sort(final Document sortCriteria) { |
| 545 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 546 | + newPipeline.pipeline.add(new Document("$sort", sortCriteria)); |
| 547 | + return newPipeline; |
| 548 | + } |
| 549 | + |
| 550 | + @Override |
| 551 | + public MongoPipeline<T> skip(final long skip) { |
| 552 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 553 | + newPipeline.pipeline.add(new Document("$skip", skip)); |
| 554 | + return newPipeline; |
| 555 | + } |
| 556 | + |
| 557 | + @Override |
| 558 | + public MongoPipeline<T> limit(final long limit) { |
| 559 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 560 | + newPipeline.pipeline.add(new Document("$limit", limit)); |
| 561 | + return newPipeline; |
| 562 | + } |
| 563 | + |
| 564 | + @Override |
| 565 | + public MongoPipeline<T> match(final Document match) { |
| 566 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 567 | + newPipeline.pipeline.add(new Document("$match", match)); |
| 568 | + return newPipeline; |
| 569 | + } |
| 570 | + |
| 571 | + @Override |
| 572 | + public MongoPipeline<T> project(final Document projection) { |
| 573 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 574 | + newPipeline.pipeline.add(new Document("$project", projection)); |
| 575 | + return newPipeline; |
| 576 | + } |
| 577 | + |
| 578 | + @Override |
| 579 | + public MongoPipeline<T> group(final Document group) { |
| 580 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 581 | + newPipeline.pipeline.add(new Document("$group", group)); |
| 582 | + return newPipeline; |
| 583 | + } |
| 584 | + |
| 585 | + @Override |
| 586 | + public MongoPipeline<T> unwind(final String field) { |
| 587 | + MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this); |
| 588 | + newPipeline.pipeline.add(new Document("$unwind", field)); |
| 589 | + return newPipeline; |
| 590 | + } |
| 591 | + |
| 592 | + @Override |
| 593 | + public <U> MongoIterable<U> map(final Function<T, U> mapper) { |
| 594 | + return new MappingIterable<T, U>(this, mapper); |
| 595 | + } |
| 596 | + |
| 597 | + @Override |
| 598 | + public void asyncForEach(final AsyncBlock<? super T> block) { |
| 599 | + throw new UnsupportedOperationException(); |
| 600 | + } |
| 601 | + |
| 602 | + @Override |
| 603 | + public <A extends Collection<? super T>> MongoFuture<A> asyncInto(final A target) { |
| 604 | + throw new UnsupportedOperationException(); |
| 605 | + } |
| 606 | + |
| 607 | + @Override |
| 608 | + @SuppressWarnings("unchecked") |
| 609 | + public MongoCursor<T> iterator() { |
| 610 | + final CommandResult commandResult = getDatabase().executeCommand(new AggregateCommand(getNamespace(), pipeline)); |
| 611 | + return new SingleShotCursor<T>((Iterable<T>) commandResult.getResponse().get("result")); |
| 612 | + } |
| 613 | + |
| 614 | + @Override |
| 615 | + public void forEach(final Block<? super T> block) { |
| 616 | + MongoCursor<T> cursor = iterator(); |
| 617 | + try { |
| 618 | + while (cursor.hasNext()) { |
| 619 | + if (!block.run(cursor.next())) { |
| 620 | + break; |
| 621 | + } |
| 622 | + } |
| 623 | + } finally { |
| 624 | + cursor.close(); |
| 625 | + } |
| 626 | + } |
| 627 | + |
| 628 | + @Override |
| 629 | + public <A extends Collection<? super T>> A into(final A target) { |
| 630 | + forEach(new Block<T>() { |
| 631 | + @Override |
| 632 | + public boolean run(final T t) { |
| 633 | + target.add(t); |
| 634 | + return true; |
| 635 | + } |
| 636 | + }); |
| 637 | + return target; |
| 638 | + } |
| 639 | + } |
525 | 640 | } |
0 commit comments