@@ -5414,8 +5414,43 @@ public final <TIntermediate, TResult> Observable<TResult> multicast(
54145414 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
54155415 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
54165416 */
5417- public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> subject) {
5418- return new OperatorMulticast<T, R>(this, subject);
5417+ public final <R> ConnectableObservable<R> multicast(final Subject<? super T, ? extends R> subject) {
5418+ return new OperatorMulticast<T, R>(this, new Func0<Subject<? super T, ? extends R>>() {
5419+
5420+ @Override
5421+ public Subject<? super T, ? extends R> call() {
5422+ // same one every time, no factory behavior
5423+ return subject;
5424+ }
5425+
5426+ });
5427+ }
5428+
5429+ /**
5430+ * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5431+ * into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
5432+ * does not begin emitting items when it is subscribed to, but only when its {@code connect} method
5433+ * is called.
5434+ * <dl>
5435+ * <dt><b>Backpressure Support:</b></dt>
5436+ * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5437+ * multiple subscribers. Each child will need to manage backpressure independently using operators such
5438+ * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5439+ * <dt><b>Scheduler:</b></dt>
5440+ * <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5441+ * </dl>
5442+ *
5443+ * @param subjectFactory
5444+ * Func that creates a new {@link Subject} for the {@link ConnectableObservable} to push source items into
5445+ * @param <R>
5446+ * the type of items emitted by the resulting {@code ConnectableObservable}
5447+ * @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5448+ * into the specified {@link Subject}
5449+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
5450+ * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5451+ */
5452+ public final <R> ConnectableObservable<R> multicast(Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
5453+ return new OperatorMulticast<T, R>(this, subjectFactory);
54195454 }
54205455
54215456 /**
@@ -5724,7 +5759,14 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
57245759 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
57255760 */
57265761 public final ConnectableObservable<T> publish() {
5727- return new OperatorMulticast<T, T>(this, PublishSubject.<T> create());
5762+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5763+
5764+ @Override
5765+ public Subject<? super T, ? extends T> call() {
5766+ return PublishSubject.<T> create();
5767+ }
5768+
5769+ });
57285770 }
57295771
57305772 /**
@@ -5819,8 +5861,15 @@ public final Subject<T, T> call() {
58195861 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: publish</a>
58205862 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
58215863 */
5822- public final ConnectableObservable<T> publish(T initialValue) {
5823- return new OperatorMulticast<T, T>(this, BehaviorSubject.<T> create(initialValue));
5864+ public final ConnectableObservable<T> publish(final T initialValue) {
5865+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5866+
5867+ @Override
5868+ public Subject<? super T, ? extends T> call() {
5869+ return BehaviorSubject.<T> create(initialValue);
5870+ }
5871+
5872+ });
58245873 }
58255874
58265875 /**
@@ -5842,7 +5891,14 @@ public final ConnectableObservable<T> publish(T initialValue) {
58425891 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
58435892 */
58445893 public final ConnectableObservable<T> publishLast() {
5845- return new OperatorMulticast<T, T>(this, AsyncSubject.<T> create());
5894+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5895+
5896+ @Override
5897+ public Subject<? super T, ? extends T> call() {
5898+ return AsyncSubject.<T> create();
5899+ }
5900+
5901+ });
58465902 }
58475903
58485904 /**
@@ -6112,7 +6168,14 @@ public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notific
61126168 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.replay.aspx">MSDN: Observable.Replay</a>
61136169 */
61146170 public final ConnectableObservable<T> replay() {
6115- return new OperatorMulticast<T, T>(this, ReplaySubject.<T> create());
6171+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6172+
6173+ @Override
6174+ public Subject<? super T, ? extends T> call() {
6175+ return ReplaySubject.<T> create();
6176+ }
6177+
6178+ });
61166179 }
61176180
61186181 /**
@@ -6444,8 +6507,15 @@ public final Subject<T, T> call() {
64446507 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
64456508 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211976.aspx">MSDN: Observable.Replay</a>
64466509 */
6447- public final ConnectableObservable<T> replay(int bufferSize) {
6448- return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithSize(bufferSize));
6510+ public final ConnectableObservable<T> replay(final int bufferSize) {
6511+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6512+
6513+ @Override
6514+ public Subject<? super T, ? extends T> call() {
6515+ return ReplaySubject.<T>createWithSize(bufferSize);
6516+ }
6517+
6518+ });
64496519 }
64506520
64516521 /**
@@ -6512,11 +6582,18 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
65126582 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
65136583 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211759.aspx">MSDN: Observable.Replay</a>
65146584 */
6515- public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
6585+ public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
65166586 if (bufferSize < 0) {
65176587 throw new IllegalArgumentException("bufferSize < 0");
65186588 }
6519- return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler));
6589+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6590+
6591+ @Override
6592+ public Subject<? super T, ? extends T> call() {
6593+ return ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler);
6594+ }
6595+
6596+ });
65206597 }
65216598
65226599 /**
@@ -6544,10 +6621,15 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
65446621 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
65456622 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229814.aspx">MSDN: Observable.Replay</a>
65466623 */
6547- public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
6548- return new OperatorMulticast<T, T>(this,
6549- OperatorReplay.createScheduledSubject(
6550- ReplaySubject.<T>createWithSize(bufferSize), scheduler));
6624+ public final ConnectableObservable<T> replay(final int bufferSize, final Scheduler scheduler) {
6625+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6626+
6627+ @Override
6628+ public Subject<? super T, ? extends T> call() {
6629+ return OperatorReplay.createScheduledSubject(ReplaySubject.<T>createWithSize(bufferSize), scheduler);
6630+ }
6631+
6632+ });
65516633 }
65526634
65536635 /**
@@ -6606,8 +6688,15 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
66066688 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
66076689 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211811.aspx">MSDN: Observable.Replay</a>
66086690 */
6609- public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
6610- return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTime(time, unit, scheduler));
6691+ public final ConnectableObservable<T> replay(final long time, final TimeUnit unit, final Scheduler scheduler) {
6692+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6693+
6694+ @Override
6695+ public Subject<? super T, ? extends T> call() {
6696+ return ReplaySubject.<T>createWithTime(time, unit, scheduler);
6697+ }
6698+
6699+ });
66116700 }
66126701
66136702 /**
@@ -6634,8 +6723,15 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler
66346723 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
66356724 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211699.aspx">MSDN: Observable.Replay</a>
66366725 */
6637- public final ConnectableObservable<T> replay(Scheduler scheduler) {
6638- return new OperatorMulticast<T, T>(this, OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler));
6726+ public final ConnectableObservable<T> replay(final Scheduler scheduler) {
6727+ return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6728+
6729+ @Override
6730+ public Subject<? super T, ? extends T> call() {
6731+ return OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler);
6732+ }
6733+
6734+ });
66396735 }
66406736
66416737 /**
0 commit comments