Skip to content

Commit 2201a8d

Browse files
myluckagainpauljervis
authored andcommitted
bael-1874 (eugenp#4574)
* bael-1874 * bael-1874 fix * bael-1874 fix1
1 parent b4969dd commit 2201a8d

2 files changed

Lines changed: 211 additions & 0 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.baeldung.rxjava;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import rx.Observable;
7+
import rx.Subscription;
8+
import rx.observables.ConnectableObservable;
9+
import rx.subscriptions.Subscriptions;
10+
11+
public class MultipleSubscribersColdObs {
12+
private static final Logger LOGGER = LoggerFactory.getLogger(MultipleSubscribersColdObs.class);
13+
14+
public static void main(String[] args) throws InterruptedException {
15+
defaultBehaviour();
16+
// subscribeBeforeConnect();
17+
18+
}
19+
20+
private static void defaultBehaviour() {
21+
Observable obs = getObservable();
22+
23+
LOGGER.info("Subscribing");
24+
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
25+
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
26+
27+
s1.unsubscribe();
28+
s2.unsubscribe();
29+
}
30+
31+
private static void subscribeBeforeConnect() throws InterruptedException {
32+
ConnectableObservable obs = getObservable().publish();
33+
34+
LOGGER.info("Subscribing");
35+
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
36+
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
37+
Thread.sleep(1000);
38+
LOGGER.info("Connecting");
39+
Subscription s = obs.connect();
40+
s.unsubscribe();
41+
42+
}
43+
44+
private static Observable getObservable() {
45+
return Observable.create(subscriber -> {
46+
subscriber.onNext(gettingValue(1));
47+
subscriber.onNext(gettingValue(2));
48+
49+
subscriber.add(Subscriptions.create(() -> {
50+
LOGGER.info("Clear resources");
51+
}));
52+
});
53+
}
54+
55+
private static Integer gettingValue(int i) {
56+
LOGGER.info("Getting " + i);
57+
return i;
58+
}
59+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package com.baeldung.rxjava;
2+
3+
import java.awt.Color;
4+
import java.awt.Dimension;
5+
import java.awt.event.MouseAdapter;
6+
import java.awt.event.MouseEvent;
7+
import java.awt.event.MouseListener;
8+
import java.lang.reflect.InvocationTargetException;
9+
10+
import javax.swing.JFrame;
11+
import rx.Observable;
12+
import rx.Subscription;
13+
import rx.observables.ConnectableObservable;
14+
import rx.subscriptions.Subscriptions;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
public class MultipleSubscribersHotObs {
19+
private static final Logger LOGGER = LoggerFactory.getLogger(MultipleSubscribersHotObs.class);
20+
private static JFrame frame;
21+
22+
public static void main(String[] args) throws InterruptedException, InvocationTargetException {
23+
24+
javax.swing.SwingUtilities.invokeAndWait(new Runnable() {
25+
public void run() {
26+
createAndShowGUI();
27+
}
28+
});
29+
30+
defaultBehaviour();
31+
// subscribeBeforeConnect();
32+
// connectBeforeSubscribe();
33+
// autoConnectAndSubscribe();
34+
// refCountAndSubscribe();
35+
}
36+
37+
private static void createAndShowGUI() {
38+
frame = new JFrame("Hot Observable Demo");
39+
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
40+
frame.getContentPane().setBackground(Color.GRAY);
41+
frame.setPreferredSize(new Dimension(500, 500));
42+
frame.pack();
43+
frame.setVisible(true);
44+
}
45+
46+
public static void defaultBehaviour() throws InterruptedException {
47+
Observable obs = getObservable();
48+
49+
LOGGER.info("subscribing #1");
50+
Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
51+
52+
Thread.sleep(1000);
53+
LOGGER.info("subscribing #2");
54+
Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
55+
Thread.sleep(1000);
56+
LOGGER.info("unsubscribe#1");
57+
subscription1.unsubscribe();
58+
Thread.sleep(1000);
59+
LOGGER.info("unsubscribe#2");
60+
subscription2.unsubscribe();
61+
}
62+
63+
public static void subscribeBeforeConnect() throws InterruptedException {
64+
65+
ConnectableObservable obs = getObservable().publish();
66+
67+
LOGGER.info("subscribing #1");
68+
Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
69+
Thread.sleep(1000);
70+
LOGGER.info("subscribing #2");
71+
Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
72+
Thread.sleep(1000);
73+
LOGGER.info("connecting:");
74+
Subscription s = obs.connect();
75+
Thread.sleep(1000);
76+
LOGGER.info("unsubscribe connected");
77+
s.unsubscribe();
78+
79+
}
80+
81+
public static void connectBeforeSubscribe() throws InterruptedException {
82+
83+
ConnectableObservable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish();
84+
LOGGER.info("connecting:");
85+
Subscription s = obs.connect();
86+
Thread.sleep(1000);
87+
LOGGER.info("subscribing #1");
88+
obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
89+
Thread.sleep(1000);
90+
LOGGER.info("subscribing #2");
91+
obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
92+
Thread.sleep(1000);
93+
s.unsubscribe();
94+
95+
}
96+
97+
public static void autoConnectAndSubscribe() throws InterruptedException {
98+
Observable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();
99+
100+
LOGGER.info("autoconnect()");
101+
Thread.sleep(1000);
102+
LOGGER.info("subscribing #1");
103+
Subscription s1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
104+
Thread.sleep(1000);
105+
LOGGER.info("subscribing #2");
106+
Subscription s2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
107+
108+
Thread.sleep(1000);
109+
LOGGER.info("unsubscribe 1");
110+
s1.unsubscribe();
111+
Thread.sleep(1000);
112+
LOGGER.info("unsubscribe 2");
113+
s2.unsubscribe();
114+
}
115+
116+
public static void refCountAndSubscribe() throws InterruptedException {
117+
Observable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();
118+
119+
LOGGER.info("refcount()");
120+
Thread.sleep(1000);
121+
LOGGER.info("subscribing #1");
122+
Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
123+
Thread.sleep(1000);
124+
LOGGER.info("subscribing #2");
125+
Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
126+
127+
Thread.sleep(1000);
128+
LOGGER.info("unsubscribe#1");
129+
subscription1.unsubscribe();
130+
Thread.sleep(1000);
131+
LOGGER.info("unsubscribe#2");
132+
subscription2.unsubscribe();
133+
134+
}
135+
136+
private static Observable getObservable() {
137+
return Observable.create(subscriber -> {
138+
frame.addMouseListener(new MouseAdapter() {
139+
@Override
140+
public void mouseClicked(MouseEvent e) {
141+
subscriber.onNext(e.getX());
142+
}
143+
});
144+
subscriber.add(Subscriptions.create(() -> {
145+
LOGGER.info("Clear resources");
146+
for (MouseListener listener : frame.getListeners(MouseListener.class)) {
147+
frame.removeMouseListener(listener);
148+
}
149+
}));
150+
});
151+
}
152+
}

0 commit comments

Comments
 (0)