@@ -301,18 +301,18 @@ class BeatTask implements Runnable {
301301 try {
302302 // 与nacos进行一次rest请求交互
303303 JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
304- long interval = result.getIntValue("clientBeatInterval" );
304+ long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong( );
305305 boolean lightBeatEnabled = false;
306- if (result.containsKey (CommonParams.LIGHT_BEAT_ENABLED)) {
307- lightBeatEnabled = result.getBooleanValue (CommonParams.LIGHT_BEAT_ENABLED);
306+ if (result.has (CommonParams.LIGHT_BEAT_ENABLED)) {
307+ lightBeatEnabled = result.get (CommonParams.LIGHT_BEAT_ENABLED).asBoolean( );
308308 }
309309 BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
310310 if (interval > 0) {
311311 nextTime = interval;
312312 }
313313 int code = NamingResponseCode.OK;
314- if (result.containsKey (CommonParams.CODE)) {
315- code = result.getIntValue (CommonParams.CODE);
314+ if (result.has (CommonParams.CODE)) {
315+ code = result.get (CommonParams.CODE).asInt( );
316316 }
317317 // 如果nacos找不到当前实例,
318318 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
@@ -336,8 +336,12 @@ class BeatTask implements Runnable {
336336 NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
337337 JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
338338
339+ } catch (Exception unknownEx) {
340+ NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
341+ JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
342+ } finally {
343+ executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
339344 }
340- executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
341345 }
342346}
343347```
@@ -359,47 +363,41 @@ class BeatTask implements Runnable {
359363
360364 NacosException exception = new NacosException ();
361365
362- if (servers != null && ! servers. isEmpty()) {
363-
364- Random random = new Random (System . currentTimeMillis());
365- int index = random. nextInt(servers. size());
366-
367- for (int i = 0 ; i < servers. size(); i++ ) {
368- // 获取nacos所在的ip+port地址
369- String server = servers. get(index);
370- try {
371- // 进行请求
372- return callServer(api, params, body, server, method);
373- } catch (NacosException e) {
374- exception = e;
375- if (NAMING_LOGGER . isDebugEnabled()) {
376- NAMING_LOGGER . debug(" request {} failed." , server, e);
377- }
378- }
379- index = (index + 1 ) % servers. size();
380- }
381- }
382-
383- if (StringUtils . isNotBlank(nacosDomain)) {
384- for (int i = 0 ; i < UtilAndComs . REQUEST_DOMAIN_RETRY_COUNT ; i++ ) {
385- try {
386- return callServer(api, params, body, nacosDomain, method);
387- } catch (NacosException e) {
388- exception = e;
389- if (NAMING_LOGGER . isDebugEnabled()) {
390- NAMING_LOGGER . debug(" request {} failed." , nacosDomain, e);
391- }
392- }
393- }
394- }
395-
396- NAMING_LOGGER . error(" request: {} failed, servers: {}, code: {}, msg: {}" ,
397- api, servers, exception. getErrCode(), exception. getErrMsg());
398-
399- throw new NacosException (exception. getErrCode(), " failed to req API:/api/" + api + " after all servers(" + servers + " ) tried: "
400- + exception. getMessage());
401-
402- }
366+ if (serverListManager. isDomain()) {
367+ String nacosDomain = serverListManager. getNacosDomain();
368+ for (int i = 0 ; i < maxRetry; i++ ) {
369+ try {
370+ return callServer(api, params, body, nacosDomain, method);
371+ } catch (NacosException e) {
372+ exception = e;
373+ if (NAMING_LOGGER . isDebugEnabled()) {
374+ NAMING_LOGGER . debug(" request {} failed." , nacosDomain, e);
375+ }
376+ }
377+ }
378+ } else {
379+ Random random = new Random (System . currentTimeMillis());
380+ int index = random. nextInt(servers. size());
381+
382+ for (int i = 0 ; i < servers. size(); i++ ) {
383+ String server = servers. get(index);
384+ try {
385+ return callServer(api, params, body, server, method);
386+ } catch (NacosException e) {
387+ exception = e;
388+ if (NAMING_LOGGER . isDebugEnabled()) {
389+ NAMING_LOGGER . debug(" request {} failed." , server, e);
390+ }
391+ }
392+ index = (index + 1 ) % servers. size();
393+ }
394+ }
395+
396+ NAMING_LOGGER . error(" request: {} failed, servers: {}, code: {}, msg: {}" , api, servers, exception. getErrCode(),
397+ exception. getErrMsg());
398+
399+ throw new NacosException (exception. getErrCode(),
400+ " failed to req API:" + api + " after all servers(" + servers + " ) tried: " + exception. getMessage());
403401 ```
404402
405403** 学习点**
@@ -437,19 +435,24 @@ public void registerService(String serviceName, String groupName, Instance insta
437435
438436 NAMING_LOGGER . info(" [REGISTER-SERVICE] {} registering service {} with instance: {}" ,
439437 namespaceId, serviceName, instance);
438+ String groupedServiceName = NamingUtils . getGroupedName(serviceName, groupName);
439+ if (instance. isEphemeral()) {
440+ BeatInfo beatInfo = beatReactor. buildBeatInfo(groupedServiceName, instance);
441+ beatReactor. addBeatInfo(groupedServiceName, beatInfo);
442+ }
440443
441- final Map<String , String > params = new HashMap<String , String > (9 );
444+ final Map<String , String > params = new HashMap<String , String > (32 );
442445 params. put(CommonParams . NAMESPACE_ID , namespaceId);
443- params. put(CommonParams . SERVICE_NAME , serviceName );
446+ params. put(CommonParams . SERVICE_NAME , groupedServiceName );
444447 params. put(CommonParams . GROUP_NAME , groupName);
445448 params. put(CommonParams . CLUSTER_NAME , instance. getClusterName());
446- params. put(" ip " , instance. getIp());
447- params. put(" port " , String . valueOf(instance. getPort()));
448- params. put(" weight " , String . valueOf(instance. getWeight()));
449- params. put(" enable " , String . valueOf(instance. isEnabled()));
450- params. put(" healthy " , String . valueOf(instance. isHealthy()));
451- params. put(" ephemeral " , String . valueOf(instance. isEphemeral()));
452- params. put(" metadata " , JSON . toJSONString (instance. getMetadata()));
449+ params. put(IP_PARAM , instance. getIp());
450+ params. put(PORT_PARAM , String . valueOf(instance. getPort()));
451+ params. put(WEIGHT_PARAM , String . valueOf(instance. getWeight()));
452+ params. put(REGISTER_ENABLE_PARAM , String . valueOf(instance. isEnabled()));
453+ params. put(HEALTHY_PARAM , String . valueOf(instance. isHealthy()));
454+ params. put(EPHEMERAL_PARAM , String . valueOf(instance. isEphemeral()));
455+ params. put(META_PARAM , JacksonUtils . toJson (instance. getMetadata()));
453456
454457 reqAPI(UtilAndComs . NACOS_URL_INSTANCE , params, HttpMethod . POST );
455458
@@ -645,76 +648,41 @@ public Instance getInstance(String namespaceId, String serviceName, String clust
645648``` java
646649@CanDistro
647650@PutMapping (" /beat" )
648- @Secured (parser = NamingResourceParser . class, action = ActionTypes . WRITE )
649- public JSONObject beat(HttpServletRequest request) throws Exception {
650-
651- JSONObject result = new JSONObject ();
652-
653- result. put(" clientBeatInterval" , switchDomain. getClientBeatInterval());
654- String serviceName = WebUtils . required(request, CommonParams . SERVICE_NAME );
655- String namespaceId = WebUtils . optional(request, CommonParams . NAMESPACE_ID ,
656- Constants . DEFAULT_NAMESPACE_ID );
657- String clusterName = WebUtils . optional(request, CommonParams . CLUSTER_NAME ,
658- UtilsAndCommons . DEFAULT_CLUSTER_NAME );
659- String ip = WebUtils . optional(request, " ip" , StringUtils . EMPTY );
660- int port = Integer . parseInt(WebUtils . optional(request, " port" , " 0" ));
661- String beat = WebUtils . optional(request, " beat" , StringUtils . EMPTY );
662-
663- RsInfo clientBeat = null ;
664- if (StringUtils . isNotBlank(beat)) {
665- clientBeat = JSON . parseObject(beat, RsInfo . class);
666- }
667-
668- if (clientBeat != null ) {
669- if (StringUtils . isNotBlank(clientBeat. getCluster())) {
670- clusterName = clientBeat. getCluster();
671- }
651+ @Secured (action = ActionTypes . WRITE )
652+ public ObjectNode beat(@RequestParam (defaultValue = Constants . DEFAULT_NAMESPACE_ID ) String namespaceId,
653+ @RequestParam String serviceName,
654+ @RequestParam (defaultValue = StringUtils . EMPTY ) String ip,
655+ @RequestParam (defaultValue = UtilsAndCommons . DEFAULT_CLUSTER_NAME ) String clusterName,
656+ @RequestParam (defaultValue = " 0" ) Integer port,
657+ @RequestParam (defaultValue = StringUtils . EMPTY ) String beat)throws Exception {
658+
659+ ObjectNode result = JacksonUtils . createEmptyJsonNode();
660+ result. put(SwitchEntry . CLIENT_BEAT_INTERVAL , switchDomain. getClientBeatInterval());
661+ RsInfo clientBeat = null ;
662+ if (StringUtils . isNotBlank(beat)) {
663+ clientBeat = JacksonUtils . toObj(beat, RsInfo . class);
664+ }
665+ if (clientBeat != null ) {
666+ if (StringUtils . isNotBlank(clientBeat. getCluster())) {
667+ clusterName = clientBeat. getCluster();
668+ } else {
669+ // fix #2533
670+ clientBeat. setCluster(clusterName);
671+ }
672672 ip = clientBeat. getIp();
673673 port = clientBeat. getPort();
674- }
675-
676- if (Loggers . SRV_LOG. isDebugEnabled()) {
677- Loggers . SRV_LOG. debug(" [CLIENT-BEAT] full arguments: beat: {}, serviceName: {}" , clientBeat, serviceName);
678- }
679- // 获取实例
680- Instance instance = serviceManager. getInstance(namespaceId, serviceName, clusterName, ip, port);
681-
682- if (instance == null ) {
683- if (clientBeat == null ) {
684- result. put(CommonParams . CODE , NamingResponseCode . RESOURCE_NOT_FOUND );
685- return result;
686674 }
687- instance = new Instance ();
688- instance. setPort(clientBeat. getPort());
689- instance. setIp(clientBeat. getIp());
690- instance. setWeight(clientBeat. getWeight());
691- instance. setMetadata(clientBeat. getMetadata());
692- instance. setClusterName(clusterName);
693- instance. setServiceName(serviceName);
694- instance. setInstanceId(instance. getInstanceId());
695- instance. setEphemeral(clientBeat. isEphemeral());
696-
697- serviceManager. registerInstance(namespaceId, serviceName, instance);
698- }
699-
700- Service service = serviceManager. getService(namespaceId, serviceName);
701-
702- if (service == null ) {
703- throw new NacosException (NacosException . SERVER_ERROR ,
704- " service not found: " + serviceName + " @" + namespaceId);
705- }
706- if (clientBeat == null ) {
707- clientBeat = new RsInfo ();
708- clientBeat. setIp(ip);
709- clientBeat. setPort(port);
710- clientBeat. setCluster(clusterName);
711- }
712- // 处理心跳方法
713- service. processClientBeat(clientBeat);
714675
715- result. put(CommonParams . CODE , NamingResponseCode . OK );
716- result. put(" clientBeatInterval" , instance. getInstanceHeartBeatInterval());
717- result. put(SwitchEntry . LIGHT_BEAT_ENABLED , switchDomain. isLightBeatEnabled());
718- return result;
676+ NamingUtils . checkServiceNameFormat(serviceName);
677+ Loggers . SRV_LOG. debug(" [CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}" , clientBeat,
678+ serviceName, namespaceId);
679+ BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder . newBuilder();
680+ int resultCode = instanceServiceV2
681+ .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
682+ result. put(CommonParams . CODE , resultCode);
683+ result. put(SwitchEntry . CLIENT_BEAT_INTERVAL ,
684+ instanceServiceV2. getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
685+ result. put(SwitchEntry . LIGHT_BEAT_ENABLED , switchDomain. isLightBeatEnabled());
686+ return result;
719687}
720688```
0 commit comments