Skip to content

Commit 910fc25

Browse files
committed
Implemented aggregation pipeline.
1 parent 4d81516 commit 910fc25

7 files changed

Lines changed: 398 additions & 32 deletions

File tree

driver/src/main/org/mongodb/MongoCollection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,6 @@ public interface MongoCollection<T> {
6565
WriteResult insert(List<T> document);
6666

6767
WriteResult save(T document);
68+
69+
MongoPipeline<T> pipe();
6870
}

driver/src/main/org/mongodb/MongoCollectionImpl.java

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.mongodb.async.AsyncBlock;
2020
import org.mongodb.async.MongoAsyncQueryCursor;
21+
import org.mongodb.command.AggregateCommand;
2122
import org.mongodb.command.Count;
2223
import org.mongodb.command.CountCommandResult;
2324
import org.mongodb.command.FindAndModifyCommandResult;
@@ -47,6 +48,7 @@
4748
import org.mongodb.operation.Update;
4849
import org.mongodb.operation.UpdateOperation;
4950

51+
import java.util.ArrayList;
5052
import java.util.Collection;
5153
import java.util.List;
5254

@@ -86,6 +88,11 @@ public WriteResult save(final T document) {
8688
return new MongoCollectionView().save(document);
8789
}
8890

91+
@Override
92+
public MongoPipeline<T> pipe() {
93+
return new MongoCollectionPipeline();
94+
}
95+
8996
@Override
9097
public CollectionAdministration tools() {
9198
return admin;
@@ -265,7 +272,7 @@ public long count() {
265272
@Override
266273
public MongoIterable<T> mapReduce(final String map, final String reduce) {
267274
final CommandResult commandResult = getDatabase().executeCommand(new MapReduceCommand(findOp, getName(), map, reduce));
268-
return new MappedReducedIterable<T>(commandResult);
275+
return new SingleShotCommandIterable<T>(commandResult);
269276
}
270277

271278
@Override
@@ -522,4 +529,112 @@ public boolean run(final T t) {
522529
}
523530
}
524531

532+
private class MongoCollectionPipeline implements MongoPipeline<T> {
533+
private final List<Document> pipeline;
534+
535+
private MongoCollectionPipeline() {
536+
pipeline = new ArrayList<Document>();
537+
}
538+
539+
public MongoCollectionPipeline(final MongoCollectionPipeline from) {
540+
pipeline = new ArrayList<Document>(from.pipeline);
541+
}
542+
543+
@Override
544+
public MongoPipeline<T> sort(final Document sortCriteria) {
545+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
546+
newPipeline.pipeline.add(new Document("$sort", sortCriteria));
547+
return newPipeline;
548+
}
549+
550+
@Override
551+
public MongoPipeline<T> skip(final long skip) {
552+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
553+
newPipeline.pipeline.add(new Document("$skip", skip));
554+
return newPipeline;
555+
}
556+
557+
@Override
558+
public MongoPipeline<T> limit(final long limit) {
559+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
560+
newPipeline.pipeline.add(new Document("$limit", limit));
561+
return newPipeline;
562+
}
563+
564+
@Override
565+
public MongoPipeline<T> match(final Document match) {
566+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
567+
newPipeline.pipeline.add(new Document("$match", match));
568+
return newPipeline;
569+
}
570+
571+
@Override
572+
public MongoPipeline<T> project(final Document projection) {
573+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
574+
newPipeline.pipeline.add(new Document("$project", projection));
575+
return newPipeline;
576+
}
577+
578+
@Override
579+
public MongoPipeline<T> group(final Document group) {
580+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
581+
newPipeline.pipeline.add(new Document("$group", group));
582+
return newPipeline;
583+
}
584+
585+
@Override
586+
public MongoPipeline<T> unwind(final String field) {
587+
MongoCollectionPipeline newPipeline = new MongoCollectionPipeline(this);
588+
newPipeline.pipeline.add(new Document("$unwind", field));
589+
return newPipeline;
590+
}
591+
592+
@Override
593+
public <U> MongoIterable<U> map(final Function<T, U> mapper) {
594+
return new MappingIterable<T, U>(this, mapper);
595+
}
596+
597+
@Override
598+
public void asyncForEach(final AsyncBlock<? super T> block) {
599+
throw new UnsupportedOperationException();
600+
}
601+
602+
@Override
603+
public <A extends Collection<? super T>> MongoFuture<A> asyncInto(final A target) {
604+
throw new UnsupportedOperationException();
605+
}
606+
607+
@Override
608+
@SuppressWarnings("unchecked")
609+
public MongoCursor<T> iterator() {
610+
final CommandResult commandResult = getDatabase().executeCommand(new AggregateCommand(getNamespace(), pipeline));
611+
return new SingleShotCursor<T>((Iterable<T>) commandResult.getResponse().get("result"));
612+
}
613+
614+
@Override
615+
public void forEach(final Block<? super T> block) {
616+
MongoCursor<T> cursor = iterator();
617+
try {
618+
while (cursor.hasNext()) {
619+
if (!block.run(cursor.next())) {
620+
break;
621+
}
622+
}
623+
} finally {
624+
cursor.close();
625+
}
626+
}
627+
628+
@Override
629+
public <A extends Collection<? super T>> A into(final A target) {
630+
forEach(new Block<T>() {
631+
@Override
632+
public boolean run(final T t) {
633+
target.add(t);
634+
return true;
635+
}
636+
});
637+
return target;
638+
}
639+
}
525640
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2008 - 2013 10gen, Inc. <http://10gen.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.mongodb;
18+
19+
public interface MongoPipeline<T> extends MongoIterable<T> {
20+
MongoPipeline<T> sort(Document id);
21+
22+
MongoPipeline<T> skip(long skip);
23+
24+
MongoPipeline<T> limit(long limit);
25+
26+
MongoPipeline<T> match(Document match);
27+
28+
MongoPipeline<T> project(Document projection);
29+
30+
MongoPipeline<T> group(Document group);
31+
32+
MongoPipeline<T> unwind(String field);
33+
}

driver/src/main/org/mongodb/MappedReducedIterable.java renamed to driver/src/main/org/mongodb/SingleShotCommandIterable.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,20 @@
1919
import org.mongodb.async.AsyncBlock;
2020
import org.mongodb.operation.CommandResult;
2121
import org.mongodb.operation.MongoFuture;
22-
import org.mongodb.operation.ServerCursor;
2322
import org.mongodb.operation.SingleResultFuture;
2423

2524
import java.util.Collection;
26-
import java.util.Iterator;
2725

28-
class MappedReducedIterable<T> implements MongoIterable<T> {
26+
class SingleShotCommandIterable<T> implements MongoIterable<T> {
2927
private CommandResult commandResult;
3028

31-
public MappedReducedIterable(final CommandResult commandResult) {
29+
public SingleShotCommandIterable(final CommandResult commandResult) {
3230
this.commandResult = commandResult;
3331
}
3432

3533
@Override
3634
public MongoCursor<T> iterator() {
37-
return new MappedReducedCursor();
35+
return new SingleShotCursor<T>(getResults());
3836
}
3937

4038
@Override
@@ -83,30 +81,4 @@ private Iterable<T> getResults() {
8381
return ((Iterable<T>) commandResult.getResponse().get("results"));
8482
}
8583

86-
private class MappedReducedCursor implements MongoCursor<T> {
87-
private final Iterator<T> iterator = getResults().iterator();
88-
@Override
89-
public void close() {
90-
}
91-
92-
@Override
93-
public boolean hasNext() {
94-
return iterator.hasNext();
95-
}
96-
97-
@Override
98-
public T next() {
99-
return iterator.next();
100-
}
101-
102-
@Override
103-
public ServerCursor getServerCursor() {
104-
return null;
105-
}
106-
107-
@Override
108-
public void remove() {
109-
throw new UnsupportedOperationException();
110-
}
111-
}
11284
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2008 - 2013 10gen, Inc. <http://10gen.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.mongodb;
18+
19+
import org.mongodb.operation.ServerCursor;
20+
21+
import java.util.Iterator;
22+
23+
class SingleShotCursor<T> implements MongoCursor<T> {
24+
private final Iterator<T> iterator;
25+
26+
SingleShotCursor(final Iterable<T> results) {
27+
iterator = results.iterator();
28+
}
29+
30+
@Override
31+
public void close() {
32+
}
33+
34+
@Override
35+
public boolean hasNext() {
36+
return iterator.hasNext();
37+
}
38+
39+
@Override
40+
public T next() {
41+
return iterator.next();
42+
}
43+
44+
@Override
45+
public ServerCursor getServerCursor() {
46+
return null;
47+
}
48+
49+
@Override
50+
public void remove() {
51+
throw new UnsupportedOperationException();
52+
}
53+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2008 - 2013 10gen, Inc. <http://10gen.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.mongodb.command;
18+
19+
import org.mongodb.Document;
20+
import org.mongodb.MongoNamespace;
21+
22+
import java.util.List;
23+
24+
public class AggregateCommand extends Command {
25+
public AggregateCommand(final MongoNamespace namespace, final List<Document> pipeline) {
26+
super(asDocument(namespace, pipeline));
27+
}
28+
29+
private static Document asDocument(final MongoNamespace namespace, final List<Document> pipeline) {
30+
return new Document("aggregate", namespace.getCollectionName()).append("pipeline", pipeline);
31+
}
32+
}

0 commit comments

Comments
 (0)