Skip to content

Commit 0bf6ece

Browse files
author
Justin Lee
committed
cleaning up force readPreference to primary with $out
use the correct input collection for aggregation
1 parent 1660765 commit 0bf6ece

3 files changed

Lines changed: 17 additions & 13 deletions

File tree

src/main/com/mongodb/DB.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
* @dochub databases
4646
*/
4747
public abstract class DB {
48-
4948
private static final Set<String> _obedientCommands = new HashSet<String>();
5049

5150
static {
@@ -101,7 +100,7 @@ ReadPreference getCommandReadPreference(DBObject command, ReadPreference request
101100
}
102101
} else if(comString.equals("aggregate")) {
103102
List<DBObject> pipeline = (List<DBObject>) command.get("pipeline");
104-
primaryRequired = pipeline.get(pipeline.size()-1).get("$out") == null;
103+
primaryRequired = pipeline.get(pipeline.size()-1).get("$out") != null;
105104
} else {
106105
primaryRequired = !_obedientCommands.contains(comString.toLowerCase());
107106
}
@@ -294,11 +293,11 @@ public CommandResult command( DBObject cmd , int options, ReadPreference readPre
294293
* @dochub commands
295294
*/
296295
public CommandResult command( DBObject cmd , int options, ReadPreference readPrefs, DBEncoder encoder ){
297-
readPrefs = getCommandReadPreference(cmd, readPrefs);
298-
cmd = wrapCommand(cmd, readPrefs);
296+
ReadPreference effectiveReadPrefs = getCommandReadPreference(cmd, readPrefs);
297+
cmd = wrapCommand(cmd, effectiveReadPrefs);
299298

300299
Iterator<DBObject> i =
301-
getCollection("$cmd").__find(cmd, new BasicDBObject(), 0, -1, 0, options, readPrefs ,
300+
getCollection("$cmd").__find(cmd, new BasicDBObject(), 0, -1, 0, options, effectiveReadPrefs ,
302301
DefaultDBDecoder.FACTORY.create(), encoder);
303302
if ( i == null || ! i.hasNext() )
304303
return null;

src/main/com/mongodb/DBCollection.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,11 +1325,13 @@ public AggregationOutput aggregate(final List<DBObject> pipeline) {
13251325
* @return the aggregation's result set
13261326
*/
13271327
public AggregationOutput aggregate(final List<DBObject> pipeline, ReadPreference readPreference) {
1328-
AggregationOptions options = AggregationOptions.builder().outputMode(AggregationOptions.OutputMode.INLINE).build();
1328+
AggregationOptions options = AggregationOptions.builder()
1329+
.outputMode(AggregationOptions.OutputMode.INLINE)
1330+
.build();
13291331

13301332
DBObject command = prepareCommand(pipeline, options);
13311333

1332-
CommandResult res = _db.command(command, getOptions(), _db.getCommandReadPreference(command, readPreference));
1334+
CommandResult res = _db.command(command, getOptions(), readPreference);
13331335

13341336
return new AggregationOutput(command, res);
13351337
}
@@ -1367,8 +1369,7 @@ public MongoCursor aggregate(final List<DBObject> pipeline, final AggregationOpt
13671369
String outCollection = (String) last.get("$out");
13681370
if (outCollection != null) {
13691371
DBCollection collection = _db.getCollection(outCollection);
1370-
return new DBCursorAdapter(new DBCursor(collection, new BasicDBObject(), null,
1371-
_db.getCommandReadPreference(command, readPreference)));
1372+
return new DBCursorAdapter(new DBCursor(collection, new BasicDBObject(), null, ReadPreference.primary()));
13721373
} else {
13731374
return new ResultsCursor(res, this, options.getBatchSize());
13741375
}

src/test/com/mongodb/AggregationTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,15 @@ public void dollarOutOnSecondary() throws UnknownHostException {
144144
}
145145
MongoClient rsClient = new MongoClient(asList(new ServerAddress("localhost"), new ServerAddress("localhost", 27018)));
146146
DB rsDatabase = rsClient.getDB(database.getName());
147-
DBCollection aggCollection = rsDatabase.getCollection("aggCollection");
147+
DBCollection aggCollection = rsDatabase.getCollection(collection.getName());
148148
aggCollection.drop();
149149

150150
final List<DBObject> pipeline = new ArrayList<DBObject>(prepareData());
151151
pipeline.add(new BasicDBObject("$out", aggCollection.getName()));
152152
AggregationOptions options = AggregationOptions.builder()
153153
.outputMode(OutputMode.CURSOR)
154154
.build();
155-
verify(pipeline, options, ReadPreference.secondary());
155+
verify(pipeline, options, ReadPreference.secondary(), aggCollection);
156156
assertEquals(2, aggCollection.count());
157157
}
158158

@@ -176,8 +176,12 @@ private void verify(final List<DBObject> pipeline, final AggregationOptions opti
176176
verify(pipeline, options, ReadPreference.primary());
177177
}
178178

179-
private void verify(final List<DBObject> pipeline, final AggregationOptions options,
180-
final ReadPreference readPreference) {
179+
private void verify(final List<DBObject> pipeline, final AggregationOptions options, final ReadPreference readPreference) {
180+
verify(pipeline, options, readPreference, collection);
181+
}
182+
183+
private void verify(final List<DBObject> pipeline, final AggregationOptions options, final ReadPreference readPreference,
184+
final DBCollection collection) {
181185
final MongoCursor out = collection.aggregate(pipeline, options, readPreference);
182186

183187
final Map<String, DBObject> results = new HashMap<String, DBObject>();

0 commit comments

Comments
 (0)