Skip to content

Commit 3f324c6

Browse files
authored
2.x: add dedicated concat for array of publishers (ReactiveX#4266)
1 parent 708310a commit 3f324c6

File tree

4 files changed

+201
-11
lines changed

4 files changed

+201
-11
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -373,12 +373,11 @@ public static <T> Flowable<T> concat(
373373
return concatArray(p1, p2, p3, p4, p5, p6, p7, p8, p9);
374374
}
375375

376-
@SuppressWarnings({ "unchecked", "rawtypes" })
377376
@BackpressureSupport(BackpressureKind.FULL)
378377
@SchedulerSupport(SchedulerSupport.NONE)
378+
@Deprecated // prefetch is unnecessary when the sources is synchronously available
379379
public static <T> Flowable<T> concatArray(int prefetch, Publisher<? extends T>... sources) {
380-
Objects.requireNonNull(sources, "sources is null");
381-
return fromArray(sources).concatMap((Function)Functions.identity(), prefetch);
380+
return concatArray(sources);
382381
}
383382

384383
/**
@@ -387,10 +386,9 @@ public static <T> Flowable<T> concatArray(int prefetch, Publisher<? extends T>..
387386
* Note: named this way because of overload conflict with concat(NbpObservable&lt;NbpObservable&gt)
388387
* @param sources the array of sources
389388
* @param <T> the common base value type
390-
* @return the new NbpObservable instance
389+
* @return the new Observable instance
391390
* @throws NullPointerException if sources is null
392391
*/
393-
@SuppressWarnings({ "unchecked", "rawtypes" })
394392
@BackpressureSupport(BackpressureKind.FULL)
395393
@SchedulerSupport(SchedulerSupport.NONE)
396394
public static <T> Flowable<T> concatArray(Publisher<? extends T>... sources) {
@@ -400,7 +398,27 @@ public static <T> Flowable<T> concatArray(Publisher<? extends T>... sources) {
400398
if (sources.length == 1) {
401399
return fromPublisher(sources[0]);
402400
}
403-
return fromArray(sources).concatMap((Function)Functions.identity());
401+
return new FlowableConcatArray<T>(sources, false);
402+
}
403+
404+
/**
405+
* Concatenates a variable number of Observable sources and delays errors from any of them
406+
* till all terminate.
407+
* @param sources the array of sources
408+
* @param <T> the common base value type
409+
* @return the new Flowable instance
410+
* @throws NullPointerException if sources is null
411+
*/
412+
@BackpressureSupport(BackpressureKind.FULL)
413+
@SchedulerSupport(SchedulerSupport.NONE)
414+
public static <T> Flowable<T> concatArrayDelayError(Publisher<? extends T>... sources) {
415+
if (sources.length == 0) {
416+
return empty();
417+
} else
418+
if (sources.length == 1) {
419+
return fromPublisher(sources[0]);
420+
}
421+
return new FlowableConcatArray<T>(sources, true);
404422
}
405423

406424
public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources) {
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.flowable;
14+
15+
import java.util.*;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.Flowable;
21+
import io.reactivex.exceptions.CompositeException;
22+
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
23+
24+
public final class FlowableConcatArray<T> extends Flowable<T> {
25+
26+
final Publisher<? extends T>[] sources;
27+
28+
final boolean delayError;
29+
30+
public FlowableConcatArray(Publisher<? extends T>[] sources, boolean delayError) {
31+
this.sources = sources;
32+
this.delayError = delayError;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(Subscriber<? super T> s) {
37+
ConcatArraySubscriber<T> parent = new ConcatArraySubscriber<T>(sources, delayError, s);
38+
s.onSubscribe(parent);
39+
40+
parent.onComplete();
41+
}
42+
43+
static final class ConcatArraySubscriber<T> extends SubscriptionArbiter implements Subscriber<T> {
44+
/** */
45+
private static final long serialVersionUID = -8158322871608889516L;
46+
47+
final Subscriber<? super T> actual;
48+
49+
final Publisher<? extends T>[] sources;
50+
51+
final boolean delayError;
52+
53+
final AtomicInteger wip;
54+
55+
int index;
56+
57+
List<Throwable> errors;
58+
59+
long produced;
60+
61+
public ConcatArraySubscriber(Publisher<? extends T>[] sources, boolean delayError, Subscriber<? super T> actual) {
62+
this.actual = actual;
63+
this.sources = sources;
64+
this.delayError = delayError;
65+
this.wip = new AtomicInteger();
66+
}
67+
68+
@Override
69+
public void onSubscribe(Subscription s) {
70+
setSubscription(s);
71+
}
72+
73+
@Override
74+
public void onNext(T t) {
75+
produced++;
76+
actual.onNext(t);
77+
}
78+
79+
@Override
80+
public void onError(Throwable t) {
81+
if (delayError) {
82+
List<Throwable> list = errors;
83+
if (list == null) {
84+
list = new ArrayList<Throwable>(sources.length - index + 1);
85+
errors = list;
86+
}
87+
list.add(t);
88+
onComplete();
89+
} else {
90+
actual.onError(t);
91+
}
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
if (wip.getAndIncrement() == 0) {
97+
Publisher<? extends T>[] srcs = sources;
98+
int n = srcs.length;
99+
int i = index;
100+
for (;;) {
101+
102+
if (i == n) {
103+
List<Throwable> list = errors;
104+
if (list != null) {
105+
if (list.size() == 1) {
106+
actual.onError(list.get(0));
107+
} else {
108+
actual.onError(new CompositeException(list));
109+
}
110+
} else {
111+
actual.onComplete();
112+
}
113+
return;
114+
}
115+
116+
Publisher<? extends T> p = srcs[i];
117+
118+
if (p == null) {
119+
Throwable ex = new NullPointerException("A Publisher entry is null");
120+
if (delayError) {
121+
List<Throwable> list = errors;
122+
if (list == null) {
123+
list = new ArrayList<Throwable>(n - i + 1);
124+
errors = list;
125+
}
126+
list.add(ex);
127+
i++;
128+
continue;
129+
} else {
130+
actual.onError(ex);
131+
return;
132+
}
133+
} else {
134+
long r = produced;
135+
if (r != 0L) {
136+
produced = 0L;
137+
produced(r);
138+
}
139+
p.subscribe(this);
140+
}
141+
142+
index = ++i;
143+
144+
if (wip.decrementAndGet() == 0) {
145+
break;
146+
}
147+
}
148+
}
149+
}
150+
}
151+
152+
}

src/main/java/io/reactivex/subscribers/TestSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ public final TestSubscriber<T> assertResult(T... values) {
888888
* @param values the expected values, asserted in order
889889
* @return this
890890
*/
891-
public final TestSubscriber<T> assertFailure(Class<Throwable> error, T... values) {
891+
public final TestSubscriber<T> assertFailure(Class<? extends Throwable> error, T... values) {
892892
return assertValues(values)
893893
.assertError(error)
894894
.assertNotComplete();

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727

2828
import io.reactivex.*;
2929
import io.reactivex.disposables.*;
30+
import io.reactivex.exceptions.*;
3031
import io.reactivex.functions.Function;
31-
import io.reactivex.internal.subscriptions.*;
32+
import io.reactivex.internal.subscriptions.BooleanSubscription;
3233
import io.reactivex.processors.*;
3334
import io.reactivex.schedulers.*;
34-
import io.reactivex.subscribers.DefaultObserver;
35-
import io.reactivex.subscribers.TestSubscriber;
35+
import io.reactivex.subscribers.*;
3636

3737
public class FlowableConcatTest {
3838

@@ -824,5 +824,25 @@ public Flowable<Integer> apply(Integer t) {
824824
assertEquals((Integer)999, ts.values().get(999));
825825
}
826826
}
827-
827+
828+
@SuppressWarnings("unchecked")
829+
@Test
830+
public void arrayDelayError() {
831+
Publisher<Integer>[] sources = new Publisher[] {
832+
Flowable.just(1),
833+
null,
834+
Flowable.range(2, 3),
835+
Flowable.error(new TestException()),
836+
Flowable.empty()
837+
};
838+
839+
TestSubscriber<Integer> ts = Flowable.concatArrayDelayError(sources).test();
840+
841+
ts.assertFailure(CompositeException.class, 1, 2, 3, 4);
842+
843+
CompositeException composite = (CompositeException)ts.errors().get(0);
844+
List<Throwable> list = composite.getExceptions();
845+
assertTrue(list.get(0).toString(), list.get(0) instanceof NullPointerException);
846+
assertTrue(list.get(1).toString(), list.get(1) instanceof TestException);
847+
}
828848
}

0 commit comments

Comments
 (0)