1+ /*
2+ * Copyright 1999-2011 Alibaba Group.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+ package com .alibaba .dubbo .registry .simple ;
17+
18+ import com .alibaba .dubbo .common .Constants ;
19+ import com .alibaba .dubbo .common .URL ;
20+ import com .alibaba .dubbo .common .logger .Logger ;
21+ import com .alibaba .dubbo .common .logger .LoggerFactory ;
22+ import com .alibaba .dubbo .common .utils .ConcurrentHashSet ;
23+ import com .alibaba .dubbo .common .utils .NetUtils ;
24+ import com .alibaba .dubbo .common .utils .UrlUtils ;
25+ import com .alibaba .dubbo .registry .NotifyListener ;
26+ import com .alibaba .dubbo .registry .RegistryService ;
27+ import com .alibaba .dubbo .registry .support .AbstractRegistry ;
28+ import com .alibaba .dubbo .rpc .RpcContext ;
29+
30+ import java .util .ArrayList ;
31+ import java .util .HashMap ;
32+ import java .util .List ;
33+ import java .util .Map ;
34+ import java .util .Set ;
35+ import java .util .concurrent .ConcurrentHashMap ;
36+ import java .util .concurrent .ConcurrentMap ;
37+
38+ /**
39+ * SimpleRegistryService
40+ *
41+ * @author william.liangf
42+ */
43+ public class SimpleRegistryService extends AbstractRegistry {
44+
45+ private final static Logger logger = LoggerFactory .getLogger (SimpleRegistryService .class );
46+ private final ConcurrentMap <String , Set <URL >> remoteRegistered = new ConcurrentHashMap <String , Set <URL >>();
47+ private final ConcurrentMap <String , ConcurrentMap <URL , Set <NotifyListener >>> remoteSubscribed = new ConcurrentHashMap <String , ConcurrentMap <URL , Set <NotifyListener >>>();
48+
49+ public SimpleRegistryService () {
50+ super (new URL ("dubbo" , NetUtils .getLocalHost (), 0 , RegistryService .class .getName (), "file" , "N/A" ));
51+ }
52+
53+ public boolean isAvailable () {
54+ return true ;
55+ }
56+
57+ public List <URL > lookup (URL url ) {
58+ List <URL > urls = new ArrayList <URL >();
59+ for (URL u : getRegistered ()) {
60+ if (UrlUtils .isMatch (url , u )) {
61+ urls .add (u );
62+ }
63+ }
64+ return urls ;
65+ }
66+
67+ public void register (URL url ) {
68+ String client = RpcContext .getContext ().getRemoteAddressString ();
69+ Set <URL > urls = remoteRegistered .get (client );
70+ if (urls == null ) {
71+ remoteRegistered .putIfAbsent (client , new ConcurrentHashSet <URL >());
72+ urls = remoteRegistered .get (client );
73+ }
74+ urls .add (url );
75+ super .register (url );
76+ registered (url );
77+ }
78+
79+ public void unregister (URL url ) {
80+ String client = RpcContext .getContext ().getRemoteAddressString ();
81+ Set <URL > urls = remoteRegistered .get (client );
82+ if (urls != null && urls .size () > 0 ) {
83+ urls .remove (url );
84+ }
85+ super .unregister (url );
86+ unregistered (url );
87+ }
88+
89+ public void subscribe (URL url , NotifyListener listener ) {
90+ if (getUrl ().getPort () == 0 ) {
91+ URL registryUrl = RpcContext .getContext ().getUrl ();
92+ if (registryUrl != null && registryUrl .getPort () > 0
93+ && RegistryService .class .getName ().equals (registryUrl .getPath ())) {
94+ super .setUrl (registryUrl );
95+ super .register (registryUrl );
96+ }
97+ }
98+ String client = RpcContext .getContext ().getRemoteAddressString ();
99+ ConcurrentMap <URL , Set <NotifyListener >> clientListeners = remoteSubscribed .get (client );
100+ if (clientListeners == null ) {
101+ remoteSubscribed .putIfAbsent (client , new ConcurrentHashMap <URL , Set <NotifyListener >>());
102+ clientListeners = remoteSubscribed .get (client );
103+ }
104+ Set <NotifyListener > listeners = clientListeners .get (url );
105+ if (listeners == null ) {
106+ clientListeners .putIfAbsent (url , new ConcurrentHashSet <NotifyListener >());
107+ listeners = clientListeners .get (url );
108+ }
109+ listeners .add (listener );
110+ super .subscribe (url , listener );
111+ subscribed (url , listener );
112+ }
113+
114+ public void unsubscribe (URL url , NotifyListener listener ) {
115+ if (!Constants .ANY_VALUE .equals (url .getServiceInterface ())
116+ && url .getParameter (Constants .REGISTER_KEY , true )) {
117+ unregister (url );
118+ }
119+ String client = RpcContext .getContext ().getRemoteAddressString ();
120+ Map <URL , Set <NotifyListener >> clientListeners = remoteSubscribed .get (client );
121+ if (clientListeners != null && clientListeners .size () > 0 ) {
122+ Set <NotifyListener > listeners = clientListeners .get (url );
123+ if (listeners != null && listeners .size () > 0 ) {
124+ listeners .remove (listener );
125+ }
126+ }
127+ }
128+
129+ protected void registered (URL url ) {
130+ for (Map .Entry <URL , Set <NotifyListener >> entry : getSubscribed ().entrySet ()) {
131+ URL key = entry .getKey ();
132+ if (UrlUtils .isMatch (key , url )) {
133+ List <URL > list = lookup (key );
134+ for (NotifyListener listener : entry .getValue ()) {
135+ listener .notify (list );
136+ }
137+ }
138+ }
139+ }
140+
141+ protected void unregistered (URL url ) {
142+ for (Map .Entry <URL , Set <NotifyListener >> entry : getSubscribed ().entrySet ()) {
143+ URL key = entry .getKey ();
144+ if (UrlUtils .isMatch (key , url )) {
145+ List <URL > list = lookup (key );
146+ for (NotifyListener listener : entry .getValue ()) {
147+ listener .notify (list );
148+ }
149+ }
150+ }
151+ }
152+
153+ protected void subscribed (final URL url , final NotifyListener listener ) {
154+ if (Constants .ANY_VALUE .equals (url .getServiceInterface ())) {
155+ new Thread (new Runnable () {
156+ public void run () {
157+ Map <String , List <URL >> map = new HashMap <String , List <URL >>();
158+ for (URL u : getRegistered ()) {
159+ if (UrlUtils .isMatch (url , u )) {
160+ String service = u .getServiceInterface ();
161+ List <URL > list = map .get (service );
162+ if (list == null ) {
163+ list = new ArrayList <URL >();
164+ map .put (service , list );
165+ }
166+ list .add (u );
167+ }
168+ }
169+ for (List <URL > list : map .values ()) {
170+ try {
171+ listener .notify (list );
172+ } catch (Throwable e ) {
173+ logger .warn ("Discard to notify " + url .getServiceKey () + " to listener " + listener );
174+ }
175+ }
176+ }
177+ }, "DubboMonitorNotifier" ).start ();
178+ } else {
179+ List <URL > list = lookup (url );
180+ try {
181+ listener .notify (list );
182+ } catch (Throwable e ) {
183+ logger .warn ("Discard to notify " + url .getServiceKey () + " to listener " + listener );
184+ }
185+ }
186+ }
187+
188+ public void disconnect () {
189+ String client = RpcContext .getContext ().getRemoteAddressString ();
190+ if (logger .isInfoEnabled ()) {
191+ logger .info ("Disconnected " + client );
192+ }
193+ Set <URL > urls = remoteRegistered .get (client );
194+ if (urls != null && urls .size () > 0 ) {
195+ for (URL url : urls ) {
196+ unregister (url );
197+ }
198+ }
199+ Map <URL , Set <NotifyListener >> listeners = remoteSubscribed .get (client );
200+ if (listeners != null && listeners .size () > 0 ) {
201+ for (Map .Entry <URL , Set <NotifyListener >> entry : listeners .entrySet ()) {
202+ URL url = entry .getKey ();
203+ for (NotifyListener listener : entry .getValue ()) {
204+ unsubscribe (url , listener );
205+ }
206+ }
207+ }
208+ }
209+
210+ }
0 commit comments