@@ -449,6 +449,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
449449 private boolean mayNeedToDrain = false ;
450450 /* protected by emitLock */
451451 int emitted = 0 ;
452+ final int THRESHOLD = (int ) (q .capacity () * 0.7 );
452453
453454 public InnerSubscriber (MergeSubscriber <T > parent , MergeProducer <T > producer ) {
454455 this .parentSubscriber = parent ;
@@ -553,8 +554,25 @@ private void emit(T t, boolean complete) {
553554 } finally {
554555 drain = parentSubscriber .releaseEmitLock ();
555556 }
556- if (emitted > 256 ) {
557+ if (emitted > THRESHOLD ) {
557558 // this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
559+ /**
560+ * <pre> {@code
561+ * Without this batching:
562+ *
563+ * Benchmark (size) Mode Samples Score Score error Units
564+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
565+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
566+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
567+ *
568+ * With this batching:
569+ *
570+ * Benchmark (size) Mode Samples Score Score error Units
571+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
572+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
573+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
574+ *} </pre>
575+ */
558576 request (emitted );
559577 // we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
560578 // and it will be flushed before anything else touches it because the emitLock will be obtained
0 commit comments