1818import static org .junit .Assert .*;
1919import static org .mockito .Matchers .*;
2020import static org .mockito .Mockito .*;
21- import static rx .operators .OperationTake .*;
21+ import static rx .operators .OperatorTake .*;
2222
23+ import java .util .Arrays ;
2324import java .util .concurrent .TimeUnit ;
2425import java .util .concurrent .atomic .AtomicBoolean ;
2526
3536import rx .subscriptions .Subscriptions ;
3637import rx .util .functions .Func1 ;
3738
38- public class OperationTakeTest {
39+ public class OperatorTakeTest {
3940
4041 @ Test
4142 public void testTake1 () {
42- Observable <String > w = Observable .from ("one" , "two" , "three" );
43- Observable <String > take = Observable . create ( take ( w , 2 ));
43+ Observable <String > w = Observable .from (Arrays . asList ( "one" , "two" , "three" ) );
44+ Observable <String > take = w . bind ( new OperatorTake < String >( 2 ));
4445
4546 @ SuppressWarnings ("unchecked" )
4647 Observer <String > aObserver = mock (Observer .class );
@@ -54,8 +55,8 @@ public void testTake1() {
5455
5556 @ Test
5657 public void testTake2 () {
57- Observable <String > w = Observable .from ("one" , "two" , "three" );
58- Observable <String > take = Observable . create ( take ( w , 1 ));
58+ Observable <String > w = Observable .from (Arrays . asList ( "one" , "two" , "three" ) );
59+ Observable <String > take = w . bind ( new OperatorTake < String >( 1 ));
5960
6061 @ SuppressWarnings ("unchecked" )
6162 Observer <String > aObserver = mock (Observer .class );
@@ -69,7 +70,7 @@ public void testTake2() {
6970
7071 @ Test (expected = IllegalArgumentException .class )
7172 public void testTakeWithError () {
72- Observable .from (1 , 2 , 3 ).take (1 ).map (new Func1 <Integer , Integer >() {
73+ Observable .from (Arrays . asList ( 1 , 2 , 3 ) ).take (1 ).map (new Func1 <Integer , Integer >() {
7374 public Integer call (Integer t1 ) {
7475 throw new IllegalArgumentException ("some error" );
7576 }
@@ -78,7 +79,7 @@ public Integer call(Integer t1) {
7879
7980 @ Test
8081 public void testTakeWithErrorHappeningInOnNext () {
81- Observable <Integer > w = Observable .from (1 , 2 , 3 ).take (2 ).map (new Func1 <Integer , Integer >() {
82+ Observable <Integer > w = Observable .from (Arrays . asList ( 1 , 2 , 3 ) ).take (2 ).map (new Func1 <Integer , Integer >() {
8283 public Integer call (Integer t1 ) {
8384 throw new IllegalArgumentException ("some error" );
8485 }
@@ -94,7 +95,7 @@ public Integer call(Integer t1) {
9495
9596 @ Test
9697 public void testTakeWithErrorHappeningInTheLastOnNext () {
97- Observable <Integer > w = Observable .from (1 , 2 , 3 ).take (1 ).map (new Func1 <Integer , Integer >() {
98+ Observable <Integer > w = Observable .from (Arrays . asList ( 1 , 2 , 3 ) ).take (1 ).map (new Func1 <Integer , Integer >() {
9899 public Integer call (Integer t1 ) {
99100 throw new IllegalArgumentException ("some error" );
100101 }
@@ -122,7 +123,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
122123 @ SuppressWarnings ("unchecked" )
123124 Observer <String > aObserver = mock (Observer .class );
124125
125- Observable . create ( take ( source , 1 )).subscribe (aObserver );
126+ source . bind ( new OperatorTake < String >( 1 )).subscribe (aObserver );
126127
127128 verify (aObserver , times (1 )).onNext ("one" );
128129 // even though onError is called we take(1) so shouldn't see it
@@ -152,7 +153,7 @@ public void unsubscribe() {
152153 @ SuppressWarnings ("unchecked" )
153154 Observer <String > aObserver = mock (Observer .class );
154155
155- Observable . create ( take ( source , 0 )).subscribe (aObserver );
156+ source . bind ( new OperatorTake < String >( 0 )).subscribe (aObserver );
156157 assertTrue ("source subscribed" , subscribed .get ());
157158 assertTrue ("source unsubscribed" , unSubscribed .get ());
158159
@@ -171,7 +172,7 @@ public void testUnsubscribeAfterTake() {
171172
172173 @ SuppressWarnings ("unchecked" )
173174 Observer <String > aObserver = mock (Observer .class );
174- Observable <String > take = Observable . create ( take ( w , 1 ));
175+ Observable <String > take = w . bind ( new OperatorTake < String >( 1 ));
175176 take .subscribe (aObserver );
176177
177178 // wait for the Observable to complete
@@ -228,99 +229,4 @@ public void run() {
228229 return s ;
229230 }
230231 }
231-
232- @ Test
233- public void testTakeTimed () {
234- TestScheduler scheduler = new TestScheduler ();
235-
236- PublishSubject <Integer > source = PublishSubject .create ();
237-
238- Observable <Integer > result = source .take (1 , TimeUnit .SECONDS , scheduler );
239-
240- Observer <Object > o = mock (Observer .class );
241-
242- result .subscribe (o );
243-
244- source .onNext (1 );
245- source .onNext (2 );
246- source .onNext (3 );
247-
248- scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
249-
250- source .onNext (4 );
251-
252- InOrder inOrder = inOrder (o );
253- inOrder .verify (o ).onNext (1 );
254- inOrder .verify (o ).onNext (2 );
255- inOrder .verify (o ).onNext (3 );
256- inOrder .verify (o ).onCompleted ();
257- inOrder .verifyNoMoreInteractions ();
258-
259- verify (o , never ()).onNext (4 );
260- verify (o , never ()).onError (any (Throwable .class ));
261- }
262-
263- @ Test
264- public void testTakeTimedErrorBeforeTime () {
265- TestScheduler scheduler = new TestScheduler ();
266-
267- PublishSubject <Integer > source = PublishSubject .create ();
268-
269- Observable <Integer > result = source .take (1 , TimeUnit .SECONDS , scheduler );
270-
271- Observer <Object > o = mock (Observer .class );
272-
273- result .subscribe (o );
274-
275- source .onNext (1 );
276- source .onNext (2 );
277- source .onNext (3 );
278- source .onError (new CustomException ());
279-
280- scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
281-
282- source .onNext (4 );
283-
284- InOrder inOrder = inOrder (o );
285- inOrder .verify (o ).onNext (1 );
286- inOrder .verify (o ).onNext (2 );
287- inOrder .verify (o ).onNext (3 );
288- inOrder .verify (o ).onError (any (CustomException .class ));
289- inOrder .verifyNoMoreInteractions ();
290-
291- verify (o , never ()).onCompleted ();
292- verify (o , never ()).onNext (4 );
293- }
294-
295- @ Test
296- public void testTakeTimedErrorAfterTime () {
297- TestScheduler scheduler = new TestScheduler ();
298-
299- PublishSubject <Integer > source = PublishSubject .create ();
300-
301- Observable <Integer > result = source .take (1 , TimeUnit .SECONDS , scheduler );
302-
303- Observer <Object > o = mock (Observer .class );
304-
305- result .subscribe (o );
306-
307- source .onNext (1 );
308- source .onNext (2 );
309- source .onNext (3 );
310-
311- scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
312-
313- source .onNext (4 );
314- source .onError (new CustomException ());
315-
316- InOrder inOrder = inOrder (o );
317- inOrder .verify (o ).onNext (1 );
318- inOrder .verify (o ).onNext (2 );
319- inOrder .verify (o ).onNext (3 );
320- inOrder .verify (o ).onCompleted ();
321- inOrder .verifyNoMoreInteractions ();
322-
323- verify (o , never ()).onNext (4 );
324- verify (o , never ()).onError (any (CustomException .class ));
325- }
326232}
0 commit comments