Skip to content

Commit 2ed8da6

Browse files
aietcnpivovarit
authored andcommitted
BAEL-1270 intro to dubbo (eugenp#3109)
* BAEL-1270 intro to dubbo * BAEL-1270: add cluster and load-balancing tests * BAEL-1270 exclude *LiveTest * BAEL-1270 using call() instead of run() in executorService.submit()
1 parent 8ec495b commit 2ed8da6

28 files changed

Lines changed: 995 additions & 0 deletions

dubbo/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## Relevant articles:
2+
3+
- [Intro to Dubbo](http://www.baeldung.com/dubbo-intro)
4+

dubbo/pom.xml

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<artifactId>parent-modules</artifactId>
7+
<groupId>com.baeldung</groupId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
</parent>
10+
11+
<artifactId>dubbo</artifactId>
12+
13+
<properties>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
16+
<java.version>1.8</java.version>
17+
<dubbo.version>2.5.7</dubbo.version>
18+
<zookeeper.version>3.4.11</zookeeper.version>
19+
<zkclient.version>0.10</zkclient.version>
20+
<surefire.version>2.19.1</surefire.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.alibaba</groupId>
26+
<artifactId>dubbo</artifactId>
27+
<version>${dubbo.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>junit</groupId>
31+
<artifactId>junit</artifactId>
32+
<version>4.12</version>
33+
<scope>test</scope>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.apache.zookeeper</groupId>
38+
<artifactId>zookeeper</artifactId>
39+
<version>${zookeeper.version}</version>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>com.101tec</groupId>
44+
<artifactId>zkclient</artifactId>
45+
<version>${zkclient.version}</version>
46+
</dependency>
47+
48+
</dependencies>
49+
50+
<build>
51+
<plugins>
52+
<plugin>
53+
<groupId>org.apache.maven.plugins</groupId>
54+
<artifactId>maven-compiler-plugin</artifactId>
55+
<configuration>
56+
<source>1.8</source>
57+
<target>1.8</target>
58+
</configuration>
59+
</plugin>
60+
<plugin>
61+
<artifactId>maven-surefire-plugin</artifactId>
62+
<version>${surefire.version}</version>
63+
<configuration>
64+
<excludes>
65+
<exclude>**/*LiveTest.java</exclude>
66+
</excludes>
67+
</configuration>
68+
</plugin>
69+
</plugins>
70+
</build>
71+
72+
</project>
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright 1999-2011 Alibaba Group.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.alibaba.dubbo.registry.simple;
17+
18+
import com.alibaba.dubbo.common.Constants;
19+
import com.alibaba.dubbo.common.URL;
20+
import com.alibaba.dubbo.common.logger.Logger;
21+
import com.alibaba.dubbo.common.logger.LoggerFactory;
22+
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
23+
import com.alibaba.dubbo.common.utils.NetUtils;
24+
import com.alibaba.dubbo.common.utils.UrlUtils;
25+
import com.alibaba.dubbo.registry.NotifyListener;
26+
import com.alibaba.dubbo.registry.RegistryService;
27+
import com.alibaba.dubbo.registry.support.AbstractRegistry;
28+
import com.alibaba.dubbo.rpc.RpcContext;
29+
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ConcurrentMap;
37+
38+
/**
39+
* SimpleRegistryService
40+
*
41+
* @author william.liangf
42+
*/
43+
public class SimpleRegistryService extends AbstractRegistry {
44+
45+
private final static Logger logger = LoggerFactory.getLogger(SimpleRegistryService.class);
46+
private final ConcurrentMap<String, Set<URL>> remoteRegistered = new ConcurrentHashMap<String, Set<URL>>();
47+
private final ConcurrentMap<String, ConcurrentMap<URL, Set<NotifyListener>>> remoteSubscribed = new ConcurrentHashMap<String, ConcurrentMap<URL, Set<NotifyListener>>>();
48+
49+
public SimpleRegistryService() {
50+
super(new URL("dubbo", NetUtils.getLocalHost(), 0, RegistryService.class.getName(), "file", "N/A"));
51+
}
52+
53+
public boolean isAvailable() {
54+
return true;
55+
}
56+
57+
public List<URL> lookup(URL url) {
58+
List<URL> urls = new ArrayList<URL>();
59+
for (URL u : getRegistered()) {
60+
if (UrlUtils.isMatch(url, u)) {
61+
urls.add(u);
62+
}
63+
}
64+
return urls;
65+
}
66+
67+
public void register(URL url) {
68+
String client = RpcContext.getContext().getRemoteAddressString();
69+
Set<URL> urls = remoteRegistered.get(client);
70+
if (urls == null) {
71+
remoteRegistered.putIfAbsent(client, new ConcurrentHashSet<URL>());
72+
urls = remoteRegistered.get(client);
73+
}
74+
urls.add(url);
75+
super.register(url);
76+
registered(url);
77+
}
78+
79+
public void unregister(URL url) {
80+
String client = RpcContext.getContext().getRemoteAddressString();
81+
Set<URL> urls = remoteRegistered.get(client);
82+
if (urls != null && urls.size() > 0) {
83+
urls.remove(url);
84+
}
85+
super.unregister(url);
86+
unregistered(url);
87+
}
88+
89+
public void subscribe(URL url, NotifyListener listener) {
90+
if (getUrl().getPort() == 0) {
91+
URL registryUrl = RpcContext.getContext().getUrl();
92+
if (registryUrl != null && registryUrl.getPort() > 0
93+
&& RegistryService.class.getName().equals(registryUrl.getPath())) {
94+
super.setUrl(registryUrl);
95+
super.register(registryUrl);
96+
}
97+
}
98+
String client = RpcContext.getContext().getRemoteAddressString();
99+
ConcurrentMap<URL, Set<NotifyListener>> clientListeners = remoteSubscribed.get(client);
100+
if (clientListeners == null) {
101+
remoteSubscribed.putIfAbsent(client, new ConcurrentHashMap<URL, Set<NotifyListener>>());
102+
clientListeners = remoteSubscribed.get(client);
103+
}
104+
Set<NotifyListener> listeners = clientListeners.get(url);
105+
if (listeners == null) {
106+
clientListeners.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
107+
listeners = clientListeners.get(url);
108+
}
109+
listeners.add(listener);
110+
super.subscribe(url, listener);
111+
subscribed(url, listener);
112+
}
113+
114+
public void unsubscribe(URL url, NotifyListener listener) {
115+
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
116+
&& url.getParameter(Constants.REGISTER_KEY, true)) {
117+
unregister(url);
118+
}
119+
String client = RpcContext.getContext().getRemoteAddressString();
120+
Map<URL, Set<NotifyListener>> clientListeners = remoteSubscribed.get(client);
121+
if (clientListeners != null && clientListeners.size() > 0) {
122+
Set<NotifyListener> listeners = clientListeners.get(url);
123+
if (listeners != null && listeners.size() > 0) {
124+
listeners.remove(listener);
125+
}
126+
}
127+
}
128+
129+
protected void registered(URL url) {
130+
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
131+
URL key = entry.getKey();
132+
if (UrlUtils.isMatch(key, url)) {
133+
List<URL> list = lookup(key);
134+
for (NotifyListener listener : entry.getValue()) {
135+
listener.notify(list);
136+
}
137+
}
138+
}
139+
}
140+
141+
protected void unregistered(URL url) {
142+
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
143+
URL key = entry.getKey();
144+
if (UrlUtils.isMatch(key, url)) {
145+
List<URL> list = lookup(key);
146+
for (NotifyListener listener : entry.getValue()) {
147+
listener.notify(list);
148+
}
149+
}
150+
}
151+
}
152+
153+
protected void subscribed(final URL url, final NotifyListener listener) {
154+
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
155+
new Thread(new Runnable() {
156+
public void run() {
157+
Map<String, List<URL>> map = new HashMap<String, List<URL>>();
158+
for (URL u : getRegistered()) {
159+
if (UrlUtils.isMatch(url, u)) {
160+
String service = u.getServiceInterface();
161+
List<URL> list = map.get(service);
162+
if (list == null) {
163+
list = new ArrayList<URL>();
164+
map.put(service, list);
165+
}
166+
list.add(u);
167+
}
168+
}
169+
for (List<URL> list : map.values()) {
170+
try {
171+
listener.notify(list);
172+
} catch (Throwable e) {
173+
logger.warn("Discard to notify " + url.getServiceKey() + " to listener " + listener);
174+
}
175+
}
176+
}
177+
}, "DubboMonitorNotifier").start();
178+
} else {
179+
List<URL> list = lookup(url);
180+
try {
181+
listener.notify(list);
182+
} catch (Throwable e) {
183+
logger.warn("Discard to notify " + url.getServiceKey() + " to listener " + listener);
184+
}
185+
}
186+
}
187+
188+
public void disconnect() {
189+
String client = RpcContext.getContext().getRemoteAddressString();
190+
if (logger.isInfoEnabled()) {
191+
logger.info("Disconnected " + client);
192+
}
193+
Set<URL> urls = remoteRegistered.get(client);
194+
if (urls != null && urls.size() > 0) {
195+
for (URL url : urls) {
196+
unregister(url);
197+
}
198+
}
199+
Map<URL, Set<NotifyListener>> listeners = remoteSubscribed.get(client);
200+
if (listeners != null && listeners.size() > 0) {
201+
for (Map.Entry<URL, Set<NotifyListener>> entry : listeners.entrySet()) {
202+
URL url = entry.getKey();
203+
for (NotifyListener listener : entry.getValue()) {
204+
unsubscribe(url, listener);
205+
}
206+
}
207+
}
208+
}
209+
210+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.baeldung.dubbo.remote;
2+
3+
/**
4+
* @author aiet
5+
*/
6+
public class GreetingsFailoverServiceImpl implements GreetingsService {
7+
8+
@Override
9+
public String sayHi(String name) {
10+
System.out.println("failover implementation");
11+
return "hi, failover " + name;
12+
}
13+
14+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.baeldung.dubbo.remote;
2+
3+
/**
4+
* @author aiet
5+
*/
6+
public interface GreetingsService {
7+
8+
String sayHi(String name);
9+
10+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.baeldung.dubbo.remote;
2+
3+
/**
4+
* @author aiet
5+
*/
6+
public class GreetingsServiceImpl implements GreetingsService {
7+
8+
@Override
9+
public String sayHi(String name) {
10+
System.out.println("default implementation");
11+
return "hi, " + name;
12+
}
13+
14+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.baeldung.dubbo.remote;
2+
3+
import static java.util.concurrent.TimeUnit.SECONDS;
4+
5+
/**
6+
* @author aiet
7+
*/
8+
public class GreetingsServiceSpecialImpl implements GreetingsService {
9+
10+
@Override
11+
public String sayHi(String name) {
12+
try {
13+
System.out.println("specially called");
14+
SECONDS.sleep(5);
15+
} catch (Exception ignored) {
16+
}
17+
return "hi, " + name;
18+
}
19+
20+
}

0 commit comments

Comments
 (0)