diff --git a/pom.xml b/pom.xml
index 4b7088f56..42a69a7f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,13 +82,10 @@
6.4.6
${user.dir}
-Dnetwork_interface_denylist=docker0
-<<<<<<< HEAD
false
false
-=======
1.2.51.sec09_noneautotype
->>>>>>> [app_discovery] support subscribe app and interface publisher
diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml
index 88083b2e0..2cadfa49c 100644
--- a/server/common/model/pom.xml
+++ b/server/common/model/pom.xml
@@ -39,5 +39,13 @@
commons-collections
commons-collections
+
+ commons-collections
+ commons-collections
+
+
+ commons-collections
+ commons-collections
+
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java
index 70585f25c..7c359f505 100644
--- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java
@@ -20,11 +20,10 @@
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.springframework.util.CollectionUtils;
import java.io.Serializable;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java
index a02233960..cf6f44b13 100644
--- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java
@@ -20,14 +20,9 @@
import com.alipay.sofa.registry.common.model.*;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.Collections2;
-import org.apache.commons.collections.CollectionUtils;
-import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox;
-import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ParamInfo;
-import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ServiceParamInfo;
+
import com.alipay.sofa.registry.common.model.PublishType;
import com.alipay.sofa.registry.common.model.ServerDataBox;
-import com.fasterxml.jackson.annotation.JsonIgnore;
/**
*
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java
index 142887b96..a421eb5f6 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java
@@ -27,8 +27,6 @@
import com.alipay.sofa.registry.server.data.slot.SlotManager;
import com.alipay.sofa.registry.server.shared.remoting.AbstractClientHandler;
import com.alipay.sofa.registry.util.ParaCheckUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-import com.alipay.sofa.registry.server.meta.registry.Registry;
/**
*
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java
deleted file mode 100644
index 0275df15a..000000000
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.data.remoting.sessionserver.handler;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.alipay.sofa.registry.common.model.PublisherInternUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import com.alipay.sofa.registry.common.model.CommonResponse;
-import com.alipay.sofa.registry.common.model.DatumSnapshotRequest;
-import com.alipay.sofa.registry.common.model.Node;
-import com.alipay.sofa.registry.common.model.PublisherDigestUtil;
-import com.alipay.sofa.registry.common.model.constants.ValueConstants;
-import com.alipay.sofa.registry.common.model.store.Publisher;
-import com.alipay.sofa.registry.common.model.store.WordCache;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.Channel;
-import com.alipay.sofa.registry.server.data.cache.DatumCache;
-import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
-import com.alipay.sofa.registry.server.data.change.event.DatumSnapshotEvent;
-import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler;
-import com.alipay.sofa.registry.server.data.renew.DatumLeaseManager;
-import com.alipay.sofa.registry.util.ParaCheckUtil;
-
-/**
- * handling snapshot request
- *
- * @author kezhu.wukz
- * @version $Id: ClientOffProcessor.java, v 0.1 2019-05-30 15:48 kezhu.wukz Exp $
- */
-public class DatumSnapshotHandler extends AbstractServerHandler {
-
- private static final Logger RENEW_LOGGER = LoggerFactory.getLogger(
- ValueConstants.LOGGER_NAME_RENEW,
- "[DatumSnapshotHandler]");
-
- /** Limited List Printing */
- private static final int LIMITED_LIST_SIZE_FOR_PRINT = 10;
-
- @Autowired
- private DataChangeEventCenter dataChangeEventCenter;
-
- @Autowired
- private DatumLeaseManager datumLeaseManager;
-
- @Autowired
- private DatumCache datumCache;
-
- @Autowired
- private ThreadPoolExecutor renewDatumProcessorExecutor;
-
- @Override
- public Executor getExecutor() {
- return renewDatumProcessorExecutor;
- }
-
- @Override
- public void checkParam(DatumSnapshotRequest request) throws RuntimeException {
- ParaCheckUtil.checkNotBlank(request.getConnectId(), "DatumSnapshotRequest.connectId");
- ParaCheckUtil.checkNotEmpty(request.getPublishers(), "DatumSnapshotRequest.publishers");
- }
-
- @Override
- public Object doHandle(Channel channel, DatumSnapshotRequest request) {
- RENEW_LOGGER.info("Received datumSnapshotRequest: {}", request);
-
- String connectId = WordCache.getInstance().getWordCache(request.getConnectId());
-
- // convert to pubMap, and wrap it by WordCache
- Map pubMap = new HashMap<>();
- List publishers = request.getPublishers();
- if (publishers != null) {
- for (Publisher publisher : publishers) {
- PublisherInternUtil.internPublisher(publisher);
- pubMap.put(publisher.getRegisterId(), publisher);
- }
- }
-
- // diff the cache and snapshot
- boolean isDiff = true;
- Map cachePubMap = datumCache.getOwnByConnectId(connectId);
- if (cachePubMap == null) {
- RENEW_LOGGER
- .info(
- ">>>>>>> connectId={}, cachePubMap.size=0, pubMap.size={}, isDiff={}, the diff is: pubMap={}",
- connectId, pubMap.size(), isDiff, limitedToString(pubMap.values()));
- } else {
- List diffPub1 = subtract(pubMap, cachePubMap);
- List diffPub2 = subtract(cachePubMap, pubMap);
- if (diffPub1.size() == 0 && diffPub2.size() == 0) {
- isDiff = false;
- }
- RENEW_LOGGER
- .info(
- ">>>>>>> connectId={}, cachePubMap.size={}, pubMap.size={}, isDiff={}, the diff is: pubMap-cachePubMap=(size:{}){}, cachePubMap-pubMap=(size:{}){}",
- connectId, cachePubMap.size(), pubMap.size(), isDiff, diffPub1.size(),
- limitedToString(diffPub1), diffPub2.size(), limitedToString(diffPub2));
- }
-
- if (isDiff) {
- // build DatumSnapshotEvent and send to eventCenter
- dataChangeEventCenter.onChange(new DatumSnapshotEvent(connectId, cachePubMap, pubMap));
- }
-
- // record the renew timestamp
- datumLeaseManager.renew(connectId);
-
- return CommonResponse.buildSuccessResponse();
- }
-
- /**
- * Limited List Printing
- */
- private String limitedToString(Collection publishers) {
- Iterator it = publishers.iterator();
- if (!it.hasNext())
- return "[]";
-
- StringBuilder sb = new StringBuilder();
- sb.append('[');
- int i = 1;
- for (;;) {
- Publisher e = it.next();
- sb.append("Publisher{dataInfoId='").append(e.getDataInfoId()).append('\'');
- sb.append(", cell='").append(e.getCell()).append('\'');
- sb.append(", registerId='").append(e.getRegisterId()).append('\'');
- sb.append(", version=").append(e.getVersion());
- sb.append(", sourceAddress=").append(e.getSourceAddress());
- sb.append(", registerTimestamp=").append(e.getRegisterTimestamp());
- sb.append(", clientRegisterTimestamp=").append(e.getClientRegisterTimestamp());
- sb.append('}');
- if (!it.hasNext() || i++ >= LIMITED_LIST_SIZE_FOR_PRINT)
- return sb.append(']').toString();
- sb.append(',').append(' ');
- }
- }
-
- private List subtract(Map pubMap1, Map pubMap2) {
- List list = new ArrayList();
- for (Map.Entry entry : pubMap1.entrySet()) {
- String registerId = entry.getKey();
- Publisher publisher1 = entry.getValue();
- Publisher publisher2 = pubMap2.get(registerId);
- if (publisher2 == null
- || PublisherDigestUtil.getDigestValue(publisher1) != PublisherDigestUtil
- .getDigestValue(publisher2)) {
- list.add(publisher1);
- }
- }
- return list;
- }
-
- @Override
- public CommonResponse buildFailedResponse(String msg) {
- return CommonResponse.buildFailedResponse(msg);
- }
-
- @Override
- public HandlerType getType() {
- return HandlerType.PROCESSER;
- }
-
- @Override
- public Class interest() {
- return DatumSnapshotRequest.class;
- }
-
- @Override
- protected Node.NodeType getConnectNodeType() {
- return Node.NodeType.DATA;
- }
-}
diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml
index 86398665b..01e030445 100644
--- a/server/server/integration/pom.xml
+++ b/server/server/integration/pom.xml
@@ -55,6 +55,30 @@
6.0.1
compile
+
+ com.alipay.sofa
+ registry-common-util
+
+
+ com.alipay.sofa
+ registry-server-data
+
+
+ com.alipay.sofa
+ registry-server-data
+
+
+ com.alipay.sofa
+ registry-server-session
+
+
+ com.alipay.sofa
+ registry-server-data
+
+
+ com.alipay.sofa
+ registry-server-meta
+
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
index e7cc29098..d187a731f 100644
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
+++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
@@ -67,7 +67,6 @@
import com.alipay.sofa.registry.server.meta.remoting.connection.SessionConnectionHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.FetchProvideDataRequestHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.HeartbeatRequestHandler;
-import com.alipay.sofa.registry.server.meta.resource.*;
import com.alipay.sofa.registry.server.meta.slot.impl.*;
import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler;
import com.alipay.sofa.registry.server.meta.repository.NodeConfirmStatusService;
@@ -83,19 +82,9 @@
import com.alipay.sofa.registry.server.meta.task.processor.SessionNodeSingleTaskProcessor;
import com.alipay.sofa.registry.store.api.DBService;
import com.alipay.sofa.registry.util.DefaultExecutorFactory;
-import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.OsUtils;
import com.alipay.sofa.registry.util.PropertySplitter;
-import org.glassfish.jersey.jackson.JacksonFeature;
-import org.glassfish.jersey.server.ResourceConfig;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.concurrent.*;
/**
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java
index c1c2f5588..e28c2512b 100644
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java
+++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java
@@ -23,6 +23,7 @@
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfig;
import com.alipay.sofa.registry.server.shared.remoting.ListenServerChannelHandler;
+import com.alipay.sofa.registry.server.meta.remoting.handler.AbstractServerHandler;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetSocketAddress;
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java
deleted file mode 100644
index 37e8ca4f3..000000000
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.meta.remoting.handler;
-
-import com.alipay.sofa.registry.common.model.metaserver.DataNode;
-import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.Channel;
-import com.alipay.sofa.registry.server.meta.registry.Registry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-/**
- * Handle data node's register request
- * @author shangyu.wh
- * @version $Id: DataNodeHandler.java, v 0.1 2018-01-18 18:04 shangyu.wh Exp $
- */
-public class DataNodeHandler extends AbstractServerHandler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeHandler.class);
-
- @Autowired
- private Registry metaServerRegistry;
-
- @Override
- public Object reply(Channel channel, DataNode dataNode) {
- NodeChangeResult nodeChangeResult;
- try {
- nodeChangeResult = metaServerRegistry.register(dataNode);
- LOGGER.info("Data node {} register success!result:{}", dataNode, nodeChangeResult);
- } catch (Exception e) {
- LOGGER.error("Data node register error!", e);
- throw new RuntimeException("Data node register error!", e);
- }
- return nodeChangeResult;
- }
-
- @Override
- public Class interest() {
- return DataNode.class;
- }
-
- @Override
- public HandlerType getType() {
- return HandlerType.PROCESSER;
- }
-
-}
\ No newline at end of file
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java
deleted file mode 100644
index 2fb6944c7..000000000
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.meta.remoting.handler;
-
-import com.alipay.sofa.registry.common.model.metaserver.GetNodesRequest;
-import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.Channel;
-import com.alipay.sofa.registry.server.meta.registry.Registry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-/**
- * Handle session/data node's query request, such as getAllNodes request
- * and current this is no use for meta node, it's instead by RAFT
- * @author shangyu.wh
- * @version $Id: GetNodesRequestHandler.java, v 0.1 2018-03-02 15:12 shangyu.wh Exp $
- */
-public class GetNodesRequestHandler extends AbstractServerHandler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger("META-CONNECT");
-
- @Autowired
- private Registry metaServerRegistry;
-
- @Override
- public Object reply(Channel channel, GetNodesRequest getNodesRequest) {
- NodeChangeResult nodeChangeResult;
- try {
- nodeChangeResult = metaServerRegistry.getAllNodes(getNodesRequest.getNodeType());
- LOGGER.info("Get {} change node list {} success!from {}",
- getNodesRequest.getNodeType(), nodeChangeResult, channel.getRemoteAddress());
- } catch (Exception e) {
- LOGGER.error("Get node list error!", e);
- throw new RuntimeException("Get node list error!", e);
- }
- return nodeChangeResult;
- }
-
- @Override
- public HandlerType getType() {
- return HandlerType.PROCESSER;
- }
-
- @Override
- public Class interest() {
- return GetNodesRequest.class;
- }
-
-}
\ No newline at end of file
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java
deleted file mode 100644
index 613cad34f..000000000
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.meta.remoting.handler;
-
-import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult;
-import com.alipay.sofa.registry.common.model.metaserver.SessionNode;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.Channel;
-import com.alipay.sofa.registry.server.meta.registry.Registry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-/**
- * Handle session node's register request
- * @author shangyu.wh
- * @version $Id: SessionNodeHandler.java, v 0.1 2018-01-11 17:03 shangyu.wh Exp $
- */
-public class SessionNodeHandler extends AbstractServerHandler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeHandler.class);
-
- @Autowired
- private Registry metaServerRegistry;
-
- @Override
- public Object reply(Channel channel, SessionNode sessionNode) {
- NodeChangeResult nodeChangeResult;
- try {
- nodeChangeResult = metaServerRegistry.register(sessionNode);
- LOGGER.info("Session node {} register success!", sessionNode);
- } catch (Exception e) {
- LOGGER.error("Session node register error!", e);
- throw new RuntimeException("Session node register error!", e);
- }
- return nodeChangeResult;
- }
-
- @Override
- public Class interest() {
- return SessionNode.class;
- }
-
- @Override
- public HandlerType getType() {
- return HandlerType.PROCESSER;
- }
-
-}
\ No newline at end of file
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java
index 10875bcf2..250bcde95 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java
@@ -180,5 +180,6 @@ public interface SessionServerConfig {
int getSlotSyncPublisherMaxNum();
Set getMetaServerIpAddresses();
+
boolean isEnableSessionLoadbalancePolicy();
}
\ No newline at end of file
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java
index f44b91f0e..9d57beb3b 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java
@@ -27,6 +27,7 @@
/**
* The type Session server config bean.
+ *
* @author shangyu.wh
* @version $Id : SessionServerConfigBean.java, v 0.1 2017-11-14 11:49 synex Exp $
*/
@@ -155,11 +156,6 @@ public class SessionServerConfigBean implements SessionServerConfig {
private int renewDatumWheelQueueSize = 10000;
private int pushDataTaskRetryFirstDelay = 500;
- private int newRevisionTaskMaxBufferSize = 1000000;
-
- private int newRevisionTaskWorkerSize = 100;
-
- private int clientNodeExchangeTimeOut = 1000; //time out cause netty HashedWheelTimer occupy a lot of mem
private long pushDataTaskRetryIncrementDelay = 500;
@@ -191,12 +187,15 @@ public class SessionServerConfigBean implements SessionServerConfig {
private boolean beginDataFetchTask = false;
-
//begin config for enterprise version
- /** forever close push zone,such as:RZBETA */
+ /**
+ * forever close push zone,such as:RZBETA
+ */
private String invalidForeverZones = "";
- /** config regex,exception to the rule of forever close push zone*/
+ /**
+ * config regex,exception to the rule of forever close push zone
+ */
private String invalidIgnoreDataidRegex = "";
private Set invalidForeverZonesSet;
@@ -221,7 +220,7 @@ public class SessionServerConfigBean implements SessionServerConfig {
private int slotSyncPublisherMaxNum = 512;
- private boolean enableSessionLoadbalancePolicy = false;
+ private boolean enableSessionLoadbalancePolicy = false;
//end config for enterprise version
@@ -231,6 +230,7 @@ public class SessionServerConfigBean implements SessionServerConfig {
/**
* constructor
+ *
* @param commonConfig
*/
public SessionServerConfigBean(CommonConfig commonConfig) {
@@ -249,7 +249,7 @@ public int getRenewDatumWheelThreadSize() {
/**
* Setter method for property renewDatumWheelThreadSize .
*
- * @param renewDatumWheelThreadSize value to be assigned to property renewDatumWheelThreadSize
+ * @param renewDatumWheelThreadSize value to be assigned to property renewDatumWheelThreadSize
*/
public void setRenewDatumWheelThreadSize(int renewDatumWheelThreadSize) {
this.renewDatumWheelThreadSize = renewDatumWheelThreadSize;
@@ -267,7 +267,7 @@ public int getRenewDatumWheelQueueSize() {
/**
* Setter method for property renewDatumWheelQueueSize .
*
- * @param renewDatumWheelQueueSize value to be assigned to property renewDatumWheelQueueSize
+ * @param renewDatumWheelQueueSize value to be assigned to property renewDatumWheelQueueSize
*/
public void setRenewDatumWheelQueueSize(int renewDatumWheelQueueSize) {
this.renewDatumWheelQueueSize = renewDatumWheelQueueSize;
@@ -286,7 +286,7 @@ public int getUserDataPushRetryExecutorQueueSize() {
/**
* Setter method for property userDataPushRetryExecutorQueueSize .
*
- * @param userDataPushRetryExecutorQueueSize value to be assigned to property userDataPushRetryExecutorQueueSize
+ * @param userDataPushRetryExecutorQueueSize value to be assigned to property userDataPushRetryExecutorQueueSize
*/
public void setUserDataPushRetryExecutorQueueSize(int userDataPushRetryExecutorQueueSize) {
this.userDataPushRetryExecutorQueueSize = userDataPushRetryExecutorQueueSize;
@@ -305,7 +305,7 @@ public int getUserDataPushRetryExecutorThreadSize() {
/**
* Setter method for property userDataPushRetryExecutorThreadSize .
*
- * @param userDataPushRetryExecutorThreadSize value to be assigned to property userDataPushRetryExecutorThreadSize
+ * @param userDataPushRetryExecutorThreadSize value to be assigned to property userDataPushRetryExecutorThreadSize
*/
public void setUserDataPushRetryExecutorThreadSize(int userDataPushRetryExecutorThreadSize) {
this.userDataPushRetryExecutorThreadSize = userDataPushRetryExecutorThreadSize;
@@ -324,7 +324,7 @@ public int getDataNodeRetryExecutorThreadSize() {
/**
* Setter method for property dataNodeRetryExecutorThreadSize .
*
- * @param dataNodeRetryExecutorThreadSize value to be assigned to property dataNodeRetryExecutorThreadSize
+ * @param dataNodeRetryExecutorThreadSize value to be assigned to property dataNodeRetryExecutorThreadSize
*/
public void setDataNodeRetryExecutorThreadSize(int dataNodeRetryExecutorThreadSize) {
this.dataNodeRetryExecutorThreadSize = dataNodeRetryExecutorThreadSize;
@@ -343,7 +343,7 @@ public int getDataNodeRetryExecutorQueueSize() {
/**
* Setter method for property dataNodeRetryExecutorQueueSize .
*
- * @param dataNodeRetryExecutorQueueSize value to be assigned to property dataNodeRetryExecutorQueueSize
+ * @param dataNodeRetryExecutorQueueSize value to be assigned to property dataNodeRetryExecutorQueueSize
*/
public void setDataNodeRetryExecutorQueueSize(int dataNodeRetryExecutorQueueSize) {
this.dataNodeRetryExecutorQueueSize = dataNodeRetryExecutorQueueSize;
@@ -361,7 +361,7 @@ public int getWriteDataAcceptorQueueSize() {
/**
* Setter method for property writeDataAcceptorQueueSize .
*
- * @param writeDataAcceptorQueueSize value to be assigned to property writeDataAcceptorQueueSize
+ * @param writeDataAcceptorQueueSize value to be assigned to property writeDataAcceptorQueueSize
*/
public void setWriteDataAcceptorQueueSize(int writeDataAcceptorQueueSize) {
this.writeDataAcceptorQueueSize = writeDataAcceptorQueueSize;
@@ -379,7 +379,7 @@ public int getRenewAndSnapshotSilentPeriodSec() {
/**
* Setter method for property renewAndSnapshotSilentPeriodSec .
*
- * @param renewAndSnapshotSilentPeriodSec value to be assigned to property renewAndSnapshotSilentPeriodSec
+ * @param renewAndSnapshotSilentPeriodSec value to be assigned to property renewAndSnapshotSilentPeriodSec
*/
public void setRenewAndSnapshotSilentPeriodSec(int renewAndSnapshotSilentPeriodSec) {
this.renewAndSnapshotSilentPeriodSec = renewAndSnapshotSilentPeriodSec;
@@ -398,7 +398,7 @@ public int getPublishDataTaskRetryTimes() {
/**
* Setter method for property publishDataTaskRetryTimes .
*
- * @param publishDataTaskRetryTimes value to be assigned to property publishDataTaskRetryTimes
+ * @param publishDataTaskRetryTimes value to be assigned to property publishDataTaskRetryTimes
*/
public void setPublishDataTaskRetryTimes(int publishDataTaskRetryTimes) {
this.publishDataTaskRetryTimes = publishDataTaskRetryTimes;
@@ -417,7 +417,7 @@ public int getUnPublishDataTaskRetryTimes() {
/**
* Setter method for property unPublishDataTaskRetryTimes .
*
- * @param unPublishDataTaskRetryTimes value to be assigned to property unPublishDataTaskRetryTimes
+ * @param unPublishDataTaskRetryTimes value to be assigned to property unPublishDataTaskRetryTimes
*/
public void setUnPublishDataTaskRetryTimes(int unPublishDataTaskRetryTimes) {
this.unPublishDataTaskRetryTimes = unPublishDataTaskRetryTimes;
@@ -565,7 +565,7 @@ public long getCancelDataTaskRetryIncrementDelay() {
/**
* Setter method for property cancelDataTaskRetryIncrementDelay.
*
- * @param cancelDataTaskRetryIncrementDelay value to be assigned to property cancelDataTaskRetryIncrementDelay
+ * @param cancelDataTaskRetryIncrementDelay value to be assigned to property cancelDataTaskRetryIncrementDelay
*/
public void setCancelDataTaskRetryIncrementDelay(long cancelDataTaskRetryIncrementDelay) {
this.cancelDataTaskRetryIncrementDelay = cancelDataTaskRetryIncrementDelay;
@@ -574,7 +574,7 @@ public void setCancelDataTaskRetryIncrementDelay(long cancelDataTaskRetryIncreme
/**
* Setter method for property cancelDataTaskRetryFirstDelay .
*
- * @param cancelDataTaskRetryFirstDelay value to be assigned to property cancelDataTaskRetryFirstDelay
+ * @param cancelDataTaskRetryFirstDelay value to be assigned to property cancelDataTaskRetryFirstDelay
*/
public void setCancelDataTaskRetryFirstDelay(long cancelDataTaskRetryFirstDelay) {
this.cancelDataTaskRetryFirstDelay = cancelDataTaskRetryFirstDelay;
@@ -593,7 +593,7 @@ public long getPublishDataTaskRetryFirstDelay() {
/**
* Setter method for property publishDataTaskRetryFirstDelay .
*
- * @param publishDataTaskRetryFirstDelay value to be assigned to property publishDataTaskRetryFirstDelay
+ * @param publishDataTaskRetryFirstDelay value to be assigned to property publishDataTaskRetryFirstDelay
*/
public void setPublishDataTaskRetryFirstDelay(long publishDataTaskRetryFirstDelay) {
this.publishDataTaskRetryFirstDelay = publishDataTaskRetryFirstDelay;
@@ -612,7 +612,7 @@ public long getPublishDataTaskRetryIncrementDelay() {
/**
* Setter method for property publishDataTaskRetryIncrementDelay .
*
- * @param publishDataTaskRetryIncrementDelay value to be assigned to property publishDataTaskRetryIncrementDelay
+ * @param publishDataTaskRetryIncrementDelay value to be assigned to property publishDataTaskRetryIncrementDelay
*/
public void setPublishDataTaskRetryIncrementDelay(long publishDataTaskRetryIncrementDelay) {
this.publishDataTaskRetryIncrementDelay = publishDataTaskRetryIncrementDelay;
@@ -631,7 +631,7 @@ public long getUnPublishDataTaskRetryFirstDelay() {
/**
* Setter method for property unPublishDataTaskRetryFirstDelay .
*
- * @param unPublishDataTaskRetryFirstDelay value to be assigned to property unPublishDataTaskRetryFirstDelay
+ * @param unPublishDataTaskRetryFirstDelay value to be assigned to property unPublishDataTaskRetryFirstDelay
*/
public void setUnPublishDataTaskRetryFirstDelay(long unPublishDataTaskRetryFirstDelay) {
this.unPublishDataTaskRetryFirstDelay = unPublishDataTaskRetryFirstDelay;
@@ -650,7 +650,7 @@ public long getUnPublishDataTaskRetryIncrementDelay() {
/**
* Setter method for property unPublishDataTaskRetryIncrementDelay .
*
- * @param unPublishDataTaskRetryIncrementDelay value to be assigned to property unPublishDataTaskRetryIncrementDelay
+ * @param unPublishDataTaskRetryIncrementDelay value to be assigned to property unPublishDataTaskRetryIncrementDelay
*/
public void setUnPublishDataTaskRetryIncrementDelay(long unPublishDataTaskRetryIncrementDelay) {
this.unPublishDataTaskRetryIncrementDelay = unPublishDataTaskRetryIncrementDelay;
@@ -713,6 +713,25 @@ public void setSubscriberRegisterFetchRetryTimes(int subscriberRegisterFetchRetr
this.subscriberRegisterFetchRetryTimes = subscriberRegisterFetchRetryTimes;
}
+ /**
+ * Getter method for property sessionRegisterDataServerTaskRetryTimes.
+ *
+ * @return property value of sessionRegisterDataServerTaskRetryTimes
+ */
+ @Override
+ public int getSessionRegisterDataServerTaskRetryTimes() {
+ return sessionRegisterDataServerTaskRetryTimes;
+ }
+
+ /**
+ * Setter method for property sessionRegisterDataServerTaskRetryTimes.
+ *
+ * @param sessionRegisterDataServerTaskRetryTimes value to be assigned to property sessionRegisterDataServerTaskRetryTimes
+ */
+ public void setSessionRegisterDataServerTaskRetryTimes(int sessionRegisterDataServerTaskRetryTimes) {
+ this.sessionRegisterDataServerTaskRetryTimes = sessionRegisterDataServerTaskRetryTimes;
+ }
+
/**
* Getter method for property clientNodeExchangeTimeOut.
*
@@ -910,7 +929,7 @@ public int getSchedulerCleanInvalidClientBackOffBound() {
/**
* Setter method for property schedulerCleanInvolidClientTimeOut.
*
- * @param schedulerCleanInvalidClientTimeOut value to be assigned to property schedulerCleanInvolidClientTimeOut
+ * @param schedulerCleanInvalidClientTimeOut value to be assigned to property schedulerCleanInvolidClientTimeOut
*/
public void setSchedulerCleanInvalidClientTimeOut(int schedulerCleanInvalidClientTimeOut) {
this.schedulerCleanInvalidClientTimeOut = schedulerCleanInvalidClientTimeOut;
@@ -919,7 +938,7 @@ public void setSchedulerCleanInvalidClientTimeOut(int schedulerCleanInvalidClien
/**
* Setter method for property schedulerCleanInvolidClientFirstDelay.
*
- * @param schedulerCleanInvalidClientFirstDelay value to be assigned to property schedulerCleanInvolidClientFirstDelay
+ * @param schedulerCleanInvalidClientFirstDelay value to be assigned to property schedulerCleanInvolidClientFirstDelay
*/
public void setSchedulerCleanInvalidClientFirstDelay(int schedulerCleanInvalidClientFirstDelay) {
this.schedulerCleanInvalidClientFirstDelay = schedulerCleanInvalidClientFirstDelay;
@@ -928,7 +947,7 @@ public void setSchedulerCleanInvalidClientFirstDelay(int schedulerCleanInvalidCl
/**
* Setter method for property schedulerCleanInvolidClientBackOffBound.
*
- * @param schedulerCleanInvalidClientBackOffBound value to be assigned to property schedulerCleanInvolidClientBackOffBound
+ * @param schedulerCleanInvalidClientBackOffBound value to be assigned to property schedulerCleanInvolidClientBackOffBound
*/
public void setSchedulerCleanInvalidClientBackOffBound(int schedulerCleanInvalidClientBackOffBound) {
this.schedulerCleanInvalidClientBackOffBound = schedulerCleanInvalidClientBackOffBound;
@@ -947,7 +966,7 @@ public boolean isStopPushSwitch() {
/**
* Setter method for property stopPushSwitch.
*
- * @param stopPushSwitch value to be assigned to property stopPushSwitch
+ * @param stopPushSwitch value to be assigned to property stopPushSwitch
*/
@Override
public void setStopPushSwitch(boolean stopPushSwitch) {
@@ -967,7 +986,7 @@ public boolean isBeginDataFetchTask() {
/**
* Setter method for property beginDataFetchTask.
*
- * @param beginDataFetchTask value to be assigned to property beginDataFetchTask
+ * @param beginDataFetchTask value to be assigned to property beginDataFetchTask
*/
@Override
public void setBeginDataFetchTask(boolean beginDataFetchTask) {
@@ -981,7 +1000,7 @@ public String getInvalidForeverZones() {
/**
* Setter method for property invalidForeverZones.
*
- * @param invalidForeverZones value to be assigned to property invalidForeverZones
+ * @param invalidForeverZones value to be assigned to property invalidForeverZones
*/
public void setInvalidForeverZones(String invalidForeverZones) {
this.invalidForeverZones = invalidForeverZones;
@@ -994,7 +1013,7 @@ public String getInvalidIgnoreDataidRegex() {
/**
* Setter method for property invalidIgnoreDataidRegex.
*
- * @param invalidIgnoreDataidRegex value to be assigned to property invalidIgnoreDataidRegex
+ * @param invalidIgnoreDataidRegex value to be assigned to property invalidIgnoreDataidRegex
*/
public void setInvalidIgnoreDataidRegex(String invalidIgnoreDataidRegex) {
this.invalidIgnoreDataidRegex = invalidIgnoreDataidRegex;
@@ -1008,7 +1027,7 @@ public int getAccessDataExecutorMinPoolSize() {
/**
* Setter method for property accessDataExecutorMinPoolSize.
*
- * @param accessDataExecutorMinPoolSize value to be assigned to property accessDataExecutorMinPoolSize
+ * @param accessDataExecutorMinPoolSize value to be assigned to property accessDataExecutorMinPoolSize
*/
public void setAccessDataExecutorMinPoolSize(int accessDataExecutorMinPoolSize) {
this.accessDataExecutorMinPoolSize = accessDataExecutorMinPoolSize;
@@ -1022,7 +1041,7 @@ public int getAccessDataExecutorMaxPoolSize() {
/**
* Setter method for property accessDataExecutorMaxPoolSize.
*
- * @param accessDataExecutorMaxPoolSize value to be assigned to property accessDataExecutorMaxPoolSize
+ * @param accessDataExecutorMaxPoolSize value to be assigned to property accessDataExecutorMaxPoolSize
*/
public void setAccessDataExecutorMaxPoolSize(int accessDataExecutorMaxPoolSize) {
this.accessDataExecutorMaxPoolSize = accessDataExecutorMaxPoolSize;
@@ -1036,7 +1055,7 @@ public int getAccessDataExecutorQueueSize() {
/**
* Setter method for property accessDataExecutorQueueSize.
*
- * @param accessDataExecutorQueueSize value to be assigned to property accessDataExecutorQueueSize
+ * @param accessDataExecutorQueueSize value to be assigned to property accessDataExecutorQueueSize
*/
public void setAccessDataExecutorQueueSize(int accessDataExecutorQueueSize) {
this.accessDataExecutorQueueSize = accessDataExecutorQueueSize;
@@ -1050,7 +1069,7 @@ public long getAccessDataExecutorKeepAliveTime() {
/**
* Setter method for property accessDataExecutorKeepAliveTime.
*
- * @param accessDataExecutorKeepAliveTime value to be assigned to property accessDataExecutorKeepAliveTime
+ * @param accessDataExecutorKeepAliveTime value to be assigned to property accessDataExecutorKeepAliveTime
*/
public void setAccessDataExecutorKeepAliveTime(long accessDataExecutorKeepAliveTime) {
this.accessDataExecutorKeepAliveTime = accessDataExecutorKeepAliveTime;
@@ -1099,7 +1118,7 @@ public long getDataChangeExecutorKeepAliveTime() {
/**
* Setter method for property dataChangeExecutorMinPoolSize.
*
- * @param dataChangeExecutorMinPoolSize value to be assigned to property dataChangeExecutorMinPoolSize
+ * @param dataChangeExecutorMinPoolSize value to be assigned to property dataChangeExecutorMinPoolSize
*/
public void setDataChangeExecutorMinPoolSize(int dataChangeExecutorMinPoolSize) {
this.dataChangeExecutorMinPoolSize = dataChangeExecutorMinPoolSize;
@@ -1108,7 +1127,7 @@ public void setDataChangeExecutorMinPoolSize(int dataChangeExecutorMinPoolSize)
/**
* Setter method for property dataChangeExecutorMaxPoolSize.
*
- * @param dataChangeExecutorMaxPoolSize value to be assigned to property dataChangeExecutorMaxPoolSize
+ * @param dataChangeExecutorMaxPoolSize value to be assigned to property dataChangeExecutorMaxPoolSize
*/
public void setDataChangeExecutorMaxPoolSize(int dataChangeExecutorMaxPoolSize) {
this.dataChangeExecutorMaxPoolSize = dataChangeExecutorMaxPoolSize;
@@ -1117,7 +1136,7 @@ public void setDataChangeExecutorMaxPoolSize(int dataChangeExecutorMaxPoolSize)
/**
* Setter method for property dataChangeExecutorQueueSize.
*
- * @param dataChangeExecutorQueueSize value to be assigned to property dataChangeExecutorQueueSize
+ * @param dataChangeExecutorQueueSize value to be assigned to property dataChangeExecutorQueueSize
*/
public void setDataChangeExecutorQueueSize(int dataChangeExecutorQueueSize) {
this.dataChangeExecutorQueueSize = dataChangeExecutorQueueSize;
@@ -1126,7 +1145,7 @@ public void setDataChangeExecutorQueueSize(int dataChangeExecutorQueueSize) {
/**
* Setter method for property dataChangeExecutorKeepAliveTime.
*
- * @param dataChangeExecutorKeepAliveTime value to be assigned to property dataChangeExecutorKeepAliveTime
+ * @param dataChangeExecutorKeepAliveTime value to be assigned to property dataChangeExecutorKeepAliveTime
*/
public void setDataChangeExecutorKeepAliveTime(long dataChangeExecutorKeepAliveTime) {
this.dataChangeExecutorKeepAliveTime = dataChangeExecutorKeepAliveTime;
@@ -1175,7 +1194,7 @@ public long getPushTaskExecutorKeepAliveTime() {
/**
* Setter method for property pushTaskExecutorMinPoolSize.
*
- * @param pushTaskExecutorMinPoolSize value to be assigned to property pushTaskExecutorMinPoolSize
+ * @param pushTaskExecutorMinPoolSize value to be assigned to property pushTaskExecutorMinPoolSize
*/
public void setPushTaskExecutorMinPoolSize(int pushTaskExecutorMinPoolSize) {
this.pushTaskExecutorMinPoolSize = pushTaskExecutorMinPoolSize;
@@ -1184,7 +1203,7 @@ public void setPushTaskExecutorMinPoolSize(int pushTaskExecutorMinPoolSize) {
/**
* Setter method for property pushTaskExecutorMaxPoolSize.
*
- * @param pushTaskExecutorMaxPoolSize value to be assigned to property pushTaskExecutorMaxPoolSize
+ * @param pushTaskExecutorMaxPoolSize value to be assigned to property pushTaskExecutorMaxPoolSize
*/
public void setPushTaskExecutorMaxPoolSize(int pushTaskExecutorMaxPoolSize) {
this.pushTaskExecutorMaxPoolSize = pushTaskExecutorMaxPoolSize;
@@ -1193,7 +1212,7 @@ public void setPushTaskExecutorMaxPoolSize(int pushTaskExecutorMaxPoolSize) {
/**
* Setter method for property pushTaskExecutorQueueSize.
*
- * @param pushTaskExecutorQueueSize value to be assigned to property pushTaskExecutorQueueSize
+ * @param pushTaskExecutorQueueSize value to be assigned to property pushTaskExecutorQueueSize
*/
public void setPushTaskExecutorQueueSize(int pushTaskExecutorQueueSize) {
this.pushTaskExecutorQueueSize = pushTaskExecutorQueueSize;
@@ -1202,7 +1221,7 @@ public void setPushTaskExecutorQueueSize(int pushTaskExecutorQueueSize) {
/**
* Setter method for property pushTaskExecutorKeepAliveTime.
*
- * @param pushTaskExecutorKeepAliveTime value to be assigned to property pushTaskExecutorKeepAliveTime
+ * @param pushTaskExecutorKeepAliveTime value to be assigned to property pushTaskExecutorKeepAliveTime
*/
public void setPushTaskExecutorKeepAliveTime(long pushTaskExecutorKeepAliveTime) {
this.pushTaskExecutorKeepAliveTime = pushTaskExecutorKeepAliveTime;
@@ -1238,7 +1257,7 @@ public long getDefaultSessionExecutorKeepAliveTime() {
/**
* Setter method for property defaultSessionExecutorMinPoolSize.
*
- * @param defaultSessionExecutorMinPoolSize value to be assigned to property defaultSessionExecutorMinPoolSize
+ * @param defaultSessionExecutorMinPoolSize value to be assigned to property defaultSessionExecutorMinPoolSize
*/
public void setDefaultSessionExecutorMinPoolSize(int defaultSessionExecutorMinPoolSize) {
this.defaultSessionExecutorMinPoolSize = defaultSessionExecutorMinPoolSize;
@@ -1247,7 +1266,7 @@ public void setDefaultSessionExecutorMinPoolSize(int defaultSessionExecutorMinPo
/**
* Setter method for property defaultSessionExecutorMaxPoolSize.
*
- * @param defaultSessionExecutorMaxPoolSize value to be assigned to property defaultSessionExecutorMaxPoolSize
+ * @param defaultSessionExecutorMaxPoolSize value to be assigned to property defaultSessionExecutorMaxPoolSize
*/
public void setDefaultSessionExecutorMaxPoolSize(int defaultSessionExecutorMaxPoolSize) {
this.defaultSessionExecutorMaxPoolSize = defaultSessionExecutorMaxPoolSize;
@@ -1256,7 +1275,7 @@ public void setDefaultSessionExecutorMaxPoolSize(int defaultSessionExecutorMaxPo
/**
* Setter method for property defaultSessionExecutorKeepAliveTime.
*
- * @param defaultSessionExecutorKeepAliveTime value to be assigned to property defaultSessionExecutorKeepAliveTime
+ * @param defaultSessionExecutorKeepAliveTime value to be assigned to property defaultSessionExecutorKeepAliveTime
*/
public void setDefaultSessionExecutorKeepAliveTime(long defaultSessionExecutorKeepAliveTime) {
this.defaultSessionExecutorKeepAliveTime = defaultSessionExecutorKeepAliveTime;
@@ -1292,7 +1311,7 @@ public int getConnectClientExecutorQueueSize() {
/**
* Setter method for property connectClientExecutorMinPoolSize.
*
- * @param connectClientExecutorMinPoolSize value to be assigned to property connectClientExecutorMinPoolSize
+ * @param connectClientExecutorMinPoolSize value to be assigned to property connectClientExecutorMinPoolSize
*/
public void setConnectClientExecutorMinPoolSize(int connectClientExecutorMinPoolSize) {
this.connectClientExecutorMinPoolSize = connectClientExecutorMinPoolSize;
@@ -1301,7 +1320,7 @@ public void setConnectClientExecutorMinPoolSize(int connectClientExecutorMinPool
/**
* Setter method for property connectClientExecutorMaxPoolSize.
*
- * @param connectClientExecutorMaxPoolSize value to be assigned to property connectClientExecutorMaxPoolSize
+ * @param connectClientExecutorMaxPoolSize value to be assigned to property connectClientExecutorMaxPoolSize
*/
public void setConnectClientExecutorMaxPoolSize(int connectClientExecutorMaxPoolSize) {
this.connectClientExecutorMaxPoolSize = connectClientExecutorMaxPoolSize;
@@ -1310,7 +1329,7 @@ public void setConnectClientExecutorMaxPoolSize(int connectClientExecutorMaxPool
/**
* Setter method for property connectClientExecutorQueueSize.
*
- * @param connectClientExecutorQueueSize value to be assigned to property connectClientExecutorQueueSize
+ * @param connectClientExecutorQueueSize value to be assigned to property connectClientExecutorQueueSize
*/
public void setConnectClientExecutorQueueSize(int connectClientExecutorQueueSize) {
this.connectClientExecutorQueueSize = connectClientExecutorQueueSize;
@@ -1329,7 +1348,7 @@ public int getDataChangeFetchTaskMaxBufferSize() {
/**
* Setter method for property dataChangeFetchTaskMaxBufferSize.
*
- * @param dataChangeFetchTaskMaxBufferSize value to be assigned to property dataChangeFetchTaskMaxBufferSize
+ * @param dataChangeFetchTaskMaxBufferSize value to be assigned to property dataChangeFetchTaskMaxBufferSize
*/
public void setDataChangeFetchTaskMaxBufferSize(int dataChangeFetchTaskMaxBufferSize) {
this.dataChangeFetchTaskMaxBufferSize = dataChangeFetchTaskMaxBufferSize;
@@ -1348,7 +1367,7 @@ public int getDataChangeFetchTaskWorkerSize() {
/**
* Setter method for property dataChangeFetchTaskWorkerSize.
*
- * @param dataChangeFetchTaskWorkerSize value to be assigned to property dataChangeFetchTaskWorkerSize
+ * @param dataChangeFetchTaskWorkerSize value to be assigned to property dataChangeFetchTaskWorkerSize
*/
public void setDataChangeFetchTaskWorkerSize(int dataChangeFetchTaskWorkerSize) {
this.dataChangeFetchTaskWorkerSize = dataChangeFetchTaskWorkerSize;
@@ -1367,7 +1386,7 @@ public int getUserDataPushRetryWheelTicksSize() {
/**
* Setter method for property userDataPushRetryWheelTicksSize.
*
- * @param userDataPushRetryWheelTicksSize value to be assigned to property userDataPushRetryWheelTicksSize
+ * @param userDataPushRetryWheelTicksSize value to be assigned to property userDataPushRetryWheelTicksSize
*/
public void setUserDataPushRetryWheelTicksSize(int userDataPushRetryWheelTicksSize) {
this.userDataPushRetryWheelTicksSize = userDataPushRetryWheelTicksSize;
@@ -1386,7 +1405,7 @@ public int getUserDataPushRetryWheelTicksDuration() {
/**
* Setter method for property userDataPushRetryWheelTicksDuration.
*
- * @param userDataPushRetryWheelTicksDuration value to be assigned to property userDataPushRetryWheelTicksDuration
+ * @param userDataPushRetryWheelTicksDuration value to be assigned to property userDataPushRetryWheelTicksDuration
*/
public void setUserDataPushRetryWheelTicksDuration(int userDataPushRetryWheelTicksDuration) {
this.userDataPushRetryWheelTicksDuration = userDataPushRetryWheelTicksDuration;
@@ -1405,7 +1424,7 @@ public int getPushDataTaskRetryFirstDelay() {
/**
* Setter method for property pushDataTaskRetryFirstDelay.
*
- * @param pushDataTaskRetryFirstDelay value to be assigned to property pushDataTaskRetryFirstDelay
+ * @param pushDataTaskRetryFirstDelay value to be assigned to property pushDataTaskRetryFirstDelay
*/
public void setPushDataTaskRetryFirstDelay(int pushDataTaskRetryFirstDelay) {
this.pushDataTaskRetryFirstDelay = pushDataTaskRetryFirstDelay;
@@ -1451,7 +1470,7 @@ public int getRenewDatumWheelTaskDelaySec() {
/**
* Setter method for property renewDatumWheelTaskDelaySec .
*
- * @param renewDatumWheelTaskDelaySec value to be assigned to property renewDatumWheelTaskDelaySec
+ * @param renewDatumWheelTaskDelaySec value to be assigned to property renewDatumWheelTaskDelaySec
*/
public void setRenewDatumWheelTaskDelaySec(int renewDatumWheelTaskDelaySec) {
this.renewDatumWheelTaskDelaySec = renewDatumWheelTaskDelaySec;
@@ -1469,7 +1488,7 @@ public int getRenewDatumWheelTaskRandomFirstDelaySec() {
/**
* Setter method for property renewDatumWheelTaskRandomFirstDelaySec .
*
- * @param renewDatumWheelTaskRandomFirstDelaySec value to be assigned to property renewDatumWheelTaskRandomFirstDelaySec
+ * @param renewDatumWheelTaskRandomFirstDelaySec value to be assigned to property renewDatumWheelTaskRandomFirstDelaySec
*/
public void setRenewDatumWheelTaskRandomFirstDelaySec(int renewDatumWheelTaskRandomFirstDelaySec) {
this.renewDatumWheelTaskRandomFirstDelaySec = renewDatumWheelTaskRandomFirstDelaySec;
@@ -1478,7 +1497,7 @@ public void setRenewDatumWheelTaskRandomFirstDelaySec(int renewDatumWheelTaskRan
/**
* Setter method for property renewDatumWheelTicksSize.
*
- * @param renewDatumWheelTicksSize value to be assigned to property renewDatumWheelTicksSize
+ * @param renewDatumWheelTicksSize value to be assigned to property renewDatumWheelTicksSize
*/
public void setRenewDatumWheelTicksSize(int renewDatumWheelTicksSize) {
this.renewDatumWheelTicksSize = renewDatumWheelTicksSize;
@@ -1487,7 +1506,7 @@ public void setRenewDatumWheelTicksSize(int renewDatumWheelTicksSize) {
/**
* Setter method for property renewDatumWheelTicksDuration.
*
- * @param renewDatumWheelTicksDuration value to be assigned to property renewDatumWheelTicksDuration
+ * @param renewDatumWheelTicksDuration value to be assigned to property renewDatumWheelTicksDuration
*/
public void setRenewDatumWheelTicksDuration(int renewDatumWheelTicksDuration) {
this.renewDatumWheelTicksDuration = renewDatumWheelTicksDuration;
@@ -1496,7 +1515,7 @@ public void setRenewDatumWheelTicksDuration(int renewDatumWheelTicksDuration) {
/**
* Setter method for property pushDataTaskRetryIncrementDelay.
*
- * @param pushDataTaskRetryIncrementDelay value to be assigned to property pushDataTaskRetryIncrementDelay
+ * @param pushDataTaskRetryIncrementDelay value to be assigned to property pushDataTaskRetryIncrementDelay
*/
public void setPushDataTaskRetryIncrementDelay(long pushDataTaskRetryIncrementDelay) {
this.pushDataTaskRetryIncrementDelay = pushDataTaskRetryIncrementDelay;
@@ -1523,7 +1542,7 @@ public String getBlacklistSubDataIdRegex() {
/**
* Setter method for property blacklistPubDataIdRegex.
*
- * @param blacklistPubDataIdRegex value to be assigned to property blacklistPubDataIdRegex
+ * @param blacklistPubDataIdRegex value to be assigned to property blacklistPubDataIdRegex
*/
public void setBlacklistPubDataIdRegex(String blacklistPubDataIdRegex) {
this.blacklistPubDataIdRegex = blacklistPubDataIdRegex;
@@ -1532,7 +1551,7 @@ public void setBlacklistPubDataIdRegex(String blacklistPubDataIdRegex) {
/**
* Setter method for property blacklistSubDataIdRegex.
*
- * @param blacklistSubDataIdRegex value to be assigned to property blacklistSubDataIdRegex
+ * @param blacklistSubDataIdRegex value to be assigned to property blacklistSubDataIdRegex
*/
public void setBlacklistSubDataIdRegex(String blacklistSubDataIdRegex) {
this.blacklistSubDataIdRegex = blacklistSubDataIdRegex;
@@ -1550,7 +1569,7 @@ public long getPushTaskConfirmWaitTimeout() {
/**
* Setter method for property pushTaskConfirmWaitTimeout.
*
- * @param pushTaskConfirmWaitTimeout value to be assigned to property pushTaskConfirmWaitTimeout
+ * @param pushTaskConfirmWaitTimeout value to be assigned to property pushTaskConfirmWaitTimeout
*/
public void setPushTaskConfirmWaitTimeout(long pushTaskConfirmWaitTimeout) {
this.pushTaskConfirmWaitTimeout = pushTaskConfirmWaitTimeout;
@@ -1595,7 +1614,7 @@ public int getPushTaskConfirmCheckExecutorThreadSize() {
/**
* Setter method for property pushTaskConfirmCheckWheelTicksSize.
*
- * @param pushTaskConfirmCheckWheelTicksSize value to be assigned to property pushTaskConfirmCheckWheelTicksSize
+ * @param pushTaskConfirmCheckWheelTicksSize value to be assigned to property pushTaskConfirmCheckWheelTicksSize
*/
public void setPushTaskConfirmCheckWheelTicksSize(int pushTaskConfirmCheckWheelTicksSize) {
this.pushTaskConfirmCheckWheelTicksSize = pushTaskConfirmCheckWheelTicksSize;
@@ -1604,7 +1623,7 @@ public void setPushTaskConfirmCheckWheelTicksSize(int pushTaskConfirmCheckWheelT
/**
* Setter method for property pushTaskConfirmCheckWheelTicksDuration.
*
- * @param pushTaskConfirmCheckWheelTicksDuration value to be assigned to property pushTaskConfirmCheckWheelTicksDuration
+ * @param pushTaskConfirmCheckWheelTicksDuration value to be assigned to property pushTaskConfirmCheckWheelTicksDuration
*/
public void setPushTaskConfirmCheckWheelTicksDuration(int pushTaskConfirmCheckWheelTicksDuration) {
this.pushTaskConfirmCheckWheelTicksDuration = pushTaskConfirmCheckWheelTicksDuration;
@@ -1613,7 +1632,7 @@ public void setPushTaskConfirmCheckWheelTicksDuration(int pushTaskConfirmCheckWh
/**
* Setter method for property pushTaskConfirmCheckExecutorQueueSize.
*
- * @param pushTaskConfirmCheckExecutorQueueSize value to be assigned to property pushTaskConfirmCheckExecutorQueueSize
+ * @param pushTaskConfirmCheckExecutorQueueSize value to be assigned to property pushTaskConfirmCheckExecutorQueueSize
*/
public void setPushTaskConfirmCheckExecutorQueueSize(int pushTaskConfirmCheckExecutorQueueSize) {
this.pushTaskConfirmCheckExecutorQueueSize = pushTaskConfirmCheckExecutorQueueSize;
@@ -1622,7 +1641,7 @@ public void setPushTaskConfirmCheckExecutorQueueSize(int pushTaskConfirmCheckExe
/**
* Setter method for property pushTaskConfirmCheckExecutorThreadSize.
*
- * @param pushTaskConfirmCheckExecutorThreadSize value to be assigned to property pushTaskConfirmCheckExecutorThreadSize
+ * @param pushTaskConfirmCheckExecutorThreadSize value to be assigned to property pushTaskConfirmCheckExecutorThreadSize
*/
public void setPushTaskConfirmCheckExecutorThreadSize(int pushTaskConfirmCheckExecutorThreadSize) {
this.pushTaskConfirmCheckExecutorThreadSize = pushTaskConfirmCheckExecutorThreadSize;
@@ -1667,7 +1686,7 @@ public long getPublishDataExecutorKeepAliveTime() {
/**
* Setter method for property publishDataExecutorMinPoolSize.
*
- * @param publishDataExecutorMinPoolSize value to be assigned to property publishDataExecutorMinPoolSize
+ * @param publishDataExecutorMinPoolSize value to be assigned to property publishDataExecutorMinPoolSize
*/
public void setPublishDataExecutorMinPoolSize(int publishDataExecutorMinPoolSize) {
this.publishDataExecutorMinPoolSize = publishDataExecutorMinPoolSize;
@@ -1676,7 +1695,7 @@ public void setPublishDataExecutorMinPoolSize(int publishDataExecutorMinPoolSize
/**
* Setter method for property publishDataExecutorMaxPoolSize.
*
- * @param publishDataExecutorMaxPoolSize value to be assigned to property publishDataExecutorMaxPoolSize
+ * @param publishDataExecutorMaxPoolSize value to be assigned to property publishDataExecutorMaxPoolSize
*/
public void setPublishDataExecutorMaxPoolSize(int publishDataExecutorMaxPoolSize) {
this.publishDataExecutorMaxPoolSize = publishDataExecutorMaxPoolSize;
@@ -1685,7 +1704,7 @@ public void setPublishDataExecutorMaxPoolSize(int publishDataExecutorMaxPoolSize
/**
* Setter method for property publishDataExecutorQueueSize.
*
- * @param publishDataExecutorQueueSize value to be assigned to property publishDataExecutorQueueSize
+ * @param publishDataExecutorQueueSize value to be assigned to property publishDataExecutorQueueSize
*/
public void setPublishDataExecutorQueueSize(int publishDataExecutorQueueSize) {
this.publishDataExecutorQueueSize = publishDataExecutorQueueSize;
@@ -1694,7 +1713,7 @@ public void setPublishDataExecutorQueueSize(int publishDataExecutorQueueSize) {
/**
* Setter method for property publishDataExecutorKeepAliveTime.
*
- * @param publishDataExecutorKeepAliveTime value to be assigned to property publishDataExecutorKeepAliveTime
+ * @param publishDataExecutorKeepAliveTime value to be assigned to property publishDataExecutorKeepAliveTime
*/
public void setPublishDataExecutorKeepAliveTime(long publishDataExecutorKeepAliveTime) {
this.publishDataExecutorKeepAliveTime = publishDataExecutorKeepAliveTime;
@@ -1712,7 +1731,7 @@ public double getAccessLimitRate() {
/**
* Setter method for property accessLimitRate.
*
- * @param accessLimitRate value to be assigned to property accessLimitRate
+ * @param accessLimitRate value to be assigned to property accessLimitRate
*/
public void setAccessLimitRate(double accessLimitRate) {
this.accessLimitRate = accessLimitRate;
@@ -1731,7 +1750,7 @@ public int getDataClientConnNum() {
/**
* Setter method for property dataClientConnNum .
*
- * @param dataClientConnNum value to be assigned to property dataClientConnNum
+ * @param dataClientConnNum value to be assigned to property dataClientConnNum
*/
public void setDataClientConnNum(int dataClientConnNum) {
this.dataClientConnNum = dataClientConnNum;
@@ -1778,7 +1797,7 @@ public int getSessionSchedulerPoolSize() {
/**
* Setter method for property sessionSchedulerPoolSize .
*
- * @param sessionSchedulerPoolSize value to be assigned to property sessionSchedulerPoolSize
+ * @param sessionSchedulerPoolSize value to be assigned to property sessionSchedulerPoolSize
*/
public void setSessionSchedulerPoolSize(int sessionSchedulerPoolSize) {
this.sessionSchedulerPoolSize = sessionSchedulerPoolSize;
@@ -1796,7 +1815,7 @@ public int getDataNodeExchangeForFetchDatumTimeOut() {
/**
* Setter method for property dataNodeExchangeForFetchDatumTimeOut .
*
- * @param dataNodeExchangeForFetchDatumTimeOut value to be assigned to property dataNodeExchangeForFetchDatumTimeOut
+ * @param dataNodeExchangeForFetchDatumTimeOut value to be assigned to property dataNodeExchangeForFetchDatumTimeOut
*/
public void setDataNodeExchangeForFetchDatumTimeOut(int dataNodeExchangeForFetchDatumTimeOut) {
this.dataNodeExchangeForFetchDatumTimeOut = dataNodeExchangeForFetchDatumTimeOut;
@@ -1804,6 +1823,7 @@ public void setDataNodeExchangeForFetchDatumTimeOut(int dataNodeExchangeForFetch
/**
* Getter method for property slotSyncPublisherMaxNum.
+ *
* @return property value of slotSyncPublisherMaxNum
*/
@Override
@@ -1824,6 +1844,7 @@ public Set getMetaServerIpAddresses() {
/**
* Setter method for property slotSyncPublisherMaxNum.
+ *
* @param slotSyncPublisherMaxNum value to be assigned to property slotSyncPublisherMaxNum
*/
public void setSlotSyncPublisherMaxNum(int slotSyncPublisherMaxNum) {
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
index 5fd124e97..cf7db8f0f 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.registry.server.session.assemble.AppAssembleService;
import com.alipay.sofa.registry.server.session.assemble.AppInterfaceAssembleService;
import com.alipay.sofa.registry.server.session.assemble.AssembleService;
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java
index a50172eec..2c50eb0be 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java
@@ -16,16 +16,13 @@
*/
package com.alipay.sofa.registry.server.session.cache;
-import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.DataInfo;
-import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.server.session.node.NodeManager;
-import com.alipay.sofa.registry.server.session.node.NodeManagerFactory;
import com.alipay.sofa.registry.server.session.store.Interests;
+import com.alipay.sofa.registry.server.shared.meta.MetaServerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
@@ -33,11 +30,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.stream.Collectors;
/**
- *
* @author xiaojian.xj
* @version $Id: SessionDatumCacheDecorator.java, v 0.1 2020年11月03日 20:23 xiaojian.xj Exp $
*/
@@ -52,6 +47,9 @@ public class SessionDatumCacheDecorator {
@Autowired
private CacheService sessionCacheService;
+ @Autowired
+ private MetaServerService metaServerService;
+
@Autowired
private AppRevisionCacheRegistry appRevisionCacheRegistry;
@@ -76,12 +74,12 @@ public Datum getDatumCache(String dataCenter, String dataInfoId) {
public Map getDatumsCache(String dataInfoId) {
Map map = new HashMap<>();
- NodeManager nodeManager = NodeManagerFactory.getNodeManager(Node.NodeType.META);
- Collection dataCenters = nodeManager.getDataCenters();
+
+ Collection dataCenters = metaServerService.getDataCenters();
if (dataCenters != null) {
Collection keys = dataCenters.stream().map(dataCenter -> new Key(Key.KeyType.OBJ,
- DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter)))
- .collect(Collectors.toList());
+ DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter)))
+ .collect(Collectors.toList());
Map values = null;
try {
@@ -90,10 +88,10 @@ public Map getDatumsCache(String dataInfoId) {
// The version is set to 0, so that when session checks the datum versions regularly, it will actively re-query the data.
for (String dataCenter : dataCenters) {
boolean result = sessionInterests.checkAndUpdateInterestVersionZero(dataCenter,
- dataInfoId);
+ dataInfoId);
taskLogger.error(String.format(
- "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s",
- result, e.getMessage()), e);
+ "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s",
+ result, e.getMessage()), e);
}
}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java
deleted file mode 100644
index b08d4a7bc..000000000
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.session.node;
-
-import java.util.*;
-
-import com.alipay.sofa.registry.common.model.Node.NodeType;
-import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult;
-import com.alipay.sofa.registry.common.model.metaserver.RenewNodesRequest;
-import com.alipay.sofa.registry.common.model.metaserver.SessionNode;
-import com.alipay.sofa.registry.common.model.store.URL;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.net.NetUtil;
-import com.alipay.sofa.registry.remoting.exchange.RequestException;
-import com.alipay.sofa.registry.remoting.exchange.message.Request;
-
-/**
- *
- * @author shangyu.wh
- * @version $Id: SessionNodeManager.java, v 0.1 2018-03-05 10:42 shangyu.wh Exp $
- */
-public class SessionNodeManager extends AbstractNodeManager {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class);
-
- @Override
- public SessionNode getNode(String dataInfoId) {
- return null;
- }
-
- @Override
- public NodeType getNodeType() {
- return NodeType.SESSION;
- }
-
- public List getZoneServerList(String zonename) {
- List serverList = new ArrayList<>();
- Collection sessionNodes = getDataCenterNodes();
- if (sessionNodes != null && !sessionNodes.isEmpty()) {
- if (zonename == null || zonename.isEmpty()) {
- zonename = sessionServerConfig.getSessionServerRegion();
- }
- for (SessionNode sessionNode : sessionNodes) {
- if (zonename.equals(sessionNode.getRegionId())) {
- URL url = sessionNode.getNodeUrl();
- if (url != null) {
- serverList.add(url.getIpAddress());
- }
- }
- }
-
- }
- return serverList;
- }
-
- @Override
- public void renewNode() {
- try {
-
- Request renewNodesRequestRequest = new Request() {
-
- @Override
- public RenewNodesRequest getRequestBody() {
- URL clientUrl = new URL(NetUtil.getLocalAddress().getHostAddress(), 0);
- SessionNode sessionNode = new SessionNode(clientUrl,
- sessionServerConfig.getSessionServerRegion());
-
- return new RenewNodesRequest(sessionNode);
- }
-
- @Override
- public URL getRequestUrl() {
- return new URL(raftClientManager.getLeader().getIp(),
- sessionServerConfig.getMetaServerPort());
- }
- };
-
- metaNodeExchanger.request(renewNodesRequestRequest);
- } catch (RequestException e) {
- throw new RuntimeException("SessionNodeManager renew node error! " + e.getMessage(), e);
- }
- }
-
- @Override
- public void updateNodes(NodeChangeResult nodeChangeResult) {
- write.lock();
- try {
- Long receiveVersion = nodeChangeResult.getVersion();
- boolean versionChange = checkAndUpdateListVersions(
- sessionServerConfig.getSessionServerDataCenter(), receiveVersion);
- if (!versionChange) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Current data type {} list version has not updated!",
- getNodeType());
- }
- }
- nodes = nodeChangeResult.getNodes();
- } finally {
- write.unlock();
- }
- }
-}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java
index 851d44a16..c80b0dfd9 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java
@@ -28,13 +28,13 @@
import com.alipay.sofa.registry.remoting.exchange.message.Response;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.node.RaftClientManager;
-import com.alipay.sofa.registry.server.session.node.SessionNodeManager;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
public class AppRevisionNodeServiceImpl implements AppRevisionNodeService {
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class,
+ private static final Logger LOGGER = LoggerFactory.getLogger(
+ AppRevisionNodeServiceImpl.class,
"[AppRevisionService]");
@Autowired
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java
index 4a33cc09c..c23c4ffc5 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java
@@ -18,8 +18,6 @@
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.remoting.CallbackHandler;
-import com.alipay.sofa.registry.core.model.AppRevisionRegister;
-import com.alipay.sofa.registry.core.model.AppRevisionKey;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
/**
@@ -29,11 +27,4 @@
public interface ClientNodeService {
void pushWithCallback(Object object, URL url, CallbackHandler callbackHandler);
- /**
- * fetch persistence data from meta server
- *
- * @param dataInfoId
- * @return
- */
- ProvideData fetchData(String dataInfoId);
}
\ No newline at end of file
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java
deleted file mode 100644
index bbe2caff2..000000000
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.registry.server.session.node.service;
-
-import com.alipay.sofa.registry.common.model.metaserver.*;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import com.alipay.sofa.registry.common.model.store.URL;
-import com.alipay.sofa.registry.log.Logger;
-import com.alipay.sofa.registry.log.LoggerFactory;
-import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
-import com.alipay.sofa.registry.remoting.exchange.RequestException;
-import com.alipay.sofa.registry.remoting.exchange.message.Request;
-import com.alipay.sofa.registry.remoting.exchange.message.Response;
-import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
-import com.alipay.sofa.registry.server.session.node.RaftClientManager;
-import com.alipay.sofa.registry.server.session.node.SessionNodeManager;
-
-import java.util.List;
-
-/**
- * @author shangyu.wh
- * @version $Id: MetaNodeServiceImpl.java, v 0.1 2018-04-17 21:23 shangyu.wh Exp $
- */
-public class MetaNodeServiceImpl implements MetaNodeService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class,
- "[MetaNodeService]");
-
- @Autowired
- protected SessionServerConfig sessionServerConfig;
-
- @Autowired
- protected NodeExchanger metaNodeExchanger;
-
- @Autowired
- RaftClientManager raftClientManager;
-
- @Override
- public ProvideData fetchData(String dataInfoId) {
- try {
-
- Request request = new Request() {
-
- @Override
- public FetchProvideDataRequest getRequestBody() {
-
- return new FetchProvideDataRequest(dataInfoId);
- }
-
- @Override
- public URL getRequestUrl() {
- return new URL(raftClientManager.getLeader().getIp(),
- sessionServerConfig.getMetaServerPort());
- }
- };
-
- Response response = metaNodeExchanger.request(request);
-
- Object result = response.getResult();
- if (result instanceof ProvideData) {
- return (ProvideData) result;
- } else {
- LOGGER.error("fetch null provider data!");
- throw new RuntimeException("MetaNodeService fetch null provider data!");
- }
- } catch (RequestException e) {
- LOGGER.error("fetch provider data error! " + e.getMessage(), e);
- throw new RuntimeException("fetch provider data error! " + e.getMessage(), e);
- }
- }
-}
\ No newline at end of file
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java
index 641c2b8ba..66f3a781b 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java
@@ -26,9 +26,6 @@
import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry;
import org.springframework.beans.factory.annotation.Autowired;
-import com.alipay.sofa.registry.common.model.Node;
-import com.alipay.sofa.registry.common.model.RenewDatumRequest;
-import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.StoreData;
import com.alipay.sofa.registry.common.model.store.Subscriber;
@@ -58,11 +55,6 @@
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import com.google.common.collect.Lists;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
/**
* @author shangyu.wh
@@ -70,64 +62,65 @@
*/
public class SessionRegistry implements Registry {
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(SessionRegistry.class);
- protected static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class,
- "[Task]");
+ protected static final Logger TASK_LOGGER = LoggerFactory.getLogger(
+ SessionRegistry.class, "[Task]");
/**
* store subscribers
*/
@Autowired
- private Interests sessionInterests;
+ private Interests sessionInterests;
/**
* store watchers
*/
@Autowired
- private Watchers sessionWatchers;
+ private Watchers sessionWatchers;
/**
* store publishers
*/
@Autowired
- private DataStore sessionDataStore;
+ private DataStore sessionDataStore;
/**
* transfer data to DataNode
*/
@Autowired
- private DataNodeService dataNodeService;
+ private DataNodeService dataNodeService;
/**
* trigger task com.alipay.sofa.registry.server.meta.listener process
*/
@Autowired
- private TaskListenerManager taskListenerManager;
+ private TaskListenerManager taskListenerManager;
@Autowired
- private SessionServerConfig sessionServerConfig;
+ private SessionServerConfig sessionServerConfig;
@Autowired
- private Exchange boltExchange;
+ private Exchange boltExchange;
@Autowired
- private SessionRegistryStrategy sessionRegistryStrategy;
+ private SessionRegistryStrategy sessionRegistryStrategy;
@Autowired
private WrapperInterceptorManager wrapperInterceptorManager;
@Autowired
- private DataIdMatchStrategy dataIdMatchStrategy;
+ private DataIdMatchStrategy dataIdMatchStrategy;
@Autowired
- private WriteDataAcceptor writeDataAcceptor;
+ private WriteDataAcceptor writeDataAcceptor;
@Autowired
- private SlotTableCache slotTableCache;
- private AppRevisionCacheRegistry appRevisionCacheRegistry;
+ private SlotTableCache slotTableCache;
+ private AppRevisionCacheRegistry appRevisionCacheRegistry;
- private volatile boolean enableDataRenewSnapshot = true;
+ private volatile boolean enableDataRenewSnapshot = true;
@Override
public void register(StoreData storeData) {
@@ -199,7 +192,7 @@ public void unRegister(StoreData storeData) {
// All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
// are handed over to WriteDataAcceptor
writeDataAcceptor.accept(new PublisherWriteDataRequest(publisher,
- WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache
+ WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache
.getLeader(publisher.getDataInfoId())));
sessionRegistryStrategy.afterPublisherUnRegister(publisher);
@@ -332,81 +325,81 @@ public void remove(List connectIds) {
subMap.forEach((registerId, sub) -> {
if (isFireSubscriberPushEmptyTask(sub.getDataId())) {
- fireSubscriberPushEmptyTask(sub);
- }
- });
- }
+ fireSubscriberPushEmptyTask(sub);
+ }
+ });
+ }
- if (pubExisted || subExisted) {
- connectIdsAll.add(connectId);
- }
- });
- cancel(connectIdsAll);
- }
+ if (pubExisted || subExisted) {
+ connectIdsAll.add(connectId);
+ }
+ });
+ cancel(connectIdsAll);
+ }
- protected boolean isFireSubscriberPushEmptyTask (String dataId){
- return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex());
- }
+ protected boolean isFireSubscriberPushEmptyTask(String dataId) {
+ return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex());
+ }
- private void fireSubscriberPushEmptyTask (Subscriber subscriber){
- //trigger empty data push
- TaskEvent taskEvent = new TaskEvent(subscriber,
- TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK);
- TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent);
- getTaskListenerManager().sendTaskEvent(taskEvent);
- }
+ private void fireSubscriberPushEmptyTask(Subscriber subscriber) {
+ //trigger empty data push
+ TaskEvent taskEvent = new TaskEvent(subscriber,
+ TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK);
+ TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent);
+ getTaskListenerManager().sendTaskEvent(taskEvent);
+ }
- public void cleanClientConnect () {
-
- Set connectIndexes = new HashSet<>();
- Set pubIndexes = sessionDataStore.getConnectIds();
- Set subIndexes = sessionInterests.getConnectIds();
- Set watchIndexes = sessionWatchers.getConnectIds();
- connectIndexes.addAll(pubIndexes);
- connectIndexes.addAll(subIndexes);
- connectIndexes.addAll(watchIndexes);
-
- Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort());
-
- List connectIds = new ArrayList<>();
- for (ConnectId connectId : connectIndexes) {
- Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(),
- connectId.getClientPort()));
- if (channel == null) {
- connectIds.add(connectId);
- LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}",
- connectId);
- }
- }
- if (!connectIds.isEmpty()) {
- cancel(connectIds);
+ public void cleanClientConnect() {
+
+ Set connectIndexes = new HashSet<>();
+ Set pubIndexes = sessionDataStore.getConnectIds();
+ Set subIndexes = sessionInterests.getConnectIds();
+ Set watchIndexes = sessionWatchers.getConnectIds();
+ connectIndexes.addAll(pubIndexes);
+ connectIndexes.addAll(subIndexes);
+ connectIndexes.addAll(watchIndexes);
+
+ Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort());
+
+ List connectIds = new ArrayList<>();
+ for (ConnectId connectId : connectIndexes) {
+ Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(),
+ connectId.getClientPort()));
+ if (channel == null) {
+ connectIds.add(connectId);
+ LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}",
+ connectId);
}
}
-
- /**
- * Getter method for property sessionInterests.
- *
- * @return property value of sessionInterests
- */
- protected Interests getSessionInterests () {
- return sessionInterests;
+ if (!connectIds.isEmpty()) {
+ cancel(connectIds);
}
+ }
- /**
- * Getter method for property sessionDataStore.
- *
- * @return property value of sessionDataStore
- */
- protected DataStore getSessionDataStore () {
- return sessionDataStore;
- }
+ /**
+ * Getter method for property sessionInterests.
+ *
+ * @return property value of sessionInterests
+ */
+ protected Interests getSessionInterests() {
+ return sessionInterests;
+ }
- /**
- * Getter method for property taskListenerManager.
- *
- * @return property value of taskListenerManager
- */
- protected TaskListenerManager getTaskListenerManager () {
- return taskListenerManager;
- }
- }
\ No newline at end of file
+ /**
+ * Getter method for property sessionDataStore.
+ *
+ * @return property value of sessionDataStore
+ */
+ protected DataStore getSessionDataStore() {
+ return sessionDataStore;
+ }
+
+ /**
+ * Getter method for property taskListenerManager.
+ *
+ * @return property value of taskListenerManager
+ */
+ protected TaskListenerManager getTaskListenerManager() {
+ return taskListenerManager;
+ }
+}
\ No newline at end of file
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java
index 395814ee9..b60f2fbad 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java
@@ -27,6 +27,7 @@
import com.alipay.sofa.registry.common.model.store.DataInfo;
import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry;
import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator;
+import com.alipay.sofa.registry.util.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import com.alipay.sofa.registry.common.model.Node.NodeType;
@@ -140,6 +141,9 @@ public Object doHandle(Channel channel, DataChangeRequest dataChangeRequest) {
}
private void refreshMeta(Collection revisions) {
+ if (revisions == null || revisions.isEmpty()) {
+ return;
+ }
for (String revision : revisions) {
appRevisionCacheRegistry.getRevision(revision);
}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java
index 44dd904ba..b25ba2d42 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java
@@ -26,13 +26,16 @@
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.core.model.AssembleType;
+import com.alipay.sofa.registry.core.model.ReceivedData;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.session.assemble.SubscriberAssembleStrategy;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry;
+import com.alipay.sofa.registry.server.session.cache.DatumKey;
import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator;
+import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter;
import com.alipay.sofa.registry.server.session.push.FirePushService;
import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager;
import com.alipay.sofa.registry.server.session.store.Interests;
@@ -43,16 +46,12 @@
import org.springframework.util.CollectionUtils;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
- *
* @author shangyu.wh
* @version $Id: DataChangeFetchTask.java, v 0.1 2017-12-13 12:25 shangyu.wh Exp $
*/
@@ -157,7 +156,7 @@ private void doExecute(String dataInfoId) {
subscriber -> subscriber.getAssembleType() == assembleType)
.collect(Collectors.toList());
- if(subscribersSend.isEmpty()){
+ if (subscribersSend.isEmpty()) {
continue;
}
Subscriber defaultSubscriber = subscribersSend.stream().findFirst().get();
@@ -256,99 +255,6 @@ private void evictReSubscribers(Collection subscribersPush) {
}
}
- private void fireReceivedDataMultiPushTask(Datum datum, List subscriberRegisterIdList,
- Collection subscribers, ScopeEnum scopeEnum,
- Subscriber subscriber, PushTaskClosure pushTaskClosure) {
- String dataId = datum.getDataId();
- String clientCell = sessionServerConfig.getClientCell(subscriber.getCell());
- Predicate zonePredicate = (zone) -> {
- if (!clientCell.equals(zone)) {
- if (ScopeEnum.zone == scopeEnum) {
- // zone scope subscribe only return zone list
- return true;
-
- } else if (ScopeEnum.dataCenter == scopeEnum || ScopeEnum.global == scopeEnum) {
- // disable zone config
- return sessionServerConfig.isInvalidForeverZone(zone) && !sessionServerConfig
- .isInvalidIgnored(dataId);
- }
- }
- return false;
- };
- ReceivedData receivedData = ReceivedDataConverter
- .getReceivedDataMulti(datum, scopeEnum, subscriberRegisterIdList,
- clientCell, zonePredicate);
-
- //trigger push to client node
- Map parameter = new HashMap<>();
- parameter.put(receivedData, subscriber.getSourceAddress());
- TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK);
- taskEvent.setTaskClosure(pushTaskClosure);
- taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
- taskLogger.info("send {} taskURL:{},taskScope:{},,taskId={}", taskEvent.getTaskType(),
- subscriber.getSourceAddress(), scopeEnum, taskEvent.getTaskId());
- taskListenerManager.sendTaskEvent(taskEvent);
- }
-
- private Map> getCache(ScopeEnum scopeEnum) {
- return sessionInterests.querySubscriberIndex(dataChangeRequest.getDataInfoId(), scopeEnum);
- }
-
- private Datum getDatumCache() {
- // build key
- DatumKey datumKey = new DatumKey(dataChangeRequest.getDataInfoId(),
- dataChangeRequest.getDataCenter());
- Key key = new Key(KeyType.OBJ, DatumKey.class.getName(), datumKey);
-
- // get from cache (it will fetch from backend server)
- Value value = null;
- try {
- value = sessionCacheService.getValue(key);
- } catch (CacheAccessException e) {
- LOGGER.error("error when access cache: {}", datumKey, e);
- }
-
- return value == null ? null : value.getPayload();
- }
-
- private void fireUserDataElementPushTask(InetSocketAddress address, Datum datum,
- Collection subscribers,
- PushTaskClosure pushTaskClosure) {
-
- TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_PUSH_TASK);
- taskEvent.setTaskClosure(pushTaskClosure);
- taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion()));
- taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
- taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
- taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, new URL(address));
-
- int size = datum != null ? datum.publisherSize() : 0;
-
- taskLogger.info(
- "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}",
- taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size,
- subscribers.size(), taskEvent.getTaskId());
- taskListenerManager.sendTaskEvent(taskEvent);
- }
-
- private void fireUserDataElementMultiPushTask(InetSocketAddress address, Datum datum,
- Collection subscribers,
- PushTaskClosure pushTaskClosure) {
-
- TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK);
- taskEvent.setTaskClosure(pushTaskClosure);
- taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion()));
- taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
- taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
- taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, new URL(address));
-
- int size = datum != null ? datum.publisherSize() : 0;
-
- taskLogger.info(
- "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}",
- taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size,
- subscribers.size(), taskEvent.getTaskId());
- taskListenerManager.sendTaskEvent(taskEvent);
private Map> getCache(String dataInfoId,
ScopeEnum scopeEnum) {
return sessionInterests.querySubscriberIndex(dataInfoId, scopeEnum);
@@ -378,7 +284,7 @@ public void setTaskEvent(TaskEvent taskEvent) {
/**
* Setter method for property dataChangeRequest.
*
- * @param dataChangeRequest value to be assigned to property dataChangeRequest
+ * @param dataChangeRequest value to be assigned to property dataChangeRequest
*/
public void setDataChangeRequest(DataChangeRequest dataChangeRequest) {
this.dataChangeRequest = dataChangeRequest;
diff --git a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java
index 33a8fca8d..52ba434a2 100644
--- a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java
+++ b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java
@@ -232,7 +232,7 @@ public void doTest() throws Exception {
// post sync data request
DataChangeRequest request = new DataChangeRequest(DataInfo.toDataInfoId(
DATA_ID, DEFAULT_INSTANCE_ID, DEFAULT_GROUP), LOCAL_DATACENTER,
- finalI);
+ finalI, new HashSet<>());
boltChannelMap.forEach((connect,boltChannel)->{