From 5ed7039da15f1fa1de60c53f732ed8517bb2c568 Mon Sep 17 00:00:00 2001 From: Synex-wh <241809311@qq.com> Date: Thu, 1 Aug 2019 20:38:54 +0800 Subject: [PATCH] Fix issue24 (#41) * fix temp push * update version 5.2.1-SNAPSHOT * fix test case * fix jetty version,and fix rest api for dataInfoIds * fix hashcode test * fix working to init bug * fix start task log * fix Watcher can't get providate data,retry and finally return new * add data server list api * add server list api * remove log * fix isssue 21 * add query by id function * fix issue 22 * delay client off process and sync data process to working status * fix data connet meta error * fix inject NotifyDataSyncHandler * fix start log * add send sub log * fix subscriber to send log * fix word cache clientid * add clientoff delay time * fix clientOffDelayMs * fix jetty version * fix version to 5.2.1 release * fix version * fix .travis.yml * fix test case * fix * fix test sync case * fix test case * fix test case * fix case * fix notify online no connect break,and add connect log * add test case * add test case * fix test case * fix format * fix resource test case * fix --- .travis.yml | 4 +- pom.xml | 4 +- .../common/model/metaserver/DataNode.java | 4 - .../common/model/metaserver/MetaNode.java | 5 +- .../common/model/metaserver/SessionNode.java | 13 +- .../registry/common/model/store/BaseInfo.java | 4 +- .../registry/task/listener/TaskEvent.java | 28 +- .../DataServerBeanConfiguration.java | 27 + .../data/bootstrap/DataServerBootstrap.java | 12 +- .../data/bootstrap/DataServerConfig.java | 4 +- .../server/data/cache/DataServerCache.java | 41 +- .../server/data/change/DataChangeHandler.java | 8 +- .../change/event/DataChangeEventQueue.java | 9 +- .../LocalDataServerCleanHandler.java | 12 +- .../datasync/sync/AbstractAcceptorStore.java | 31 +- .../data/event/AfterWorkingProcess.java | 29 + .../server/data/event/StartTaskEvent.java | 16 +- .../server/data/event/StartTaskTypeEnum.java | 45 ++ .../handler/AfterWorkingProcessHandler.java | 50 ++ .../handler/DataServerChangeEventHandler.java | 2 + .../LocalDataServerChangeEventHandler.java | 26 +- .../handler/MetaServerChangeEventHandler.java | 9 + .../event/handler/StartTaskEventHandler.java | 26 +- .../DataServerConnectionFactory.java | 14 +- .../handler/NotifyDataSyncHandler.java | 92 ++- .../dataserver/task/AbstractTask.java | 6 + .../task/ConnectionRefreshTask.java | 6 + .../dataserver/task/ReNewNodeTask.java | 6 + .../handler/AbstractServerHandler.java | 22 +- .../metaserver/DefaultMetaServiceImpl.java | 19 +- .../task/ConnectionRefreshMetaTask.java | 6 + .../disconnect/DisconnectEventHandler.java | 63 +- .../handler/ClientOffHandler.java | 2 +- .../handler/DataServerConnectionHandler.java | 7 +- .../data/resource/DataDigestResource.java | 112 ++++ .../src/main/resources/application.properties | 1 + .../src/main/resources/logback-spring.xml | 69 ++- .../registry/server/data/BackupTriadTest.java | 21 +- .../bootstrap/AbstractNodeConfigBean.java | 4 +- .../server/meta/remoting/RaftExchanger.java | 22 +- .../service/MetaRepositoryService.java | 9 + .../src/main/resources/logback-spring.xml | 18 + .../service/DataRepositoryServiceTest.java | 241 ++++++++ .../service/MetaRepositoryServiceTest.java | 241 ++++++++ .../service/SessionRepositoryServiceTest.java | 182 ++++++ .../ReceivedDataMultiPushTaskListener.java | 7 + .../resource/SessionDigestResource.java | 91 ++- .../scheduler/task/AbstractSessionTask.java | 4 + .../scheduler/task/CancelDataTask.java | 6 + .../task/DataChangeFetchCloudTask.java | 20 +- .../scheduler/task/DataChangeFetchTask.java | 31 +- .../session/scheduler/task/DataPushTask.java | 6 +- .../task/ProvideDataChangeFetchTask.java | 5 + .../scheduler/task/PushTaskClosure.java | 28 +- .../task/ReceivedConfigDataPushTask.java | 6 + .../task/ReceivedDataMultiPushTask.java | 66 ++- .../task/SessionRegisterDataTask.java | 6 + .../task/SubscriberMultiFetchTask.java | 5 + .../task/SubscriberRegisterFetchTask.java | 6 + .../task/WatcherRegisterFetchTask.java | 50 +- .../server/session/store/Interests.java | 10 +- .../session/store/SessionInterests.java | 10 + ...ltSubscriberRegisterFetchTaskStrategy.java | 12 +- .../src/main/resources/logback-spring.xml | 6 + .../test/java/TestServiceStateMachine.java | 558 ++++++++++++++++++ .../resource/data/DataDigestResourceTest.java | 28 +- .../sofa/registry/test/sync/DataSyncTest.java | 16 +- 67 files changed, 2330 insertions(+), 219 deletions(-) create mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/AfterWorkingProcess.java create mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskTypeEnum.java create mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AfterWorkingProcessHandler.java create mode 100644 server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/DataRepositoryServiceTest.java create mode 100644 server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/MetaRepositoryServiceTest.java create mode 100644 server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/SessionRepositoryServiceTest.java create mode 100644 server/store/jraft/src/test/java/TestServiceStateMachine.java diff --git a/.travis.yml b/.travis.yml index cb53cacfe..e8363e468 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ language: java sudo: false +dist: trusty + jdk: - oraclejdk8 @@ -12,4 +14,4 @@ script: - sh ./tools/check_format.sh after_success: -- bash <(curl -s https://codecov.io/bash) \ No newline at end of file +- bash <(curl -s https://codecov.io/bash) diff --git a/pom.xml b/pom.xml index 1b15dc405..624993605 100644 --- a/pom.xml +++ b/pom.xml @@ -2,8 +2,8 @@ + 4.0.0 - com.alipay.sofa registry-parent 5.2.1-SNAPSHOT @@ -78,7 +78,7 @@ 1.2.4 4.0.2 2.4 - [9.4.17.v20190418,) + [9.4.17.v20190418,9.4.19.v20190610] ${user.dir} -Dnetwork_interface_denylist=docker0 diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataNode.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataNode.java index ca3bba378..e5e623f63 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataNode.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataNode.java @@ -213,10 +213,6 @@ public void setRegistrationTimestamp(long registrationTimestamp) { public String toString() { final StringBuilder sb = new StringBuilder("DataNode{"); sb.append("ip=").append(getIp()); - sb.append(", dataCenter='").append(dataCenter).append('\''); - sb.append(", regionId='").append(regionId).append('\''); - sb.append(", nodeStatus=").append(nodeStatus); - sb.append(", registrationTimestamp=").append(registrationTimestamp); sb.append('}'); return sb.toString(); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/MetaNode.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/MetaNode.java index 6adf2457c..01d107cb0 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/MetaNode.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/MetaNode.java @@ -131,10 +131,7 @@ public void setRegionId(String regionId) { @Override public String toString() { final StringBuilder sb = new StringBuilder("MetaNode{"); - sb.append("nodeUrl=").append(getIp()); - sb.append(", dataCenter='").append(dataCenter).append('\''); - sb.append(", regionId='").append(regionId).append('\''); - sb.append(", nodeStatus=").append(nodeStatus); + sb.append("ip=").append(getIp()); sb.append('}'); return sb.toString(); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/SessionNode.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/SessionNode.java index d6beeaf67..8c9a3487e 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/SessionNode.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/SessionNode.java @@ -55,6 +55,14 @@ public URL getNodeUrl() { return nodeUrl; } + /** + * get ip address from nodeUrl + * @return + */ + public String getIp() { + return nodeUrl == null ? "" : nodeUrl.getIpAddress(); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -167,10 +175,7 @@ public void setNodeStatus(NodeStatus nodeStatus) { @Override public String toString() { final StringBuilder sb = new StringBuilder("SessionNode{"); - sb.append("nodeUrl=").append(nodeUrl); - sb.append(", regionId='").append(regionId).append('\''); - sb.append(", name='").append(name).append('\''); - sb.append(", nodeStatus=").append(nodeStatus); + sb.append("ip=").append(getIp()); sb.append('}'); return sb.toString(); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java index 7c4983356..2242e5354 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java @@ -193,7 +193,7 @@ public void setAttributes(Map attributes) { Map newAttributes = new HashMap<>(); if (attributes != null && !attributes.isEmpty()) { attributes.forEach((key, value) -> newAttributes - .put(WordCache.getInstance().getWordCache(key), value)); + .put(key, value)); } this.attributes = newAttributes; } @@ -273,7 +273,7 @@ public String getClientId() { * @param clientId value to be assigned to property clientId */ public void setClientId(String clientId) { - this.clientId = WordCache.getInstance().getWordCache(clientId); + this.clientId = clientId; } /** diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java b/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java index 16f1abcc3..3e1d354a4 100644 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java @@ -19,6 +19,7 @@ import com.alipay.sofa.registry.task.TaskClosure; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** @@ -75,6 +76,10 @@ public String getName() { private long createTime; + private TaskClosure taskClosure; + + private final String taskId; + private final Map attributes = new ConcurrentHashMap(); /** @@ -84,10 +89,9 @@ public String getName() { public TaskEvent(TaskType taskType) { this.taskType = taskType; this.createTime = System.currentTimeMillis(); + taskId = UUID.randomUUID().toString(); } - private TaskClosure taskClosure; - /** * constructor * @param eventObj @@ -96,6 +100,17 @@ public TaskEvent(TaskType taskType) { public TaskEvent(Object eventObj, TaskType taskType) { this.eventObj = eventObj; this.taskType = taskType; + this.createTime = System.currentTimeMillis(); + taskId = UUID.randomUUID().toString(); + } + + /** + * Getter method for property taskId. + * + * @return property value of taskId + */ + public String getTaskId() { + return taskId; } /** @@ -195,7 +210,12 @@ public long getCreateTime() { @Override public String toString() { - return "TaskEvent{" + "eventObj=" + eventObj + ", sendTimeStamp=" + sendTimeStamp - + ", attributes=" + attributes + '}'; + final StringBuilder sb = new StringBuilder("TaskEvent{"); + sb.append("eventObj=").append(eventObj); + sb.append(", sendTimeStamp=").append(sendTimeStamp); + sb.append(", attributes=").append(attributes); + sb.append(", taskId='").append(taskId).append('\''); + sb.append('}'); + return sb.toString(); } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java index 5087a451b..776982906 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java @@ -33,7 +33,9 @@ import com.alipay.sofa.registry.server.data.datasync.sync.Scheduler; import com.alipay.sofa.registry.server.data.datasync.sync.StoreServiceFactory; import com.alipay.sofa.registry.server.data.datasync.sync.SyncDataServiceImpl; +import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess; import com.alipay.sofa.registry.server.data.event.EventCenter; +import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler; import com.alipay.sofa.registry.server.data.event.handler.DataServerChangeEventHandler; import com.alipay.sofa.registry.server.data.event.handler.LocalDataServerChangeEventHandler; import com.alipay.sofa.registry.server.data.event.handler.MetaServerChangeEventHandler; @@ -76,6 +78,7 @@ import com.alipay.sofa.registry.util.PropertySplitter; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.server.ResourceConfig; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -449,4 +452,28 @@ public DataDigestResource dataDigestResource() { return new DataDigestResource(); } } + + @Configuration + public static class AfterWorkingProcessConfiguration { + + @Autowired + DisconnectEventHandler disconnectEventHandler; + + @Autowired + AbstractClientHandler notifyDataSyncHandler; + + @Bean(name = "afterWorkProcessors") + public List afterWorkingProcessors() { + List list = new ArrayList<>(); + list.add(disconnectEventHandler); + list.add((NotifyDataSyncHandler) notifyDataSyncHandler); + return list; + } + + @Bean + public AfterWorkingProcessHandler afterWorkingProcessHandler() { + return new AfterWorkingProcessHandler(); + } + + } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBootstrap.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBootstrap.java index 02bec192f..e42293b84 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBootstrap.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBootstrap.java @@ -23,10 +23,12 @@ import com.alipay.sofa.registry.remoting.ChannelHandler; import com.alipay.sofa.registry.remoting.Server; import com.alipay.sofa.registry.remoting.exchange.Exchange; +import com.alipay.sofa.registry.server.data.cache.CacheDigestTask; import com.alipay.sofa.registry.server.data.datasync.sync.Scheduler; import com.alipay.sofa.registry.server.data.event.EventCenter; import com.alipay.sofa.registry.server.data.event.MetaServerChangeEvent; import com.alipay.sofa.registry.server.data.event.StartTaskEvent; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import org.glassfish.jersey.server.ResourceConfig; @@ -38,10 +40,12 @@ import javax.ws.rs.Path; import javax.ws.rs.ext.Provider; import java.lang.annotation.Annotation; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * @@ -186,7 +190,13 @@ private void startScheduler() { try { if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler(); - eventCenter.post(StartTaskEvent.getInstance()); + // start all startTask except renew task + eventCenter.post(new StartTaskEvent( + Arrays.stream(StartTaskTypeEnum.values()).filter(type->type != StartTaskTypeEnum.RENEW).collect( + Collectors.toSet()))); + + //start dump log + new CacheDigestTask().start(); } } catch (Exception e) { schedulerStarted.set(false); diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java index 58eeb29cd..aa0d2f3aa 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java @@ -51,9 +51,9 @@ public class DataServerConfig { private int queueSize; - private int notifyIntervalMs; + private int notifyIntervalMs = 500; - private int clientOffDelayMs; + private int clientOffDelayMs = 1000; private int notifyTempDataIntervalMs; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java index 163d633ff..5cc050ba3 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java @@ -21,14 +21,15 @@ import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; +import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler; import com.alipay.sofa.registry.server.data.node.DataNodeStatus; import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum; +import com.google.common.collect.Sets; import org.springframework.beans.factory.annotation.Autowired; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -53,8 +54,13 @@ public class DataServerCache { @Autowired private DataServerConfig dataServerConfig; + @Autowired + private AfterWorkingProcessHandler afterWorkingProcessHandler; + + /** current dataServer list and version */ private volatile DataServerChangeItem dataServerChangeItem = new DataServerChangeItem(); + /** new input dataServer list and version */ private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem(); private final AtomicBoolean HAS_NOTIFY_ALL = new AtomicBoolean( @@ -242,13 +248,14 @@ private void updateDataServerStatus() { Map map = nodeStatusMap.get(curVersion.get()); if (map != null) { Set ips = map.keySet(); - if (!ips.containsAll(newDataServerChangeItem.getServerMap() - .get(dataServerConfig.getLocalDataCenter()).keySet())) { - LOGGER.info( - "nodeStatusMap not contains all push list,nodeStatusMap {} push {}", - nodeStatusMap, - newDataServerChangeItem.getServerMap() - .get(dataServerConfig.getLocalDataCenter()).keySet()); + Set itemIps = newDataServerChangeItem.getServerMap() + .get(dataServerConfig.getLocalDataCenter()).keySet(); + if (!ips.containsAll(itemIps)) { + + LOGGER + .info( + "nodeStatusMap not contains all push list,nodeStatusMap {},push {},diff {}", + nodeStatusMap, itemIps, Sets.difference(ips, itemIps)); return; } } else { @@ -265,6 +272,9 @@ private void updateDataServerStatus() { //after working status,must clean this map,because calculate backupTriad need add not working node,see LocalDataServerChangeEventHandler getToBeSyncMap resetStatusMapToWorking(); + + //after working process + afterWorkingProcessHandler.afterWorkingProcess(); } } @@ -351,8 +361,12 @@ public Long getDataCenterNewVersion(String dataCenter) { } } - public BackupTriad calculateOldBackupTriad(String dataInfoId, String dataCenter, - DataServerConfig dataServerBootstrapConfig) { + /** + * calculate ConsistentHash base current data server list + * @param dataCenter + * @return + */ + public ConsistentHash calculateOldConsistentHash(String dataCenter) { Map> dataServerMap = dataServerChangeItem.getServerMap(); Map dataNodeMap = dataServerMap.get(dataCenter); @@ -361,12 +375,9 @@ public BackupTriad calculateOldBackupTriad(String dataInfoId, String dataCenter, Collection dataServerNodes = dataNodeMap.values(); ConsistentHash consistentHash = new ConsistentHash<>( - dataServerBootstrapConfig.getNumberOfReplicas(), dataServerNodes); - - List list = consistentHash.getNUniqueNodesFor(dataInfoId, - dataServerBootstrapConfig.getStoreNodes()); + dataServerConfig.getNumberOfReplicas(), dataServerNodes); - return new BackupTriad(dataInfoId, list); + return consistentHash; } else { LOGGER.warn("Calculate Old BackupTriad,old dataServer list is empty!"); return null; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java index a45c718ba..6f9e0104f 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java @@ -43,7 +43,10 @@ */ public class DataChangeHandler implements InitializingBean { - private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(DataChangeHandler.class); + + private static final Logger LOGGER_START = LoggerFactory.getLogger("DATA-START-LOGS"); @Autowired private DataServerConfig dataServerBootstrapConfig; @@ -74,7 +77,6 @@ public void start() { for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); - LOGGER.info("[DataChangeHandler] begin to notify datum in queue:{}", name); executor.execute(() -> { while (true) { try { @@ -85,7 +87,7 @@ public void start() { } } }); - LOGGER.info("[DataChangeHandler] notify datum in queue:{} success", name); + LOGGER_START.info("[DataChangeHandler] notify datum in queue:{} success", name); } } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/event/DataChangeEventQueue.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/event/DataChangeEventQueue.java index ae996d04f..99cd31bd3 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/event/DataChangeEventQueue.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/event/DataChangeEventQueue.java @@ -16,7 +16,6 @@ */ package com.alipay.sofa.registry.server.data.change.event; -import com.alipay.sofa.registry.common.model.PublishType; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.log.Logger; @@ -51,6 +50,9 @@ public class DataChangeEventQueue { private static final Logger LOGGER = LoggerFactory .getLogger(DataChangeEventQueue.class); + private static final Logger LOGGER_START = LoggerFactory + .getLogger("DATA-START-LOGS"); + /** * */ @@ -170,7 +172,6 @@ private ChangeData getChangeData(String dataCenter, String dataInfoId, * */ public void start() { - LOGGER.info("[{}] begin start DataChangeEventQueue", getName()); Executor executor = ExecutorFactory.newSingleThreadExecutor( String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName())); executor.execute(() -> { @@ -197,7 +198,7 @@ public void start() { } } }); - LOGGER.info("[{}] start DataChangeEventQueue success", getName()); + LOGGER_START.info("[{}] start DataChangeEventQueue success", getName()); } private void handleHost(ClientChangeEvent event) { @@ -205,6 +206,8 @@ private void handleHost(ClientChangeEvent event) { synchronized (Interners.newWeakInterner().intern(clientHost)) { Map pubMap = DatumCache.getByHost(clientHost); if (pubMap != null && !pubMap.isEmpty()) { + LOGGER.info("[{}] client off begin, host={}, occurTimestamp={},all pub size={}", + getName(), clientHost, event.getOccurredTimestamp(), pubMap.size()); int count = 0; for (Publisher publisher : pubMap.values()) { DataServerNode dataServerNode = DataServerNodeFactory.computeDataServerNode( diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler.java index bfc1fc3c8..fe498dbae 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler.java @@ -44,8 +44,11 @@ */ public class LocalDataServerCleanHandler { - private static final Logger LOGGER = LoggerFactory - .getLogger(LocalDataServerCleanHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(LocalDataServerCleanHandler.class); + + private static final Logger LOGGER_START = LoggerFactory + .getLogger("DATA-START-LOGS"); @Autowired private DataServerConfig dataServerBootstrapConfig; @@ -61,13 +64,12 @@ public class LocalDataServerCleanHandler { /** * a DelayQueue that contains clean task */ - private final DelayQueue> EVENT_QUEUE = new DelayQueue<>(); + private final DelayQueue> EVENT_QUEUE = new DelayQueue<>(); /** * constructor */ public LocalDataServerCleanHandler() { - LOGGER.info("[LocalDataServerCleanHandler] begin start LocalDataServerCleanHandler"); Executor executor = ExecutorFactory .newSingleThreadExecutor(LocalDataServerCleanHandler.class.getSimpleName()); executor.execute(() -> { @@ -81,7 +83,7 @@ public LocalDataServerCleanHandler() { } } }); - LOGGER.info("[LocalDataServerCleanHandler] start LocalDataServerCleanHandler success"); + LOGGER_START.info("[LocalDataServerCleanHandler] start LocalDataServerCleanHandler success"); } /** diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java index c85ff165c..70e644c0e 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java @@ -31,6 +31,7 @@ import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerConnectionFactory; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import com.alipay.sofa.registry.server.data.util.DelayItem; +import com.alipay.sofa.registry.server.data.util.TimeUtil; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; @@ -191,25 +192,31 @@ private void notifyChange(Acceptor acceptor) { continue; } - Connection connection = dataServerConnectionFactory.getConnection(targetDataIp); - if (connection == null) { - LOGGER.error(getLogByClass(String.format( - "Can not get notify data server connection!ip: %s", targetDataIp))); - continue; - } - LOGGER.info(getLogByClass("Notify data server {} change data {} to sync"), - connection.getRemoteIP(), request); + Server syncServer = boltExchange.getServer(dataServerBootstrapConfig.getSyncDataPort()); + for (int tryCount = 0; tryCount < NOTIFY_RETRY; tryCount++) { try { - Server syncServer = boltExchange.getServer(dataServerBootstrapConfig - .getSyncDataPort()); + + Connection connection = dataServerConnectionFactory.getConnection(targetDataIp); + if (connection == null) { + LOGGER.error(getLogByClass(String.format( + "Can not get notify data server connection!ip: %s,retry=%s", + targetDataIp, tryCount))); + TimeUtil.randomDelay(1000); + continue; + } + LOGGER.info( + getLogByClass("Notify data server {} change data {} to sync,retry={}"), + connection.getRemoteIP(), request, tryCount); + syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); break; } catch (Exception e) { LOGGER.error(getLogByClass(String.format( - "Notify data server %s failed, NotifyDataSyncRequest:%s", targetDataIp, - request)), e); + "Notify data server %s failed, NotifyDataSyncRequest:%s,retry=%s", + targetDataIp, request, tryCount)), e); + TimeUtil.randomDelay(1000); } } } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/AfterWorkingProcess.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/AfterWorkingProcess.java new file mode 100644 index 000000000..f15aca14d --- /dev/null +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/AfterWorkingProcess.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.data.event; + +/** + * + * @author shangyu.wh + * @version $Id: AfterWorkingProcessor.java, v 0.1 2019-05-21 11:25 shangyu.wh Exp $ + */ +public interface AfterWorkingProcess { + + void afterWorkingProcess(); + + int getOrder(); +} \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java index f49f3c01a..a892a2ab2 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.registry.server.data.event; +import java.util.Set; + /** * * @author qian.lqlq @@ -23,17 +25,19 @@ */ public class StartTaskEvent { - private static final StartTaskEvent INSTANCE = new StartTaskEvent(); + private final Set suitableTypes; - private StartTaskEvent() { + public StartTaskEvent(Set suitableTypes) { + this.suitableTypes = suitableTypes; } /** - * get instance of StartTaskEvent - * @return + * Getter method for property suitableTypes. + * + * @return property value of suitableTypes */ - public static StartTaskEvent getInstance() { - return INSTANCE; + public Set getSuitableTypes() { + return suitableTypes; } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskTypeEnum.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskTypeEnum.java new file mode 100644 index 000000000..a015a1f4d --- /dev/null +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskTypeEnum.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.data.event; + +/** + * + * @author shangyu.wh + * @version $Id: StartTaskTypeEnum.java, v 0.1 2019-04-22 14:35 shangyu.wh Exp $ + */ +public enum StartTaskTypeEnum { + + /** + * ConnectionRefreshMetaTask + */ + CONNECT_META, + + /** + * ConnectionRefreshDataTask + */ + CONNECT_DATA, + + /** + * ReNewNodeTask + */ + RENEW, + + /** + * VersionCompareTask + */ + VERSION_COMPARE +} \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AfterWorkingProcessHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AfterWorkingProcessHandler.java new file mode 100644 index 000000000..3e7f3cfba --- /dev/null +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AfterWorkingProcessHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.data.event.handler; + +import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess; + +import javax.annotation.Resource; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * + * @author shangyu.wh + * @version $Id: AfterWorkingProcessHandler.java, v 0.1 2019-05-21 11:46 shangyu.wh Exp $ + */ +public class AfterWorkingProcessHandler implements AfterWorkingProcess { + + @Resource(name = "afterWorkProcessors") + private List afterWorkingProcessors; + + @Override + public void afterWorkingProcess() { + + if(afterWorkingProcessors != null){ + List list = afterWorkingProcessors.stream().sorted(Comparator.comparing(AfterWorkingProcess::getOrder)).collect(Collectors.toList()); + + list.forEach(AfterWorkingProcess::afterWorkingProcess); + } + } + + @Override + public int getOrder() { + return 0; + } +} \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java index 931877b03..b2b1ec0a5 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java @@ -189,6 +189,8 @@ private void connectDataServer(String dataCenter, String ip) { "[DataServerChangeEventHandler] connect dataserver %s in %s failed five times,dataServer will not work,please check connect!", ip, dataCenter)); } + LOGGER.info("[DataServerChangeEventHandler] connect dataserver {} in {} success", ip, + dataCenter); //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerBootstrapConfig); diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java index 774b7572f..9a79171e6 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java @@ -228,6 +228,13 @@ private void notifyToFetch(LocalDataServerChangeEvent event, long changeVersion) Map>> toBeSyncMap = new HashMap<>(); Map> triadCache = new HashMap<>(); + ConsistentHash consistentHashOld = dataServerCache + .calculateOldConsistentHash(dataServerBootstrapConfig.getLocalDataCenter()); + if (consistentHash == null) { + LOGGER.error("Calculate Old ConsistentHash error!"); + throw new RuntimeException("Calculate Old ConsistentHash error!"); + } + //compute new triad for every datum in cache Map> allMap = DatumCache.getAll(); for (Entry> dataCenterEntry : allMap.entrySet()) { @@ -247,8 +254,9 @@ private void notifyToFetch(LocalDataServerChangeEvent event, long changeVersion) dataServerBootstrapConfig.getStoreNodes()); triadCache.put(dataInfoId, backupNodes); } - BackupTriad backupTriad = dataServerCache.calculateOldBackupTriad(dataInfoId, - dataServerBootstrapConfig.getLocalDataCenter(), dataServerBootstrapConfig); + BackupTriad backupTriad = new BackupTriad(dataInfoId, + consistentHashOld.getNUniqueNodesFor(dataInfoId, + dataServerBootstrapConfig.getStoreNodes())); if (backupTriad != null) { List newJoinedNodes = backupTriad.getNewJoined(backupNodes, dataServerCache.getNotWorking()); @@ -346,6 +354,20 @@ private void notifyOnline(long changeVersion) { try { if (dataServerNode.getConnection() == null || !dataServerNode.getConnection().isFine()) { + LOGGER + .warn( + "notify Online dataserver connect {} not existed or not fine!version={}", + ip, changeVersion); + Map dataServerNodeMapCurrent = DataServerNodeFactory + .getDataServerNodes(dataServerBootstrapConfig.getLocalDataCenter()); + DataServerNode dataServerNodeCurrent = dataServerNodeMapCurrent.get(ip); + if (dataServerNodeCurrent == null) { + LOGGER + .warn( + "notify Online dataserver {} has not existed in DataServerNodeFactory!version={}", + ip, changeVersion); + break; + } //maybe get dataNode from metaServer,current has not connected!wait for connect task execute TimeUtil.randomDelay(1000); continue; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java index 3518503cb..9b035c98b 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java @@ -30,6 +30,8 @@ import com.alipay.sofa.registry.server.data.event.DataServerChangeEvent; import com.alipay.sofa.registry.server.data.event.EventCenter; import com.alipay.sofa.registry.server.data.event.MetaServerChangeEvent; +import com.alipay.sofa.registry.server.data.event.StartTaskEvent; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import com.alipay.sofa.registry.server.data.remoting.MetaNodeExchanger; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import com.alipay.sofa.registry.server.data.remoting.metaserver.MetaServerConnectionFactory; @@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -150,6 +153,12 @@ public URL getRequestUrl() { if (obj instanceof NodeChangeResult) { NodeChangeResult result = (NodeChangeResult) obj; Map versionMap = result.getDataCenterListVersions(); + + //send renew after first register dataNode + Set set = new HashSet<>(); + set.add(StartTaskTypeEnum.RENEW); + eventCenter.post(new StartTaskEvent(set)); + eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap)); break; } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java index c6adfffe1..8da403f3e 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java @@ -18,7 +18,6 @@ import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.server.data.cache.CacheDigestTask; import com.alipay.sofa.registry.server.data.event.StartTaskEvent; import com.alipay.sofa.registry.server.data.executor.ExecutorFactory; import com.alipay.sofa.registry.server.data.remoting.dataserver.task.AbstractTask; @@ -34,13 +33,15 @@ */ public class StartTaskEventHandler extends AbstractEventHandler { - private static final Logger LOGGER = LoggerFactory - .getLogger(StartTaskEventHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(StartTaskEventHandler.class); + + private static final Logger LOGGER_START = LoggerFactory.getLogger("DATA-START-LOGS"); @Resource(name = "tasks") private List tasks; - private ScheduledExecutorService executor = null; + private ScheduledExecutorService executor = null; @Override public Class interest() { @@ -50,16 +51,21 @@ public Class interest() { @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { - executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() - .getSimpleName()); - for (AbstractTask task : tasks) { - LOGGER.info("[StartTaskEventHandler] start task:{}", task.getName()); + getExecutor(); + } + + for (AbstractTask task : tasks) { + if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); - LOGGER.info("[StartTaskEventHandler] start task:{} success", task.getName()); + LOGGER_START.info("[StartTaskEventHandler] start task:{} success", task.getName()); } - new CacheDigestTask().start(); } } + private void getExecutor() { + executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() + .getSimpleName()); + } + } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerConnectionFactory.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerConnectionFactory.java index 6cb2d8f9b..c7f62a93f 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerConnectionFactory.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerConnectionFactory.java @@ -31,7 +31,7 @@ public class DataServerConnectionFactory { /** * collection of connections - * key:ip + * key:connectId ip:port */ private final Map MAP = new ConcurrentHashMap<>(); @@ -41,16 +41,16 @@ public class DataServerConnectionFactory { * @param connection */ public void register(Connection connection) { - MAP.put(connection.getRemoteIP(), connection); + MAP.put(getConnectId(connection), connection); } /** - * remove connection by specific ip + * remove connection by specific ip+port * * @param connection */ public void remove(Connection connection) { - MAP.remove(connection.getRemoteIP()); + MAP.remove(getConnectId(connection)); } /** @@ -60,6 +60,10 @@ public void remove(Connection connection) { * @return */ public Connection getConnection(String ip) { - return MAP.get(ip); + return MAP.values().stream().filter(connection -> ip.equals(connection.getRemoteIP()) && connection.isFine()).findFirst().orElse(null); + } + + private String getConnectId(Connection connection) { + return connection.getRemoteIP() + ":" + connection.getRemotePort(); } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java index 5938070bb..af94482d1 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java @@ -29,17 +29,22 @@ import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; import com.alipay.sofa.registry.server.data.cache.DatumCache; import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter; +import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess; import com.alipay.sofa.registry.server.data.executor.ExecutorFactory; +import com.alipay.sofa.registry.server.data.node.DataNodeStatus; import com.alipay.sofa.registry.server.data.remoting.dataserver.GetSyncDataHandler; import com.alipay.sofa.registry.server.data.remoting.dataserver.SyncDataCallback; import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler; +import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum; import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer; import com.alipay.sofa.registry.util.NamedThreadFactory; import com.alipay.sofa.registry.util.ParaCheckUtil; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,23 +53,33 @@ * @author qian.lqlq * @version $Id: NotifyDataSyncProcessor.java, v 0.1 2018-03-06 20:04 qian.lqlq Exp $ */ -public class NotifyDataSyncHandler extends AbstractClientHandler { +public class NotifyDataSyncHandler extends AbstractClientHandler implements + AfterWorkingProcess { - private static final Logger LOGGER = LoggerFactory.getLogger(NotifyDataSyncHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(NotifyDataSyncHandler.class); @Autowired - private DataServerConfig dataServerBootstrapConfig; + private DataServerConfig dataServerBootstrapConfig; @Autowired - private GetSyncDataHandler getSyncDataHandler; + private GetSyncDataHandler getSyncDataHandler; @Autowired - private DataChangeEventCenter dataChangeEventCenter; + private DataChangeEventCenter dataChangeEventCenter; - private Executor executor = ExecutorFactory.newFixedThreadPool(10, - NotifyDataSyncHandler.class.getSimpleName()); + private Executor executor = ExecutorFactory + .newFixedThreadPool( + 10, + NotifyDataSyncHandler.class + .getSimpleName()); - private ThreadPoolExecutor notifyExecutor; + private ThreadPoolExecutor notifyExecutor; + + @Autowired + private DataNodeStatus dataNodeStatus; + + private static final BlockingQueue noWorkQueue = new LinkedBlockingQueue<>(); @Override public void checkParam(NotifyDataSyncRequest request) throws RuntimeException { @@ -74,6 +89,16 @@ public void checkParam(NotifyDataSyncRequest request) throws RuntimeException { @Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); + if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { + LOGGER.info("receive notifyDataSync request,but data server not working!"); + noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); + return CommonResponse.buildSuccessResponse(); + } + executorRequest(connection, request); + return CommonResponse.buildSuccessResponse(); + } + + private void executorRequest(Connection connection,NotifyDataSyncRequest request){ executor.execute(() -> { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); @@ -92,7 +117,25 @@ public Object doHandle(Channel channel, NotifyDataSyncRequest request) { "[NotifyDataSyncHandler] not need to sync data, version={}", version); } }); - return CommonResponse.buildSuccessResponse(); + } + + @Override + public void afterWorkingProcess() { + try { + while (!noWorkQueue.isEmpty()) { + SyncDataRequestForWorking event = noWorkQueue.poll(1, TimeUnit.SECONDS); + if (event != null) { + executorRequest(event.getConnection(), event.getRequest()); + } + } + } catch (InterruptedException e) { + LOGGER.error("receive disconnect event after working interrupted!", e); + } + } + + @Override + public int getOrder() { + return 1; } @Override @@ -128,4 +171,35 @@ public HandlerType getType() { protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } + + private class SyncDataRequestForWorking { + + private final Connection connection; + + private final NotifyDataSyncRequest request; + + public SyncDataRequestForWorking(Connection connection, NotifyDataSyncRequest request) { + this.connection = connection; + this.request = request; + } + + /** + * Getter method for property connection. + * + * @return property value of connection + */ + public Connection getConnection() { + return connection; + } + + /** + * Getter method for property request. + * + * @return property value of request + */ + public NotifyDataSyncRequest getRequest() { + return request; + } + + } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/AbstractTask.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/AbstractTask.java index 64f80fc16..39c7d0488 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/AbstractTask.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/AbstractTask.java @@ -18,6 +18,7 @@ import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import java.util.concurrent.TimeUnit; @@ -74,4 +75,9 @@ public void run() { */ public abstract TimeUnit getTimeUnit(); + /** + * get type to match post + * @return + */ + public abstract StartTaskTypeEnum getStartTaskTypeEnum(); } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ConnectionRefreshTask.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ConnectionRefreshTask.java index 6a19451ca..041bb2bc4 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ConnectionRefreshTask.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ConnectionRefreshTask.java @@ -19,6 +19,7 @@ import com.alipay.sofa.registry.server.data.cache.DataServerChangeItem; import com.alipay.sofa.registry.server.data.event.DataServerChangeEvent; import com.alipay.sofa.registry.server.data.event.EventCenter; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import org.springframework.beans.factory.annotation.Autowired; @@ -60,4 +61,9 @@ public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } + @Override + public StartTaskTypeEnum getStartTaskTypeEnum() { + return StartTaskTypeEnum.CONNECT_DATA; + } + } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ReNewNodeTask.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ReNewNodeTask.java index 6fda74933..8275cbfef 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ReNewNodeTask.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/task/ReNewNodeTask.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.registry.server.data.remoting.dataserver.task; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import org.springframework.beans.factory.annotation.Autowired; @@ -50,4 +51,9 @@ public int getInitialDelay() { public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } + + @Override + public StartTaskTypeEnum getStartTaskTypeEnum() { + return StartTaskTypeEnum.RENEW; + } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/handler/AbstractServerHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/handler/AbstractServerHandler.java index 34630ed39..ec7fd1432 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/handler/AbstractServerHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/handler/AbstractServerHandler.java @@ -30,23 +30,28 @@ */ public abstract class AbstractServerHandler implements ChannelHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(AbstractServerHandler.class); + + private static final Logger LOGGER_CONNECT = LoggerFactory.getLogger("DATA-CONNECT"); + + private static final Logger LOGGER_EXCHANGE = LoggerFactory.getLogger("DATA-EXCHANGE"); @Override public void connected(Channel channel) throws RemotingException { if (channel != null && channel.isConnected()) { - LOGGER - .info(getConnectNodeType() + " node connected,remote address:" - + channel.getRemoteAddress() + " localAddress:" + channel.getLocalAddress()); + LOGGER_CONNECT.info(getConnectNodeType() + " node connected,remote address:" + + channel.getRemoteAddress() + " localAddress:" + + channel.getLocalAddress()); } } @Override public void disconnected(Channel channel) throws RemotingException { if (channel != null && !channel.isConnected()) { - LOGGER - .info(getConnectNodeType() + " node disconnected,remote address:" - + channel.getRemoteAddress() + " localAddress:" + channel.getLocalAddress()); + LOGGER_CONNECT.info(getConnectNodeType() + " node disconnected,remote address:" + + channel.getRemoteAddress() + " localAddress:" + + channel.getLocalAddress()); } } @@ -112,7 +117,6 @@ public Class interest() { * @param request */ protected void logRequest(Channel channel, T request) { - log(request.toString()); if (channel != null) { log(new StringBuilder("Remote:").append(channel.getRemoteAddress()).append(" Request:") .append(request).toString()); @@ -127,7 +131,7 @@ protected void logRequest(Channel channel, T request) { * @param log */ protected void log(String log) { - LOGGER.info(new StringBuilder("[").append(getClassName()).append("] ").append(log) + LOGGER_EXCHANGE.info(new StringBuilder("[").append(getClassName()).append("] ").append(log) .toString()); } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java index dd9ecdc49..ac94deb3e 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java @@ -39,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -81,11 +82,19 @@ public Map> getMetaServerMap() { Connection connection = null; try { if (connectionMap.isEmpty()) { - connection = ((BoltChannel) metaNodeExchanger.connect(new URL( - set.iterator().next(), dataServerBootstrapConfig.getMetaServerPort()))) - .getConnection(); + List list = new ArrayList(set); + Collections.shuffle(list); + connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator() + .next(), dataServerBootstrapConfig.getMetaServerPort()))).getConnection(); } else { - connection = connectionMap.values().iterator().next(); + List connections = new ArrayList<>(connectionMap.values()); + Collections.shuffle(connections); + connection = connections.iterator().next(); + if (!connection.isFine()) { + connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection + .getRemoteIP(), dataServerBootstrapConfig.getMetaServerPort()))) + .getConnection(); + } } GetNodesRequest request = new GetNodesRequest(NodeType.META); @@ -127,6 +136,8 @@ public URL getRequestUrl() { .warn( "[DefaultMetaServiceImpl] refresh connections from metaServer error,refresh leader : {}", con); + throw new RuntimeException( + "[DefaultMetaServiceImpl] refresh connections from metaServer error!", e); } return map; } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/task/ConnectionRefreshMetaTask.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/task/ConnectionRefreshMetaTask.java index a36967ba0..50946a93a 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/task/ConnectionRefreshMetaTask.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/task/ConnectionRefreshMetaTask.java @@ -18,6 +18,7 @@ import com.alipay.sofa.registry.server.data.event.EventCenter; import com.alipay.sofa.registry.server.data.event.MetaServerChangeEvent; +import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum; import com.alipay.sofa.registry.server.data.remoting.dataserver.task.AbstractTask; import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService; import org.springframework.beans.factory.annotation.Autowired; @@ -56,4 +57,9 @@ public int getInitialDelay() { public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } + + @Override + public StartTaskTypeEnum getStartTaskTypeEnum() { + return StartTaskTypeEnum.CONNECT_META; + } } \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java index f262b420f..fa870f910 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java @@ -21,37 +21,53 @@ import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; import com.alipay.sofa.registry.server.data.change.event.ClientChangeEvent; import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter; +import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess; import com.alipay.sofa.registry.server.data.executor.ExecutorFactory; +import com.alipay.sofa.registry.server.data.node.DataNodeStatus; import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory; +import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * @author qian.lqlq * @version $Id: ClientDisconnectEventHandler.java, v 0.1 2017-12-07 15:32 qian.lqlq Exp $ */ -public class DisconnectEventHandler implements InitializingBean { +public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess { - private static final Logger LOGGER = LoggerFactory - .getLogger(DisconnectEventHandler.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(DisconnectEventHandler.class); + + private static final Logger LOGGER_START = LoggerFactory + .getLogger("DATA-START-LOGS"); /** * a DelayQueue that contains client disconnect events */ - private final DelayQueue EVENT_QUEUE = new DelayQueue<>(); + private final DelayQueue EVENT_QUEUE = new DelayQueue<>(); + + @Autowired + private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired - private SessionServerConnectionFactory sessionServerConnectionFactory; + private DataChangeEventCenter dataChangeEventCenter; @Autowired - private DataChangeEventCenter dataChangeEventCenter; + private DataServerConfig dataServerConfig; @Autowired - private DataServerConfig dataServerConfig; + private DataNodeStatus dataNodeStatus; + + private static final int BLOCK_FOR_ALL_SYNC = 5000; + + private static final BlockingQueue noWorkQueue = new LinkedBlockingQueue<>(); /** * receive disconnect event of client @@ -64,13 +80,42 @@ public void receive(DisconnectEvent event) { LOGGER.info("receive session off event: sessionServerHost={}, processId={}", sessionServerDisconnectEvent.getSessionServerHost(), sessionServerDisconnectEvent.getProcessId()); + } else if (event.getType() == DisconnectTypeEnum.CLIENT) { + ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent) event; + LOGGER.info("receive client off event: clientHost={}", clientDisconnectEvent.getHost()); + } + + if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { + LOGGER.info("receive disconnect event,but data server not working!"); + noWorkQueue.add(event); + return; } EVENT_QUEUE.add(event); } + public void afterWorkingProcess() { + try { + //sleep for sync all done + TimeUnit.MILLISECONDS.sleep(BLOCK_FOR_ALL_SYNC); + + while (!noWorkQueue.isEmpty()) { + DisconnectEvent event = noWorkQueue.poll(1, TimeUnit.SECONDS); + if (event != null) { + receive(event); + } + } + } catch (InterruptedException e) { + LOGGER.error("receive disconnect event after working interrupted!", e); + } + } + + @Override + public int getOrder() { + return 0; + } + @Override public void afterPropertiesSet() { - LOGGER.info("begin start DisconnectEventHandler"); Executor executor = ExecutorFactory .newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName()); executor.execute(() -> { @@ -109,7 +154,7 @@ public void afterPropertiesSet() { } } }); - LOGGER.info("start DisconnectEventHandler success"); + LOGGER_START.info("start DisconnectEventHandler success"); } /** diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/ClientOffHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/ClientOffHandler.java index 2ebd53d7f..61724135b 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/ClientOffHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/ClientOffHandler.java @@ -53,7 +53,7 @@ public Object doHandle(Channel channel, ClientOffRequest request) { List hosts = request.getHosts(); for (String host : hosts) { disconnectEventHandler.receive(new ClientDisconnectEvent(host, request.getGmtOccur(), - dataServerBootstrapConfig.getClientOffDelayMs() * 10)); + dataServerBootstrapConfig.getClientOffDelayMs())); } return CommonResponse.buildSuccessResponse(); } diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DataServerConnectionHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DataServerConnectionHandler.java index 6f41e1f95..2340fa18d 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DataServerConnectionHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DataServerConnectionHandler.java @@ -17,6 +17,7 @@ package com.alipay.sofa.registry.server.data.remoting.sessionserver.handler; import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.Node.NodeType; import com.alipay.sofa.registry.net.NetUtil; import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.remoting.ChannelHandler; @@ -26,6 +27,10 @@ import org.springframework.beans.factory.annotation.Autowired; /** + * + * Connection handler for session connect data server + * + * fix shangyu.wh * @author xuanbei * @since 2019/2/15 */ @@ -52,7 +57,7 @@ public void disconnected(Channel channel) throws RemotingException { @Override protected Node.NodeType getConnectNodeType() { - return Node.NodeType.DATA; + return NodeType.SESSION; } @Override diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/resource/DataDigestResource.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/resource/DataDigestResource.java index 786bf7af0..d61ea6da3 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/resource/DataDigestResource.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/resource/DataDigestResource.java @@ -16,21 +16,34 @@ */ package com.alipay.sofa.registry.server.data.resource; +import com.alipay.remoting.Connection; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.net.NetUtil; +import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; import com.alipay.sofa.registry.server.data.cache.DatumCache; +import com.alipay.sofa.registry.server.data.node.DataServerNode; +import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory; +import com.alipay.sofa.registry.server.data.remoting.metaserver.MetaServerConnectionFactory; +import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; /** * @@ -40,6 +53,21 @@ @Path("digest") public class DataDigestResource { + private final static String SESSION = "SESSION"; + + private final static String DATA = "DATA"; + + private final static String META = "META"; + + @Autowired + private SessionServerConnectionFactory sessionServerConnectionFactory; + + @Autowired + private MetaServerConnectionFactory metaServerConnectionFactory; + + @Autowired + private DataServerConfig dataServerConfig; + @GET @Path("datum/query") @Produces(MediaType.APPLICATION_JSON) @@ -111,6 +139,90 @@ public String getDatumCount() { return sb.toString(); } + @GET + @Path("{type}/serverList/query") + @Produces(MediaType.APPLICATION_JSON) + public Map> getServerListAll(@PathParam("type") String type) { + + Map> map = new HashMap<>(); + if (type != null && !type.isEmpty()) { + String inputType = type.toUpperCase(); + + switch (inputType) { + case SESSION: + List sessionList = getSessionServerList(); + if (sessionList != null) { + map = new HashMap<>(); + map.put(dataServerConfig.getLocalDataCenter(), sessionList); + } + break; + case DATA: + map = getDataServerList(); + break; + case META: + map = getMetaServerList(); + break; + default: + map = new HashMap<>(); + break; + } + + } + return map; + } + + public List getSessionServerList() { + List connections = sessionServerConnectionFactory.getConnections().stream().filter(connection -> connection != null && connection.isFine()) + .map(connection -> connection.getRemoteIP() + ":" + connection.getRemotePort()) + .collect(Collectors.toList()); + return connections; + } + + public Map> getDataServerList() { + + Map> map = new HashMap<>(); + Set allDataCenter = new HashSet<>(DataServerNodeFactory.getAllDataCenters()); + for (String dataCenter:allDataCenter) { + + List list = map.computeIfAbsent(dataCenter,k->new ArrayList<>()); + + Map dataNodes = DataServerNodeFactory.getDataServerNodes(dataCenter); + if(dataNodes != null && !dataNodes.isEmpty()){ + + dataNodes.forEach((ip,dataServerNode)->{ + if (ip != null && !ip.equals(DataServerConfig.IP)) { + Connection connection = dataServerNode.getConnection(); + if (connection != null && connection.isFine()) { + list.add(connection.getRemoteIP()); + } + } + }); + } + } + return map; + } + + public Map> getMetaServerList() { + + Map> map = new HashMap<>(); + Set allDataCenter = new HashSet<>(metaServerConnectionFactory.getAllDataCenters()); + for (String dataCenter:allDataCenter) { + + List list = map.computeIfAbsent(dataCenter,k->new ArrayList<>()); + + Map metaConnections = metaServerConnectionFactory.getConnections(dataCenter); + if(metaConnections != null && !metaConnections.isEmpty()){ + + metaConnections.forEach((ip,connection)->{ + if (connection != null && connection.isFine()) { + list.add(connection.getRemoteIP()); + } + }); + } + } + return map; + } + private boolean isBlank(String dataInfoId) { return dataInfoId == null || dataInfoId.isEmpty(); } diff --git a/server/server/data/src/main/resources/application.properties b/server/server/data/src/main/resources/application.properties index 6a28ef486..d172a1405 100644 --- a/server/server/data/src/main/resources/application.properties +++ b/server/server/data/src/main/resources/application.properties @@ -10,6 +10,7 @@ data.server.httpServerPort=9622 data.server.queueCount=4 data.server.queueSize=10240 data.server.notifyIntervalMs=500 +data.server.clientOffDelayMs=1000 data.server.notifyTempDataIntervalMs=10 data.server.rpcTimeout=3000 data.server.metaServerPort=9611 diff --git a/server/server/data/src/main/resources/logback-spring.xml b/server/server/data/src/main/resources/logback-spring.xml index eab8360d6..f64cde75d 100644 --- a/server/server/data/src/main/resources/logback-spring.xml +++ b/server/server/data/src/main/resources/logback-spring.xml @@ -173,6 +173,46 @@ + + true + + ${LOG_LEVEL} + + + ERROR + DENY + + ${DATA_LOG_HOME}/registry-connect.log + + ${DATA_LOG_HOME}/registry-connect.log.%d{yyyy-MM-dd} + 30 + + + [%d{ISO8601}][%p][%t][%c{0}] - %m%n + ${LOG_ENCODE} + + + + + true + + ${LOG_LEVEL} + + + ERROR + DENY + + ${DATA_LOG_HOME}/registry-exchange.log + + ${DATA_LOG_HOME}/registry-exchange.log.%d{yyyy-MM-dd} + 30 + + + [%d{ISO8601}][%p][%t][%c{0}] - %m%n + ${LOG_ENCODE} + + + @@ -198,7 +238,7 @@ - @@ -214,6 +254,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/BackupTriadTest.java b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/BackupTriadTest.java index ba4bba93a..3f5e7985d 100644 --- a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/BackupTriadTest.java +++ b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/BackupTriadTest.java @@ -51,12 +51,10 @@ public void doTest() { Set notWorking = new HashSet<>(); notWorking.add("192.168.0.2"); assertEquals(2, backupTriad.getNewJoined(newTriad, notWorking).size()); - assertEquals( - "DataNode{ip=192.168.0.2, dataCenter='DefaultDataCenter', regionId='null', nodeStatus=INIT, registrationTimestamp=0}", - backupTriad.getNewJoined(newTriad, notWorking).get(0).toString()); - assertEquals( - "DataNode{ip=192.168.0.4, dataCenter='DefaultDataCenter', regionId='null', nodeStatus=INIT, registrationTimestamp=0}", - backupTriad.getNewJoined(newTriad, notWorking).get(1).toString()); + assertEquals("DataNode{ip=192.168.0.2}", backupTriad.getNewJoined(newTriad, notWorking) + .get(0).toString()); + assertEquals("DataNode{ip=192.168.0.4}", backupTriad.getNewJoined(newTriad, notWorking) + .get(1).toString()); assertEquals("TestDataInfoId", backupTriad.getDataInfoId()); backupTriad.setDataInfoId("AnotherTestDataInfoId"); @@ -68,14 +66,9 @@ public void doTest() { backupTriad.setTriad(nodeList); assertTrue(backupTriad.containsSelf()); assertEquals(2, backupTriad.getTriad().size()); - assertEquals( - "DataNode{ip=192.168.0.1, dataCenter='DefaultDataCenter', regionId='null', nodeStatus=INIT, registrationTimestamp=0}", - backupTriad.getTriad().get(0).toString()); - assertEquals( - "DataNode{ip=" - + DataServerConfig.IP - + ", dataCenter='DefaultDataCenter', regionId='null', nodeStatus=INIT, registrationTimestamp=0}", - backupTriad.getTriad().get(1).toString()); + assertEquals("DataNode{ip=192.168.0.1}", backupTriad.getTriad().get(0).toString()); + assertEquals("DataNode{ip=" + DataServerConfig.IP + "}", backupTriad.getTriad().get(1) + .toString()); assertTrue(backupTriad.toString().contains( "BackupTriad{dataInfoId='AnotherTestDataInfoId', ipSetOfNode=") && backupTriad.toString().contains("192.168.0.1") diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/AbstractNodeConfigBean.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/AbstractNodeConfigBean.java index c5b8e78a5..430e29b45 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/AbstractNodeConfigBean.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/AbstractNodeConfigBean.java @@ -105,9 +105,7 @@ public String getMetaDataCenter(String metaIpAddress) { } }); - if (ret.get() == null) { - LOGGER.error("node ipAddress:" + metaIpAddress + " cannot be found on config list!"); - } + dataCenterRet = ret.get(); } return dataCenterRet; diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/RaftExchanger.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/RaftExchanger.java index 6a35b4b7a..981c1a19e 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/RaftExchanger.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/RaftExchanger.java @@ -58,6 +58,8 @@ public class RaftExchanger { private static final Logger METRICS_LOGGER = LoggerFactory.getLogger("META-JRAFT-METRICS"); + private static final Logger LOGGER_START = LoggerFactory.getLogger("META-START-LOGS"); + @Autowired private MetaServerConfig metaServerConfig; @@ -93,9 +95,9 @@ public void startRaftServer(final ExecutorManager executorManager) { raftServer.setLeaderProcessListener(new LeaderProcessListener() { @Override public void startProcess() { - LOGGER.info("Start leader process..."); + LOGGER_START.info("Start leader process..."); executorManager.startScheduler(); - LOGGER.info("Initialize server scheduler success!"); + LOGGER_START.info("Initialize server scheduler success!"); PeerId leader = new PeerId(NetUtil.getLocalAddress().getHostAddress(), metaServerConfig.getRaftServerPort()); raftServer.sendNotify(leader, "leader"); @@ -104,9 +106,9 @@ public void startProcess() { @Override public void stopProcess() { - LOGGER.info("Stop leader process..."); + LOGGER_START.info("Stop leader process..."); executorManager.stopScheduler(); - LOGGER.info("Stop server scheduler success!"); + LOGGER_START.info("Stop server scheduler success!"); PeerId leader = new PeerId(NetUtil.getLocalAddress().getHostAddress(), metaServerConfig.getRaftServerPort()); raftServer.sendNotify(leader, "leader"); @@ -116,14 +118,14 @@ public void stopProcess() { raftServer.setFollowerProcessListener(new FollowerProcessListener() { @Override public void startProcess(PeerId leader) { - LOGGER.info("Start follower process leader {}...", leader); + LOGGER_START.info("Start follower process leader {}...", leader); raftServer.sendNotify(leader, "follower"); registerCurrentNode(); } @Override public void stopProcess(PeerId leader) { - LOGGER.info("Stop follower process leader {}...", leader); + LOGGER_START.info("Stop follower process leader {}...", leader); raftServer.sendNotify(leader, "follower"); } }); @@ -136,7 +138,7 @@ public void stopProcess(PeerId leader) { } } catch (Exception e) { serverStart.set(false); - LOGGER.error("Start raft server error!", e); + LOGGER_START.error("Start raft server error!", e); throw new RuntimeException("Start raft server error!", e); } } @@ -160,7 +162,7 @@ public void startRaftClient() { } } catch (Exception e) { clientStart.set(false); - LOGGER.error("Start raft client error!", e); + LOGGER_START.error("Start raft client error!", e); throw new RuntimeException("Start raft client error!", e); } } @@ -174,7 +176,7 @@ public void startCliService() { cliService = new CliServiceImpl(); cliService.init(new CliOptions()); } catch (Exception e) { - LOGGER.error("Start raft cliService error!", e); + LOGGER_START.error("Start raft cliService error!", e); throw new RuntimeException("Start raft cliService error!", e); } } @@ -190,7 +192,7 @@ private void registerCurrentNode() { metaServerRegistry.register(new MetaNode(new URL(ip, 0), nodeConfig .getLocalDataCenter())); } else { - LOGGER.error( + LOGGER_START.error( "Register CurrentNode fail!meta node list config not contains current ip {}", ip); throw new RuntimeException( diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/MetaRepositoryService.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/MetaRepositoryService.java index 06bb4a26d..927247aa9 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/MetaRepositoryService.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/MetaRepositoryService.java @@ -347,4 +347,13 @@ public Set getSnapshotFileNames() { snapShotFileNames.add(this.getClass().getSimpleName()); return snapShotFileNames; } + + /** + * Setter method for property nodeConfig. + * + * @param nodeConfig value to be assigned to property nodeConfig + */ + public void setNodeConfig(NodeConfig nodeConfig) { + this.nodeConfig = nodeConfig; + } } \ No newline at end of file diff --git a/server/server/meta/src/main/resources/logback-spring.xml b/server/server/meta/src/main/resources/logback-spring.xml index 9251cda98..6294f4dcf 100644 --- a/server/server/meta/src/main/resources/logback-spring.xml +++ b/server/server/meta/src/main/resources/logback-spring.xml @@ -213,6 +213,24 @@ + + + + + + + + + + + + + + + + + + diff --git a/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/DataRepositoryServiceTest.java b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/DataRepositoryServiceTest.java new file mode 100644 index 000000000..d8a2fd1ed --- /dev/null +++ b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/DataRepositoryServiceTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.meta.test.service; + +import com.alipay.sofa.registry.common.model.Node.NodeStatus; +import com.alipay.sofa.registry.common.model.metaserver.DataNode; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.server.meta.bootstrap.NodeConfig; +import com.alipay.sofa.registry.server.meta.repository.NodeRepository; +import com.alipay.sofa.registry.server.meta.repository.service.DataRepositoryService; +import com.alipay.sofa.registry.server.meta.store.RenewDecorate; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author shangyu.wh + * @version 1.0: MetaRepositoryServiceTest.java, v 0.1 2019-08-01 12:18 shangyu.wh Exp $ + */ +public class DataRepositoryServiceTest { + + @Test + public void testDataRepositoryServicePut() { + Map registry = new ConcurrentHashMap<>(); + DataRepositoryService metaRepositoryService = new DataRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new DataNode(new URL(ip, 0), + dataCenter)); + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getIp(), ip); + } + + @Test + public void testDataRepositoryServiceRemove() { + Map registry = new ConcurrentHashMap<>(); + DataRepositoryService metaRepositoryService = new DataRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new DataNode(new URL(ip, 0), + dataCenter)); + + metaRepositoryService.put(ip, metaNode); + + metaRepositoryService.remove(ip); + + Assert.assertEquals(metaRepositoryService.getAllData().size(), 0); + } + + @Test + public void testMetaRepositoryServiceReplace() { + Map registry = new ConcurrentHashMap<>(); + DataRepositoryService metaRepositoryService = new DataRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new DataNode(new URL(ip, 0), + dataCenter)); + + DataNode metaNodeFix = new DataNode(new URL(ip, 0), dataCenter); + metaNodeFix.setNodeStatus(NodeStatus.WORKING); + + RenewDecorate metaNode2 = new RenewDecorate(metaNodeFix); + + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.INIT); + + metaRepositoryService.replace(ip, metaNode2); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.WORKING); + Assert.assertEquals(metaRepositoryService.getAllData().size(), 1); + } + + @Test + public void testMetaRepositoryServiceReplaceAll() { + Map registry = new ConcurrentHashMap<>(); + DataRepositoryService metaRepositoryService = new DataRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new DataNode(new URL(ip, 0), + dataCenter)); + + DataNode metaNodeFix = new DataNode(new URL(ip, 0), dataCenter); + metaNodeFix.setNodeStatus(NodeStatus.WORKING); + + RenewDecorate metaNode2 = new RenewDecorate(metaNodeFix); + + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.INIT); + + Map> map = new HashMap<>(); + map.put(ip, metaNode2); + + metaRepositoryService.replaceAll(dataCenter, map, 1234l); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.WORKING); + Assert.assertEquals(metaRepositoryService.getAllDataMap().get(dataCenter).size(), 1); + } +} \ No newline at end of file diff --git a/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/MetaRepositoryServiceTest.java b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/MetaRepositoryServiceTest.java new file mode 100644 index 000000000..479012abd --- /dev/null +++ b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/MetaRepositoryServiceTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.meta.test.service; + +import com.alipay.sofa.registry.common.model.Node.NodeStatus; +import com.alipay.sofa.registry.common.model.metaserver.MetaNode; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.server.meta.bootstrap.NodeConfig; +import com.alipay.sofa.registry.server.meta.repository.NodeRepository; +import com.alipay.sofa.registry.server.meta.repository.service.MetaRepositoryService; +import com.alipay.sofa.registry.server.meta.store.RenewDecorate; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author shangyu.wh + * @version 1.0: MetaRepositoryServiceTest.java, v 0.1 2019-08-01 12:18 shangyu.wh Exp $ + */ +public class MetaRepositoryServiceTest { + + @Test + public void testMetaRepositoryServicePut() { + Map registry = new ConcurrentHashMap<>(); + MetaRepositoryService metaRepositoryService = new MetaRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new MetaNode(new URL(ip, 0), + dataCenter)); + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getIp(), ip); + } + + @Test + public void testMetaRepositoryServiceRemove() { + Map registry = new ConcurrentHashMap<>(); + MetaRepositoryService metaRepositoryService = new MetaRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new MetaNode(new URL(ip, 0), + dataCenter)); + + metaRepositoryService.put(ip, metaNode); + + metaRepositoryService.remove(ip); + + Assert.assertEquals(metaRepositoryService.getAllData().size(), 0); + } + + @Test + public void testMetaRepositoryServiceReplace() { + Map registry = new ConcurrentHashMap<>(); + MetaRepositoryService metaRepositoryService = new MetaRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new MetaNode(new URL(ip, 0), + dataCenter)); + + MetaNode metaNodeFix = new MetaNode(new URL(ip, 0), dataCenter); + metaNodeFix.setNodeStatus(NodeStatus.WORKING); + + RenewDecorate metaNode2 = new RenewDecorate(metaNodeFix); + + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.INIT); + + metaRepositoryService.replace(ip, metaNode2); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.WORKING); + Assert.assertEquals(metaRepositoryService.getAllData().size(), 1); + } + + @Test + public void testMetaRepositoryServiceReplaceAll() { + Map registry = new ConcurrentHashMap<>(); + MetaRepositoryService metaRepositoryService = new MetaRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + metaRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate metaNode = new RenewDecorate(new MetaNode(new URL(ip, 0), + dataCenter)); + + MetaNode metaNodeFix = new MetaNode(new URL(ip, 0), dataCenter); + metaNodeFix.setNodeStatus(NodeStatus.WORKING); + + RenewDecorate metaNode2 = new RenewDecorate(metaNodeFix); + + metaRepositoryService.put(ip, metaNode); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.INIT); + + Map> map = new HashMap<>(); + map.put(ip, metaNode2); + + metaRepositoryService.replaceAll(dataCenter, map, 1234l); + + Assert.assertEquals(metaRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.WORKING); + Assert.assertEquals(metaRepositoryService.getAllDataMap().get(dataCenter).size(), 1); + } +} \ No newline at end of file diff --git a/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/SessionRepositoryServiceTest.java b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/SessionRepositoryServiceTest.java new file mode 100644 index 000000000..da0997188 --- /dev/null +++ b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/test/service/SessionRepositoryServiceTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.meta.test.service; + +import com.alipay.sofa.registry.common.model.Node.NodeStatus; +import com.alipay.sofa.registry.common.model.metaserver.SessionNode; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.server.meta.bootstrap.NodeConfig; +import com.alipay.sofa.registry.server.meta.repository.service.SessionRepositoryService; +import com.alipay.sofa.registry.server.meta.store.RenewDecorate; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author shangyu.wh + * @version 1.0: SessionRepositoryServiceTest.java, v 0.1 2019-08-01 13:04 shangyu.wh Exp $ + */ +public class SessionRepositoryServiceTest { + + @Test + public void testSessionRepositoryServicePut() { + ConcurrentHashMap> registry = new ConcurrentHashMap<>(); + SessionRepositoryService sessionRepositoryService = new SessionRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + sessionRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate sessionNode = new RenewDecorate(new SessionNode(new URL(ip, 0), + dataCenter)); + sessionRepositoryService.put(ip, sessionNode); + + Assert.assertEquals(sessionRepositoryService.get(ip).getRenewal().getIp(), ip); + } + + @Test + public void testSessionRepositoryServiceRemove() { + ConcurrentHashMap> registry = new ConcurrentHashMap<>(); + SessionRepositoryService sessionRepositoryService = new SessionRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + sessionRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate sessionNode = new RenewDecorate(new SessionNode(new URL(ip, 0), + dataCenter)); + sessionRepositoryService.put(ip, sessionNode); + + sessionRepositoryService.remove(ip); + + Assert.assertEquals(sessionRepositoryService.getAllData().size(), 0); + } + + @Test + public void testSessionRepositoryServiceReplace() { + ConcurrentHashMap> registry = new ConcurrentHashMap<>(); + SessionRepositoryService sessionRepositoryService = new SessionRepositoryService(registry); + + String ip = "192.1.1.1"; + String dataCenter = "zue"; + + sessionRepositoryService.setNodeConfig(new NodeConfig() { + @Override + public Map> getMetaNode() { + return null; + } + + @Override + public Map> getMetaNodeIP() { + return null; + } + + @Override + public String getLocalDataCenter() { + return dataCenter; + } + + @Override + public String getMetaDataCenter(String metaIpAddress) { + return dataCenter; + } + + @Override + public Set getDataCenterMetaServers(String dataCenter) { + return null; + } + }); + + RenewDecorate sessionNod = new RenewDecorate(new SessionNode(new URL(ip, 0), + dataCenter)); + + SessionNode metaNodeFix = new SessionNode(new URL(ip, 0), dataCenter); + metaNodeFix.setNodeStatus(NodeStatus.WORKING); + + RenewDecorate metaNode2 = new RenewDecorate(metaNodeFix); + + sessionRepositoryService.put(ip, sessionNod); + + Assert.assertEquals(sessionRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.INIT); + + sessionRepositoryService.replace(ip, metaNode2); + + Assert.assertEquals(sessionRepositoryService.get(ip).getRenewal().getNodeStatus(), + NodeStatus.WORKING); + Assert.assertEquals(sessionRepositoryService.getAllData().size(), 1); + Assert.assertEquals(sessionRepositoryService.getAllDataMap().get(dataCenter).size(), 1); + } + +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/ReceivedDataMultiPushTaskListener.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/ReceivedDataMultiPushTaskListener.java index e4c8b06b3..fa8e67694 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/ReceivedDataMultiPushTaskListener.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/ReceivedDataMultiPushTaskListener.java @@ -22,10 +22,12 @@ import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; import com.alipay.sofa.registry.server.session.node.service.ClientNodeService; import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager; +import com.alipay.sofa.registry.server.session.scheduler.task.PushTaskClosure; import com.alipay.sofa.registry.server.session.scheduler.task.ReceivedDataMultiPushTask; import com.alipay.sofa.registry.server.session.scheduler.task.SessionTask; import com.alipay.sofa.registry.server.session.strategy.ReceivedDataMultiPushTaskStrategy; import com.alipay.sofa.registry.server.session.strategy.TaskMergeProcessorStrategy; +import com.alipay.sofa.registry.task.TaskClosure; import com.alipay.sofa.registry.task.batcher.TaskProcessor; import com.alipay.sofa.registry.task.listener.TaskEvent; import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType; @@ -103,6 +105,11 @@ public boolean support(TaskEvent event) { @Override public void handleEvent(TaskEvent event) { + TaskClosure taskClosure = event.getTaskClosure(); + + if (taskClosure != null && taskClosure instanceof PushTaskClosure) { + ((PushTaskClosure) taskClosure).addTask(event); + } receiveDataTaskMergeProcessorStrategy.handleEvent(event); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java index 46d408b4f..3bb5e1110 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java @@ -16,11 +16,16 @@ */ package com.alipay.sofa.registry.server.session.resource; +import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.Node.NodeType; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.StoreData; import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.Watcher; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; +import com.alipay.sofa.registry.server.session.node.NodeManager; +import com.alipay.sofa.registry.server.session.node.NodeManagerFactory; +import com.alipay.sofa.registry.server.session.node.SessionNodeManager; import com.alipay.sofa.registry.server.session.store.DataStore; import com.alipay.sofa.registry.server.session.store.Interests; import com.alipay.sofa.registry.server.session.store.Watchers; @@ -70,11 +75,17 @@ public class SessionDigestResource { @Autowired private SessionServerConfig sessionServerConfig; - private final static String SUB = "SUB"; + private final static String SUB = "SUB"; - private final static String PUB = "PUB"; + private final static String PUB = "PUB"; - private final static String WAT = "WAT"; + private final static String WAT = "WAT"; + + private final static String SESSION = "SESSION"; + + private final static String DATA = "DATA"; + + private final static String META = "META"; @GET @Path("{type}/data/query") @@ -208,4 +219,78 @@ private void fillServerList(String type, } } } + + @GET + @Path("{type}/serverList/query") + @Produces(MediaType.APPLICATION_JSON) + public List getServerListAll(@PathParam("type") String type) { + List serverList = new ArrayList<>(); + if (type != null && !type.isEmpty()) { + String inputType = type.toUpperCase(); + + switch (inputType) { + case SESSION: + serverList = getSessionServerList(); + break; + case DATA: + serverList = getDataServerList(); + break; + case META: + serverList = getMetaServerList(); + break; + default: + serverList = new ArrayList<>(); + break; + } + + } + return serverList; + } + + public List getSessionServerList() { + + List serverList = new ArrayList<>(); + NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.SESSION); + + if (nodeManager instanceof SessionNodeManager) { + SessionNodeManager sessionNodeManager = (SessionNodeManager) nodeManager; + serverList = sessionNodeManager.getZoneServerList(sessionServerConfig + .getSessionServerRegion()); + } + return serverList; + } + + public List getDataServerList() { + List serverList = new ArrayList<>(); + NodeManager dataNodeManager = NodeManagerFactory.getNodeManager(NodeType.DATA); + + Collection dataNodes = dataNodeManager.getDataCenterNodes(); + + if (!CollectionUtils.isEmpty(dataNodes)) { + for (Node dataNode : dataNodes) { + if (dataNode.getNodeUrl() == null || dataNode.getNodeUrl().getIpAddress() == null) { + continue; + } + serverList.add(dataNode.getNodeUrl().getIpAddress()); + } + } + return serverList; + } + + public List getMetaServerList() { + List serverList = new ArrayList<>(); + NodeManager metaNodeManager = NodeManagerFactory.getNodeManager(NodeType.META); + + Collection metaNodes = metaNodeManager.getDataCenterNodes(); + + if (!CollectionUtils.isEmpty(metaNodes)) { + for (Node metaNode : metaNodes) { + if (metaNode.getNodeUrl() == null || metaNode.getNodeUrl().getIpAddress() == null) { + continue; + } + serverList.add(metaNode.getNodeUrl().getIpAddress()); + } + } + return serverList; + } } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/AbstractSessionTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/AbstractSessionTask.java index 3135c1461..47d9e853b 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/AbstractSessionTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/AbstractSessionTask.java @@ -46,6 +46,10 @@ public synchronized String getTaskId() { return taskId; } + protected synchronized void setTaskId(String taskId) { + this.taskId = taskId; + } + protected boolean checkRetryTimes(int configTimes) { if (configTimes > 0) { if (execCount.incrementAndGet() > configTimes) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/CancelDataTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/CancelDataTask.java index df380c58b..17ef2a827 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/CancelDataTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/CancelDataTask.java @@ -82,6 +82,12 @@ public void execute() { @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (obj instanceof List) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchCloudTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchCloudTask.java index a149fe500..0a31df7ce 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchCloudTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchCloudTask.java @@ -98,6 +98,11 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof String)) { @@ -249,8 +254,9 @@ private void fireReceiveDataPushTask(Map datums, TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); taskEvent.setTaskClosure(pushTaskClosure); taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); - taskLogger.info("send {} taskURL:{},taskScope:{},version:{}", taskEvent.getTaskType(), - subscriber.getSourceAddress(), scopeEnum, receivedData.getVersion()); + taskLogger.info("send {} taskURL:{},taskScope:{},version:{},taskId={}", + taskEvent.getTaskType(), subscriber.getSourceAddress(), scopeEnum, + receivedData.getVersion(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -268,9 +274,10 @@ private void fireUserDataElementPushTask(Datum datum, Subscriber subscriber, int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", + taskLogger.info( + "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), - datum.getDataCenter(), size, subscribers.size()); + datum.getDataCenter(), size, subscribers.size(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -287,9 +294,10 @@ private void fireUserDataElementMultiPushTask(Datum datum, Subscriber subscriber int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", + taskLogger.info( + "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), - datum.getDataCenter(), size, subscribers.size()); + datum.getDataCenter(), size, subscribers.size(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java index 3757a3117..8a95b8548 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java @@ -103,6 +103,11 @@ public void execute() { for (ScopeEnum scopeEnum : ScopeEnum.values()) { Map> map = getCache(scopeEnum); if (map != null && !map.isEmpty()) { + LOGGER + .info( + "Get all subscribers to send from cache size:{},which dataInfoId:{} on dataCenter:{},scope:{}", + map.size(), dataChangeRequest.getDataInfoId(), + dataChangeRequest.getDataCenter(), scopeEnum); for (Entry> entry : map.entrySet()) { Map subscriberMap = entry.getValue(); if (subscriberMap != null && !subscriberMap.isEmpty()) { @@ -111,6 +116,12 @@ public void execute() { Collection subscribersSend = subscribersVersionCheck(subscriberMap .values()); if (subscribersSend.isEmpty()) { + LOGGER + .warn( + "Subscribers to send empty,which dataInfoId:{} on dataCenter:{},scope:{},address:{},size:{}", + dataChangeRequest.getDataInfoId(), + dataChangeRequest.getDataCenter(), scopeEnum, + entry.getKey(), subscriberMap.size()); continue; } @@ -248,8 +259,8 @@ private void fireReceivedDataMultiPushTask(Datum datum, List subscriberR TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); taskEvent.setTaskClosure(pushTaskClosure); taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); - taskLogger.info("send {} taskURL:{},taskScope:{}", taskEvent.getTaskType(), subscriber.getSourceAddress(), - scopeEnum); + taskLogger.info("send {} taskURL:{},taskScope:{},,taskId={}", taskEvent.getTaskType(), subscriber.getSourceAddress(), + scopeEnum,taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -278,9 +289,10 @@ private void fireUserDataElementPushTask(InetSocketAddress address, Datum datum, int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", + taskLogger.info( + "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size, - subscribers.size()); + subscribers.size(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -297,9 +309,10 @@ private void fireUserDataElementMultiPushTask(InetSocketAddress address, Datum d int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", + taskLogger.info( + "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size, - subscribers.size()); + subscribers.size(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -310,6 +323,12 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof DataChangeRequest)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataPushTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataPushTask.java index 52eb5bcfc..8e768b46f 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataPushTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataPushTask.java @@ -29,7 +29,6 @@ import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter; import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager; import com.alipay.sofa.registry.server.session.store.Interests; -import com.alipay.sofa.registry.task.batcher.TaskProcessor.ProcessingResult; import com.alipay.sofa.registry.task.listener.TaskEvent; import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType; import com.alipay.sofa.registry.task.listener.TaskListenerManager; @@ -225,6 +224,11 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof DataPushRequest)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ProvideDataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ProvideDataChangeFetchTask.java index 19ffb86c1..7d6c69ca9 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ProvideDataChangeFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ProvideDataChangeFetchTask.java @@ -100,6 +100,11 @@ public ProvideDataChangeFetchTask(SessionServerConfig sessionServerConfig, @Override public void setTaskEvent(TaskEvent taskEvent) { + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof NotifyProvideDataChange)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/PushTaskClosure.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/PushTaskClosure.java index cf7d46ece..68a7a4b45 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/PushTaskClosure.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/PushTaskClosure.java @@ -21,8 +21,9 @@ import com.alipay.sofa.registry.task.Task; import com.alipay.sofa.registry.task.TaskClosure; import com.alipay.sofa.registry.task.batcher.TaskProcessor.ProcessingResult; +import com.alipay.sofa.registry.task.listener.TaskEvent; -import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -39,7 +40,8 @@ public class PushTaskClosure implements TaskClosure { private final static Logger LOGGER = LoggerFactory .getLogger(PushTaskClosure.class); - private ConcurrentHashMap taskMap = new ConcurrentHashMap<>(); + private Set tasks = ConcurrentHashMap + .newKeySet(); private ConcurrentHashMap taskResultMap = new ConcurrentHashMap<>(); @@ -64,21 +66,21 @@ public void run(ProcessingResult processingResult, Task task) { } } - public void addTask(Task task) { - taskMap.putIfAbsent(task.getTaskId(), task); + public void addTask(TaskEvent taskEvent) { + tasks.add(taskEvent.getTaskId()); } public void start() { pushTaskClosureExecutor.execute(() -> { try { - int size = taskMap.size(); - LOGGER.info("Push task queue size {},map size {}", completionQueue.size(), size); + int size = tasks.size(); + LOGGER.info("Push task queue size {},all task size {}", completionQueue.size(), size); for (int i = 0; i < size; i++) { String taskId = completionQueue.poll(6000, TimeUnit.MILLISECONDS); if(taskId != null) { ProcessingResult result = taskResultMap.get(taskId); if (result == ProcessingResult.Success) { - taskMap.remove(taskId); + tasks.remove(taskId); } } } @@ -86,14 +88,14 @@ public void start() { LOGGER.error("Push task check InterruptedException!", e); } - if (taskMap.isEmpty()) { + if (tasks.isEmpty()) { LOGGER.info("Push all tasks success"); if (taskClosure != null) { taskClosure.run(ProcessingResult.Success, null); } } else { - LOGGER.warn("Push tasks found error tasks {} !", taskMap); + LOGGER.warn("Push tasks found error tasks {} !", tasks.size()); if (taskClosure != null) { taskClosure.run(ProcessingResult.PermanentError, null); } @@ -102,12 +104,12 @@ public void start() { } /** - * Getter method for property taskMap. + * Getter method for property tasks. * - * @return property value of taskMap + * @return property value of tasks */ - public Map getTaskMap() { - return taskMap; + public Set getTasks() { + return tasks; } /** diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedConfigDataPushTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedConfigDataPushTask.java index 168e86474..11f6df8b6 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedConfigDataPushTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedConfigDataPushTask.java @@ -98,6 +98,12 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (obj instanceof Map) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedDataMultiPushTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedDataMultiPushTask.java index dde2fd9be..52328b44e 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedDataMultiPushTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/ReceivedDataMultiPushTask.java @@ -19,6 +19,7 @@ import com.alipay.sofa.registry.common.model.PushDataRetryRequest; import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.core.model.DataBox; import com.alipay.sofa.registry.core.model.ReceivedData; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -37,6 +38,7 @@ import com.alipay.sofa.registry.timer.AsyncHashedWheelTimer; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -62,6 +64,8 @@ public class ReceivedDataMultiPushTask extends AbstractSessionTask implements Ta private ReceivedDataMultiPushTaskStrategy receivedDataMultiPushTaskStrategy; private AsyncHashedWheelTimer asyncHashedWheelTimer; + private String dataPush; + public ReceivedDataMultiPushTask(SessionServerConfig sessionServerConfig, ClientNodeService clientNodeService, ExecutorManager executorManager, @@ -94,10 +98,11 @@ public void execute() { CallbackHandler callbackHandler = new CallbackHandler() { @Override public void onCallback(Channel channel, Object message) { - LOGGER.info( - "Push ReceivedData success! dataId:{},group:{},Instance:{},version:{},url: {}", - receivedData.getDataId(), receivedData.getGroup(), - receivedData.getInstanceId(), receivedData.getVersion(), url); + LOGGER + .info( + "Push ReceivedData success! dataId:{},group:{},Instance:{},version:{},url: {},dataPush:{}", + receivedData.getDataId(), receivedData.getGroup(), + receivedData.getInstanceId(), receivedData.getVersion(), url, dataPush); if (taskClosure != null) { confirmCallBack(true); @@ -106,10 +111,12 @@ public void onCallback(Channel channel, Object message) { @Override public void onException(Channel channel, Throwable exception) { - LOGGER.error( - "Push ReceivedData error! dataId:{},group:{},Instance:{},version:{},url: {}", - receivedData.getDataId(), receivedData.getGroup(), - receivedData.getInstanceId(), receivedData.getVersion(), url, exception); + LOGGER + .error( + "Push ReceivedData error! dataId:{},group:{},Instance:{},version:{},url: {},dataPush:{}", + receivedData.getDataId(), receivedData.getGroup(), + receivedData.getInstanceId(), receivedData.getVersion(), url, dataPush, + exception); if (taskClosure != null) { confirmCallBack(false); @@ -154,31 +161,31 @@ private void retrySendReceiveData(PushDataRetryRequest pushDataRetryRequest) { clientNodeService.pushWithCallback(infoPackage, targetUrl, new CallbackHandler() { @Override public void onCallback(Channel channel, Object message) { - LOGGER.info("Retry Push ReceivedData success! dataId:{}, group:{},url:{},retryTimes:{}", - receivedData.getDataId(), receivedData.getGroup(), targetUrl,retryTimes); + LOGGER.info("Retry Push ReceivedData success! dataId:{}, group:{},url:{},taskId:{},dataPush:{},retryTimes:{}", + receivedData.getDataId(), receivedData.getGroup(), targetUrl,getTaskId(),dataPush,retryTimes); } @Override public void onException(Channel channel, Throwable exception) { - LOGGER.error("Retry Push ReceivedData callback error! url:{}, dataId:{}, group:{},taskId:{},retryTimes:{}", targetUrl, - receivedData.getDataId(), receivedData.getGroup(),getTaskId(),retryTimes); + LOGGER.error("Retry Push ReceivedData callback error! url:{}, dataId:{}, group:{},taskId:{},dataPush:{},retryTimes:{}", targetUrl, + receivedData.getDataId(), receivedData.getGroup(),getTaskId(),dataPush,retryTimes); retrySendReceiveData(pushDataRetryRequest); } }); } catch (Exception e) { - LOGGER.error("Retry Push ReceivedData error! url:{}, dataId:{}, group:{},taskId:{},retryTimes:{}", targetUrl, - receivedData.getDataId(), receivedData.getGroup(),getTaskId(),retryTimes); + LOGGER.error("Retry Push ReceivedData error! url:{}, dataId:{}, group:{},taskId:{},dataPush:{},retryTimes:{}", targetUrl, + receivedData.getDataId(), receivedData.getGroup(),getTaskId(),dataPush,retryTimes); retrySendReceiveData(pushDataRetryRequest); } },getBlockTime(retryTimes),TimeUnit.MILLISECONDS); } else { - LOGGER.error("Retry Push ReceivedData error, connect be null or disconnected,stop retry!dataId:{}, group:{},url:{},taskId:{},retryTimes:{}", - receivedData.getDataId(), receivedData.getGroup(), targetUrl,getTaskId(),retryTimes); + LOGGER.error("Retry Push ReceivedData error, connect be null or disconnected,stop retry!dataId:{}, group:{},url:{},taskId:{},dataPush:{},retryTimes:{}", + receivedData.getDataId(), receivedData.getGroup(), targetUrl,getTaskId(),dataPush,retryTimes); } } else { - LOGGER.error("Retry Push ReceivedData times have exceeded!dataId:{}, group:{},url:{},taskId:{},retryTimes:{}", - receivedData.getDataId(), receivedData.getGroup(), targetUrl,getTaskId(),retryTimes); + LOGGER.error("Retry Push ReceivedData times have exceeded!dataId:{}, group:{},url:{},taskId:{},dataPush:{},retryTimes:{}", + receivedData.getDataId(), receivedData.getGroup(), targetUrl,getTaskId(),dataPush,retryTimes); } } } @@ -191,6 +198,11 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (obj instanceof Map) { @@ -210,12 +222,24 @@ public void setTaskEvent(TaskEvent taskEvent) { } } - taskClosure = taskEvent.getTaskClosure(); + if (receivedData != null && receivedData.getData() != null) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + Map> map = receivedData.getData(); + if (!map.isEmpty()) { - if (taskClosure instanceof PushTaskClosure) { - ((PushTaskClosure) taskClosure).addTask(this); + for (Map.Entry> entry1 : map.entrySet()) { + sb.append(entry1.getKey()).append("="); + int size = entry1.getValue() != null ? entry1.getValue().size() : 0; + sb.append(size).append(","); + } + } + sb.append("]"); + dataPush = sb.toString(); } + taskClosure = taskEvent.getTaskClosure(); + subscribers = (Collection) taskEvent .getAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS); //taskClosure must confirm all subscriber push success diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SessionRegisterDataTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SessionRegisterDataTask.java index 0640760ed..925bc27a5 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SessionRegisterDataTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SessionRegisterDataTask.java @@ -64,6 +64,12 @@ public boolean checkRetryTimes() { @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (obj instanceof URL) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberMultiFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberMultiFetchTask.java index ffb1495c0..b2b35d1e3 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberMultiFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberMultiFetchTask.java @@ -63,6 +63,11 @@ public SubscriberMultiFetchTask(SessionServerConfig sessionServerConfig, @Override public void setTaskEvent(TaskEvent taskEvent) { + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof String)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberRegisterFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberRegisterFetchTask.java index 815325337..9a539dd81 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberRegisterFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SubscriberRegisterFetchTask.java @@ -66,6 +66,12 @@ public long getExpiryTime() { @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof Subscriber)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/WatcherRegisterFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/WatcherRegisterFetchTask.java index ad5bdf1b0..441b9dd07 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/WatcherRegisterFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/WatcherRegisterFetchTask.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; /** * @@ -58,6 +59,8 @@ public class WatcherRegisterFetchTask extends AbstractSessionTask { private Watcher watcher; + private static final int TRY_COUNT = 3; + public WatcherRegisterFetchTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, MetaNodeService metaNodeService) { @@ -68,6 +71,12 @@ public WatcherRegisterFetchTask(SessionServerConfig sessionServerConfig, @Override public void setTaskEvent(TaskEvent taskEvent) { + + //taskId create from event + if (taskEvent.getTaskId() != null) { + setTaskId(taskEvent.getTaskId()); + } + Object obj = taskEvent.getEventObj(); if (!(obj instanceof Watcher)) { @@ -88,17 +97,30 @@ public void execute() { boolean isOldVersion = !ClientVersion.StoreData.equals(watcher.getClientVersion()); - ProvideData provideData = metaNodeService.fetchData(watcher.getDataInfoId()); - if (provideData != null) { - if (!isOldVersion) { - DataInfo dataInfo = DataInfo.valueOf(provideData.getDataInfoId()); - ReceivedConfigData receivedConfigData = ReceivedDataConverter - .getReceivedConfigData(provideData.getProvideData(), dataInfo, - provideData.getVersion()); - receivedConfigData.setConfiguratorRegistIds(subscriberRegisterIdList); - firePushTask(receivedConfigData); + ProvideData provideData = null; + + for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { + try { + provideData = metaNodeService.fetchData(watcher.getDataInfoId()); + break; + } catch (Exception e) { + randomDelay(3000); } } + + if (provideData == null) { + taskLogger.error("Fetch provider data error,set null value return.dataInfoId={}", + watcher.getDataId()); + provideData = new ProvideData(null, watcher.getDataInfoId(), null); + } + + if (!isOldVersion) { + DataInfo dataInfo = DataInfo.valueOf(provideData.getDataInfoId()); + ReceivedConfigData receivedConfigData = ReceivedDataConverter.getReceivedConfigData( + provideData.getProvideData(), dataInfo, provideData.getVersion()); + receivedConfigData.setConfiguratorRegistIds(subscriberRegisterIdList); + firePushTask(receivedConfigData); + } } private void firePushTask(ReceivedConfigData receivedConfigData) { @@ -114,6 +136,16 @@ public boolean checkRetryTimes() { return checkRetryTimes(sessionServerConfig.getSubscriberRegisterFetchRetryTimes()); } + private void randomDelay(int max) { + Random random = new Random(); + int randomNum = random.nextInt(max); + try { + Thread.sleep(randomNum); + } catch (InterruptedException e) { + taskLogger.error("[TimeUtil] random delay error", e); + } + } + @Override public String toString() { return "WATCHER_REGISTER_FETCH_TASK{" + "taskId='" + getTaskId() + '\'' + ", watcher=" diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java index 3fcf99a39..e3c0b65c1 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java @@ -70,11 +70,19 @@ public interface Interests extends DataManager { Collection getInterestDataInfoIds(); /** - * get subscribers whith specify dataInfo and scope,and order by source InetSocketAddress + * get subscribers whith specify dataInfo and scope,and group by source InetSocketAddress * @param dataInfoId * @param scope * @return */ Map> querySubscriberIndex(String dataInfoId, ScopeEnum scope); + + /** + * get subscriber by dataInfoId and registerId + * @param registerId + * @param dataInfoId + * @return + */ + Subscriber queryById(String registerId, String dataInfoId); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java index 68341371e..a1b0719be 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java @@ -180,6 +180,16 @@ public Map queryByConnectId(String connectId) { return connectIndex.get(connectId); } + public Subscriber queryById(String registerId, String dataInfoId) { + + Map subscribers = interests.get(dataInfoId); + + if (subscribers == null) { + return null; + } + return subscribers.get(registerId); + } + @Override public Collection getInterests(String dataInfoId) { Map subscribers = interests.get(dataInfoId); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberRegisterFetchTaskStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberRegisterFetchTaskStrategy.java index 0c7f1ac91..9ef7fdbe3 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberRegisterFetchTaskStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberRegisterFetchTaskStrategy.java @@ -108,8 +108,8 @@ private void firePush(ReceivedData receivedData, Subscriber subscriber, parameter.put(receivedData, subscriber.getSourceAddress()); TaskEvent taskEvent = new TaskEvent(parameter, TaskEvent.TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); - taskLogger.info("send {} taskURL:{},taskScope:{}", taskEvent.getTaskType(), - subscriber.getSourceAddress(), receivedData.getScope()); + taskLogger.info("send {} taskURL:{},taskScope:{},taskId:{}", taskEvent.getTaskType(), + subscriber.getSourceAddress(), receivedData.getScope(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -141,9 +141,9 @@ private void fireUserDataElementPushTask(Datum datum, Subscriber subscriber, taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, subscriber.getSourceAddress()); int size = datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={}", + taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},taskId={}", taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), - datum.getDataCenter(), size); + datum.getDataCenter(), size, taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } @@ -161,9 +161,9 @@ private void fireUserDataElementMultiPushTask(Datum datum, Subscriber subscriber int size = datum.getPubMap() != null ? datum.getPubMap().size() : 0; - taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={}", + taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},taskId={}", taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), - datum.getDataCenter(), size); + datum.getDataCenter(), size, taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); } } diff --git a/server/server/session/src/main/resources/logback-spring.xml b/server/server/session/src/main/resources/logback-spring.xml index 7df0a5104..96f141a5b 100644 --- a/server/server/session/src/main/resources/logback-spring.xml +++ b/server/server/session/src/main/resources/logback-spring.xml @@ -291,6 +291,12 @@ + + + + + + diff --git a/server/store/jraft/src/test/java/TestServiceStateMachine.java b/server/store/jraft/src/test/java/TestServiceStateMachine.java new file mode 100644 index 000000000..0503a7e84 --- /dev/null +++ b/server/store/jraft/src/test/java/TestServiceStateMachine.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.LeaderChangeContext; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.entity.RaftOutter.SnapshotMeta; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.alipay.sofa.registry.jraft.bootstrap.ServiceStateMachine; +import com.alipay.sofa.registry.jraft.command.ProcessRequest; +import com.alipay.sofa.registry.jraft.command.ProcessResponse; +import com.alipay.sofa.registry.jraft.processor.FollowerProcessListener; +import com.alipay.sofa.registry.jraft.processor.LeaderProcessListener; +import com.alipay.sofa.registry.jraft.processor.LeaderTaskClosure; +import com.alipay.sofa.registry.jraft.processor.Processor; +import com.alipay.sofa.registry.jraft.processor.SnapshotProcess; +import com.caucho.hessian.io.Hessian2Output; +import com.caucho.hessian.io.SerializerFactory; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.OneofDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import com.google.protobuf.UnknownFieldSet; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + * @author shangyu.wh + * @version 1.0: TestServiceStateMachine.java, v 0.1 2019-08-01 15:16 shangyu.wh Exp $ + */ +public class TestServiceStateMachine { + + @Test + public void testApply() { + ServiceStateMachine serviceStateMachine = ServiceStateMachine.getInstance(); + + Processor processor = Processor.getInstance(); + + processor.addWorker(TestServiceStateMachine.class.getSimpleName(), + TestServiceStateMachine.class, new TestServiceStateMachine()); + + AtomicInteger count = new AtomicInteger(); + + LeaderTaskClosure leaderTaskClosure = new LeaderTaskClosure(); + ProcessRequest processRequest = new ProcessRequest(); + processRequest + .setMethodArgs(new Object[] { TestServiceStateMachine.class.getSimpleName() }); + processRequest.setMethodArgSigs(new String[] { "java.lang.String" }); + processRequest.setMethodName("testMethod"); + processRequest.setServiceName(TestServiceStateMachine.class.getSimpleName()); + leaderTaskClosure.setRequest(processRequest); + + serviceStateMachine.onApply(new Iterator() { + + @Override + public boolean hasNext() { + if (count.get() > 0) + return false; + return true; + } + + @Override + public ByteBuffer next() { + count.getAndIncrement(); + return null; + } + + @Override + public ByteBuffer getData() { + return ByteBuffer.allocate(10); + } + + @Override + public long getIndex() { + return 0; + } + + @Override + public long getTerm() { + return 0; + } + + @Override + public Closure done() { + return leaderTaskClosure; + } + + @Override + public void setErrorAndRollback(long ntail, Status st) { + + } + }); + + Assert.assertEquals(TestServiceStateMachine.class.getSimpleName(), + ((ProcessResponse) leaderTaskClosure.getResponse()).getEntity()); + } + + @Test + public void testApply2() { + ServiceStateMachine serviceStateMachine = ServiceStateMachine.getInstance(); + + Processor processor = Processor.getInstance(); + + processor.addWorker(TestServiceStateMachine.class.getSimpleName(), + TestServiceStateMachine.class, new TestServiceStateMachine()); + + AtomicInteger count = new AtomicInteger(); + + LeaderTaskClosure leaderTaskClosure = new LeaderTaskClosure(); + ProcessRequest processRequest = new ProcessRequest(); + processRequest + .setMethodArgs(new Object[] { TestServiceStateMachine.class.getSimpleName() }); + processRequest.setMethodArgSigs(new String[] { "java.lang.String" }); + processRequest.setMethodName("testMethod"); + processRequest.setServiceName(TestServiceStateMachine.class.getSimpleName()); + leaderTaskClosure.setRequest(processRequest); + + final ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + Hessian2Output hessianOutput = new Hessian2Output(byteStream); + SerializerFactory serializerFactory = new SerializerFactory(); + hessianOutput.setSerializerFactory(serializerFactory); + try { + hessianOutput.writeObject(processRequest); + hessianOutput.close(); + } catch (IOException e) { + } + + serviceStateMachine.onApply(new Iterator() { + + @Override + public boolean hasNext() { + if (count.get() > 0) + return false; + return true; + } + + @Override + public ByteBuffer next() { + count.getAndIncrement(); + return null; + } + + @Override + public ByteBuffer getData() { + byte[] cmdBytes = byteStream.toByteArray(); + + ByteBuffer data = ByteBuffer.allocate(cmdBytes.length); + data.put(cmdBytes); + data.flip(); + return data; + } + + @Override + public long getIndex() { + return 0; + } + + @Override + public long getTerm() { + return 0; + } + + @Override + public Closure done() { + return null; + } + + @Override + public void setErrorAndRollback(long ntail, Status st) { + + } + }); + + } + + @Test + public void testOnSnapshotSave() throws InterruptedException { + ServiceStateMachine serviceStateMachine = ServiceStateMachine.getInstance(); + Processor processor = Processor.getInstance(); + LeaderTaskClosure leaderTaskClosure = new LeaderTaskClosure(); + AtomicReference status = new AtomicReference<>(); + leaderTaskClosure.setDone(statusIn -> { + status.set(statusIn); + }); + + SnapshotProcess process = new SnapshotProcess(){ + + @Override + public boolean save(String path) { + return true; + } + + @Override + public boolean load(String path) { + return true; + } + + @Override + public SnapshotProcess copy() { + return this; + } + + @Override + public Set getSnapshotFileNames() { + return new HashSet<>(Arrays.asList(new String[]{"aaa","sss"})); + } + }; + processor.addWorker(process.getClass().getSimpleName(),process.getClass(),process); + + serviceStateMachine.onSnapshotSave(new SnapshotWriter(){ + + @Override + public void close() throws IOException { + + } + + @Override + public String getPath() { + return "path"; + } + + @Override + public Set listFiles() { + return null; + } + + @Override + public Message getFileMeta(String fileName) { + return null; + } + + @Override + public boolean init(Void opts) { + return false; + } + + @Override + public void shutdown() { + + } + + @Override + public boolean saveMeta(SnapshotMeta meta) { + return false; + } + + public boolean addFile(String fileName){ + return true; + } + + @Override + public boolean addFile(String fileName, Message fileMeta) { + return true; + } + + @Override + public boolean removeFile(String fileName) { + return false; + } + + @Override + public void close(boolean keepDataOnError) throws IOException { + + } + },leaderTaskClosure); + + TimeUnit.MILLISECONDS.sleep(1000); + + Assert.assertTrue(status.get().isOk()); + } + + @Test + public void testOnSnapshotLoad() throws InterruptedException { + ServiceStateMachine serviceStateMachine = ServiceStateMachine.getInstance(); + Processor processor = Processor.getInstance(); + SnapshotProcess process = new SnapshotProcess() { + + @Override + public boolean save(String path) { + return true; + } + + @Override + public boolean load(String path) { + return true; + } + + @Override + public SnapshotProcess copy() { + return this; + } + + @Override + public Set getSnapshotFileNames() { + return new HashSet<>(Arrays.asList(new String[] { "aaa", "sss" })); + } + }; + processor.addWorker(process.getClass().getSimpleName(), process.getClass(), process); + + boolean ret = serviceStateMachine.onSnapshotLoad(new SnapshotReader() { + + @Override + public void close() throws IOException { + + } + + @Override + public String getPath() { + return "Path"; + } + + @Override + public Set listFiles() { + return null; + } + + @Override + public Message getFileMeta(String fileName) { + return new Message() { + @Override + public Message getDefaultInstanceForType() { + return null; + } + + @Override + public boolean isInitialized() { + return false; + } + + @Override + public List findInitializationErrors() { + return null; + } + + @Override + public String getInitializationErrorString() { + return null; + } + + @Override + public Descriptor getDescriptorForType() { + return null; + } + + @Override + public Map getAllFields() { + return null; + } + + @Override + public boolean hasOneof(OneofDescriptor oneofDescriptor) { + return false; + } + + @Override + public FieldDescriptor getOneofFieldDescriptor(OneofDescriptor oneofDescriptor) { + return null; + } + + @Override + public boolean hasField(FieldDescriptor fieldDescriptor) { + return false; + } + + @Override + public Object getField(FieldDescriptor fieldDescriptor) { + return null; + } + + @Override + public int getRepeatedFieldCount(FieldDescriptor fieldDescriptor) { + return 0; + } + + @Override + public Object getRepeatedField(FieldDescriptor fieldDescriptor, int i) { + return null; + } + + @Override + public UnknownFieldSet getUnknownFields() { + return null; + } + + @Override + public void writeTo(CodedOutputStream codedOutputStream) throws IOException { + + } + + @Override + public int getSerializedSize() { + return 0; + } + + @Override + public Parser getParserForType() { + return null; + } + + @Override + public ByteString toByteString() { + return null; + } + + @Override + public byte[] toByteArray() { + return new byte[0]; + } + + @Override + public void writeTo(OutputStream outputStream) throws IOException { + + } + + @Override + public void writeDelimitedTo(OutputStream outputStream) throws IOException { + + } + + @Override + public boolean equals(Object o) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public String toString() { + return null; + } + + @Override + public Builder newBuilderForType() { + return null; + } + + @Override + public Builder toBuilder() { + return null; + } + }; + } + + @Override + public boolean init(Void opts) { + return false; + } + + @Override + public void shutdown() { + + } + + @Override + public SnapshotMeta load() { + return null; + } + + @Override + public String generateURIForCopy() { + return null; + } + }); + + TimeUnit.MILLISECONDS.sleep(1000); + + Assert.assertTrue(ret); + } + + @Test + public void testRemain() throws InterruptedException { + ServiceStateMachine serviceStateMachine = ServiceStateMachine.getInstance(); + + AtomicInteger leaderstart = new AtomicInteger(); + AtomicInteger leaderstop = new AtomicInteger(); + + serviceStateMachine.setFollowerProcessListener(new FollowerProcessListener() { + @Override + public void startProcess(PeerId leader) { + leaderstop.getAndIncrement(); + } + + @Override + public void stopProcess(PeerId leader) { + leaderstop.getAndIncrement(); + } + }); + + serviceStateMachine.setLeaderProcessListener(new LeaderProcessListener() { + @Override + public void startProcess() { + leaderstart.getAndIncrement(); + } + + @Override + public void stopProcess() { + leaderstart.getAndIncrement(); + } + }); + + serviceStateMachine.onLeaderStart(1); + TimeUnit.MILLISECONDS.sleep(500); + Assert.assertEquals(leaderstart.get(), 1); + + serviceStateMachine.onLeaderStop(Status.OK()); + TimeUnit.MILLISECONDS.sleep(500); + Assert.assertTrue(leaderstart.get() == 2); + + serviceStateMachine.onStartFollowing(new LeaderChangeContext(new PeerId(), 1, Status.OK())); + TimeUnit.MILLISECONDS.sleep(500); + Assert.assertTrue(leaderstop.get() == 1); + + serviceStateMachine.onStopFollowing(new LeaderChangeContext(new PeerId(), 1, Status.OK())); + TimeUnit.MILLISECONDS.sleep(500); + Assert.assertTrue(leaderstop.get() == 2); + + } + + public String testMethod(String ss) { + return ss; + } +} \ No newline at end of file diff --git a/test/src/test/java/com/alipay/sofa/registry/test/resource/data/DataDigestResourceTest.java b/test/src/test/java/com/alipay/sofa/registry/test/resource/data/DataDigestResourceTest.java index 2db2e7361..5bacd905e 100644 --- a/test/src/test/java/com/alipay/sofa/registry/test/resource/data/DataDigestResourceTest.java +++ b/test/src/test/java/com/alipay/sofa/registry/test/resource/data/DataDigestResourceTest.java @@ -33,6 +33,7 @@ import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import java.util.HashMap; +import java.util.List; import java.util.Map; import static com.alipay.sofa.registry.client.constants.ValueConstants.DEFAULT_GROUP; @@ -53,10 +54,11 @@ public static void beforeClass() throws Exception { clientOff(); dataId = "test-dataId-" + System.currentTimeMillis(); value = "DataDigestResourceTest"; + Thread.sleep(1000L); PublisherRegistration registration = new PublisherRegistration(dataId); registryClient1.register(registration, value); - Thread.sleep(500L); + Thread.sleep(1000L); SubscriberRegistration subReg = new SubscriberRegistration(dataId, new MySubscriberDataObserver()); @@ -117,4 +119,28 @@ public void testGetDatumCount() { .request(APPLICATION_JSON).get(String.class); assertTrue(countResult.contains("[Publisher] size of publisher in DefaultDataCenter is 1")); } + + @Test + public void testGetServerListAll() throws Exception { + + Map> sessionMap = dataChannel.getWebTarget() + .path("digest/session/serverList/query").request(APPLICATION_JSON) + .get(new GenericType>>() { + }); + assertEquals(1, sessionMap.size()); + assertEquals(1, sessionMap.get(LOCAL_DATACENTER).size()); + assertTrue(sessionMap.get(LOCAL_DATACENTER).get(0).contains(LOCAL_ADDRESS)); + + Map> metaMap = dataChannel.getWebTarget() + .path("digest/meta/serverList/query").request(APPLICATION_JSON) + .get(new GenericType>>() { + }); + assertEquals(metaMap.get(LOCAL_DATACENTER).get(0), LOCAL_ADDRESS); + + Map> dataMap = dataChannel.getWebTarget() + .path("digest/data/serverList/query").request(APPLICATION_JSON) + .get(new GenericType>>() { + }); + assertEquals(dataMap.size(), 0); + } } \ No newline at end of file diff --git a/test/src/test/java/com/alipay/sofa/registry/test/sync/DataSyncTest.java b/test/src/test/java/com/alipay/sofa/registry/test/sync/DataSyncTest.java index 2dcbb7416..0bccb8810 100644 --- a/test/src/test/java/com/alipay/sofa/registry/test/sync/DataSyncTest.java +++ b/test/src/test/java/com/alipay/sofa/registry/test/sync/DataSyncTest.java @@ -16,7 +16,6 @@ */ package com.alipay.sofa.registry.test.sync; -import com.alipay.remoting.Connection; import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration; import com.alipay.sofa.registry.common.model.CommonResponse; import com.alipay.sofa.registry.common.model.dataserver.NotifyDataSyncRequest; @@ -53,6 +52,7 @@ public class DataSyncTest extends BaseIntegrationTest { private static DataServerConnectionFactory dataServerConnectionFactory; private static Server dataSyncServer; private static String remoteIP; + private static BoltChannel boltChannel; @BeforeClass public static void beforeClass() throws Exception { @@ -67,20 +67,24 @@ public static void beforeClass() throws Exception { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), TEST_SYNC_PORT), new ChannelHandler[] { new MockSyncDataHandler(), dataApplicationContext.getBean(DataSyncServerConnectionHandler.class) }); - remoteIP = ((BoltChannel) dataNodeExchanger.connect(new URL(LOCAL_ADDRESS, TEST_SYNC_PORT))) - .getConnection().getLocalIP(); + boltChannel = (BoltChannel) dataNodeExchanger + .connect(new URL(LOCAL_ADDRESS, TEST_SYNC_PORT)); + System.out.println("testsyncserver remote connect remote:" + + boltChannel.getConnection().getRemoteAddress() + " local:" + + boltChannel.getConnection().getLocalAddress()); + remoteIP = boltChannel.getConnection().getLocalIP(); Thread.sleep(500); } @Test public void doTest() throws Exception { // post sync data request - Connection connection = dataServerConnectionFactory.getConnection(remoteIP); NotifyDataSyncRequest request = new NotifyDataSyncRequest(DataInfo.toDataInfoId( MockSyncDataHandler.dataId, DEFAULT_INSTANCE_ID, DEFAULT_GROUP), LOCAL_DATACENTER, MockSyncDataHandler.version, DataSourceTypeEnum.SYNC.toString()); - CommonResponse commonResponse = (CommonResponse) dataSyncServer.sendSync( - dataSyncServer.getChannel(connection.getRemoteAddress()), request, 1000); + CommonResponse commonResponse = (CommonResponse) dataSyncServer.sendSync(dataSyncServer + .getChannel(new URL(remoteIP, boltChannel.getConnection().getLocalPort())), request, + 1000); assertTrue(commonResponse.isSuccess()); // register Subscriber