33import org .junit .Before ;
44import org .junit .Test ;
55import org .mockito .MockitoAnnotations ;
6- import rx .observables .ConnectableObservable ;
76import rx .subscriptions .Subscriptions ;
7+ import rx .util .functions .Action0 ;
88
9- import static org .mockito .Mockito .*;
9+ import java .util .concurrent .atomic .AtomicInteger ;
10+
11+ import static org .junit .Assert .assertEquals ;
12+ import static org .mockito .Mockito .mock ;
1013
1114public class RefCountTests {
1215
@@ -16,67 +19,30 @@ public void setUp() {
1619 }
1720
1821 @ Test
19- public void subscriptionToUnderlyingOnFirstSubscription () {
20- @ SuppressWarnings ("unchecked" )
21- ConnectableObservable <Integer > connectable = mock (ConnectableObservable .class );
22- Observable <Integer > refCounted = ConnectableObservable .refCount (connectable );
23- @ SuppressWarnings ("unchecked" )
24- Observer <Integer > observer = mock (Observer .class );
25- when (connectable .subscribe (any (Observer .class ))).thenReturn (Subscriptions .empty ());
26- when (connectable .connect ()).thenReturn (Subscriptions .empty ());
27- refCounted .subscribe (observer );
28- verify (connectable , times (1 )).subscribe (any (Observer .class ));
29- verify (connectable , times (1 )).connect ();
30- }
31-
32- @ Test
33- public void noSubscriptionToUnderlyingOnSecondSubscription () {
34- @ SuppressWarnings ("unchecked" )
35- ConnectableObservable <Integer > connectable = mock (ConnectableObservable .class );
36- Observable <Integer > refCounted = ConnectableObservable .refCount (connectable );
37- @ SuppressWarnings ("unchecked" )
38- Observer <Integer > observer = mock (Observer .class );
39- when (connectable .subscribe (any (Observer .class ))).thenReturn (Subscriptions .empty ());
40- when (connectable .connect ()).thenReturn (Subscriptions .empty ());
41- refCounted .subscribe (observer );
42- refCounted .subscribe (observer );
43- verify (connectable , times (2 )).subscribe (any (Observer .class ));
44- verify (connectable , times (1 )).connect ();
45- }
46-
47- @ Test
48- public void unsubscriptionFromUnderlyingOnLastUnsubscription () {
49- @ SuppressWarnings ("unchecked" )
50- ConnectableObservable <Integer > connectable = mock (ConnectableObservable .class );
51- Observable <Integer > refCounted = ConnectableObservable .refCount (connectable );
52- @ SuppressWarnings ("unchecked" )
53- Observer <Integer > observer = mock (Observer .class );
54- Subscription underlying = mock (Subscription .class );
55- when (connectable .subscribe (any (Observer .class ))).thenReturn (underlying );
56- Subscription connection = mock (Subscription .class );
57- when (connectable .connect ()).thenReturn (connection );
58- Subscription first = refCounted .subscribe (observer );
59- first .unsubscribe ();
60- verify (underlying , times (1 )).unsubscribe ();
61- verify (connection , times (1 )).unsubscribe ();
62- }
63-
64- @ Test
65- public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription () {
66- @ SuppressWarnings ("unchecked" )
67- ConnectableObservable <Integer > connectable = mock (ConnectableObservable .class );
68- Observable <Integer > refCounted = ConnectableObservable .refCount (connectable );
69- @ SuppressWarnings ("unchecked" )
22+ public void onlyFirstShouldSubscribeAndLastUnsubscribe () {
23+ final AtomicInteger subscriptionCount = new AtomicInteger ();
24+ final AtomicInteger unsubscriptionCount = new AtomicInteger ();
25+ Observable <Integer > observable = Observable .create (new Observable .OnSubscribeFunc <Integer >() {
26+ @ Override
27+ public Subscription onSubscribe (Observer <? super Integer > observer ) {
28+ subscriptionCount .incrementAndGet ();
29+ return Subscriptions .create (new Action0 () {
30+ @ Override
31+ public void call () {
32+ unsubscriptionCount .incrementAndGet ();
33+ }
34+ });
35+ }
36+ });
37+ Observable <Integer > refCounted = observable .publish ().refCount ();
7038 Observer <Integer > observer = mock (Observer .class );
71- Subscription underlying = mock (Subscription .class );
72- when (connectable .subscribe (any (Observer .class ))).thenReturn (underlying );
73- Subscription connection = mock (Subscription .class );
74- when (connectable .connect ()).thenReturn (connection );
7539 Subscription first = refCounted .subscribe (observer );
40+ assertEquals (1 , subscriptionCount .get ());
7641 Subscription second = refCounted .subscribe (observer );
42+ assertEquals (1 , subscriptionCount .get ());
7743 first .unsubscribe ();
44+ assertEquals (0 , unsubscriptionCount .get ());
7845 second .unsubscribe ();
79- verify (underlying , times (2 )).unsubscribe ();
80- verify (connection , times (1 )).unsubscribe ();
46+ assertEquals (1 , unsubscriptionCount .get ());
8147 }
8248}
0 commit comments