2727import rx .Observer ;
2828import rx .Subscription ;
2929import rx .subscriptions .CompositeSubscription ;
30- import rx .subscriptions .SingleAssignmentSubscription ;
3130import rx .util .functions .Func2 ;
3231import rx .util .functions .Func3 ;
3332import rx .util .functions .Func4 ;
@@ -61,37 +60,42 @@ public class OperationCombineLatest {
6160 * The aggregation function used to combine the source observable values.
6261 * @return A function from an observer to a subscription. This can be used to create an observable from.
6362 */
63+ @ SuppressWarnings ("unchecked" )
6464 public static <T0 , T1 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <T1 > w1 , Func2 <? super T0 , ? super T1 , ? extends R > combineLatestFunction ) {
6565 return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 ), Functions .fromFunc (combineLatestFunction ));
6666 }
6767
6868 /**
6969 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
7070 */
71+ @ SuppressWarnings ("unchecked" )
7172 public static <T0 , T1 , T2 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 ,
7273 Func3 <? super T0 , ? super T1 , ? super T2 , ? extends R > combineLatestFunction ) {
73- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 ), Functions .fromFunc (combineLatestFunction ));
74+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 ), Functions .fromFunc (combineLatestFunction ));
7475 }
7576
7677 /**
7778 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
7879 */
80+ @ SuppressWarnings ("unchecked" )
7981 public static <T0 , T1 , T2 , T3 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 ,
8082 Func4 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? extends R > combineLatestFunction ) {
81- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 ), Functions .fromFunc (combineLatestFunction ));
83+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 ), Functions .fromFunc (combineLatestFunction ));
8284 }
8385
8486 /**
8587 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
8688 */
89+ @ SuppressWarnings ("unchecked" )
8790 public static <T0 , T1 , T2 , T3 , T4 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 ,
8891 Func5 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? extends R > combineLatestFunction ) {
89- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 ), Functions .fromFunc (combineLatestFunction ));
92+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 ), Functions .fromFunc (combineLatestFunction ));
9093 }
9194
9295 /**
9396 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
9497 */
98+ @ SuppressWarnings ("unchecked" )
9599 public static <T0 , T1 , T2 , T3 , T4 , T5 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 ,
96100 Func6 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? extends R > combineLatestFunction ) {
97101 return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 ), Functions .fromFunc (combineLatestFunction ));
@@ -100,6 +104,7 @@ public static <T0, T1, T2, T3, T4, T5, R> OnSubscribeFunc<R> combineLatest(Obser
100104 /**
101105 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
102106 */
107+ @ SuppressWarnings ("unchecked" )
103108 public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 ,
104109 Func7 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? extends R > combineLatestFunction ) {
105110 return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 , w6 ), Functions .fromFunc (combineLatestFunction ));
@@ -108,6 +113,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, R> OnSubscribeFunc<R> combineLatest(O
108113 /**
109114 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
110115 */
116+ @ SuppressWarnings ("unchecked" )
111117 public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , T7 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 , Observable <? extends T7 > w7 ,
112118 Func8 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? super T7 , ? extends R > combineLatestFunction ) {
113119 return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 , w6 , w7 ), Functions .fromFunc (combineLatestFunction ));
@@ -116,6 +122,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, T7, R> OnSubscribeFunc<R> combineLate
116122 /**
117123 * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
118124 */
125+ @ SuppressWarnings ("unchecked" )
119126 public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 , Observable <? extends T7 > w7 ,
120127 Observable <? extends T8 > w8 ,
121128 Func9 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? super T7 , ? super T8 , ? extends R > combineLatestFunction ) {
@@ -125,6 +132,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, T7, T8, R> OnSubscribeFunc<R> combine
125132 static final class CombineLatest <T , R > implements OnSubscribeFunc <R > {
126133 final List <Observable <? extends T >> sources ;
127134 final FuncN <? extends R > combiner ;
135+
128136 public CombineLatest (Iterable <? extends Observable <? extends T >> sources , FuncN <? extends R > combiner ) {
129137 this .sources = new ArrayList <Observable <? extends T >>();
130138 this .combiner = combiner ;
@@ -136,27 +144,28 @@ public CombineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<
136144 @ Override
137145 public Subscription onSubscribe (Observer <? super R > t1 ) {
138146 CompositeSubscription csub = new CompositeSubscription ();
139-
147+
140148 Collector collector = new Collector (t1 , csub , sources .size ());
141-
149+
142150 int index = 0 ;
143151 List <SourceObserver > observers = new ArrayList <SourceObserver >(sources .size () + 1 );
144152 for (Observable <? extends T > source : sources ) {
145- SingleAssignmentSubscription sas = new SingleAssignmentSubscription ();
153+ SafeObservableSubscription sas = new SafeObservableSubscription ();
146154 csub .add (sas );
147155 observers .add (new SourceObserver (collector , sas , index , source ));
148156 index ++;
149157 }
150-
158+
151159 for (SourceObserver so : observers ) {
152160 // if we run to completion, don't bother any further
153161 if (!csub .isUnsubscribed ()) {
154162 so .connect ();
155163 }
156164 }
157-
165+
158166 return csub ;
159167 }
168+
160169 /**
161170 * The collector that combines the latest values from many sources.
162171 */
@@ -173,6 +182,7 @@ final class Collector {
173182 int hasCount ;
174183 /** Number of completed source observers. */
175184 int completedCount ;
185+
176186 public Collector (Observer <? super R > observer , Subscription cancel , int count ) {
177187 this .observer = observer ;
178188 this .cancel = cancel ;
@@ -181,6 +191,7 @@ public Collector(Observer<? super R> observer, Subscription cancel, int count) {
181191 this .completed = new BitSet (count );
182192 this .lock = new ReentrantLock ();
183193 }
194+
184195 public void next (int index , T value ) {
185196 Throwable err = null ;
186197 lock .lock ();
@@ -210,6 +221,7 @@ public void next(int index, T value) {
210221 cancel .unsubscribe ();
211222 }
212223 }
224+
213225 public void error (int index , Throwable e ) {
214226 boolean unsub = false ;
215227 lock .lock ();
@@ -226,13 +238,16 @@ public void error(int index, Throwable e) {
226238 cancel .unsubscribe ();
227239 }
228240 }
241+
229242 boolean isTerminated () {
230243 return completedCount == values .length + 1 ;
231244 }
245+
232246 void terminate () {
233247 completedCount = values .length + 1 ;
234248 Arrays .fill (values , null );
235249 }
250+
236251 public void completed (int index ) {
237252 boolean unsub = false ;
238253 lock .lock ();
@@ -256,22 +271,25 @@ public void completed(int index) {
256271 }
257272 }
258273 }
274+
259275 /**
260276 * Observes a specific source and communicates with the collector.
261277 */
262- final class SourceObserver implements Observer <T > {
263- final SingleAssignmentSubscription self ;
278+ final class SourceObserver implements Observer <T > {
279+ final SafeObservableSubscription self ;
264280 final Collector collector ;
265281 final int index ;
266282 Observable <? extends T > source ;
267- public SourceObserver (Collector collector ,
268- SingleAssignmentSubscription self , int index ,
283+
284+ public SourceObserver (Collector collector ,
285+ SafeObservableSubscription self , int index ,
269286 Observable <? extends T > source ) {
270287 this .self = self ;
271288 this .collector = collector ;
272289 this .index = index ;
273290 this .source = source ;
274291 }
292+
275293 @ Override
276294 public void onNext (T args ) {
277295 collector .next (index , args );
@@ -287,9 +305,10 @@ public void onCompleted() {
287305 collector .completed (index );
288306 self .unsubscribe ();
289307 }
308+
290309 /** Connect to the source. */
291310 void connect () {
292- self .set (source .subscribe (this ));
311+ self .wrap (source .subscribe (this ));
293312 source = null ;
294313 }
295314 }
0 commit comments