3939import rx .util .functions .Func0 ;
4040import rx .util .functions .Func1 ;
4141
42- public class DefaultSubject <T > extends Subject <T , T > {
43- public static <T > DefaultSubject <T > create () {
42+ /**
43+ * Subject that publishes a single event to each {@link Observer} that has subscribed.
44+ * <p>
45+ * Example usage:
46+ * <p>
47+ * <pre> {@code
48+
49+ PublishSubject<Object> subject = PublishSubject.create();
50+ // observer1 will receive all onNext events
51+ subject.subscribe(observer1);
52+ subject.onNext("one");
53+ subject.onNext("two");
54+ // observer2 will only receive "three" and onCompleted
55+ subject.subscribe(observer2);
56+ subject.onNext("three");
57+ subject.onCompleted();
58+
59+ } </pre>
60+ *
61+ * @param <T>
62+ */
63+ public class PublishSubject <T > extends Subject <T , T > {
64+ public static <T > PublishSubject <T > create () {
4465 final ConcurrentHashMap <Subscription , Observer <T >> observers = new ConcurrentHashMap <Subscription , Observer <T >>();
4566
4667 Func1 <Observer <T >, Subscription > onSubscribe = new Func1 <Observer <T >, Subscription >() {
@@ -62,12 +83,12 @@ public void unsubscribe() {
6283 }
6384 };
6485
65- return new DefaultSubject <T >(onSubscribe , observers );
86+ return new PublishSubject <T >(onSubscribe , observers );
6687 }
6788
6889 private final ConcurrentHashMap <Subscription , Observer <T >> observers ;
6990
70- protected DefaultSubject (Func1 <Observer <T >, Subscription > onSubscribe , ConcurrentHashMap <Subscription , Observer <T >> observers ) {
91+ protected PublishSubject (Func1 <Observer <T >, Subscription > onSubscribe , ConcurrentHashMap <Subscription , Observer <T >> observers ) {
7192 super (onSubscribe );
7293 this .observers = observers ;
7394 }
@@ -96,7 +117,7 @@ public void onNext(T args) {
96117 public static class UnitTest {
97118 @ Test
98119 public void test () {
99- DefaultSubject <Integer > subject = DefaultSubject .create ();
120+ PublishSubject <Integer > subject = PublishSubject .create ();
100121 final AtomicReference <List <Notification <String >>> actualRef = new AtomicReference <List <Notification <String >>>();
101122
102123 Observable <List <Notification <Integer >>> wNotificationsList = subject .materialize ().toList ();
@@ -147,7 +168,7 @@ public void unsubscribe() {
147168
148169 @ Test
149170 public void testCompleted () {
150- DefaultSubject <Object > subject = DefaultSubject .create ();
171+ PublishSubject <Object > subject = PublishSubject .create ();
151172
152173 @ SuppressWarnings ("unchecked" )
153174 Observer <String > aObserver = mock (Observer .class );
@@ -188,7 +209,7 @@ private void assertNeverObserver(Observer<String> aObserver)
188209
189210 @ Test
190211 public void testError () {
191- DefaultSubject <Object > subject = DefaultSubject .create ();
212+ PublishSubject <Object > subject = PublishSubject .create ();
192213
193214 @ SuppressWarnings ("unchecked" )
194215 Observer <String > aObserver = mock (Observer .class );
@@ -222,7 +243,7 @@ private void assertErrorObserver(Observer<String> aObserver)
222243
223244 @ Test
224245 public void testSubscribeMidSequence () {
225- DefaultSubject <Object > subject = DefaultSubject .create ();
246+ PublishSubject <Object > subject = PublishSubject .create ();
226247
227248 @ SuppressWarnings ("unchecked" )
228249 Observer <String > aObserver = mock (Observer .class );
@@ -255,7 +276,7 @@ private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver
255276
256277 @ Test
257278 public void testUnsubscribeFirstObserver () {
258- DefaultSubject <Object > subject = DefaultSubject .create ();
279+ PublishSubject <Object > subject = PublishSubject .create ();
259280
260281 @ SuppressWarnings ("unchecked" )
261282 Observer <String > aObserver = mock (Observer .class );
@@ -290,31 +311,31 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
290311 @ Test
291312 public void testUnsubscribe ()
292313 {
293- UnsubscribeTester .test (new Func0 <DefaultSubject <Object >>()
314+ UnsubscribeTester .test (new Func0 <PublishSubject <Object >>()
294315 {
295316 @ Override
296- public DefaultSubject <Object > call ()
317+ public PublishSubject <Object > call ()
297318 {
298- return DefaultSubject .create ();
319+ return PublishSubject .create ();
299320 }
300- }, new Action1 <DefaultSubject <Object >>()
321+ }, new Action1 <PublishSubject <Object >>()
301322 {
302323 @ Override
303- public void call (DefaultSubject <Object > DefaultSubject )
324+ public void call (PublishSubject <Object > DefaultSubject )
304325 {
305326 DefaultSubject .onCompleted ();
306327 }
307- }, new Action1 <DefaultSubject <Object >>()
328+ }, new Action1 <PublishSubject <Object >>()
308329 {
309330 @ Override
310- public void call (DefaultSubject <Object > DefaultSubject )
331+ public void call (PublishSubject <Object > DefaultSubject )
311332 {
312333 DefaultSubject .onError (new Exception ());
313334 }
314- }, new Action1 <DefaultSubject <Object >>()
335+ }, new Action1 <PublishSubject <Object >>()
315336 {
316337 @ Override
317- public void call (DefaultSubject <Object > DefaultSubject )
338+ public void call (PublishSubject <Object > DefaultSubject )
318339 {
319340 DefaultSubject .onNext ("one" );
320341 }
0 commit comments