|
15 | 15 | */ |
16 | 16 | package rx.subscriptions; |
17 | 17 |
|
| 18 | +import static org.junit.Assert.*; |
| 19 | + |
| 20 | +import java.util.ArrayList; |
| 21 | +import java.util.Collection; |
18 | 22 | import java.util.List; |
19 | 23 | import java.util.concurrent.ConcurrentLinkedQueue; |
20 | 24 | import java.util.concurrent.atomic.AtomicBoolean; |
| 25 | +import java.util.concurrent.atomic.AtomicInteger; |
21 | 26 |
|
22 | | -import org.slf4j.Logger; |
23 | | -import org.slf4j.LoggerFactory; |
| 27 | +import org.junit.Test; |
24 | 28 |
|
25 | 29 | import rx.Subscription; |
26 | | -import rx.util.functions.Functions; |
| 30 | +import rx.util.CompositeException; |
27 | 31 |
|
28 | 32 | /** |
29 | 33 | * Subscription that represents a group of Subscriptions that are unsubscribed together. |
|
32 | 36 | */ |
33 | 37 | public class CompositeSubscription implements Subscription { |
34 | 38 |
|
35 | | - private static final Logger logger = LoggerFactory.getLogger(Functions.class); |
36 | | - |
37 | 39 | /* |
38 | 40 | * The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically. |
39 | 41 | * |
@@ -67,13 +69,80 @@ public synchronized void add(Subscription s) { |
67 | 69 | @Override |
68 | 70 | public synchronized void unsubscribe() { |
69 | 71 | if (unsubscribed.compareAndSet(false, true)) { |
| 72 | + Collection<Exception> es = null; |
70 | 73 | for (Subscription s : subscriptions) { |
71 | 74 | try { |
72 | 75 | s.unsubscribe(); |
73 | 76 | } catch (Exception e) { |
74 | | - logger.error("Failed to unsubscribe.", e); |
| 77 | + if (es == null) { |
| 78 | + es = new ArrayList<Exception>(); |
| 79 | + } |
| 80 | + es.add(e); |
75 | 81 | } |
76 | 82 | } |
| 83 | + if (es != null) { |
| 84 | + throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es); |
| 85 | + } |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + public static class UnitTest { |
| 90 | + |
| 91 | + @Test |
| 92 | + public void testSuccess() { |
| 93 | + final AtomicInteger counter = new AtomicInteger(); |
| 94 | + CompositeSubscription s = new CompositeSubscription(); |
| 95 | + s.add(new Subscription() { |
| 96 | + |
| 97 | + @Override |
| 98 | + public void unsubscribe() { |
| 99 | + counter.incrementAndGet(); |
| 100 | + } |
| 101 | + }); |
| 102 | + |
| 103 | + s.add(new Subscription() { |
| 104 | + |
| 105 | + @Override |
| 106 | + public void unsubscribe() { |
| 107 | + counter.incrementAndGet(); |
| 108 | + } |
| 109 | + }); |
| 110 | + |
| 111 | + s.unsubscribe(); |
| 112 | + |
| 113 | + assertEquals(2, counter.get()); |
| 114 | + } |
| 115 | + |
| 116 | + @Test |
| 117 | + public void testException() { |
| 118 | + final AtomicInteger counter = new AtomicInteger(); |
| 119 | + CompositeSubscription s = new CompositeSubscription(); |
| 120 | + s.add(new Subscription() { |
| 121 | + |
| 122 | + @Override |
| 123 | + public void unsubscribe() { |
| 124 | + throw new RuntimeException("failed on first one"); |
| 125 | + } |
| 126 | + }); |
| 127 | + |
| 128 | + s.add(new Subscription() { |
| 129 | + |
| 130 | + @Override |
| 131 | + public void unsubscribe() { |
| 132 | + counter.incrementAndGet(); |
| 133 | + } |
| 134 | + }); |
| 135 | + |
| 136 | + try { |
| 137 | + s.unsubscribe(); |
| 138 | + fail("Expecting an exception"); |
| 139 | + } catch (CompositeException e) { |
| 140 | + // we expect this |
| 141 | + assertEquals(1, e.getExceptions().size()); |
| 142 | + } |
| 143 | + |
| 144 | + // we should still have unsubscribed to the second one |
| 145 | + assertEquals(1, counter.get()); |
77 | 146 | } |
78 | 147 | } |
79 | 148 |
|
|
0 commit comments