1+ package io .reactivex .internal .operators ;
2+
3+ import static org .mockito .Matchers .any ;
4+ import static org .mockito .Mockito .*;
5+
6+ import java .util .concurrent .TimeUnit ;
7+
8+ import org .junit .*;
9+ import org .mockito .InOrder ;
10+ import org .reactivestreams .*;
11+
12+ import io .reactivex .*;
13+ import io .reactivex .internal .subscriptions .EmptySubscription ;
14+ import io .reactivex .schedulers .TestScheduler ;
15+ import io .reactivex .subjects .PublishSubject ;
16+
17+ public class OperatorSampleTest {
18+ private TestScheduler scheduler ;
19+ private Scheduler .Worker innerScheduler ;
20+ private Subscriber <Long > observer ;
21+ private Subscriber <Object > observer2 ;
22+
23+ @ Before
24+ // due to mocking
25+ public void before () {
26+ scheduler = new TestScheduler ();
27+ innerScheduler = scheduler .createWorker ();
28+ observer = TestHelper .mockSubscriber ();
29+ observer2 = TestHelper .mockSubscriber ();
30+ }
31+
32+ @ Test
33+ public void testSample () {
34+ Observable <Long > source = Observable .create (new Publisher <Long >() {
35+ @ Override
36+ public void subscribe (final Subscriber <? super Long > observer1 ) {
37+ observer1 .onSubscribe (EmptySubscription .INSTANCE );
38+ innerScheduler .schedule (new Runnable () {
39+ @ Override
40+ public void run () {
41+ observer1 .onNext (1L );
42+ }
43+ }, 1 , TimeUnit .SECONDS );
44+ innerScheduler .schedule (new Runnable () {
45+ @ Override
46+ public void run () {
47+ observer1 .onNext (2L );
48+ }
49+ }, 2 , TimeUnit .SECONDS );
50+ innerScheduler .schedule (new Runnable () {
51+ @ Override
52+ public void run () {
53+ observer1 .onComplete ();
54+ }
55+ }, 3 , TimeUnit .SECONDS );
56+ }
57+ });
58+
59+ Observable <Long > sampled = source .sample (400L , TimeUnit .MILLISECONDS , scheduler );
60+ sampled .subscribe (observer );
61+
62+ InOrder inOrder = inOrder (observer );
63+
64+ scheduler .advanceTimeTo (800L , TimeUnit .MILLISECONDS );
65+ verify (observer , never ()).onNext (any (Long .class ));
66+ verify (observer , never ()).onComplete ();
67+ verify (observer , never ()).onError (any (Throwable .class ));
68+
69+ scheduler .advanceTimeTo (1200L , TimeUnit .MILLISECONDS );
70+ inOrder .verify (observer , times (1 )).onNext (1L );
71+ verify (observer , never ()).onNext (2L );
72+ verify (observer , never ()).onComplete ();
73+ verify (observer , never ()).onError (any (Throwable .class ));
74+
75+ scheduler .advanceTimeTo (1600L , TimeUnit .MILLISECONDS );
76+ inOrder .verify (observer , never ()).onNext (1L );
77+ verify (observer , never ()).onNext (2L );
78+ verify (observer , never ()).onComplete ();
79+ verify (observer , never ()).onError (any (Throwable .class ));
80+
81+ scheduler .advanceTimeTo (2000L , TimeUnit .MILLISECONDS );
82+ inOrder .verify (observer , never ()).onNext (1L );
83+ inOrder .verify (observer , times (1 )).onNext (2L );
84+ verify (observer , never ()).onComplete ();
85+ verify (observer , never ()).onError (any (Throwable .class ));
86+
87+ scheduler .advanceTimeTo (3000L , TimeUnit .MILLISECONDS );
88+ inOrder .verify (observer , never ()).onNext (1L );
89+ inOrder .verify (observer , never ()).onNext (2L );
90+ verify (observer , times (1 )).onComplete ();
91+ verify (observer , never ()).onError (any (Throwable .class ));
92+ }
93+
94+ @ Test
95+ public void sampleWithSamplerNormal () {
96+ PublishSubject <Integer > source = PublishSubject .create ();
97+ PublishSubject <Integer > sampler = PublishSubject .create ();
98+
99+ Observable <Integer > m = source .sample (sampler );
100+ m .subscribe (observer2 );
101+
102+ source .onNext (1 );
103+ source .onNext (2 );
104+ sampler .onNext (1 );
105+ source .onNext (3 );
106+ source .onNext (4 );
107+ sampler .onNext (2 );
108+ source .onComplete ();
109+ sampler .onNext (3 );
110+
111+ InOrder inOrder = inOrder (observer2 );
112+ inOrder .verify (observer2 , never ()).onNext (1 );
113+ inOrder .verify (observer2 , times (1 )).onNext (2 );
114+ inOrder .verify (observer2 , never ()).onNext (3 );
115+ inOrder .verify (observer2 , times (1 )).onNext (4 );
116+ inOrder .verify (observer2 , times (1 )).onComplete ();
117+ verify (observer , never ()).onError (any (Throwable .class ));
118+ }
119+
120+ @ Test
121+ public void sampleWithSamplerNoDuplicates () {
122+ PublishSubject <Integer > source = PublishSubject .create ();
123+ PublishSubject <Integer > sampler = PublishSubject .create ();
124+
125+ Observable <Integer > m = source .sample (sampler );
126+ m .subscribe (observer2 );
127+
128+ source .onNext (1 );
129+ source .onNext (2 );
130+ sampler .onNext (1 );
131+ sampler .onNext (1 );
132+
133+ source .onNext (3 );
134+ source .onNext (4 );
135+ sampler .onNext (2 );
136+ sampler .onNext (2 );
137+
138+ source .onComplete ();
139+ sampler .onNext (3 );
140+
141+ InOrder inOrder = inOrder (observer2 );
142+ inOrder .verify (observer2 , never ()).onNext (1 );
143+ inOrder .verify (observer2 , times (1 )).onNext (2 );
144+ inOrder .verify (observer2 , never ()).onNext (3 );
145+ inOrder .verify (observer2 , times (1 )).onNext (4 );
146+ inOrder .verify (observer2 , times (1 )).onComplete ();
147+ verify (observer , never ()).onError (any (Throwable .class ));
148+ }
149+
150+ @ Test
151+ public void sampleWithSamplerTerminatingEarly () {
152+ PublishSubject <Integer > source = PublishSubject .create ();
153+ PublishSubject <Integer > sampler = PublishSubject .create ();
154+
155+ Observable <Integer > m = source .sample (sampler );
156+ m .subscribe (observer2 );
157+
158+ source .onNext (1 );
159+ source .onNext (2 );
160+ sampler .onNext (1 );
161+ sampler .onComplete ();
162+
163+ source .onNext (3 );
164+ source .onNext (4 );
165+
166+ InOrder inOrder = inOrder (observer2 );
167+ inOrder .verify (observer2 , never ()).onNext (1 );
168+ inOrder .verify (observer2 , times (1 )).onNext (2 );
169+ inOrder .verify (observer2 , times (1 )).onComplete ();
170+ inOrder .verify (observer2 , never ()).onNext (any ());
171+ verify (observer , never ()).onError (any (Throwable .class ));
172+ }
173+
174+ @ Test
175+ public void sampleWithSamplerEmitAndTerminate () {
176+ PublishSubject <Integer > source = PublishSubject .create ();
177+ PublishSubject <Integer > sampler = PublishSubject .create ();
178+
179+ Observable <Integer > m = source .sample (sampler );
180+ m .subscribe (observer2 );
181+
182+ source .onNext (1 );
183+ source .onNext (2 );
184+ sampler .onNext (1 );
185+ source .onNext (3 );
186+ source .onComplete ();
187+ sampler .onNext (2 );
188+ sampler .onComplete ();
189+
190+ InOrder inOrder = inOrder (observer2 );
191+ inOrder .verify (observer2 , never ()).onNext (1 );
192+ inOrder .verify (observer2 , times (1 )).onNext (2 );
193+ inOrder .verify (observer2 , never ()).onNext (3 );
194+ inOrder .verify (observer2 , times (1 )).onComplete ();
195+ inOrder .verify (observer2 , never ()).onNext (any ());
196+ verify (observer , never ()).onError (any (Throwable .class ));
197+ }
198+
199+ @ Test
200+ public void sampleWithSamplerEmptySource () {
201+ PublishSubject <Integer > source = PublishSubject .create ();
202+ PublishSubject <Integer > sampler = PublishSubject .create ();
203+
204+ Observable <Integer > m = source .sample (sampler );
205+ m .subscribe (observer2 );
206+
207+ source .onComplete ();
208+ sampler .onNext (1 );
209+
210+ InOrder inOrder = inOrder (observer2 );
211+ inOrder .verify (observer2 , times (1 )).onComplete ();
212+ verify (observer2 , never ()).onNext (any ());
213+ verify (observer , never ()).onError (any (Throwable .class ));
214+ }
215+
216+ @ Test
217+ public void sampleWithSamplerSourceThrows () {
218+ PublishSubject <Integer > source = PublishSubject .create ();
219+ PublishSubject <Integer > sampler = PublishSubject .create ();
220+
221+ Observable <Integer > m = source .sample (sampler );
222+ m .subscribe (observer2 );
223+
224+ source .onNext (1 );
225+ source .onError (new RuntimeException ("Forced failure!" ));
226+ sampler .onNext (1 );
227+
228+ InOrder inOrder = inOrder (observer2 );
229+ inOrder .verify (observer2 , times (1 )).onError (any (Throwable .class ));
230+ verify (observer2 , never ()).onNext (any ());
231+ verify (observer , never ()).onComplete ();
232+ }
233+
234+ @ Test
235+ public void sampleWithSamplerThrows () {
236+ PublishSubject <Integer > source = PublishSubject .create ();
237+ PublishSubject <Integer > sampler = PublishSubject .create ();
238+
239+ Observable <Integer > m = source .sample (sampler );
240+ m .subscribe (observer2 );
241+
242+ source .onNext (1 );
243+ sampler .onNext (1 );
244+ sampler .onError (new RuntimeException ("Forced failure!" ));
245+
246+ InOrder inOrder = inOrder (observer2 );
247+ inOrder .verify (observer2 , times (1 )).onNext (1 );
248+ inOrder .verify (observer2 , times (1 )).onError (any (RuntimeException .class ));
249+ verify (observer , never ()).onComplete ();
250+ }
251+
252+ @ Test
253+ public void testSampleUnsubscribe () {
254+ final Subscription s = mock (Subscription .class );
255+ Observable <Integer > o = Observable .create (
256+ new Publisher <Integer >() {
257+ @ Override
258+ public void subscribe (Subscriber <? super Integer > subscriber ) {
259+ subscriber .onSubscribe (s );
260+ }
261+ }
262+ );
263+ o .throttleLast (1 , TimeUnit .MILLISECONDS ).subscribe ().dispose ();
264+ verify (s ).cancel ();
265+ }
266+ }
0 commit comments