diff --git a/README.md b/README.md
index 670bb26f1..1d3535c42 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。SOFARegistry 最早源自于淘宝的 ConfigServer,十年来,随着蚂蚁金服的业务发展,注册中心架构已经演进至第五代。目前 SOFARegistry 不仅全面服务于蚂蚁金服的自有业务,还随着蚂蚁金融科技服务众多合作伙伴,同时也兼容开源生态。SOFARegistry 采用 AP 架构,支持秒级时效性推送,同时采用分层架构支持无限水平扩展。
-## 功能特性
+## 功能特性
- 支持服务发布与服务订阅
- 支持服务变更时的主动推送
diff --git a/client/all/pom.xml b/client/all/pom.xml
index 7924275e0..9dc05f8f9 100644
--- a/client/all/pom.xml
+++ b/client/all/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-client-all
- 5.4.0-SNAPSHOT
+ 5.4.0
${project.groupId}:${project.artifactId}
http://github.com/alipay/sofa-registry
diff --git a/client/api/pom.xml b/client/api/pom.xml
index 80d6d32ee..2bf83a14d 100644
--- a/client/api/pom.xml
+++ b/client/api/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/client/impl/pom.xml b/client/impl/pom.xml
index b9078b693..141863f09 100644
--- a/client/impl/pom.xml
+++ b/client/impl/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/client/log/pom.xml b/client/log/pom.xml
index 1d71d2f24..18d70036b 100644
--- a/client/log/pom.xml
+++ b/client/log/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/client/pom.xml b/client/pom.xml
index a9569fdcc..640de410e 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -7,7 +7,7 @@
com.alipay.sofa
registry-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index 97a9770fa..c524641da 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index e4f048e1d..0167137f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.alipay.sofa
registry-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
pom
${project.groupId}:${project.artifactId}
diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml
index 0bf111bde..2918f0959 100644
--- a/server/common/model/pom.xml
+++ b/server/common/model/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-common
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/ServerDataBox.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/ServerDataBox.java
index 75dc51877..f65d8b942 100644
--- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/ServerDataBox.java
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/ServerDataBox.java
@@ -16,8 +16,6 @@
*/
package com.alipay.sofa.registry.common.model;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -25,6 +23,8 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
/**
*
* @author zhuoyu.sjw
@@ -93,7 +93,7 @@ public Object getObject() {
* @throws ClassNotFoundException the class not found exception
*/
public Object extract() throws IOException, ClassNotFoundException {
- if (isInBytes()) {
+ if (object == null && isInBytes()) {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
if (serialization != SERIALIZED_BY_JAVA) {
throw new IOException("Unsupported serialization type: " + serialization);
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/NotifyProvideDataChange.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/NotifyProvideDataChange.java
index a72d16d12..3fa7f078c 100644
--- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/NotifyProvideDataChange.java
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/NotifyProvideDataChange.java
@@ -16,9 +16,11 @@
*/
package com.alipay.sofa.registry.common.model.metaserver;
-import com.alipay.sofa.registry.common.model.Node.NodeType;
-
import java.io.Serializable;
+import java.util.Set;
+
+import com.alipay.sofa.registry.common.model.Node.NodeType;
+import com.google.common.collect.Sets;
/**
*
@@ -27,13 +29,13 @@
*/
public class NotifyProvideDataChange implements Serializable {
- private String dataInfoId;
+ private String dataInfoId;
- private Long version;
+ private Long version;
- private DataOperator dataOperator;
+ private DataOperator dataOperator;
- private NodeType nodeType = NodeType.SESSION;
+ private Set nodeTypes;
/**
* constructor
@@ -42,9 +44,15 @@ public class NotifyProvideDataChange implements Serializable {
* @param dataOperator
*/
public NotifyProvideDataChange(String dataInfoId, Long version, DataOperator dataOperator) {
+ this(dataInfoId, version, dataOperator, Sets.newHashSet(NodeType.SESSION));
+ }
+
+ public NotifyProvideDataChange(String dataInfoId, Long version, DataOperator dataOperator,
+ Set nodeTypes) {
this.dataInfoId = dataInfoId;
this.version = version;
this.dataOperator = dataOperator;
+ this.nodeTypes = nodeTypes;
}
/**
@@ -106,17 +114,17 @@ public void setDataOperator(DataOperator dataOperator) {
*
* @return property value of nodeType
*/
- public NodeType getNodeType() {
- return nodeType;
+ public Set getNodeTypes() {
+ return nodeTypes;
}
/**
* Setter method for property nodeType.
*
- * @param nodeType value to be assigned to property nodeType
+ * @param nodeTypes value to be assigned to property nodeType
*/
- public void setNodeType(NodeType nodeType) {
- this.nodeType = nodeType;
+ public void setNodeTypes(Set nodeTypes) {
+ this.nodeTypes = nodeTypes;
}
@Override
@@ -125,7 +133,7 @@ public String toString() {
sb.append("dataInfoId='").append(dataInfoId).append('\'');
sb.append(", version=").append(version);
sb.append(", dataOperator=").append(dataOperator);
- sb.append(", nodeType=").append(nodeType);
+ sb.append(", nodeTypes=").append(nodeTypes);
sb.append('}');
return sb.toString();
}
diff --git a/server/common/pom.xml b/server/common/pom.xml
index 49f73e00a..1e9880e2f 100644
--- a/server/common/pom.xml
+++ b/server/common/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml
index 241cac4b2..bc5dca7c4 100644
--- a/server/common/util/pom.xml
+++ b/server/common/util/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-common
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/consistency/pom.xml b/server/consistency/pom.xml
index 9a6f5d060..bf2695035 100644
--- a/server/consistency/pom.xml
+++ b/server/consistency/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/distribution/data/pom.xml b/server/distribution/data/pom.xml
index c08fc60c0..f37a13820 100644
--- a/server/distribution/data/pom.xml
+++ b/server/distribution/data/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-distribution
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/distribution/integration/pom.xml b/server/distribution/integration/pom.xml
index 8c2726175..7d2cd007e 100644
--- a/server/distribution/integration/pom.xml
+++ b/server/distribution/integration/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-distribution
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/distribution/meta/pom.xml b/server/distribution/meta/pom.xml
index 55bc020c7..4f0457f1b 100644
--- a/server/distribution/meta/pom.xml
+++ b/server/distribution/meta/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-distribution
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml
index 0b3ebafb5..ffcc4df42 100644
--- a/server/distribution/pom.xml
+++ b/server/distribution/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-server-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/distribution/session/pom.xml b/server/distribution/session/pom.xml
index 84d017a4f..5f3c463b7 100644
--- a/server/distribution/session/pom.xml
+++ b/server/distribution/session/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-distribution
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/pom.xml b/server/pom.xml
index b59a5ee33..d2fe09dab 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -7,7 +7,7 @@
com.alipay.sofa
registry-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml
index 19cee7447..4b12641af 100644
--- a/server/remoting/api/pom.xml
+++ b/server/remoting/api/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/Server.java b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/Server.java
index 586e97d94..5b3de39d7 100644
--- a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/Server.java
+++ b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/Server.java
@@ -16,11 +16,11 @@
*/
package com.alipay.sofa.registry.remoting;
-import com.alipay.sofa.registry.common.model.store.URL;
-
import java.net.InetSocketAddress;
import java.util.Collection;
+import com.alipay.sofa.registry.common.model.store.URL;
+
/**
*
* @author shangyu.wh
diff --git a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/RequestException.java b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/RequestException.java
index 6f00d8edb..1d5fa3e06 100644
--- a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/RequestException.java
+++ b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/RequestException.java
@@ -25,7 +25,8 @@
*/
public class RequestException extends Exception {
- private Request request;
+ private static final int MAX_BODY_SIZE = 512;
+ private Request request;
/**
* constructor
@@ -81,8 +82,13 @@ public RequestException(Throwable cause) {
public String getMessage() {
StringBuilder sb = new StringBuilder();
if (request != null) {
+ String requestBody = request.getRequestBody() != null ? request.getRequestBody()
+ .toString() : "null";
+ if (requestBody.length() > MAX_BODY_SIZE) {
+ requestBody = requestBody.substring(0, MAX_BODY_SIZE);
+ }
sb.append("request url: ").append(request.getRequestUrl()).append(", body: ")
- .append(request.getRequestBody()).append(", ");
+ .append(requestBody).append(", ");
}
sb.append(super.getMessage());
return sb.toString();
diff --git a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/message/Request.java b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/message/Request.java
index 7bf15fff0..473ed2fc4 100644
--- a/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/message/Request.java
+++ b/server/remoting/api/src/main/java/com/alipay/sofa/registry/remoting/exchange/message/Request.java
@@ -16,11 +16,11 @@
*/
package com.alipay.sofa.registry.remoting.exchange.message;
+import java.util.concurrent.atomic.AtomicInteger;
+
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.remoting.CallbackHandler;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* The interface Request.
*
@@ -60,4 +60,8 @@ default CallbackHandler getCallBackHandler() {
default AtomicInteger getRetryTimes() {
return new AtomicInteger();
}
+
+ default Integer getTimeout() {
+ return null;
+ }
}
\ No newline at end of file
diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml
index b747b68a9..6da3b7f87 100644
--- a/server/remoting/bolt/pom.xml
+++ b/server/remoting/bolt/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/remoting/bolt/src/main/java/com/alipay/sofa/registry/remoting/bolt/BoltClient.java b/server/remoting/bolt/src/main/java/com/alipay/sofa/registry/remoting/bolt/BoltClient.java
index cb206d35c..b7dc0b04d 100644
--- a/server/remoting/bolt/src/main/java/com/alipay/sofa/registry/remoting/bolt/BoltClient.java
+++ b/server/remoting/bolt/src/main/java/com/alipay/sofa/registry/remoting/bolt/BoltClient.java
@@ -120,7 +120,6 @@ public Channel connect(URL url) {
throw new IllegalArgumentException("Create connection url can not be null!");
}
try {
- getBoltConnection(rpcClient, url);
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml
index 76c8a0ad4..343ef66cb 100644
--- a/server/remoting/http/pom.xml
+++ b/server/remoting/http/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/remoting/http/src/main/java/com/alipay/sofa/registry/remoting/jersey/JerseyJettyServer.java b/server/remoting/http/src/main/java/com/alipay/sofa/registry/remoting/jersey/JerseyJettyServer.java
index 672422037..0e08653fa 100644
--- a/server/remoting/http/src/main/java/com/alipay/sofa/registry/remoting/jersey/JerseyJettyServer.java
+++ b/server/remoting/http/src/main/java/com/alipay/sofa/registry/remoting/jersey/JerseyJettyServer.java
@@ -16,14 +16,14 @@
*/
package com.alipay.sofa.registry.remoting.jersey;
-import com.alipay.sofa.registry.common.model.store.URL;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.CallbackHandler;
-import com.alipay.sofa.registry.remoting.Channel;
-import com.alipay.sofa.registry.remoting.ChannelHandler;
-import com.alipay.sofa.registry.remoting.Server;
-import com.alipay.sofa.registry.remoting.jersey.jetty.server.HttpConnectionCustomFactory;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.ProcessingException;
+
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -35,13 +35,13 @@
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.spi.Container;
-import javax.ws.rs.ProcessingException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.alipay.sofa.registry.common.model.store.URL;
+import com.alipay.sofa.registry.log.Logger;
+import com.alipay.sofa.registry.log.LoggerFactory;
+import com.alipay.sofa.registry.remoting.CallbackHandler;
+import com.alipay.sofa.registry.remoting.Channel;
+import com.alipay.sofa.registry.remoting.Server;
+import com.alipay.sofa.registry.remoting.jersey.jetty.server.HttpConnectionCustomFactory;
/**
*
diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml
index a75591b4f..03ac1717b 100644
--- a/server/remoting/pom.xml
+++ b/server/remoting/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml
index cfbdee9f6..fc1d25c50 100644
--- a/server/server/data/pom.xml
+++ b/server/server/data/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server
- 5.4.0-SNAPSHOT
+ 5.4.0
../pom.xml
4.0.0
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java
index 7b8a2d590..d0aa2506f 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerBeanConfiguration.java
@@ -25,7 +25,6 @@
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
@@ -38,6 +37,7 @@
import com.alipay.sofa.registry.server.data.cache.CacheDigestTask;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
+import com.alipay.sofa.registry.server.data.cache.LocalDatumStorage;
import com.alipay.sofa.registry.server.data.change.DataChangeHandler;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.notify.BackUpNotifier;
@@ -80,6 +80,9 @@
import com.alipay.sofa.registry.server.data.remoting.metaserver.handler.NotifyProvideDataChangeHandler;
import com.alipay.sofa.registry.server.data.remoting.metaserver.handler.ServerChangeHandler;
import com.alipay.sofa.registry.server.data.remoting.metaserver.handler.StatusConfirmHandler;
+import com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.ProvideDataProcessor;
+import com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.ProvideDataProcessorManager;
+import com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.processor.DatumExpireProvideDataProcessor;
import com.alipay.sofa.registry.server.data.remoting.metaserver.task.ConnectionRefreshMetaTask;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect.DisconnectEventHandler;
@@ -137,14 +140,26 @@ public DataNodeStatus dataNodeStatus() {
return new DataNodeStatus();
}
+ @Bean(name = "PropertySplitter")
+ public PropertySplitter propertySplitter() {
+ return new PropertySplitter();
+ }
+
+ }
+
+ @Configuration
+ public static class DataServerStorageConfiguration {
+
@Bean
+ @ConditionalOnMissingBean
public DatumCache datumCache() {
return new DatumCache();
}
- @Bean(name = "PropertySplitter")
- public PropertySplitter propertySplitter() {
- return new PropertySplitter();
+ @Bean
+ @ConditionalOnMissingBean
+ public LocalDatumStorage localDatumStorage() {
+ return new LocalDatumStorage();
}
}
@@ -161,6 +176,7 @@ public CacheDigestTask cacheDigestTask() {
@Configuration
public static class SessionRemotingConfiguration {
+
@Bean
public Exchange jerseyExchange() {
return new JerseyExchange();
@@ -317,7 +333,8 @@ public AbstractServerHandler syncDataHandler() {
}
@Bean
- public AbstractClientHandler notifyDataSyncHandler() {
+ @ConditionalOnMissingBean
+ public NotifyDataSyncHandler notifyDataSyncHandler() {
return new NotifyDataSyncHandler();
}
@@ -340,6 +357,32 @@ public AbstractClientHandler statusConfirmHandler() {
public NotifyProvideDataChangeHandler notifyProvideDataChangeHandler() {
return new NotifyProvideDataChangeHandler();
}
+
+ @Bean(name = "afterWorkProcessors")
+ public List afterWorkingProcessors() {
+ List list = new ArrayList<>();
+ list.add(renewDatumHandler());
+ list.add(datumLeaseManager());
+ list.add(disconnectEventHandler());
+ list.add(notifyDataSyncHandler());
+ return list;
+ }
+
+ @Bean
+ public AfterWorkingProcessHandler afterWorkingProcessHandler() {
+ return new AfterWorkingProcessHandler();
+ }
+
+ @Bean
+ public DatumLeaseManager datumLeaseManager() {
+ return new DatumLeaseManager();
+ }
+
+ @Bean
+ public DisconnectEventHandler disconnectEventHandler() {
+ return new DisconnectEventHandler();
+ }
+
}
@Configuration
@@ -377,6 +420,7 @@ public List dataChangeNotifiers() {
list.add(backUpNotifier());
return list;
}
+
}
@Configuration
@@ -401,6 +445,7 @@ public Scheduler syncDataScheduler() {
public StoreServiceFactory storeServiceFactory() {
return new StoreServiceFactory();
}
+
}
@Configuration
@@ -431,21 +476,11 @@ public LocalDataServerCleanHandler localDataServerCleanHandler() {
return new LocalDataServerCleanHandler();
}
- @Bean
- public DatumLeaseManager datumLeaseManager() {
- return new DatumLeaseManager();
- }
-
@Bean
public GetSyncDataHandler getSyncDataHandler() {
return new GetSyncDataHandler();
}
- @Bean
- public DisconnectEventHandler disconnectEventHandler() {
- return new DisconnectEventHandler();
- }
-
@Bean
public EventCenter eventCenter() {
return new EventCenter();
@@ -455,6 +490,7 @@ public EventCenter eventCenter() {
public DataChangeEventCenter dataChangeEventCenter() {
return new DataChangeEventCenter();
}
+
}
@Configuration
@@ -488,6 +524,7 @@ public List tasks() {
public IMetaServerService metaServerService() {
return new DefaultMetaServiceImpl();
}
+
}
@Configuration
@@ -506,51 +543,18 @@ public HealthResource healthResource() {
}
@Bean
+ @ConditionalOnMissingBean
public DataDigestResource dataDigestResource() {
return new DataDigestResource();
}
- }
-
- @Configuration
- public static class AfterWorkingProcessConfiguration {
-
- @Autowired
- DisconnectEventHandler disconnectEventHandler;
-
- @Autowired
- AbstractClientHandler notifyDataSyncHandler;
-
- @Autowired
- RenewDatumHandler renewDatumHandler;
-
- @Autowired
- DatumLeaseManager datumLeaseManager;
-
- @Bean(name = "afterWorkProcessors")
- public List afterWorkingProcessors() {
- List list = new ArrayList<>();
- list.add(renewDatumHandler);
- list.add(datumLeaseManager);
- list.add(disconnectEventHandler);
- list.add((NotifyDataSyncHandler) notifyDataSyncHandler);
- return list;
- }
-
- @Bean
- public AfterWorkingProcessHandler afterWorkingProcessHandler() {
- return new AfterWorkingProcessHandler();
- }
}
@Configuration
public static class ExecutorConfiguration {
- @Autowired
- DataServerConfig dataServerConfig;
-
@Bean(name = "publishProcessorExecutor")
- public ThreadPoolExecutor publishProcessorExecutor() {
+ public ThreadPoolExecutor publishProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("PublishProcessorExecutor",
dataServerConfig.getPublishExecutorMinPoolSize(),
dataServerConfig.getPublishExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
@@ -559,7 +563,7 @@ public ThreadPoolExecutor publishProcessorExecutor() {
}
@Bean(name = "renewDatumProcessorExecutor")
- public ThreadPoolExecutor renewDatumProcessorExecutor() {
+ public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor",
dataServerConfig.getRenewDatumExecutorMinPoolSize(),
dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
@@ -568,7 +572,7 @@ public ThreadPoolExecutor renewDatumProcessorExecutor() {
}
@Bean(name = "getDataProcessorExecutor")
- public ThreadPoolExecutor getDataProcessorExecutor() {
+ public ThreadPoolExecutor getDataProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("GetDataProcessorExecutor",
dataServerConfig.getGetDataExecutorMinPoolSize(),
dataServerConfig.getGetDataExecutorMaxPoolSize(),
@@ -578,4 +582,23 @@ public ThreadPoolExecutor getDataProcessorExecutor() {
}
}
+
+ @Configuration
+ public static class DataProvideDataConfiguration {
+
+ @Bean
+ public ProvideDataProcessor provideDataProcessorManager() {
+ return new ProvideDataProcessorManager();
+ }
+
+ @Bean
+ public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
+ ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();
+ ((ProvideDataProcessorManager) provideDataProcessorManager)
+ .addProvideDataProcessor(datumExpireProvideDataProcessor);
+ return datumExpireProvideDataProcessor;
+ }
+
+ }
+
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java
index 375656dc2..f81deebf5 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/bootstrap/DataServerConfig.java
@@ -16,17 +16,18 @@
*/
package com.alipay.sofa.registry.server.data.bootstrap;
-import com.alipay.sofa.registry.net.NetUtil;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import com.alipay.sofa.registry.net.NetUtil;
+
/**
*
*
@@ -118,6 +119,10 @@ public class DataServerConfig {
private int renewEnableDelaySec = 30;
+ private int dataSyncDelayTimeout = 1000;
+
+ private int dataSyncNotifyRetry = 3;
+
/**
* constructor
* @param commonConfig
@@ -126,6 +131,10 @@ public DataServerConfig(CommonConfig commonConfig) {
this.commonConfig = commonConfig;
}
+ public boolean isLocalDataCenter(String dataCenter) {
+ return commonConfig.getLocalDataCenter().equals(dataCenter);
+ }
+
/**
* Getter method for property renewEnableDelaySec.
*
@@ -803,8 +812,45 @@ public void setSessionServerNotifierRetryExecutorQueueSize(int sessionServerNoti
this.sessionServerNotifierRetryExecutorQueueSize = sessionServerNotifierRetryExecutorQueueSize;
}
+ /**
+ * Getter method for property dataSyncDelayTimeout.
+ *
+ * @return property value of dataSyncDelayTimeout
+ */
+ public int getDataSyncDelayTimeout() {
+ return dataSyncDelayTimeout;
+ }
+
+ /**
+ * Setter method for property dataSyncDelayTimeout .
+ *
+ * @param dataSyncDelayTimeout value to be assigned to property dataSyncDelayTimeout
+ */
+ public void setDataSyncDelayTimeout(int dataSyncDelayTimeout) {
+ this.dataSyncDelayTimeout = dataSyncDelayTimeout;
+ }
+
+ /**
+ * Getter method for property dataSyncNotifyRetry.
+ *
+ * @return property value of dataSyncNotifyRetry
+ */
+ public int getDataSyncNotifyRetry() {
+ return dataSyncNotifyRetry;
+ }
+
+ /**
+ * Setter method for property dataSyncNotifyRetry .
+ *
+ * @param dataSyncNotifyRetry value to be assigned to property dataSyncNotifyRetry
+ */
+ public void setDataSyncNotifyRetry(int dataSyncNotifyRetry) {
+ this.dataSyncNotifyRetry = dataSyncNotifyRetry;
+ }
+
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
+
}
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java
index df481af15..64d3d9ee0 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DataServerCache.java
@@ -16,7 +16,20 @@
*/
package com.alipay.sofa.registry.server.data.cache;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
+import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
@@ -25,18 +38,8 @@
import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* cache of dataservers
@@ -371,24 +374,35 @@ public Long getDataCenterNewVersion(String dataCenter) {
/**
* calculate ConsistentHash base current data server list
- * @param dataCenter
- * @return
+ * do not return null, otherwise will lead to unexpected consequences
+ *
+ * 20200211 update: bugfix: empty dataServerList cause NPE because calculateOldConsistentHash return null
*/
public ConsistentHash calculateOldConsistentHash(String dataCenter) {
Map> dataServerMap = dataServerChangeItem.getServerMap();
Map dataNodeMap = dataServerMap.get(dataCenter);
+ Collection dataServerNodes;
if (dataNodeMap != null && !dataNodeMap.isEmpty()) {
-
- Collection dataServerNodes = dataNodeMap.values();
-
- ConsistentHash consistentHash = new ConsistentHash<>(
- dataServerConfig.getNumberOfReplicas(), dataServerNodes);
-
- return consistentHash;
+ dataServerNodes = dataNodeMap.values();
} else {
- LOGGER.warn("Calculate Old BackupTriad,old dataServer list is empty!");
- return null;
+ dataServerNodes = Lists.newArrayList(new DataNode(new URL(DataServerConfig.IP),
+ dataCenter));
+ LOGGER
+ .error("[calculateOldConsistentHash] Old dataServer list is empty, add on the local IP");
}
+ ConsistentHash consistentHash = new ConsistentHash<>(
+ dataServerConfig.getNumberOfReplicas(), dataServerNodes);
+
+ return consistentHash;
+ }
+
+ /**
+ * get all datacenters
+ *
+ * @return
+ */
+ public Set getAllDataCenters() {
+ return newDataServerChangeItem.getVersionMap().keySet();
}
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumCache.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumCache.java
index 493811b7e..53b577ada 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumCache.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumCache.java
@@ -17,61 +17,27 @@
package com.alipay.sofa.registry.server.data.cache;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
-import com.alipay.sofa.registry.common.model.store.WordCache;
-import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
-import com.alipay.sofa.registry.server.data.node.DataServerNode;
-import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
-
-import org.apache.commons.lang.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import com.alipay.sofa.registry.common.model.dataserver.Datum;
-import com.alipay.sofa.registry.common.model.store.Publisher;
-import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
-import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
-import com.alipay.sofa.registry.server.data.node.DataServerNode;
-import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
/**
* cache of datum, providing query function to the upper module
*
+ * @author kezhu.wukz
* @author qian.lqlq
* @version $Id: DatumCache.java, v 0.1 2017-12-06 20:50 qian.lqlq Exp $
*/
public class DatumCache {
- public static final long ERROR_DATUM_VERSION = -2L;
-
- /**
- * row: dataCenter
- * column: dataInfoId
- * value: datum
- */
- private final Map> DATUM_MAP = new ConcurrentHashMap<>();
-
- /**
- * all datum index
- *
- * row: ip:port
- * column: registerId
- * value: publisher
- */
- private final Map> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
-
@Autowired
- private DataServerConfig dataServerConfig;
+ private DatumStorage localDatumStorage;
/**
* get datum by specific dataCenter and dataInfoId
@@ -81,27 +47,37 @@ public class DatumCache {
* @return
*/
public Datum get(String dataCenter, String dataInfoId) {
- Map map = DATUM_MAP.get(dataCenter);
- if (map != null) {
- return map.get(dataInfoId);
- }
- return null;
+ return localDatumStorage.get(dataCenter, dataInfoId);
}
/**
- * get datum of all datercenters by dataInfoId
+ * get datum of all data centers by dataInfoId
*
* @param dataInfoId
* @return
*/
public Map get(String dataInfoId) {
Map datumMap = new HashMap<>();
- DATUM_MAP.forEach((dataCenter, datums) -> {
- Datum datum = datums.get(dataInfoId);
- if (datum != null) {
- datumMap.put(dataCenter, datum);
- }
- });
+
+ //local
+ Map localDataCenterToMap = localDatumStorage.get(dataInfoId);
+ datumMap.putAll(localDataCenterToMap);
+
+ return datumMap;
+ }
+
+ /**
+ * get datum of all data centers by dataInfoId
+ *
+ * @param dataInfoId
+ * @return
+ */
+ public Map getVersions(String dataInfoId) {
+ Map datumMap = new HashMap<>();
+
+ //local
+ Map localVersions = localDatumStorage.getVersions(dataInfoId);
+ datumMap.putAll(localVersions);
return datumMap;
}
@@ -132,7 +108,7 @@ public Map getDatumGroupByDataCenter(String dataCenter, String da
* @return
*/
public Map> getAll() {
- return DATUM_MAP;
+ return localDatumStorage.getAll();
}
/**
@@ -142,27 +118,14 @@ public Map> getAll() {
* @return
*/
public Map getByConnectId(String connectId) {
- return ALL_CONNECT_ID_INDEX.getOrDefault(connectId, null);
+ return localDatumStorage.getByConnectId(connectId);
}
/**
* get own publishers by connectId
*/
public Map getOwnByConnectId(String connectId) {
- Map ownPubMap = new HashMap<>();
- Map allPubMap = ALL_CONNECT_ID_INDEX.getOrDefault(connectId, null);
- if (allPubMap != null) {
- for (Map.Entry entry : allPubMap.entrySet()) {
- String registerId = entry.getKey();
- Publisher publisher = entry.getValue();
- DataServerNode dataServerNode = DataServerNodeFactory.computeDataServerNode(
- dataServerConfig.getLocalDataCenter(), publisher.getDataInfoId());
- if (DataServerConfig.IP.equals(dataServerNode.getIp())) {
- ownPubMap.put(registerId, publisher);
- }
- }
- }
- return ownPubMap;
+ return localDatumStorage.getOwnByConnectId(connectId);
}
/**
@@ -173,67 +136,7 @@ public Map getOwnByConnectId(String connectId) {
* @return the last version before datum changed, if datum is not exist, return null
*/
public MergeResult putDatum(DataChangeTypeEnum changeType, Datum datum) {
- MergeResult mergeResult;
- String dataCenter = datum.getDataCenter();
- String dataInfoId = datum.getDataInfoId();
- Map map = getDatumMapByDataCenter(dataCenter);
-
- //first put UnPublisher datum(dataId group instanceId is null),can not add to cache
- if (datum.getDataId() == null && map.get(dataInfoId) == null) {
- mergeResult = new MergeResult(ERROR_DATUM_VERSION, false);
- return mergeResult;
- }
-
- // filter out the unPubs of datum when first put.
- // Otherwise, "syncData" or "fetchData" when get Datum with unPubs, which will result something error
- boolean[] exists = { true };
- Datum cacheDatum = map.computeIfAbsent(dataInfoId, k -> filterUnPubs(exists, datum));
- if (!exists[0]) {
- Iterator> iterator = datum.getPubMap().entrySet().iterator();
- while (iterator.hasNext()) {
- Entry entry = iterator.next();
- Publisher publisher = entry.getValue();
- addToIndex(publisher);
- }
- mergeResult = new MergeResult(null, true);
- } else {
- if (changeType == DataChangeTypeEnum.MERGE) {
- mergeResult = mergeDatum(cacheDatum, datum);
- } else {
- Long lastVersion = coverDatum(datum);
- mergeResult = new MergeResult(lastVersion, true);
- }
- }
- return mergeResult;
- }
-
- /**
- * remove unPubs from datum
- */
- private Datum filterUnPubs(boolean[] exists, Datum datum) {
- Iterator> iterator = datum.getPubMap().entrySet().iterator();
- while (iterator.hasNext()) {
- Entry entry = iterator.next();
- Publisher publisher = entry.getValue();
- if (publisher instanceof UnPublisher) {
- //first put to cache,UnPublisher data must remove,not so got error pub data exist
- iterator.remove();
- }
- }
- exists[0] = false;
- return datum;
- }
-
- private Map getDatumMapByDataCenter(String dataCenter) {
- Map map = DATUM_MAP.get(dataCenter);
- if (map == null) {
- map = new ConcurrentHashMap<>();
- Map ret = DATUM_MAP.putIfAbsent(dataCenter, map);
- if (ret != null) {
- map = ret;
- }
- }
- return map;
+ return localDatumStorage.putDatum(changeType, datum);
}
/**
@@ -243,51 +146,7 @@ private Map getDatumMapByDataCenter(String dataCenter) {
* @return
*/
public boolean cleanDatum(String dataCenter, String dataInfoId) {
-
- Map datumMap = DATUM_MAP.get(dataCenter);
- if (datumMap != null) {
- Datum cacheDatum = datumMap.remove(dataInfoId);
- if (cacheDatum != null) {
- Map cachePubMap = cacheDatum.getPubMap();
-
- for (Entry cachePubEntry : cachePubMap.entrySet()) {
- String registerId = cachePubEntry.getKey();
- Publisher cachePub = cachePubEntry.getValue();
- //remove from cache
- if (cachePub != null) {
- cachePubMap.remove(registerId);
- removeFromIndex(cachePub);
- }
- }
- return true;
- }
- }
- return false;
- }
-
- /**
- * merge datum in cache
- *
- * @param datum
- * @return
- */
- private MergeResult mergeDatum(Datum cacheDatum, Datum datum) {
- boolean isChanged = false;
- Map cachePubMap = cacheDatum.getPubMap();
- Map pubMap = datum.getPubMap();
- for (Entry pubEntry : pubMap.entrySet()) {
- String registerId = pubEntry.getKey();
- Publisher pub = pubEntry.getValue();
- Publisher cachePub = cachePubMap.get(registerId);
- if (mergePublisher(pub, cachePubMap, cachePub)) {
- isChanged = true;
- }
- }
- Long lastVersion = cacheDatum.getVersion();
- if (isChanged) {
- cacheDatum.setVersion(datum.getVersion());
- }
- return new MergeResult(lastVersion, isChanged);
+ return localDatumStorage.cleanDatum(dataCenter, dataInfoId);
}
/**
@@ -295,131 +154,7 @@ private MergeResult mergeDatum(Datum cacheDatum, Datum datum) {
*/
public Datum putSnapshot(String dataInfoId, Map toBeDeletedPubMap,
Map snapshotPubMap) {
- // get cache datum
- Map datumMap = getDatumMapByDataCenter(dataServerConfig.getLocalDataCenter());
- Datum cacheDatum = datumMap.get(dataInfoId);
- if (cacheDatum == null) {
- cacheDatum = new Datum(dataInfoId, dataServerConfig.getLocalDataCenter());
- Publisher publisher = snapshotPubMap.values().iterator().next();
- cacheDatum.setInstanceId(publisher.getInstanceId());
- cacheDatum.setDataId(publisher.getDataId());
- cacheDatum.setGroup(publisher.getGroup());
- Datum datum = datumMap.putIfAbsent(dataInfoId, cacheDatum);
- if (datum != null) {
- cacheDatum = datum;
- }
- }
- //remove toBeDeletedPubMap from cacheDatum
- for (Entry toBeDeletedPubEntry : toBeDeletedPubMap.entrySet()) {
- String registerId = toBeDeletedPubEntry.getKey();
- Publisher toBeDeletedPub = toBeDeletedPubEntry.getValue();
- if (cacheDatum != null) {
- cacheDatum.getPubMap().remove(registerId);
- removeFromIndex(toBeDeletedPub);
- }
- }
- // add snapshotPubMap to cacheDatum
- for (Entry pubEntry : snapshotPubMap.entrySet()) {
- String registerId = pubEntry.getKey();
- Publisher snapshotPub = pubEntry.getValue();
- Publisher cachePub = cacheDatum.getPubMap().put(registerId, snapshotPub);
- if (cachePub != null) {
- removeFromIndex(cachePub);
- }
- addToIndex(snapshotPub);
- }
-
- cacheDatum.updateVersion();
-
- return cacheDatum;
- }
-
- private boolean mergePublisher(Publisher pub, Map cachePubMap,
- Publisher cachePub) {
- boolean isChanged = false;
- String registerId = pub.getRegisterId();
- if (pub instanceof UnPublisher) {
- //remove from cache
- if (cachePub != null && pub.getRegisterTimestamp() > cachePub.getRegisterTimestamp()) {
- cachePubMap.remove(registerId);
- removeFromIndex(cachePub);
- isChanged = true;
- }
- } else {
- long version = pub.getVersion();
- long cacheVersion = cachePub == null ? 0L : cachePub.getVersion();
- if (cacheVersion <= version) {
- cachePubMap.put(registerId, pub);
- // connectId and cacheConnectId may not be equal, so indexes need to be deleted and added, rather than overwritten directly.
- // why connectId and cacheConnectId may not be equal?
- // eg: sessionserver crash, client(RegistryClient but not ConfregClient) reconnect to other sessionserver, sourceAddress changed, version not changed
- removeFromIndex(cachePub);
- addToIndex(pub);
- isChanged = true;
- }
- }
- return isChanged;
- }
-
- /**
- *
- * @param datum
- * @return
- */
- private Long coverDatum(Datum datum) {
- String dataCenter = datum.getDataCenter();
- String dataInfoId = datum.getDataInfoId();
- Datum cacheDatum = DATUM_MAP.get(dataCenter).get(dataInfoId);
- if (datum.getVersion() != cacheDatum.getVersion()) {
- DATUM_MAP.get(dataCenter).put(dataInfoId, datum);
- Map pubMap = datum.getPubMap();
- Map cachePubMap = new HashMap<>(cacheDatum.getPubMap());
- for (Entry pubEntry : pubMap.entrySet()) {
- String registerId = pubEntry.getKey();
- Publisher pub = pubEntry.getValue();
- addToIndex(pub);
- Publisher cachePub = cachePubMap.get(registerId);
- if (cachePub != null && getConnectId(pub).equals(getConnectId(cachePub))) {
- cachePubMap.remove(registerId);
- }
- }
- if (!cachePubMap.isEmpty()) {
- for (Publisher cachePub : cachePubMap.values()) {
- removeFromIndex(cachePub);
- }
- }
- }
- return cacheDatum.getVersion();
- }
-
- private void removeFromIndex(Publisher publisher) {
- if (publisher == null) {
- return;
- }
- String connectId = getConnectId(publisher);
-
- // remove from ALL_CONNECT_ID_INDEX
- Map publisherMap = ALL_CONNECT_ID_INDEX.get(connectId);
- if (publisherMap != null) {
- publisherMap.remove(publisher.getRegisterId());
- }
- }
-
- private void addToIndex(Publisher publisher) {
- if (publisher == null) {
- return;
- }
- String connectId = getConnectId(publisher);
-
- // add to ALL_CONNECT_ID_INDEX
- Map publisherMap = ALL_CONNECT_ID_INDEX
- .computeIfAbsent(connectId, s -> new ConcurrentHashMap<>());
- publisherMap.put(publisher.getRegisterId(), publisher);
-
- }
-
- private String getConnectId(Publisher cachePub) {
- return WordCache.getInstance().getWordCache(cachePub.getSourceAddress().getAddressString());
+ return localDatumStorage.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
}
/**
@@ -428,6 +163,7 @@ private String getConnectId(Publisher cachePub) {
* @return property value of OWN_CONNECT_ID_INDEX
*/
public Set getAllConnectIds() {
- return ALL_CONNECT_ID_INDEX.keySet();
+ return localDatumStorage.getAllConnectIds();
}
+
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumStorage.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumStorage.java
new file mode 100644
index 000000000..fc71511c8
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/DatumStorage.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.cache;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.alipay.sofa.registry.common.model.dataserver.Datum;
+import com.alipay.sofa.registry.common.model.store.Publisher;
+import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version $Id: DatumAccessService.java, v 0.1 2019-12-05 11:51 kezhu.wukz Exp $
+ */
+public interface DatumStorage {
+
+ /**
+ * get datum by specific dataCenter and dataInfoId
+ *
+ * @param dataCenter
+ * @param dataInfoId
+ * @return
+ */
+ Datum get(String dataCenter, String dataInfoId);
+
+ /**
+ * get datum of all datercenters by dataInfoId
+ *
+ * @param dataInfoId
+ * @return
+ */
+ Map get(String dataInfoId);
+
+ /**
+ * get all datum
+ *
+ * @return
+ */
+ Map> getAll();
+
+ /**
+ *
+ *
+ * @param connectId
+ * @return
+ */
+ Map getByConnectId(String connectId);
+
+ /**
+ * get own publishers by connectId
+ */
+ Map getOwnByConnectId(String connectId);
+
+ /**
+ * Getter method for property OWN_CONNECT_ID_INDEX.
+ *
+ * @return property value of OWN_CONNECT_ID_INDEX
+ */
+ Set getAllConnectIds();
+
+ /**
+ * put datum into cache
+ *
+ * @param changeType
+ * @param datum
+ * @return the last version before datum changed, if datum is not exist, return null
+ */
+ MergeResult putDatum(DataChangeTypeEnum changeType, Datum datum);
+
+ /**
+ * remove datum ant contains all pub data,and clean all the client map reference
+ * @param dataCenter
+ * @param dataInfoId
+ * @return
+ */
+ boolean cleanDatum(String dataCenter, String dataInfoId);
+
+ /**
+ * cover datum by snapshot
+ */
+ Datum putSnapshot(String dataInfoId, Map toBeDeletedPubMap,
+ Map snapshotPubMap);
+
+ Map getVersions(String dataInfoId);
+
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/LocalDatumStorage.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/LocalDatumStorage.java
new file mode 100644
index 000000000..e326174b5
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/LocalDatumStorage.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.cache;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.alipay.sofa.registry.common.model.dataserver.Datum;
+import com.alipay.sofa.registry.common.model.store.Publisher;
+import com.alipay.sofa.registry.common.model.store.WordCache;
+import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
+import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
+import com.alipay.sofa.registry.server.data.node.DataServerNode;
+import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
+
+/**
+ * datum storage of local dataCenter
+ *
+ * @author kezhu.wukz
+ * @version $Id: LocalDatumAccessService.java, v 0.1 2019-12-06 15:22 kezhu.wukz Exp $
+ */
+public class LocalDatumStorage implements DatumStorage {
+
+ public static final long ERROR_DATUM_VERSION = -2L;
+
+ /**
+ * row: dataCenter
+ * column: dataInfoId
+ * value: datum
+ */
+ protected final Map> DATUM_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * all datum index
+ *
+ * row: ip:port
+ * column: registerId
+ * value: publisher
+ */
+ protected final Map> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
+
+ @Autowired
+ private DataServerConfig dataServerConfig;
+
+ /**
+ * get datum by specific dataCenter and dataInfoId
+ *
+ * @param dataCenter
+ * @param dataInfoId
+ * @return
+ */
+ public Datum get(String dataCenter, String dataInfoId) {
+ Map map = DATUM_MAP.get(dataCenter);
+ if (map != null) {
+ return map.get(dataInfoId);
+ }
+ return null;
+ }
+
+ /**
+ * get datum of all datercenters by dataInfoId
+ *
+ * @param dataInfoId
+ * @return
+ */
+ public Map get(String dataInfoId) {
+ Map datumMap = new HashMap<>();
+ DATUM_MAP.forEach((dataCenter, datums) -> {
+ Datum datum = datums.get(dataInfoId);
+ if (datum != null) {
+ datumMap.put(dataCenter, datum);
+ }
+ });
+
+ return datumMap;
+ }
+
+ /**
+ * get all datum
+ *
+ * @return
+ */
+ public Map> getAll() {
+ return DATUM_MAP;
+ }
+
+ /**
+ *
+ *
+ * @param connectId
+ * @return
+ */
+ public Map getByConnectId(String connectId) {
+ return ALL_CONNECT_ID_INDEX.getOrDefault(connectId, null);
+ }
+
+ /**
+ * get own publishers by connectId
+ */
+ public Map getOwnByConnectId(String connectId) {
+ Map ownPubMap = new HashMap<>();
+ Map allPubMap = ALL_CONNECT_ID_INDEX.getOrDefault(connectId, null);
+ if (allPubMap != null) {
+ for (Entry entry : allPubMap.entrySet()) {
+ String registerId = entry.getKey();
+ Publisher publisher = entry.getValue();
+ if (isOwnByMyself(publisher.getDataInfoId())) {
+ ownPubMap.put(registerId, publisher);
+ }
+ }
+ }
+ return ownPubMap;
+ }
+
+ /**
+ * whether dataInfoId own by self
+ */
+ protected boolean isOwnByMyself(String dataInfoId) {
+ DataServerNode dataServerNode = DataServerNodeFactory.computeDataServerNode(
+ dataServerConfig.getLocalDataCenter(), dataInfoId);
+ return DataServerConfig.IP.equals(dataServerNode.getIp());
+ }
+
+ /**
+ * put datum into cache
+ *
+ * @param changeType
+ * @param datum
+ * @return the last version before datum changed, if datum is not exist, return null
+ */
+ public MergeResult putDatum(DataChangeTypeEnum changeType, Datum datum) {
+ MergeResult mergeResult;
+ String dataCenter = datum.getDataCenter();
+ String dataInfoId = datum.getDataInfoId();
+ Map map = getDatumMapByDataCenter(dataCenter);
+
+ //first put UnPublisher datum(dataId group instanceId is null),can not add to cache
+ if (datum.getDataId() == null && map.get(dataInfoId) == null) {
+ mergeResult = new MergeResult(ERROR_DATUM_VERSION, false);
+ return mergeResult;
+ }
+
+ // filter out the unPubs of datum when first put.
+ // Otherwise, "syncData" or "fetchData" when get Datum with unPubs, which will result something error
+ boolean[] exists = { true };
+ Datum cacheDatum = map.computeIfAbsent(dataInfoId, k -> filterUnPubs(exists, datum));
+ if (!exists[0]) {
+ Iterator> iterator = datum.getPubMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ Publisher publisher = entry.getValue();
+ addToIndex(publisher);
+ }
+ mergeResult = new MergeResult(null, true);
+ } else {
+ if (changeType == DataChangeTypeEnum.MERGE) {
+ mergeResult = mergeDatum(cacheDatum, datum);
+ } else {
+ Long lastVersion = coverDatum(datum);
+ mergeResult = new MergeResult(lastVersion, true);
+ }
+ }
+ return mergeResult;
+ }
+
+ /**
+ * remove unPubs from datum
+ */
+ private Datum filterUnPubs(boolean[] exists, Datum datum) {
+ Iterator> iterator = datum.getPubMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ Publisher publisher = entry.getValue();
+ if (publisher instanceof UnPublisher) {
+ //first put to cache,UnPublisher data must remove,not so got error pub data exist
+ iterator.remove();
+ }
+ }
+ exists[0] = false;
+ return datum;
+ }
+
+ private Map getDatumMapByDataCenter(String dataCenter) {
+ Map map = DATUM_MAP.get(dataCenter);
+ if (map == null) {
+ map = new ConcurrentHashMap<>();
+ Map ret = DATUM_MAP.putIfAbsent(dataCenter, map);
+ if (ret != null) {
+ map = ret;
+ }
+ }
+ return map;
+ }
+
+ /**
+ * remove datum ant contains all pub data,and clean all the client map reference
+ * @param dataCenter
+ * @param dataInfoId
+ * @return
+ */
+ public boolean cleanDatum(String dataCenter, String dataInfoId) {
+
+ Map datumMap = DATUM_MAP.get(dataCenter);
+ if (datumMap != null) {
+ Datum cacheDatum = datumMap.remove(dataInfoId);
+ if (cacheDatum != null) {
+ Map cachePubMap = cacheDatum.getPubMap();
+
+ for (Entry cachePubEntry : cachePubMap.entrySet()) {
+ String registerId = cachePubEntry.getKey();
+ Publisher cachePub = cachePubEntry.getValue();
+ //remove from cache
+ if (cachePub != null) {
+ cachePubMap.remove(registerId);
+ removeFromIndex(cachePub);
+ }
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * merge datum in cache
+ *
+ * @param datum
+ * @return
+ */
+ private MergeResult mergeDatum(Datum cacheDatum, Datum datum) {
+ boolean isChanged = false;
+ Map cachePubMap = cacheDatum.getPubMap();
+ Map pubMap = datum.getPubMap();
+ for (Entry pubEntry : pubMap.entrySet()) {
+ String registerId = pubEntry.getKey();
+ Publisher pub = pubEntry.getValue();
+ Publisher cachePub = cachePubMap.get(registerId);
+ if (mergePublisher(pub, cachePubMap, cachePub)) {
+ isChanged = true;
+ }
+ }
+ Long lastVersion = cacheDatum.getVersion();
+ if (isChanged) {
+ cacheDatum.setVersion(datum.getVersion());
+ }
+ return new MergeResult(lastVersion, isChanged);
+ }
+
+ /**
+ * cover datum by snapshot
+ */
+ public Datum putSnapshot(String dataInfoId, Map toBeDeletedPubMap,
+ Map snapshotPubMap) {
+ // get cache datum
+ Map datumMap = getDatumMapByDataCenter(dataServerConfig.getLocalDataCenter());
+ Datum cacheDatum = datumMap.get(dataInfoId);
+ if (cacheDatum == null) {
+ cacheDatum = new Datum(dataInfoId, dataServerConfig.getLocalDataCenter());
+ Publisher publisher = snapshotPubMap.values().iterator().next();
+ cacheDatum.setInstanceId(publisher.getInstanceId());
+ cacheDatum.setDataId(publisher.getDataId());
+ cacheDatum.setGroup(publisher.getGroup());
+ Datum datum = datumMap.putIfAbsent(dataInfoId, cacheDatum);
+ if (datum != null) {
+ cacheDatum = datum;
+ }
+ }
+ //remove toBeDeletedPubMap from cacheDatum
+ for (Entry toBeDeletedPubEntry : toBeDeletedPubMap.entrySet()) {
+ String registerId = toBeDeletedPubEntry.getKey();
+ Publisher toBeDeletedPub = toBeDeletedPubEntry.getValue();
+ if (cacheDatum != null) {
+ cacheDatum.getPubMap().remove(registerId);
+ removeFromIndex(toBeDeletedPub);
+ }
+ }
+ // add snapshotPubMap to cacheDatum
+ for (Entry pubEntry : snapshotPubMap.entrySet()) {
+ String registerId = pubEntry.getKey();
+ Publisher snapshotPub = pubEntry.getValue();
+ Publisher cachePub = cacheDatum.getPubMap().put(registerId, snapshotPub);
+ if (cachePub != null) {
+ removeFromIndex(cachePub);
+ }
+ addToIndex(snapshotPub);
+ }
+
+ cacheDatum.updateVersion();
+
+ return cacheDatum;
+ }
+
+ @Override
+ public Map getVersions(String dataInfoId) {
+ Map versions = new HashMap<>(1);
+ Map datumMap = this.get(dataInfoId);
+ if (datumMap != null) {
+ for (Map.Entry entry : datumMap.entrySet()) {
+ String dataCenter = entry.getKey();
+ Datum datum = entry.getValue();
+ versions.put(dataCenter, datum.getVersion());
+ }
+ }
+ return versions;
+ }
+
+ private boolean mergePublisher(Publisher pub, Map cachePubMap,
+ Publisher cachePub) {
+ boolean isChanged = false;
+ String registerId = pub.getRegisterId();
+ if (pub instanceof UnPublisher) {
+ //remove from cache
+ if (cachePub != null && pub.getRegisterTimestamp() > cachePub.getRegisterTimestamp()) {
+ cachePubMap.remove(registerId);
+ removeFromIndex(cachePub);
+ isChanged = true;
+ }
+ } else {
+ long version = pub.getVersion();
+ long cacheVersion = cachePub == null ? 0L : cachePub.getVersion();
+ if (cacheVersion <= version) {
+ cachePubMap.put(registerId, pub);
+ // connectId and cacheConnectId may not be equal, so indexes need to be deleted and added, rather than overwritten directly.
+ // why connectId and cacheConnectId may not be equal?
+ // eg: sessionserver crash, client(RegistryClient but not ConfregClient) reconnect to other sessionserver, sourceAddress changed, version not changed
+ removeFromIndex(cachePub);
+ addToIndex(pub);
+ isChanged = true;
+ }
+ }
+ return isChanged;
+ }
+
+ /**
+ *
+ * @param datum
+ * @return
+ */
+ private Long coverDatum(Datum datum) {
+ String dataCenter = datum.getDataCenter();
+ String dataInfoId = datum.getDataInfoId();
+ Datum cacheDatum = DATUM_MAP.get(dataCenter).get(dataInfoId);
+ if (datum.getVersion() != cacheDatum.getVersion()) {
+ DATUM_MAP.get(dataCenter).put(dataInfoId, datum);
+ Map pubMap = datum.getPubMap();
+ Map cachePubMap = new HashMap<>(cacheDatum.getPubMap());
+ for (Entry pubEntry : pubMap.entrySet()) {
+ String registerId = pubEntry.getKey();
+ Publisher pub = pubEntry.getValue();
+ addToIndex(pub);
+ Publisher cachePub = cachePubMap.get(registerId);
+ if (cachePub != null && getConnectId(pub).equals(getConnectId(cachePub))) {
+ cachePubMap.remove(registerId);
+ }
+ }
+ if (!cachePubMap.isEmpty()) {
+ for (Publisher cachePub : cachePubMap.values()) {
+ removeFromIndex(cachePub);
+ }
+ }
+ }
+ return cacheDatum.getVersion();
+ }
+
+ private void removeFromIndex(Publisher publisher) {
+ if (publisher == null) {
+ return;
+ }
+ String connectId = getConnectId(publisher);
+
+ // remove from ALL_CONNECT_ID_INDEX
+ Map publisherMap = ALL_CONNECT_ID_INDEX.get(connectId);
+ if (publisherMap != null) {
+ publisherMap.remove(publisher.getRegisterId());
+ }
+ }
+
+ private void addToIndex(Publisher publisher) {
+ if (publisher == null) {
+ return;
+ }
+ String connectId = getConnectId(publisher);
+
+ // add to ALL_CONNECT_ID_INDEX
+ Map publisherMap = ALL_CONNECT_ID_INDEX
+ .computeIfAbsent(connectId, s -> new ConcurrentHashMap<>());
+ publisherMap.put(publisher.getRegisterId(), publisher);
+
+ }
+
+ private String getConnectId(Publisher cachePub) {
+ return WordCache.getInstance().getWordCache(cachePub.getSourceAddress().getAddressString());
+ }
+
+ /**
+ * Getter method for property OWN_CONNECT_ID_INDEX.
+ *
+ * @return property value of OWN_CONNECT_ID_INDEX
+ */
+ public Set getAllConnectIds() {
+ return ALL_CONNECT_ID_INDEX.keySet();
+ }
+
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java
index 665095b0d..f79dc8682 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeHandler.java
@@ -16,12 +16,22 @@
*/
package com.alipay.sofa.registry.server.data.change;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
+import com.alipay.sofa.registry.server.data.cache.LocalDatumStorage;
import com.alipay.sofa.registry.server.data.cache.MergeResult;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventQueue;
@@ -155,7 +165,7 @@ public void run() {
Long lastVersion = mergeResult.getLastVersion();
if (lastVersion != null
- && lastVersion.longValue() == datumCache.ERROR_DATUM_VERSION) {
+ && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
LOGGER
.error(
"[DataChangeHandler][{}] first put unPub datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}",
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/BackUpNotifier.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/BackUpNotifier.java
index 293376031..45ac0e0b5 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/BackUpNotifier.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/BackUpNotifier.java
@@ -16,14 +16,15 @@
*/
package com.alipay.sofa.registry.server.data.change.notify;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.datasync.Operator;
import com.alipay.sofa.registry.server.data.datasync.SyncDataService;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.HashSet;
-import java.util.Set;
/**
*
@@ -39,7 +40,6 @@ public class BackUpNotifier implements IDataChangeNotifier {
public Set getSuitableSource() {
Set set = new HashSet<>();
set.add(DataSourceTypeEnum.PUB);
- set.add(DataSourceTypeEnum.SYNC);
return set;
}
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/TempPublisherNotifier.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/TempPublisherNotifier.java
index 64d61ed90..38096aa96 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/TempPublisherNotifier.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/TempPublisherNotifier.java
@@ -16,6 +16,13 @@
*/
package com.alipay.sofa.registry.server.data.change.notify;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.remoting.Connection;
import com.alipay.remoting.InvokeCallback;
import com.alipay.sofa.registry.common.model.CommonResponse;
@@ -31,12 +38,6 @@
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
/**
*
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java
index b07713814..0c010c8f9 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.java
@@ -54,8 +54,6 @@ public abstract class AbstractAcceptorStore implements AcceptorStore {
"[SyncDataService]");
private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
- private static final int DEFAULT_DELAY_TIMEOUT = 3000;
- private static final int NOTIFY_RETRY = 3;
@Autowired
protected IMetaServerService metaServerService;
@@ -178,7 +176,7 @@ private void removeCache(Acceptor acceptor) {
}
private void addQueue(Acceptor acceptor) {
- delayQueue.put(new DelayItem(acceptor, DEFAULT_DELAY_TIMEOUT));
+ delayQueue.put(new DelayItem(acceptor, dataServerConfig.getDataSyncDelayTimeout()));
}
private void notifyChange(Acceptor acceptor) {
@@ -208,7 +206,7 @@ private void notifyChange(Acceptor acceptor) {
Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());
- for (int tryCount = 0; tryCount < NOTIFY_RETRY; tryCount++) {
+ for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
try {
Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/DataServerChangeEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/DataServerChangeEvent.java
index 447c3af8c..9dfaa60f8 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/DataServerChangeEvent.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/DataServerChangeEvent.java
@@ -16,18 +16,18 @@
*/
package com.alipay.sofa.registry.server.data.event;
-import com.alipay.sofa.registry.common.model.metaserver.DataNode;
-import com.alipay.sofa.registry.server.data.cache.DataServerChangeItem;
-
import java.util.HashMap;
import java.util.Map;
+import com.alipay.sofa.registry.common.model.metaserver.DataNode;
+import com.alipay.sofa.registry.server.data.cache.DataServerChangeItem;
+
/**
*
* @author qian.lqlq
* @version $Id: DataServerChangeEvent.java, v 0.1 2018-03-13 14:37 qian.lqlq Exp $
*/
-public class DataServerChangeEvent {
+public class DataServerChangeEvent implements Event {
/**
* node type enum
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/Event.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/Event.java
new file mode 100644
index 000000000..1d79e6d49
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/Event.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.event;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version $Id: Event.java, v 0.1 2019-12-12 15:42 kezhu.wukz Exp $
+ */
+public interface Event {
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/EventCenter.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/EventCenter.java
index 3e5dd8daf..aa4da7d98 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/EventCenter.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/EventCenter.java
@@ -16,10 +16,12 @@
*/
package com.alipay.sofa.registry.server.data.event;
-import com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler;
+import java.util.Collection;
+import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
/**
*
@@ -28,24 +30,32 @@
*/
public class EventCenter {
- private final Map MAP = new ConcurrentHashMap<>();
+ private Multimap, AbstractEventHandler> MAP = ArrayListMultimap.create();
/**
* eventHandler register
* @param handler
*/
public void register(AbstractEventHandler handler) {
- MAP.put(handler.interest(), handler);
+ List> interests = handler.interest();
+ for (Class extends Event> interest : interests) {
+ MAP.put(interest, handler);
+ }
}
/**
* event handler handle process
* @param event
*/
- public void post(Object event) {
+ public void post(Event event) {
Class clazz = event.getClass();
if (MAP.containsKey(clazz)) {
- MAP.get(clazz).handle(event);
+ Collection handlers = MAP.get(clazz);
+ if (handlers != null) {
+ for (AbstractEventHandler handler : handlers) {
+ handler.handle(event);
+ }
+ }
} else {
throw new RuntimeException("no suitable handler was found:" + clazz);
}
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/LocalDataServerChangeEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/LocalDataServerChangeEvent.java
index 100af3482..8a89432db 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/LocalDataServerChangeEvent.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/LocalDataServerChangeEvent.java
@@ -16,17 +16,17 @@
*/
package com.alipay.sofa.registry.server.data.event;
-import com.alipay.sofa.registry.common.model.metaserver.DataNode;
-
import java.util.Map;
import java.util.Set;
+import com.alipay.sofa.registry.common.model.metaserver.DataNode;
+
/**
*
* @author qian.lqlq
* @version $Id: LocalDataServerChangeEvent.java, v 0.1 2018-05-07 20:13 qian.lqlq Exp $
*/
-public class LocalDataServerChangeEvent {
+public class LocalDataServerChangeEvent implements Event {
private Map localDataServerMap;
@@ -87,4 +87,18 @@ public Set getNewJoined() {
public long getLocalDataCenterversion() {
return localDataCenterversion;
}
+
+ /**
+ * @see Object#toString()
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("LocalDataServerChangeEvent{");
+ sb.append("localDataServerMap=").append(localDataServerMap);
+ sb.append(", localDataCenterversion=").append(localDataCenterversion);
+ sb.append(", newJoined=").append(newJoined);
+ sb.append(", version=").append(version);
+ sb.append('}');
+ return sb.toString();
+ }
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/MetaServerChangeEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/MetaServerChangeEvent.java
index d726eb664..6057f0f07 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/MetaServerChangeEvent.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/MetaServerChangeEvent.java
@@ -24,7 +24,7 @@
* @author qian.lqlq
* @version $Id: MetaServerChangeEvent.java, v 0.1 2018-03-13 15:31 qian.lqlq Exp $
*/
-public class MetaServerChangeEvent {
+public class MetaServerChangeEvent implements Event {
private Map> ipMap;
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/RemoteDataServerChangeEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/RemoteDataServerChangeEvent.java
new file mode 100644
index 000000000..dd57ed815
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/RemoteDataServerChangeEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.event;
+
+import java.util.Map;
+
+import com.alipay.sofa.registry.common.model.metaserver.DataNode;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version $Id: RemoteDataServerChangeEvent.java, v 0.1 2019-12-17 20:13 kezhu.wukz Exp $
+ */
+public class RemoteDataServerChangeEvent implements Event {
+
+ private final String dataCenter;
+ private Map remoteDataServerMap;
+
+ private long remoteDataCenterVersion;
+
+ private long version;
+
+ /**
+ * constructor
+ * @param remoteDataServerMap
+ * @param version
+ * @param remoteDataCenterVersion
+ */
+ public RemoteDataServerChangeEvent(String dataCenter,
+ Map remoteDataServerMap, long version,
+ long remoteDataCenterVersion) {
+ this.dataCenter = dataCenter;
+ this.remoteDataServerMap = remoteDataServerMap;
+ this.version = version;
+ this.remoteDataCenterVersion = remoteDataCenterVersion;
+ }
+
+ /**
+ * Getter method for property dataCenter.
+ *
+ * @return property value of dataCenter
+ */
+ public String getDataCenter() {
+ return dataCenter;
+ }
+
+ /**
+ * Getter method for property remoteDataServerMap.
+ *
+ * @return property value of remoteDataServerMap
+ */
+ public Map getRemoteDataServerMap() {
+ return remoteDataServerMap;
+ }
+
+ /**
+ * Getter method for property version.
+ *
+ * @return property value of version
+ */
+ public long getVersion() {
+ return version;
+ }
+
+ /**
+ * Getter method for property remoteDataCenterVersion.
+ *
+ * @return property value of remoteDataCenterVersion
+ */
+ public long getRemoteDataCenterVersion() {
+ return remoteDataCenterVersion;
+ }
+
+ /**
+ * @see Object#toString()
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("RemoteDataServerChangeEvent{");
+ sb.append("dataCenter='").append(dataCenter).append('\'');
+ sb.append(", remoteDataServerMap=").append(remoteDataServerMap);
+ sb.append(", remoteDataCenterVersion=").append(remoteDataCenterVersion);
+ sb.append(", version=").append(version);
+ sb.append('}');
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java
index a892a2ab2..23591b6b6 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/StartTaskEvent.java
@@ -23,7 +23,7 @@
* @author qian.lqlq
* @version $Id: StartTaskEvent.java, v 0.1 2018-03-13 15:13 qian.lqlq Exp $
*/
-public class StartTaskEvent {
+public class StartTaskEvent implements Event {
private final Set suitableTypes;
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AbstractEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AbstractEventHandler.java
index e33401b61..6b79d868e 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AbstractEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/AbstractEventHandler.java
@@ -16,18 +16,21 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;
+import java.util.List;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.event.EventCenter;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
/**
*
* @author qian.lqlq
* @version $Id: AbstractEventHandler.java, v 0.1 2018-03-13 15:34 qian.lqlq Exp $
*/
-public abstract class AbstractEventHandler implements InitializingBean {
+public abstract class AbstractEventHandler implements InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEventHandler.class);
@@ -43,7 +46,7 @@ public void afterPropertiesSet() throws Exception {
* event handle func
* @param event
*/
- public void handle(T event) {
+ public void handle(Event event) {
try {
doHandle(event);
} catch (Exception e) {
@@ -51,7 +54,7 @@ public void handle(T event) {
}
}
- public abstract Class interest();
+ public abstract List> interest();
- public abstract void doHandle(T event);
+ public abstract void doHandle(Event event);
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java
index d9a0a3d49..5690f69a2 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/DataServerChangeEventHandler.java
@@ -16,6 +16,18 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.CollectionUtils;
+
import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
import com.alipay.sofa.registry.common.model.store.URL;
@@ -28,19 +40,12 @@
import com.alipay.sofa.registry.server.data.event.DataServerChangeEvent;
import com.alipay.sofa.registry.server.data.event.EventCenter;
import com.alipay.sofa.registry.server.data.event.LocalDataServerChangeEvent;
+import com.alipay.sofa.registry.server.data.event.RemoteDataServerChangeEvent;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.DataNodeExchanger;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
-import org.apache.commons.lang.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.util.CollectionUtils;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Lists;
/**
*
@@ -67,8 +72,8 @@ public class DataServerChangeEventHandler extends AbstractEventHandler> interest() {
+ return Lists.newArrayList(DataServerChangeEvent.class);
}
@Override
@@ -81,19 +86,21 @@ public void doHandle(DataServerChangeEvent event) {
Set localDataServers = dataServerCache.getDataServers(
dataServerConfig.getLocalDataCenter()).keySet();
//get changed dataservers
- Map> changedMap = dataServerCache
- .compareAndSet(dataServerChangeItem,event.getFromType());
- if(!changedMap.isEmpty()) {
+ Map> changedMap = dataServerCache.compareAndSet(
+ dataServerChangeItem, event.getFromType());
+ if (!changedMap.isEmpty()) {
for (Entry> changeEntry : changedMap.entrySet()) {
String dataCenter = changeEntry.getKey();
Set ips = changeEntry.getValue();
+ Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
if (!CollectionUtils.isEmpty(ips)) {
for (String ip : ips) {
if (!StringUtils.equals(ip, DataServerConfig.IP)) {
DataServerNode dataServerNode = DataServerNodeFactory
- .getDataServerNode(dataCenter, ip);
- if (dataServerNode == null || dataServerNode.getConnection() == null
- || !dataServerNode.getConnection().isFine()) {
+ .getDataServerNode(dataCenter, ip);
+ if (dataServerNode == null
+ || dataServerNode.getConnection() == null
+ || !dataServerNode.getConnection().isFine()) {
connectDataServer(dataCenter, ip);
}
}
@@ -103,57 +110,69 @@ public void doHandle(DataServerChangeEvent event) {
for (String ip : ipSet) {
if (!ips.contains(ip)) {
DataServerNodeFactory.remove(dataCenter, ip, dataServerConfig);
- LOGGER.info(
+ LOGGER
+ .info(
"[DataServerChangeEventHandler] remove connection, datacenter:{}, ip:{},from:{}",
- dataCenter, ip,event.getFromType());
+ dataCenter, ip, event.getFromType());
}
}
- Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
- Map newDataNodes = dataServerCache.getNewDataServerMap(dataCenter);
- //if the datacenter is self, post LocalDataServerChangeEvent
- if (dataServerConfig.getLocalDataCenter().equals(dataCenter)) {
+ Map newDataNodes = dataServerCache
+ .getNewDataServerMap(dataCenter);
+
+ //avoid input map reference operation DataServerNodeFactory MAP
+ Map map = new ConcurrentHashMap<>(newDataNodes);
+
+ //if the dataCenter is self, post LocalDataServerChangeEvent
+ if (dataServerConfig.isLocalDataCenter(dataCenter)) {
Set newjoined = new HashSet<>(ips);
newjoined.removeAll(localDataServers);
- //avoid input map reference operation DataServerNodeFactory MAP
- Map map = new ConcurrentHashMap<>(newDataNodes);
-
- LOGGER.info("Node list change fire LocalDataServerChangeEvent,current node list={},version={},from:{}",
- map.keySet(), newVersion,event.getFromType());
+ LOGGER
+ .info(
+ "Node list change fire LocalDataServerChangeEvent,current node list={},version={},from:{}",
+ map.keySet(), newVersion, event.getFromType());
eventCenter.post(new LocalDataServerChangeEvent(map, newjoined,
- dataServerChangeItem.getVersionMap()
- .get(dataServerConfig.getLocalDataCenter()),
- newVersion));
+ dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
} else {
dataServerCache.updateItem(newDataNodes, newVersion, dataCenter);
+ eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map,
+ dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
}
} else {
- //if the datacenter which has no dataservers is not self, remove it
- if (!dataServerConfig.getLocalDataCenter().equals(dataCenter)) {
+ //if the dataCenter which has no dataServers is not self, remove it
+ if (!dataServerConfig.isLocalDataCenter(dataCenter)) {
removeDataCenter(dataCenter);
+ eventCenter.post(new RemoteDataServerChangeEvent(dataCenter,
+ Collections.EMPTY_MAP, dataServerChangeItem.getVersionMap().get(
+ dataCenter), newVersion));
}
- Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
- Map newDataNodes = dataServerCache.getNewDataServerMap(dataCenter);
+ Map newDataNodes = dataServerCache
+ .getNewDataServerMap(dataCenter);
dataServerCache.updateItem(newDataNodes, newVersion, dataCenter);
}
}
} else {
- //refresh for keep connect
- Set allDataCenter = new HashSet<>(DataServerNodeFactory.getAllDataCenters());
- for (String dataCenter:allDataCenter) {
- Map dataNodes = DataServerNodeFactory.getDataServerNodes(dataCenter);
- if(dataNodes != null && !dataNodes.isEmpty()){
-
- dataNodes.forEach((ip,dataServerNode)->{
- if (!StringUtils.equals(ip, DataServerConfig.IP)) {
+ //refresh for keep connect other dataServers
+ Set allDataCenter = new HashSet<>(dataServerCache.getAllDataCenters());
+ for (String dataCenter : allDataCenter) {
+ Map dataNodes = dataServerCache
+ .getNewDataServerMap(dataCenter);
+ if (dataNodes != null) {
+ for (DataNode dataNode : dataNodes.values()) {
+ if (!StringUtils.equals(dataNode.getIp(), DataServerConfig.IP)) {
+ DataServerNode dataServerNode = DataServerNodeFactory
+ .getDataServerNode(dataCenter, dataNode.getIp());
Connection connection = dataServerNode.getConnection();
- if (connection != null && !connection.isFine()) {
- LOGGER.warn("[DataServerChangeEventHandler] dataServer connections is not fine,try to reconnect it,old connection={},dataCenter={},from:{}",
- connection.getRemoteAddress(), dataCenter,event.getFromType());
- connectDataServer(dataCenter, ip);
+ if (connection == null || !connection.isFine()) {
+ LOGGER
+ .warn(
+ "[DataServerChangeEventHandler] dataServer connections is not fine, try to reconnect it, old connection={}, dataNode={}, dataCenter={}, from:{}",
+ connection, dataNode.getIp(), dataCenter,
+ event.getFromType());
+ connectDataServer(dataCenter, dataNode.getIp());
}
}
- });
+ }
}
}
}
@@ -181,12 +200,12 @@ private void connectDataServer(String dataCenter, String ip) {
}
if (conn == null || !conn.isFine()) {
LOGGER.error(
- "[DataServerChangeEventHandler] connect dataserver {} in {} failed five times", ip,
+ "[DataServerChangeEventHandler] connect dataServer {} in {} failed five times", ip,
dataCenter);
throw new RuntimeException(
String
.format(
- "[DataServerChangeEventHandler] connect dataserver %s in %s failed five times,dataServer will not work,please check connect!",
+ "[DataServerChangeEventHandler] connect dataServer %s in %s failed five times,dataServer will not work,please check connect!",
ip, dataCenter));
}
//maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute
@@ -194,7 +213,7 @@ private void connectDataServer(String dataCenter, String ip) {
}
/**
- * remove datacenter, and close connections of dataservers in this datacenter
+ * remove dataCenter, and close connections of dataServers in this dataCenter
*
* @param dataCenter
*/
@@ -202,7 +221,6 @@ private void removeDataCenter(String dataCenter) {
DataServerNodeFactory.getDataServerNodes(dataCenter).values().stream().map(DataServerNode::getConnection)
.filter(connection -> connection != null && connection.isFine()).forEach(Connection::close);
DataServerNodeFactory.remove(dataCenter);
- LOGGER.info(
- "[DataServerChangeEventHandler] remove connections of datacenter : {}", dataCenter);
+ LOGGER.info("[DataServerChangeEventHandler] remove connections of dataCenter : {}", dataCenter);
}
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java
index d303d192e..67a01d65b 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.java
@@ -16,6 +16,18 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
@@ -43,17 +55,6 @@
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
import com.google.common.collect.Lists;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -95,8 +96,8 @@ public class LocalDataServerChangeEventHandler extends
private static final int TRY_COUNT = 5;
@Override
- public Class interest() {
- return LocalDataServerChangeEvent.class;
+ public List> interest() {
+ return Lists.newArrayList(LocalDataServerChangeEvent.class);
}
@Override
@@ -244,10 +245,6 @@ private void notifyToFetch(LocalDataServerChangeEvent event, long changeVersion)
ConsistentHash consistentHashOld = dataServerCache
.calculateOldConsistentHash(dataServerConfig.getLocalDataCenter());
- if (consistentHash == null) {
- LOGGER.error("Calculate Old ConsistentHash error!");
- throw new RuntimeException("Calculate Old ConsistentHash error!");
- }
//compute new triad for every datum in cache
Map> allMap = datumCache.getAll();
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java
index 1498d8947..b3d98de26 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/MetaServerChangeEventHandler.java
@@ -16,6 +16,15 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.CollectionUtils;
+
import com.alipay.remoting.Connection;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
@@ -36,13 +45,7 @@
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
import com.alipay.sofa.registry.server.data.remoting.metaserver.MetaServerConnectionFactory;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.util.CollectionUtils;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import com.google.common.collect.Lists;
/**
*
@@ -71,8 +74,8 @@ public class MetaServerChangeEventHandler extends AbstractEventHandler> interest() {
+ return Lists.newArrayList(MetaServerChangeEvent.class);
}
@Override
@@ -147,8 +150,10 @@ public URL getRequestUrl() {
LOGGER
.error(
- "[MetaServerChangeEventHandler] register data node send error!retry once leader :{} error",
- newLeader.getIp(), e);
+ String
+ .format(
+ "[MetaServerChangeEventHandler] register data node send error!retry once leader :%s error",
+ newLeader.getIp()), e);
}
if (obj instanceof NodeChangeResult) {
NodeChangeResult result = (NodeChangeResult) obj;
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java
index 8da403f3e..886aaf59f 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/event/handler/StartTaskEventHandler.java
@@ -16,15 +16,17 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.annotation.Resource;
+
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.event.StartTaskEvent;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.dataserver.task.AbstractTask;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.collect.Lists;
/**
*
@@ -44,8 +46,8 @@ public class StartTaskEventHandler extends AbstractEventHandler
private ScheduledExecutorService executor = null;
@Override
- public Class interest() {
- return StartTaskEvent.class;
+ public List> interest() {
+ return Lists.newArrayList(StartTaskEvent.class);
}
@Override
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java
index 8d7555000..a87bd7221 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java
@@ -16,6 +16,12 @@
*/
package com.alipay.sofa.registry.server.data.remoting;
+import java.util.Collection;
+
+import javax.annotation.Resource;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
@@ -28,10 +34,6 @@
import com.alipay.sofa.registry.remoting.exchange.message.Response;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.Resource;
-import java.util.Collection;
/**
* @author xuanbei
@@ -59,7 +61,7 @@ public Response request(Request request) {
if (null != request.getCallBackHandler()) {
client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
request.getCallBackHandler(),
- dataServerConfig.getRpcTimeout());
+ request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
return () -> Response.ResultStatus.SUCCESSFUL;
} else {
final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/MetaNodeExchanger.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/MetaNodeExchanger.java
index faca923de..b60f1c609 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/MetaNodeExchanger.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/MetaNodeExchanger.java
@@ -16,6 +16,12 @@
*/
package com.alipay.sofa.registry.server.data.remoting;
+import java.util.Collection;
+
+import javax.annotation.Resource;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
@@ -29,10 +35,6 @@
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.Resource;
-import java.util.Collection;
/**
* @author xuanbei
@@ -71,7 +73,7 @@ public Response request(Request request) {
LOGGER.warn("MetaNode Exchanger request send error!It will be retry once!Request url:{}", url);
final Object result = client.sendSync(url, request.getRequestBody(),
- dataServerConfig.getRpcTimeout());
+ request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
return () -> result;
}
}
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerNodeFactory.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerNodeFactory.java
index fb511a683..44976090d 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerNodeFactory.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/DataServerNodeFactory.java
@@ -208,13 +208,4 @@ public static List computeDataServerNodes(String dataCenter, Str
return null;
}
- /**
- * get all datacenters
- *
- * @return
- */
- public static Set getAllDataCenters() {
- return MAP.keySet();
- }
-
}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java
index afdf1f191..987ae9ccf 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyDataSyncHandler.java
@@ -16,6 +16,15 @@
*/
package com.alipay.sofa.registry.server.data.remoting.dataserver.handler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.Node;
@@ -39,14 +48,6 @@
import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.ParaCheckUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
*
@@ -103,23 +104,31 @@ public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
executor.execute(() -> {
- String dataInfoId = request.getDataInfoId();
- String dataCenter = request.getDataCenter();
- Datum datum = datumCache.get(dataCenter, dataInfoId);
- Long version = (datum == null) ? null : datum.getVersion();
- Long requestVersion = request.getVersion();
- if (version == null || requestVersion == 0L || version < requestVersion) {
- LOGGER.info("[NotifyDataSyncProcessor] begin get sync data, currentVersion={},request={}", version,
- request);
- getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
- new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
- dataChangeEventCenter));
- } else {
- LOGGER.info("[NotifyDataSyncHandler] not need to sync data, currentVersion={},request={}", version,request);
- }
+ fetchSyncData(connection, request);
});
}
+ protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
+ String dataInfoId = request.getDataInfoId();
+ String dataCenter = request.getDataCenter();
+ Datum datum = datumCache.get(dataCenter, dataInfoId);
+ Long version = (datum == null) ? null : datum.getVersion();
+ Long requestVersion = request.getVersion();
+
+ if (version == null || requestVersion == 0L || version < requestVersion) {
+ LOGGER.info(
+ "[NotifyDataSyncProcessor] begin get sync data, currentVersion={},request={}",
+ version, request);
+ getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
+ new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
+ dataChangeEventCenter));
+ } else {
+ LOGGER.info(
+ "[NotifyDataSyncHandler] not need to sync data, currentVersion={},request={}",
+ version, request);
+ }
+ }
+
@Override
public void afterWorkingProcess() {
try {
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java
index 5a5ebf4c1..0d456314b 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/DefaultMetaServiceImpl.java
@@ -16,6 +16,20 @@
*/
package com.alipay.sofa.registry.server.data.remoting.metaserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.remoting.Connection;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.registry.common.model.Node.NodeType;
@@ -35,22 +49,11 @@
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
+import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DataServerChangeItem;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.MetaNodeExchanger;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
/**
*
@@ -71,6 +74,9 @@ public class DefaultMetaServiceImpl implements IMetaServerService {
@Autowired
private MetaServerConnectionFactory metaServerConnectionFactory;
+ @Autowired
+ private DataServerCache dataServerCache;
+
private RaftClient raftClient;
private AtomicBoolean clientStart = new AtomicBoolean(false);
@@ -154,6 +160,11 @@ public List getDataServers(String dataCenter, String dataInfoId)
dataServerConfig.getStoreNodes());
}
+ @Override
+ public Collection getDataServers(String dataCenter) {
+ return DataServerNodeFactory.getDataServerNodes(dataCenter).values();
+ }
+
@Override
public DataServerChangeItem getDateServers() {
Map connectionMap = metaServerConnectionFactory
@@ -203,7 +214,7 @@ public URL getRequestUrl() {
@Override
public List getOtherDataCenters() {
- Set all = new HashSet<>(DataServerNodeFactory.getAllDataCenters());
+ Set all = new HashSet<>(dataServerCache.getAllDataCenters());
all.remove(dataServerConfig.getLocalDataCenter());
return new ArrayList<>(all);
}
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/IMetaServerService.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/IMetaServerService.java
index d1cf4192f..e6621f341 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/IMetaServerService.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/IMetaServerService.java
@@ -16,15 +16,16 @@
*/
package com.alipay.sofa.registry.server.data.remoting.metaserver;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.server.data.cache.DataServerChangeItem;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* The interface Meta server service.
* @author qian.lqlq
@@ -57,6 +58,14 @@ public interface IMetaServerService {
*/
List getDataServers(String dataCenter, String dataInfoId);
+ /**
+ * Gets data servers.
+ *
+ * @param dataCenter the data center
+ * @return the data servers
+ */
+ Collection getDataServers(String dataCenter);
+
/**
* Gets date servers.
*
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/NotifyProvideDataChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/NotifyProvideDataChangeHandler.java
index 756a025cb..d6ea6b2a9 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/NotifyProvideDataChangeHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/NotifyProvideDataChangeHandler.java
@@ -20,7 +20,6 @@
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.Node.NodeType;
-import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.DataOperator;
import com.alipay.sofa.registry.common.model.metaserver.NotifyProvideDataChange;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
@@ -29,7 +28,7 @@
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
-import com.alipay.sofa.registry.server.data.renew.DatumLeaseManager;
+import com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.ProvideDataProcessor;
/**
*
@@ -38,14 +37,14 @@
*/
public class NotifyProvideDataChangeHandler extends AbstractClientHandler {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(NotifyProvideDataChangeHandler.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(NotifyProvideDataChangeHandler.class);
@Autowired
- private IMetaServerService metaServerService;
+ private IMetaServerService metaServerService;
@Autowired
- private DatumLeaseManager datumLeaseManager;
+ private ProvideDataProcessor provideDataProcessorManager;
@Override
public HandlerType getType() {
@@ -64,32 +63,15 @@ public void checkParam(Object request) throws RuntimeException {
@Override
public Object doHandle(Channel channel, Object request) {
+ LOGGER.info("Received notifyProvideDataChange: {}", request);
NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;
-
- fireDataChangeFetchTask(notifyProvideDataChange);
- return null;
- }
-
- private void fireDataChangeFetchTask(NotifyProvideDataChange notifyProvideDataChange) {
-
String dataInfoId = notifyProvideDataChange.getDataInfoId();
if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {
-
- if (ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(dataInfoId)) {
- ProvideData provideData = metaServerService.fetchData(dataInfoId);
- if (provideData == null || provideData.getProvideData() == null
- || provideData.getProvideData().getObject() == null) {
- LOGGER
- .info("Fetch enableDataDatumExpire but no data existed, current config not change!");
- return;
- }
- boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData
- .getProvideData().getObject());
- LOGGER.info("Fetch enableDataDatumExpire {} success!", enableDataDatumExpire);
- datumLeaseManager.setRenewEnable(enableDataDatumExpire);
- }
+ ProvideData provideData = metaServerService.fetchData(dataInfoId);
+ provideDataProcessorManager.changeDataProcess(provideData);
}
+ return null;
}
@Override
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/ServerChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/ServerChangeHandler.java
index 0ace90cc8..66f18f9ea 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/ServerChangeHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/ServerChangeHandler.java
@@ -16,11 +16,19 @@
*/
package com.alipay.sofa.registry.server.data.remoting.metaserver.handler;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.Node.NodeType;
import com.alipay.sofa.registry.common.model.metaserver.MetaNode;
import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult;
+import com.alipay.sofa.registry.log.Logger;
+import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.event.DataServerChangeEvent;
@@ -29,11 +37,6 @@
import com.alipay.sofa.registry.server.data.event.MetaServerChangeEvent;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
/**
*
@@ -42,11 +45,13 @@
*/
public class ServerChangeHandler extends AbstractClientHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerChangeHandler.class);
+
@Autowired
- private EventCenter eventCenter;
+ private EventCenter eventCenter;
@Autowired
- private DataServerConfig dataServerConfig;
+ private DataServerConfig dataServerConfig;
@Override
public void checkParam(NodeChangeResult request) throws RuntimeException {
@@ -55,6 +60,7 @@ public void checkParam(NodeChangeResult request) throws RuntimeException {
@Override
public Object doHandle(Channel channel, NodeChangeResult request) {
+ LOGGER.info("Received NodeChangeResult: {}", request);
ExecutorFactory.getCommonExecutor().execute(() -> {
if (request.getNodeType() == NodeType.DATA) {
eventCenter.post(new DataServerChangeEvent(request.getNodes(),
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessor.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessor.java
new file mode 100644
index 000000000..dc08d1fe6
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.remoting.metaserver.provideData;
+
+import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version 1.0: ProvideDataProcessor.java, v 0.1 2019-12-25 17:26 kezhu.wukz Exp $
+ */
+public interface ProvideDataProcessor {
+
+ void changeDataProcess(ProvideData provideData);
+
+ boolean support(ProvideData provideData);
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessorManager.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessorManager.java
new file mode 100644
index 000000000..24346f4d9
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/ProvideDataProcessorManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.remoting.metaserver.provideData;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version 1.0: ProvideDataProcessorManager.java, v 0.1 2019-12-25 17:39 kezhu.wukz Exp $
+ */
+public class ProvideDataProcessorManager implements ProvideDataProcessor {
+
+ private Collection provideDataProcessors = new ArrayList<>();
+
+ public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {
+ provideDataProcessors.add(provideDataProcessor);
+ }
+
+ @Override
+ public void changeDataProcess(ProvideData provideData) {
+ for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {
+ if (provideDataProcessor.support(provideData)) {
+ provideDataProcessor.changeDataProcess(provideData);
+ }
+ }
+ }
+
+ @Override
+ public boolean support(ProvideData provideData) {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/processor/DatumExpireProvideDataProcessor.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/processor/DatumExpireProvideDataProcessor.java
new file mode 100644
index 000000000..ceba5865e
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/provideData/processor/DatumExpireProvideDataProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.processor;
+
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.alipay.sofa.registry.common.model.constants.ValueConstants;
+import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
+import com.alipay.sofa.registry.log.Logger;
+import com.alipay.sofa.registry.log.LoggerFactory;
+import com.alipay.sofa.registry.server.data.remoting.metaserver.provideData.ProvideDataProcessor;
+import com.alipay.sofa.registry.server.data.renew.DatumLeaseManager;
+
+/**
+ *
+ * @author kezhu.wukz
+ * @version 1.0: DatumExpireProvideDataProcessor.java, v 0.1 2019-12-26 20:30 kezhu.wukz Exp $
+ */
+public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(DatumExpireProvideDataProcessor.class);
+
+ @Autowired
+ private DatumLeaseManager datumLeaseManager;
+
+ @Override
+ public void changeDataProcess(ProvideData provideData) {
+ if (checkInvalid(provideData)) {
+ return;
+ }
+ boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
+ .getObject());
+ LOGGER.info("Fetch enableDataDatumExpire {} success!", enableDataDatumExpire);
+ datumLeaseManager.setRenewEnable(enableDataDatumExpire);
+ }
+
+ private boolean checkInvalid(ProvideData provideData) {
+ boolean invalid = provideData == null || provideData.getProvideData() == null
+ || provideData.getProvideData().getObject() == null;
+ if (invalid) {
+ LOGGER.warn("Fetch enableDataDatumExpire return invalid data, provideData: {}",
+ provideData);
+ }
+ return invalid;
+ }
+
+ @Override
+ public boolean support(ProvideData provideData) {
+ return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId());
+ }
+}
\ No newline at end of file
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java
index 43c49622d..8d5333d26 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/disconnect/DisconnectEventHandler.java
@@ -16,6 +16,16 @@
*/
package com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
@@ -26,15 +36,6 @@
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
/**
* @author qian.lqlq
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/GetDataVersionsHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/GetDataVersionsHandler.java
index 756280f63..24f58a89b 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/GetDataVersionsHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/GetDataVersionsHandler.java
@@ -28,7 +28,6 @@
import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.Node;
-import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.GetDataVersionRequest;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
@@ -68,19 +67,17 @@ public Object doHandle(Channel channel, GetDataVersionRequest request) {
Map> map = new HashMap<>();
List dataInfoIds = request.getDataInfoIds();
for (String dataInfoId : dataInfoIds) {
- Map datumMap = datumCache.get(dataInfoId);
- Set> entrySet = datumMap.entrySet();
- for (Entry entry : entrySet) {
+ Map datumMap = datumCache.getVersions(dataInfoId);
+ Set> entrySet = datumMap.entrySet();
+ for (Entry entry : entrySet) {
String dataCenter = entry.getKey();
- Datum datum = entry.getValue();
- if (datum != null) {
- Map dataInfoIdToVersionMap = map.get(dataCenter);
- if (dataInfoIdToVersionMap == null) {
- dataInfoIdToVersionMap = new HashMap<>(dataInfoIds.size());
- map.put(dataCenter, dataInfoIdToVersionMap);
- }
- dataInfoIdToVersionMap.put(dataInfoId, datum.getVersion());
+ Long version = entry.getValue();
+ Map dataInfoIdToVersionMap = map.get(dataCenter);
+ if (dataInfoIdToVersionMap == null) {
+ dataInfoIdToVersionMap = new HashMap<>(dataInfoIds.size());
+ map.put(dataCenter, dataInfoIdToVersionMap);
}
+ dataInfoIdToVersionMap.put(dataInfoId, version);
}
}
return new GenericResponse