From 5d646aab26d84baae2690a1a1b3504d4bccd23b5 Mon Sep 17 00:00:00 2001 From: dzdx Date: Mon, 14 Dec 2020 11:44:07 +0800 Subject: [PATCH] [app_discovery] fix compile --- pom.xml | 3 - server/common/model/pom.xml | 8 + .../model/slot/DataSlotDiffSyncResult.java | 3 +- .../common/model/store/Publisher.java | 7 +- .../handler/SlotTableChangeHandler.java | 2 - .../handler/DatumSnapshotHandler.java | 195 ------------------ server/server/integration/pom.xml | 24 +++ .../bootstrap/MetaServerConfiguration.java | 11 - .../connection/MetaConnectionHandler.java | 1 + .../remoting/handler/DataNodeHandler.java | 62 ------ .../handler/GetNodesRequestHandler.java | 64 ------ .../remoting/handler/SessionNodeHandler.java | 62 ------ .../bootstrap/SessionServerConfig.java | 1 + .../bootstrap/SessionServerConfigBean.java | 171 ++++++++------- .../bootstrap/SessionServerConfiguration.java | 1 + .../cache/SessionDatumCacheDecorator.java | 24 +-- .../session/node/SessionNodeManager.java | 117 ----------- .../service/AppRevisionNodeServiceImpl.java | 4 +- .../node/service/ClientNodeService.java | 9 - .../node/service/MetaNodeServiceImpl.java | 86 -------- .../session/registry/SessionRegistry.java | 181 ++++++++-------- .../handler/DataChangeRequestHandler.java | 4 + .../scheduler/task/DataChangeFetchTask.java | 108 +--------- .../registry/test/sync/SessionNotifyTest.java | 2 +- 24 files changed, 245 insertions(+), 905 deletions(-) delete mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java delete mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java delete mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java delete mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java delete mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java delete mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java diff --git a/pom.xml b/pom.xml index 4b7088f56..42a69a7f1 100644 --- a/pom.xml +++ b/pom.xml @@ -82,13 +82,10 @@ 6.4.6 ${user.dir} -Dnetwork_interface_denylist=docker0 -<<<<<<< HEAD false false -======= 1.2.51.sec09_noneautotype ->>>>>>> [app_discovery] support subscribe app and interface publisher diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index 88083b2e0..2cadfa49c 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -39,5 +39,13 @@ commons-collections commons-collections + + commons-collections + commons-collections + + + commons-collections + commons-collections + diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java index 70585f25c..7c359f505 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/slot/DataSlotDiffSyncResult.java @@ -20,11 +20,10 @@ import com.alipay.sofa.registry.common.model.store.Publisher; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.springframework.util.CollectionUtils; import java.io.Serializable; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java index a02233960..cf6f44b13 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java @@ -20,14 +20,9 @@ import com.alipay.sofa.registry.common.model.*; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.collect.Collections2; -import org.apache.commons.collections.CollectionUtils; -import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; -import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ParamInfo; -import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ServiceParamInfo; + import com.alipay.sofa.registry.common.model.PublishType; import com.alipay.sofa.registry.common.model.ServerDataBox; -import com.fasterxml.jackson.annotation.JsonIgnore; /** * diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java index 142887b96..a421eb5f6 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/metaserver/handler/SlotTableChangeHandler.java @@ -27,8 +27,6 @@ import com.alipay.sofa.registry.server.data.slot.SlotManager; import com.alipay.sofa.registry.server.shared.remoting.AbstractClientHandler; import com.alipay.sofa.registry.util.ParaCheckUtil; -import org.springframework.beans.factory.annotation.Autowired; -import com.alipay.sofa.registry.server.meta.registry.Registry; /** * diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java deleted file mode 100644 index 0275df15a..000000000 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.data.remoting.sessionserver.handler; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; - -import com.alipay.sofa.registry.common.model.PublisherInternUtil; -import org.springframework.beans.factory.annotation.Autowired; - -import com.alipay.sofa.registry.common.model.CommonResponse; -import com.alipay.sofa.registry.common.model.DatumSnapshotRequest; -import com.alipay.sofa.registry.common.model.Node; -import com.alipay.sofa.registry.common.model.PublisherDigestUtil; -import com.alipay.sofa.registry.common.model.constants.ValueConstants; -import com.alipay.sofa.registry.common.model.store.Publisher; -import com.alipay.sofa.registry.common.model.store.WordCache; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.server.data.cache.DatumCache; -import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter; -import com.alipay.sofa.registry.server.data.change.event.DatumSnapshotEvent; -import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler; -import com.alipay.sofa.registry.server.data.renew.DatumLeaseManager; -import com.alipay.sofa.registry.util.ParaCheckUtil; - -/** - * handling snapshot request - * - * @author kezhu.wukz - * @version $Id: ClientOffProcessor.java, v 0.1 2019-05-30 15:48 kezhu.wukz Exp $ - */ -public class DatumSnapshotHandler extends AbstractServerHandler { - - private static final Logger RENEW_LOGGER = LoggerFactory.getLogger( - ValueConstants.LOGGER_NAME_RENEW, - "[DatumSnapshotHandler]"); - - /** Limited List Printing */ - private static final int LIMITED_LIST_SIZE_FOR_PRINT = 10; - - @Autowired - private DataChangeEventCenter dataChangeEventCenter; - - @Autowired - private DatumLeaseManager datumLeaseManager; - - @Autowired - private DatumCache datumCache; - - @Autowired - private ThreadPoolExecutor renewDatumProcessorExecutor; - - @Override - public Executor getExecutor() { - return renewDatumProcessorExecutor; - } - - @Override - public void checkParam(DatumSnapshotRequest request) throws RuntimeException { - ParaCheckUtil.checkNotBlank(request.getConnectId(), "DatumSnapshotRequest.connectId"); - ParaCheckUtil.checkNotEmpty(request.getPublishers(), "DatumSnapshotRequest.publishers"); - } - - @Override - public Object doHandle(Channel channel, DatumSnapshotRequest request) { - RENEW_LOGGER.info("Received datumSnapshotRequest: {}", request); - - String connectId = WordCache.getInstance().getWordCache(request.getConnectId()); - - // convert to pubMap, and wrap it by WordCache - Map pubMap = new HashMap<>(); - List publishers = request.getPublishers(); - if (publishers != null) { - for (Publisher publisher : publishers) { - PublisherInternUtil.internPublisher(publisher); - pubMap.put(publisher.getRegisterId(), publisher); - } - } - - // diff the cache and snapshot - boolean isDiff = true; - Map cachePubMap = datumCache.getOwnByConnectId(connectId); - if (cachePubMap == null) { - RENEW_LOGGER - .info( - ">>>>>>> connectId={}, cachePubMap.size=0, pubMap.size={}, isDiff={}, the diff is: pubMap={}", - connectId, pubMap.size(), isDiff, limitedToString(pubMap.values())); - } else { - List diffPub1 = subtract(pubMap, cachePubMap); - List diffPub2 = subtract(cachePubMap, pubMap); - if (diffPub1.size() == 0 && diffPub2.size() == 0) { - isDiff = false; - } - RENEW_LOGGER - .info( - ">>>>>>> connectId={}, cachePubMap.size={}, pubMap.size={}, isDiff={}, the diff is: pubMap-cachePubMap=(size:{}){}, cachePubMap-pubMap=(size:{}){}", - connectId, cachePubMap.size(), pubMap.size(), isDiff, diffPub1.size(), - limitedToString(diffPub1), diffPub2.size(), limitedToString(diffPub2)); - } - - if (isDiff) { - // build DatumSnapshotEvent and send to eventCenter - dataChangeEventCenter.onChange(new DatumSnapshotEvent(connectId, cachePubMap, pubMap)); - } - - // record the renew timestamp - datumLeaseManager.renew(connectId); - - return CommonResponse.buildSuccessResponse(); - } - - /** - * Limited List Printing - */ - private String limitedToString(Collection publishers) { - Iterator it = publishers.iterator(); - if (!it.hasNext()) - return "[]"; - - StringBuilder sb = new StringBuilder(); - sb.append('['); - int i = 1; - for (;;) { - Publisher e = it.next(); - sb.append("Publisher{dataInfoId='").append(e.getDataInfoId()).append('\''); - sb.append(", cell='").append(e.getCell()).append('\''); - sb.append(", registerId='").append(e.getRegisterId()).append('\''); - sb.append(", version=").append(e.getVersion()); - sb.append(", sourceAddress=").append(e.getSourceAddress()); - sb.append(", registerTimestamp=").append(e.getRegisterTimestamp()); - sb.append(", clientRegisterTimestamp=").append(e.getClientRegisterTimestamp()); - sb.append('}'); - if (!it.hasNext() || i++ >= LIMITED_LIST_SIZE_FOR_PRINT) - return sb.append(']').toString(); - sb.append(',').append(' '); - } - } - - private List subtract(Map pubMap1, Map pubMap2) { - List list = new ArrayList(); - for (Map.Entry entry : pubMap1.entrySet()) { - String registerId = entry.getKey(); - Publisher publisher1 = entry.getValue(); - Publisher publisher2 = pubMap2.get(registerId); - if (publisher2 == null - || PublisherDigestUtil.getDigestValue(publisher1) != PublisherDigestUtil - .getDigestValue(publisher2)) { - list.add(publisher1); - } - } - return list; - } - - @Override - public CommonResponse buildFailedResponse(String msg) { - return CommonResponse.buildFailedResponse(msg); - } - - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - - @Override - public Class interest() { - return DatumSnapshotRequest.class; - } - - @Override - protected Node.NodeType getConnectNodeType() { - return Node.NodeType.DATA; - } -} diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml index 86398665b..01e030445 100644 --- a/server/server/integration/pom.xml +++ b/server/server/integration/pom.xml @@ -55,6 +55,30 @@ 6.0.1 compile + + com.alipay.sofa + registry-common-util + + + com.alipay.sofa + registry-server-data + + + com.alipay.sofa + registry-server-data + + + com.alipay.sofa + registry-server-session + + + com.alipay.sofa + registry-server-data + + + com.alipay.sofa + registry-server-meta + diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java index e7cc29098..d187a731f 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java @@ -67,7 +67,6 @@ import com.alipay.sofa.registry.server.meta.remoting.connection.SessionConnectionHandler; import com.alipay.sofa.registry.server.meta.remoting.handler.FetchProvideDataRequestHandler; import com.alipay.sofa.registry.server.meta.remoting.handler.HeartbeatRequestHandler; -import com.alipay.sofa.registry.server.meta.resource.*; import com.alipay.sofa.registry.server.meta.slot.impl.*; import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler; import com.alipay.sofa.registry.server.meta.repository.NodeConfirmStatusService; @@ -83,19 +82,9 @@ import com.alipay.sofa.registry.server.meta.task.processor.SessionNodeSingleTaskProcessor; import com.alipay.sofa.registry.store.api.DBService; import com.alipay.sofa.registry.util.DefaultExecutorFactory; -import com.alipay.sofa.registry.util.NamedThreadFactory; import com.alipay.sofa.registry.util.OsUtils; import com.alipay.sofa.registry.util.PropertySplitter; -import org.glassfish.jersey.jackson.JacksonFeature; -import org.glassfish.jersey.server.ResourceConfig; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import java.util.ArrayList; -import java.util.Collection; import java.util.concurrent.*; /** diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java index c1c2f5588..e28c2512b 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/connection/MetaConnectionHandler.java @@ -23,6 +23,7 @@ import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfig; import com.alipay.sofa.registry.server.shared.remoting.ListenServerChannelHandler; +import com.alipay.sofa.registry.server.meta.remoting.handler.AbstractServerHandler; import org.springframework.beans.factory.annotation.Autowired; import java.net.InetSocketAddress; diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java deleted file mode 100644 index 37e8ca4f3..000000000 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/DataNodeHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.meta.remoting.handler; - -import com.alipay.sofa.registry.common.model.metaserver.DataNode; -import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.server.meta.registry.Registry; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * Handle data node's register request - * @author shangyu.wh - * @version $Id: DataNodeHandler.java, v 0.1 2018-01-18 18:04 shangyu.wh Exp $ - */ -public class DataNodeHandler extends AbstractServerHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeHandler.class); - - @Autowired - private Registry metaServerRegistry; - - @Override - public Object reply(Channel channel, DataNode dataNode) { - NodeChangeResult nodeChangeResult; - try { - nodeChangeResult = metaServerRegistry.register(dataNode); - LOGGER.info("Data node {} register success!result:{}", dataNode, nodeChangeResult); - } catch (Exception e) { - LOGGER.error("Data node register error!", e); - throw new RuntimeException("Data node register error!", e); - } - return nodeChangeResult; - } - - @Override - public Class interest() { - return DataNode.class; - } - - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - -} \ No newline at end of file diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java deleted file mode 100644 index 2fb6944c7..000000000 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/GetNodesRequestHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.meta.remoting.handler; - -import com.alipay.sofa.registry.common.model.metaserver.GetNodesRequest; -import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.server.meta.registry.Registry; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * Handle session/data node's query request, such as getAllNodes request - * and current this is no use for meta node, it's instead by RAFT - * @author shangyu.wh - * @version $Id: GetNodesRequestHandler.java, v 0.1 2018-03-02 15:12 shangyu.wh Exp $ - */ -public class GetNodesRequestHandler extends AbstractServerHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger("META-CONNECT"); - - @Autowired - private Registry metaServerRegistry; - - @Override - public Object reply(Channel channel, GetNodesRequest getNodesRequest) { - NodeChangeResult nodeChangeResult; - try { - nodeChangeResult = metaServerRegistry.getAllNodes(getNodesRequest.getNodeType()); - LOGGER.info("Get {} change node list {} success!from {}", - getNodesRequest.getNodeType(), nodeChangeResult, channel.getRemoteAddress()); - } catch (Exception e) { - LOGGER.error("Get node list error!", e); - throw new RuntimeException("Get node list error!", e); - } - return nodeChangeResult; - } - - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - - @Override - public Class interest() { - return GetNodesRequest.class; - } - -} \ No newline at end of file diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java deleted file mode 100644 index 613cad34f..000000000 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/SessionNodeHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.meta.remoting.handler; - -import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult; -import com.alipay.sofa.registry.common.model.metaserver.SessionNode; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.server.meta.registry.Registry; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * Handle session node's register request - * @author shangyu.wh - * @version $Id: SessionNodeHandler.java, v 0.1 2018-01-11 17:03 shangyu.wh Exp $ - */ -public class SessionNodeHandler extends AbstractServerHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeHandler.class); - - @Autowired - private Registry metaServerRegistry; - - @Override - public Object reply(Channel channel, SessionNode sessionNode) { - NodeChangeResult nodeChangeResult; - try { - nodeChangeResult = metaServerRegistry.register(sessionNode); - LOGGER.info("Session node {} register success!", sessionNode); - } catch (Exception e) { - LOGGER.error("Session node register error!", e); - throw new RuntimeException("Session node register error!", e); - } - return nodeChangeResult; - } - - @Override - public Class interest() { - return SessionNode.class; - } - - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - -} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java index 10875bcf2..250bcde95 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfig.java @@ -180,5 +180,6 @@ public interface SessionServerConfig { int getSlotSyncPublisherMaxNum(); Set getMetaServerIpAddresses(); + boolean isEnableSessionLoadbalancePolicy(); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java index f44b91f0e..9d57beb3b 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java @@ -27,6 +27,7 @@ /** * The type Session server config bean. + * * @author shangyu.wh * @version $Id : SessionServerConfigBean.java, v 0.1 2017-11-14 11:49 synex Exp $ */ @@ -155,11 +156,6 @@ public class SessionServerConfigBean implements SessionServerConfig { private int renewDatumWheelQueueSize = 10000; private int pushDataTaskRetryFirstDelay = 500; - private int newRevisionTaskMaxBufferSize = 1000000; - - private int newRevisionTaskWorkerSize = 100; - - private int clientNodeExchangeTimeOut = 1000; //time out cause netty HashedWheelTimer occupy a lot of mem private long pushDataTaskRetryIncrementDelay = 500; @@ -191,12 +187,15 @@ public class SessionServerConfigBean implements SessionServerConfig { private boolean beginDataFetchTask = false; - //begin config for enterprise version - /** forever close push zone,such as:RZBETA */ + /** + * forever close push zone,such as:RZBETA + */ private String invalidForeverZones = ""; - /** config regex,exception to the rule of forever close push zone*/ + /** + * config regex,exception to the rule of forever close push zone + */ private String invalidIgnoreDataidRegex = ""; private Set invalidForeverZonesSet; @@ -221,7 +220,7 @@ public class SessionServerConfigBean implements SessionServerConfig { private int slotSyncPublisherMaxNum = 512; - private boolean enableSessionLoadbalancePolicy = false; + private boolean enableSessionLoadbalancePolicy = false; //end config for enterprise version @@ -231,6 +230,7 @@ public class SessionServerConfigBean implements SessionServerConfig { /** * constructor + * * @param commonConfig */ public SessionServerConfigBean(CommonConfig commonConfig) { @@ -249,7 +249,7 @@ public int getRenewDatumWheelThreadSize() { /** * Setter method for property renewDatumWheelThreadSize . * - * @param renewDatumWheelThreadSize value to be assigned to property renewDatumWheelThreadSize + * @param renewDatumWheelThreadSize value to be assigned to property renewDatumWheelThreadSize */ public void setRenewDatumWheelThreadSize(int renewDatumWheelThreadSize) { this.renewDatumWheelThreadSize = renewDatumWheelThreadSize; @@ -267,7 +267,7 @@ public int getRenewDatumWheelQueueSize() { /** * Setter method for property renewDatumWheelQueueSize . * - * @param renewDatumWheelQueueSize value to be assigned to property renewDatumWheelQueueSize + * @param renewDatumWheelQueueSize value to be assigned to property renewDatumWheelQueueSize */ public void setRenewDatumWheelQueueSize(int renewDatumWheelQueueSize) { this.renewDatumWheelQueueSize = renewDatumWheelQueueSize; @@ -286,7 +286,7 @@ public int getUserDataPushRetryExecutorQueueSize() { /** * Setter method for property userDataPushRetryExecutorQueueSize . * - * @param userDataPushRetryExecutorQueueSize value to be assigned to property userDataPushRetryExecutorQueueSize + * @param userDataPushRetryExecutorQueueSize value to be assigned to property userDataPushRetryExecutorQueueSize */ public void setUserDataPushRetryExecutorQueueSize(int userDataPushRetryExecutorQueueSize) { this.userDataPushRetryExecutorQueueSize = userDataPushRetryExecutorQueueSize; @@ -305,7 +305,7 @@ public int getUserDataPushRetryExecutorThreadSize() { /** * Setter method for property userDataPushRetryExecutorThreadSize . * - * @param userDataPushRetryExecutorThreadSize value to be assigned to property userDataPushRetryExecutorThreadSize + * @param userDataPushRetryExecutorThreadSize value to be assigned to property userDataPushRetryExecutorThreadSize */ public void setUserDataPushRetryExecutorThreadSize(int userDataPushRetryExecutorThreadSize) { this.userDataPushRetryExecutorThreadSize = userDataPushRetryExecutorThreadSize; @@ -324,7 +324,7 @@ public int getDataNodeRetryExecutorThreadSize() { /** * Setter method for property dataNodeRetryExecutorThreadSize . * - * @param dataNodeRetryExecutorThreadSize value to be assigned to property dataNodeRetryExecutorThreadSize + * @param dataNodeRetryExecutorThreadSize value to be assigned to property dataNodeRetryExecutorThreadSize */ public void setDataNodeRetryExecutorThreadSize(int dataNodeRetryExecutorThreadSize) { this.dataNodeRetryExecutorThreadSize = dataNodeRetryExecutorThreadSize; @@ -343,7 +343,7 @@ public int getDataNodeRetryExecutorQueueSize() { /** * Setter method for property dataNodeRetryExecutorQueueSize . * - * @param dataNodeRetryExecutorQueueSize value to be assigned to property dataNodeRetryExecutorQueueSize + * @param dataNodeRetryExecutorQueueSize value to be assigned to property dataNodeRetryExecutorQueueSize */ public void setDataNodeRetryExecutorQueueSize(int dataNodeRetryExecutorQueueSize) { this.dataNodeRetryExecutorQueueSize = dataNodeRetryExecutorQueueSize; @@ -361,7 +361,7 @@ public int getWriteDataAcceptorQueueSize() { /** * Setter method for property writeDataAcceptorQueueSize . * - * @param writeDataAcceptorQueueSize value to be assigned to property writeDataAcceptorQueueSize + * @param writeDataAcceptorQueueSize value to be assigned to property writeDataAcceptorQueueSize */ public void setWriteDataAcceptorQueueSize(int writeDataAcceptorQueueSize) { this.writeDataAcceptorQueueSize = writeDataAcceptorQueueSize; @@ -379,7 +379,7 @@ public int getRenewAndSnapshotSilentPeriodSec() { /** * Setter method for property renewAndSnapshotSilentPeriodSec . * - * @param renewAndSnapshotSilentPeriodSec value to be assigned to property renewAndSnapshotSilentPeriodSec + * @param renewAndSnapshotSilentPeriodSec value to be assigned to property renewAndSnapshotSilentPeriodSec */ public void setRenewAndSnapshotSilentPeriodSec(int renewAndSnapshotSilentPeriodSec) { this.renewAndSnapshotSilentPeriodSec = renewAndSnapshotSilentPeriodSec; @@ -398,7 +398,7 @@ public int getPublishDataTaskRetryTimes() { /** * Setter method for property publishDataTaskRetryTimes . * - * @param publishDataTaskRetryTimes value to be assigned to property publishDataTaskRetryTimes + * @param publishDataTaskRetryTimes value to be assigned to property publishDataTaskRetryTimes */ public void setPublishDataTaskRetryTimes(int publishDataTaskRetryTimes) { this.publishDataTaskRetryTimes = publishDataTaskRetryTimes; @@ -417,7 +417,7 @@ public int getUnPublishDataTaskRetryTimes() { /** * Setter method for property unPublishDataTaskRetryTimes . * - * @param unPublishDataTaskRetryTimes value to be assigned to property unPublishDataTaskRetryTimes + * @param unPublishDataTaskRetryTimes value to be assigned to property unPublishDataTaskRetryTimes */ public void setUnPublishDataTaskRetryTimes(int unPublishDataTaskRetryTimes) { this.unPublishDataTaskRetryTimes = unPublishDataTaskRetryTimes; @@ -565,7 +565,7 @@ public long getCancelDataTaskRetryIncrementDelay() { /** * Setter method for property cancelDataTaskRetryIncrementDelay. * - * @param cancelDataTaskRetryIncrementDelay value to be assigned to property cancelDataTaskRetryIncrementDelay + * @param cancelDataTaskRetryIncrementDelay value to be assigned to property cancelDataTaskRetryIncrementDelay */ public void setCancelDataTaskRetryIncrementDelay(long cancelDataTaskRetryIncrementDelay) { this.cancelDataTaskRetryIncrementDelay = cancelDataTaskRetryIncrementDelay; @@ -574,7 +574,7 @@ public void setCancelDataTaskRetryIncrementDelay(long cancelDataTaskRetryIncreme /** * Setter method for property cancelDataTaskRetryFirstDelay . * - * @param cancelDataTaskRetryFirstDelay value to be assigned to property cancelDataTaskRetryFirstDelay + * @param cancelDataTaskRetryFirstDelay value to be assigned to property cancelDataTaskRetryFirstDelay */ public void setCancelDataTaskRetryFirstDelay(long cancelDataTaskRetryFirstDelay) { this.cancelDataTaskRetryFirstDelay = cancelDataTaskRetryFirstDelay; @@ -593,7 +593,7 @@ public long getPublishDataTaskRetryFirstDelay() { /** * Setter method for property publishDataTaskRetryFirstDelay . * - * @param publishDataTaskRetryFirstDelay value to be assigned to property publishDataTaskRetryFirstDelay + * @param publishDataTaskRetryFirstDelay value to be assigned to property publishDataTaskRetryFirstDelay */ public void setPublishDataTaskRetryFirstDelay(long publishDataTaskRetryFirstDelay) { this.publishDataTaskRetryFirstDelay = publishDataTaskRetryFirstDelay; @@ -612,7 +612,7 @@ public long getPublishDataTaskRetryIncrementDelay() { /** * Setter method for property publishDataTaskRetryIncrementDelay . * - * @param publishDataTaskRetryIncrementDelay value to be assigned to property publishDataTaskRetryIncrementDelay + * @param publishDataTaskRetryIncrementDelay value to be assigned to property publishDataTaskRetryIncrementDelay */ public void setPublishDataTaskRetryIncrementDelay(long publishDataTaskRetryIncrementDelay) { this.publishDataTaskRetryIncrementDelay = publishDataTaskRetryIncrementDelay; @@ -631,7 +631,7 @@ public long getUnPublishDataTaskRetryFirstDelay() { /** * Setter method for property unPublishDataTaskRetryFirstDelay . * - * @param unPublishDataTaskRetryFirstDelay value to be assigned to property unPublishDataTaskRetryFirstDelay + * @param unPublishDataTaskRetryFirstDelay value to be assigned to property unPublishDataTaskRetryFirstDelay */ public void setUnPublishDataTaskRetryFirstDelay(long unPublishDataTaskRetryFirstDelay) { this.unPublishDataTaskRetryFirstDelay = unPublishDataTaskRetryFirstDelay; @@ -650,7 +650,7 @@ public long getUnPublishDataTaskRetryIncrementDelay() { /** * Setter method for property unPublishDataTaskRetryIncrementDelay . * - * @param unPublishDataTaskRetryIncrementDelay value to be assigned to property unPublishDataTaskRetryIncrementDelay + * @param unPublishDataTaskRetryIncrementDelay value to be assigned to property unPublishDataTaskRetryIncrementDelay */ public void setUnPublishDataTaskRetryIncrementDelay(long unPublishDataTaskRetryIncrementDelay) { this.unPublishDataTaskRetryIncrementDelay = unPublishDataTaskRetryIncrementDelay; @@ -713,6 +713,25 @@ public void setSubscriberRegisterFetchRetryTimes(int subscriberRegisterFetchRetr this.subscriberRegisterFetchRetryTimes = subscriberRegisterFetchRetryTimes; } + /** + * Getter method for property sessionRegisterDataServerTaskRetryTimes. + * + * @return property value of sessionRegisterDataServerTaskRetryTimes + */ + @Override + public int getSessionRegisterDataServerTaskRetryTimes() { + return sessionRegisterDataServerTaskRetryTimes; + } + + /** + * Setter method for property sessionRegisterDataServerTaskRetryTimes. + * + * @param sessionRegisterDataServerTaskRetryTimes value to be assigned to property sessionRegisterDataServerTaskRetryTimes + */ + public void setSessionRegisterDataServerTaskRetryTimes(int sessionRegisterDataServerTaskRetryTimes) { + this.sessionRegisterDataServerTaskRetryTimes = sessionRegisterDataServerTaskRetryTimes; + } + /** * Getter method for property clientNodeExchangeTimeOut. * @@ -910,7 +929,7 @@ public int getSchedulerCleanInvalidClientBackOffBound() { /** * Setter method for property schedulerCleanInvolidClientTimeOut. * - * @param schedulerCleanInvalidClientTimeOut value to be assigned to property schedulerCleanInvolidClientTimeOut + * @param schedulerCleanInvalidClientTimeOut value to be assigned to property schedulerCleanInvolidClientTimeOut */ public void setSchedulerCleanInvalidClientTimeOut(int schedulerCleanInvalidClientTimeOut) { this.schedulerCleanInvalidClientTimeOut = schedulerCleanInvalidClientTimeOut; @@ -919,7 +938,7 @@ public void setSchedulerCleanInvalidClientTimeOut(int schedulerCleanInvalidClien /** * Setter method for property schedulerCleanInvolidClientFirstDelay. * - * @param schedulerCleanInvalidClientFirstDelay value to be assigned to property schedulerCleanInvolidClientFirstDelay + * @param schedulerCleanInvalidClientFirstDelay value to be assigned to property schedulerCleanInvolidClientFirstDelay */ public void setSchedulerCleanInvalidClientFirstDelay(int schedulerCleanInvalidClientFirstDelay) { this.schedulerCleanInvalidClientFirstDelay = schedulerCleanInvalidClientFirstDelay; @@ -928,7 +947,7 @@ public void setSchedulerCleanInvalidClientFirstDelay(int schedulerCleanInvalidCl /** * Setter method for property schedulerCleanInvolidClientBackOffBound. * - * @param schedulerCleanInvalidClientBackOffBound value to be assigned to property schedulerCleanInvolidClientBackOffBound + * @param schedulerCleanInvalidClientBackOffBound value to be assigned to property schedulerCleanInvolidClientBackOffBound */ public void setSchedulerCleanInvalidClientBackOffBound(int schedulerCleanInvalidClientBackOffBound) { this.schedulerCleanInvalidClientBackOffBound = schedulerCleanInvalidClientBackOffBound; @@ -947,7 +966,7 @@ public boolean isStopPushSwitch() { /** * Setter method for property stopPushSwitch. * - * @param stopPushSwitch value to be assigned to property stopPushSwitch + * @param stopPushSwitch value to be assigned to property stopPushSwitch */ @Override public void setStopPushSwitch(boolean stopPushSwitch) { @@ -967,7 +986,7 @@ public boolean isBeginDataFetchTask() { /** * Setter method for property beginDataFetchTask. * - * @param beginDataFetchTask value to be assigned to property beginDataFetchTask + * @param beginDataFetchTask value to be assigned to property beginDataFetchTask */ @Override public void setBeginDataFetchTask(boolean beginDataFetchTask) { @@ -981,7 +1000,7 @@ public String getInvalidForeverZones() { /** * Setter method for property invalidForeverZones. * - * @param invalidForeverZones value to be assigned to property invalidForeverZones + * @param invalidForeverZones value to be assigned to property invalidForeverZones */ public void setInvalidForeverZones(String invalidForeverZones) { this.invalidForeverZones = invalidForeverZones; @@ -994,7 +1013,7 @@ public String getInvalidIgnoreDataidRegex() { /** * Setter method for property invalidIgnoreDataidRegex. * - * @param invalidIgnoreDataidRegex value to be assigned to property invalidIgnoreDataidRegex + * @param invalidIgnoreDataidRegex value to be assigned to property invalidIgnoreDataidRegex */ public void setInvalidIgnoreDataidRegex(String invalidIgnoreDataidRegex) { this.invalidIgnoreDataidRegex = invalidIgnoreDataidRegex; @@ -1008,7 +1027,7 @@ public int getAccessDataExecutorMinPoolSize() { /** * Setter method for property accessDataExecutorMinPoolSize. * - * @param accessDataExecutorMinPoolSize value to be assigned to property accessDataExecutorMinPoolSize + * @param accessDataExecutorMinPoolSize value to be assigned to property accessDataExecutorMinPoolSize */ public void setAccessDataExecutorMinPoolSize(int accessDataExecutorMinPoolSize) { this.accessDataExecutorMinPoolSize = accessDataExecutorMinPoolSize; @@ -1022,7 +1041,7 @@ public int getAccessDataExecutorMaxPoolSize() { /** * Setter method for property accessDataExecutorMaxPoolSize. * - * @param accessDataExecutorMaxPoolSize value to be assigned to property accessDataExecutorMaxPoolSize + * @param accessDataExecutorMaxPoolSize value to be assigned to property accessDataExecutorMaxPoolSize */ public void setAccessDataExecutorMaxPoolSize(int accessDataExecutorMaxPoolSize) { this.accessDataExecutorMaxPoolSize = accessDataExecutorMaxPoolSize; @@ -1036,7 +1055,7 @@ public int getAccessDataExecutorQueueSize() { /** * Setter method for property accessDataExecutorQueueSize. * - * @param accessDataExecutorQueueSize value to be assigned to property accessDataExecutorQueueSize + * @param accessDataExecutorQueueSize value to be assigned to property accessDataExecutorQueueSize */ public void setAccessDataExecutorQueueSize(int accessDataExecutorQueueSize) { this.accessDataExecutorQueueSize = accessDataExecutorQueueSize; @@ -1050,7 +1069,7 @@ public long getAccessDataExecutorKeepAliveTime() { /** * Setter method for property accessDataExecutorKeepAliveTime. * - * @param accessDataExecutorKeepAliveTime value to be assigned to property accessDataExecutorKeepAliveTime + * @param accessDataExecutorKeepAliveTime value to be assigned to property accessDataExecutorKeepAliveTime */ public void setAccessDataExecutorKeepAliveTime(long accessDataExecutorKeepAliveTime) { this.accessDataExecutorKeepAliveTime = accessDataExecutorKeepAliveTime; @@ -1099,7 +1118,7 @@ public long getDataChangeExecutorKeepAliveTime() { /** * Setter method for property dataChangeExecutorMinPoolSize. * - * @param dataChangeExecutorMinPoolSize value to be assigned to property dataChangeExecutorMinPoolSize + * @param dataChangeExecutorMinPoolSize value to be assigned to property dataChangeExecutorMinPoolSize */ public void setDataChangeExecutorMinPoolSize(int dataChangeExecutorMinPoolSize) { this.dataChangeExecutorMinPoolSize = dataChangeExecutorMinPoolSize; @@ -1108,7 +1127,7 @@ public void setDataChangeExecutorMinPoolSize(int dataChangeExecutorMinPoolSize) /** * Setter method for property dataChangeExecutorMaxPoolSize. * - * @param dataChangeExecutorMaxPoolSize value to be assigned to property dataChangeExecutorMaxPoolSize + * @param dataChangeExecutorMaxPoolSize value to be assigned to property dataChangeExecutorMaxPoolSize */ public void setDataChangeExecutorMaxPoolSize(int dataChangeExecutorMaxPoolSize) { this.dataChangeExecutorMaxPoolSize = dataChangeExecutorMaxPoolSize; @@ -1117,7 +1136,7 @@ public void setDataChangeExecutorMaxPoolSize(int dataChangeExecutorMaxPoolSize) /** * Setter method for property dataChangeExecutorQueueSize. * - * @param dataChangeExecutorQueueSize value to be assigned to property dataChangeExecutorQueueSize + * @param dataChangeExecutorQueueSize value to be assigned to property dataChangeExecutorQueueSize */ public void setDataChangeExecutorQueueSize(int dataChangeExecutorQueueSize) { this.dataChangeExecutorQueueSize = dataChangeExecutorQueueSize; @@ -1126,7 +1145,7 @@ public void setDataChangeExecutorQueueSize(int dataChangeExecutorQueueSize) { /** * Setter method for property dataChangeExecutorKeepAliveTime. * - * @param dataChangeExecutorKeepAliveTime value to be assigned to property dataChangeExecutorKeepAliveTime + * @param dataChangeExecutorKeepAliveTime value to be assigned to property dataChangeExecutorKeepAliveTime */ public void setDataChangeExecutorKeepAliveTime(long dataChangeExecutorKeepAliveTime) { this.dataChangeExecutorKeepAliveTime = dataChangeExecutorKeepAliveTime; @@ -1175,7 +1194,7 @@ public long getPushTaskExecutorKeepAliveTime() { /** * Setter method for property pushTaskExecutorMinPoolSize. * - * @param pushTaskExecutorMinPoolSize value to be assigned to property pushTaskExecutorMinPoolSize + * @param pushTaskExecutorMinPoolSize value to be assigned to property pushTaskExecutorMinPoolSize */ public void setPushTaskExecutorMinPoolSize(int pushTaskExecutorMinPoolSize) { this.pushTaskExecutorMinPoolSize = pushTaskExecutorMinPoolSize; @@ -1184,7 +1203,7 @@ public void setPushTaskExecutorMinPoolSize(int pushTaskExecutorMinPoolSize) { /** * Setter method for property pushTaskExecutorMaxPoolSize. * - * @param pushTaskExecutorMaxPoolSize value to be assigned to property pushTaskExecutorMaxPoolSize + * @param pushTaskExecutorMaxPoolSize value to be assigned to property pushTaskExecutorMaxPoolSize */ public void setPushTaskExecutorMaxPoolSize(int pushTaskExecutorMaxPoolSize) { this.pushTaskExecutorMaxPoolSize = pushTaskExecutorMaxPoolSize; @@ -1193,7 +1212,7 @@ public void setPushTaskExecutorMaxPoolSize(int pushTaskExecutorMaxPoolSize) { /** * Setter method for property pushTaskExecutorQueueSize. * - * @param pushTaskExecutorQueueSize value to be assigned to property pushTaskExecutorQueueSize + * @param pushTaskExecutorQueueSize value to be assigned to property pushTaskExecutorQueueSize */ public void setPushTaskExecutorQueueSize(int pushTaskExecutorQueueSize) { this.pushTaskExecutorQueueSize = pushTaskExecutorQueueSize; @@ -1202,7 +1221,7 @@ public void setPushTaskExecutorQueueSize(int pushTaskExecutorQueueSize) { /** * Setter method for property pushTaskExecutorKeepAliveTime. * - * @param pushTaskExecutorKeepAliveTime value to be assigned to property pushTaskExecutorKeepAliveTime + * @param pushTaskExecutorKeepAliveTime value to be assigned to property pushTaskExecutorKeepAliveTime */ public void setPushTaskExecutorKeepAliveTime(long pushTaskExecutorKeepAliveTime) { this.pushTaskExecutorKeepAliveTime = pushTaskExecutorKeepAliveTime; @@ -1238,7 +1257,7 @@ public long getDefaultSessionExecutorKeepAliveTime() { /** * Setter method for property defaultSessionExecutorMinPoolSize. * - * @param defaultSessionExecutorMinPoolSize value to be assigned to property defaultSessionExecutorMinPoolSize + * @param defaultSessionExecutorMinPoolSize value to be assigned to property defaultSessionExecutorMinPoolSize */ public void setDefaultSessionExecutorMinPoolSize(int defaultSessionExecutorMinPoolSize) { this.defaultSessionExecutorMinPoolSize = defaultSessionExecutorMinPoolSize; @@ -1247,7 +1266,7 @@ public void setDefaultSessionExecutorMinPoolSize(int defaultSessionExecutorMinPo /** * Setter method for property defaultSessionExecutorMaxPoolSize. * - * @param defaultSessionExecutorMaxPoolSize value to be assigned to property defaultSessionExecutorMaxPoolSize + * @param defaultSessionExecutorMaxPoolSize value to be assigned to property defaultSessionExecutorMaxPoolSize */ public void setDefaultSessionExecutorMaxPoolSize(int defaultSessionExecutorMaxPoolSize) { this.defaultSessionExecutorMaxPoolSize = defaultSessionExecutorMaxPoolSize; @@ -1256,7 +1275,7 @@ public void setDefaultSessionExecutorMaxPoolSize(int defaultSessionExecutorMaxPo /** * Setter method for property defaultSessionExecutorKeepAliveTime. * - * @param defaultSessionExecutorKeepAliveTime value to be assigned to property defaultSessionExecutorKeepAliveTime + * @param defaultSessionExecutorKeepAliveTime value to be assigned to property defaultSessionExecutorKeepAliveTime */ public void setDefaultSessionExecutorKeepAliveTime(long defaultSessionExecutorKeepAliveTime) { this.defaultSessionExecutorKeepAliveTime = defaultSessionExecutorKeepAliveTime; @@ -1292,7 +1311,7 @@ public int getConnectClientExecutorQueueSize() { /** * Setter method for property connectClientExecutorMinPoolSize. * - * @param connectClientExecutorMinPoolSize value to be assigned to property connectClientExecutorMinPoolSize + * @param connectClientExecutorMinPoolSize value to be assigned to property connectClientExecutorMinPoolSize */ public void setConnectClientExecutorMinPoolSize(int connectClientExecutorMinPoolSize) { this.connectClientExecutorMinPoolSize = connectClientExecutorMinPoolSize; @@ -1301,7 +1320,7 @@ public void setConnectClientExecutorMinPoolSize(int connectClientExecutorMinPool /** * Setter method for property connectClientExecutorMaxPoolSize. * - * @param connectClientExecutorMaxPoolSize value to be assigned to property connectClientExecutorMaxPoolSize + * @param connectClientExecutorMaxPoolSize value to be assigned to property connectClientExecutorMaxPoolSize */ public void setConnectClientExecutorMaxPoolSize(int connectClientExecutorMaxPoolSize) { this.connectClientExecutorMaxPoolSize = connectClientExecutorMaxPoolSize; @@ -1310,7 +1329,7 @@ public void setConnectClientExecutorMaxPoolSize(int connectClientExecutorMaxPool /** * Setter method for property connectClientExecutorQueueSize. * - * @param connectClientExecutorQueueSize value to be assigned to property connectClientExecutorQueueSize + * @param connectClientExecutorQueueSize value to be assigned to property connectClientExecutorQueueSize */ public void setConnectClientExecutorQueueSize(int connectClientExecutorQueueSize) { this.connectClientExecutorQueueSize = connectClientExecutorQueueSize; @@ -1329,7 +1348,7 @@ public int getDataChangeFetchTaskMaxBufferSize() { /** * Setter method for property dataChangeFetchTaskMaxBufferSize. * - * @param dataChangeFetchTaskMaxBufferSize value to be assigned to property dataChangeFetchTaskMaxBufferSize + * @param dataChangeFetchTaskMaxBufferSize value to be assigned to property dataChangeFetchTaskMaxBufferSize */ public void setDataChangeFetchTaskMaxBufferSize(int dataChangeFetchTaskMaxBufferSize) { this.dataChangeFetchTaskMaxBufferSize = dataChangeFetchTaskMaxBufferSize; @@ -1348,7 +1367,7 @@ public int getDataChangeFetchTaskWorkerSize() { /** * Setter method for property dataChangeFetchTaskWorkerSize. * - * @param dataChangeFetchTaskWorkerSize value to be assigned to property dataChangeFetchTaskWorkerSize + * @param dataChangeFetchTaskWorkerSize value to be assigned to property dataChangeFetchTaskWorkerSize */ public void setDataChangeFetchTaskWorkerSize(int dataChangeFetchTaskWorkerSize) { this.dataChangeFetchTaskWorkerSize = dataChangeFetchTaskWorkerSize; @@ -1367,7 +1386,7 @@ public int getUserDataPushRetryWheelTicksSize() { /** * Setter method for property userDataPushRetryWheelTicksSize. * - * @param userDataPushRetryWheelTicksSize value to be assigned to property userDataPushRetryWheelTicksSize + * @param userDataPushRetryWheelTicksSize value to be assigned to property userDataPushRetryWheelTicksSize */ public void setUserDataPushRetryWheelTicksSize(int userDataPushRetryWheelTicksSize) { this.userDataPushRetryWheelTicksSize = userDataPushRetryWheelTicksSize; @@ -1386,7 +1405,7 @@ public int getUserDataPushRetryWheelTicksDuration() { /** * Setter method for property userDataPushRetryWheelTicksDuration. * - * @param userDataPushRetryWheelTicksDuration value to be assigned to property userDataPushRetryWheelTicksDuration + * @param userDataPushRetryWheelTicksDuration value to be assigned to property userDataPushRetryWheelTicksDuration */ public void setUserDataPushRetryWheelTicksDuration(int userDataPushRetryWheelTicksDuration) { this.userDataPushRetryWheelTicksDuration = userDataPushRetryWheelTicksDuration; @@ -1405,7 +1424,7 @@ public int getPushDataTaskRetryFirstDelay() { /** * Setter method for property pushDataTaskRetryFirstDelay. * - * @param pushDataTaskRetryFirstDelay value to be assigned to property pushDataTaskRetryFirstDelay + * @param pushDataTaskRetryFirstDelay value to be assigned to property pushDataTaskRetryFirstDelay */ public void setPushDataTaskRetryFirstDelay(int pushDataTaskRetryFirstDelay) { this.pushDataTaskRetryFirstDelay = pushDataTaskRetryFirstDelay; @@ -1451,7 +1470,7 @@ public int getRenewDatumWheelTaskDelaySec() { /** * Setter method for property renewDatumWheelTaskDelaySec . * - * @param renewDatumWheelTaskDelaySec value to be assigned to property renewDatumWheelTaskDelaySec + * @param renewDatumWheelTaskDelaySec value to be assigned to property renewDatumWheelTaskDelaySec */ public void setRenewDatumWheelTaskDelaySec(int renewDatumWheelTaskDelaySec) { this.renewDatumWheelTaskDelaySec = renewDatumWheelTaskDelaySec; @@ -1469,7 +1488,7 @@ public int getRenewDatumWheelTaskRandomFirstDelaySec() { /** * Setter method for property renewDatumWheelTaskRandomFirstDelaySec . * - * @param renewDatumWheelTaskRandomFirstDelaySec value to be assigned to property renewDatumWheelTaskRandomFirstDelaySec + * @param renewDatumWheelTaskRandomFirstDelaySec value to be assigned to property renewDatumWheelTaskRandomFirstDelaySec */ public void setRenewDatumWheelTaskRandomFirstDelaySec(int renewDatumWheelTaskRandomFirstDelaySec) { this.renewDatumWheelTaskRandomFirstDelaySec = renewDatumWheelTaskRandomFirstDelaySec; @@ -1478,7 +1497,7 @@ public void setRenewDatumWheelTaskRandomFirstDelaySec(int renewDatumWheelTaskRan /** * Setter method for property renewDatumWheelTicksSize. * - * @param renewDatumWheelTicksSize value to be assigned to property renewDatumWheelTicksSize + * @param renewDatumWheelTicksSize value to be assigned to property renewDatumWheelTicksSize */ public void setRenewDatumWheelTicksSize(int renewDatumWheelTicksSize) { this.renewDatumWheelTicksSize = renewDatumWheelTicksSize; @@ -1487,7 +1506,7 @@ public void setRenewDatumWheelTicksSize(int renewDatumWheelTicksSize) { /** * Setter method for property renewDatumWheelTicksDuration. * - * @param renewDatumWheelTicksDuration value to be assigned to property renewDatumWheelTicksDuration + * @param renewDatumWheelTicksDuration value to be assigned to property renewDatumWheelTicksDuration */ public void setRenewDatumWheelTicksDuration(int renewDatumWheelTicksDuration) { this.renewDatumWheelTicksDuration = renewDatumWheelTicksDuration; @@ -1496,7 +1515,7 @@ public void setRenewDatumWheelTicksDuration(int renewDatumWheelTicksDuration) { /** * Setter method for property pushDataTaskRetryIncrementDelay. * - * @param pushDataTaskRetryIncrementDelay value to be assigned to property pushDataTaskRetryIncrementDelay + * @param pushDataTaskRetryIncrementDelay value to be assigned to property pushDataTaskRetryIncrementDelay */ public void setPushDataTaskRetryIncrementDelay(long pushDataTaskRetryIncrementDelay) { this.pushDataTaskRetryIncrementDelay = pushDataTaskRetryIncrementDelay; @@ -1523,7 +1542,7 @@ public String getBlacklistSubDataIdRegex() { /** * Setter method for property blacklistPubDataIdRegex. * - * @param blacklistPubDataIdRegex value to be assigned to property blacklistPubDataIdRegex + * @param blacklistPubDataIdRegex value to be assigned to property blacklistPubDataIdRegex */ public void setBlacklistPubDataIdRegex(String blacklistPubDataIdRegex) { this.blacklistPubDataIdRegex = blacklistPubDataIdRegex; @@ -1532,7 +1551,7 @@ public void setBlacklistPubDataIdRegex(String blacklistPubDataIdRegex) { /** * Setter method for property blacklistSubDataIdRegex. * - * @param blacklistSubDataIdRegex value to be assigned to property blacklistSubDataIdRegex + * @param blacklistSubDataIdRegex value to be assigned to property blacklistSubDataIdRegex */ public void setBlacklistSubDataIdRegex(String blacklistSubDataIdRegex) { this.blacklistSubDataIdRegex = blacklistSubDataIdRegex; @@ -1550,7 +1569,7 @@ public long getPushTaskConfirmWaitTimeout() { /** * Setter method for property pushTaskConfirmWaitTimeout. * - * @param pushTaskConfirmWaitTimeout value to be assigned to property pushTaskConfirmWaitTimeout + * @param pushTaskConfirmWaitTimeout value to be assigned to property pushTaskConfirmWaitTimeout */ public void setPushTaskConfirmWaitTimeout(long pushTaskConfirmWaitTimeout) { this.pushTaskConfirmWaitTimeout = pushTaskConfirmWaitTimeout; @@ -1595,7 +1614,7 @@ public int getPushTaskConfirmCheckExecutorThreadSize() { /** * Setter method for property pushTaskConfirmCheckWheelTicksSize. * - * @param pushTaskConfirmCheckWheelTicksSize value to be assigned to property pushTaskConfirmCheckWheelTicksSize + * @param pushTaskConfirmCheckWheelTicksSize value to be assigned to property pushTaskConfirmCheckWheelTicksSize */ public void setPushTaskConfirmCheckWheelTicksSize(int pushTaskConfirmCheckWheelTicksSize) { this.pushTaskConfirmCheckWheelTicksSize = pushTaskConfirmCheckWheelTicksSize; @@ -1604,7 +1623,7 @@ public void setPushTaskConfirmCheckWheelTicksSize(int pushTaskConfirmCheckWheelT /** * Setter method for property pushTaskConfirmCheckWheelTicksDuration. * - * @param pushTaskConfirmCheckWheelTicksDuration value to be assigned to property pushTaskConfirmCheckWheelTicksDuration + * @param pushTaskConfirmCheckWheelTicksDuration value to be assigned to property pushTaskConfirmCheckWheelTicksDuration */ public void setPushTaskConfirmCheckWheelTicksDuration(int pushTaskConfirmCheckWheelTicksDuration) { this.pushTaskConfirmCheckWheelTicksDuration = pushTaskConfirmCheckWheelTicksDuration; @@ -1613,7 +1632,7 @@ public void setPushTaskConfirmCheckWheelTicksDuration(int pushTaskConfirmCheckWh /** * Setter method for property pushTaskConfirmCheckExecutorQueueSize. * - * @param pushTaskConfirmCheckExecutorQueueSize value to be assigned to property pushTaskConfirmCheckExecutorQueueSize + * @param pushTaskConfirmCheckExecutorQueueSize value to be assigned to property pushTaskConfirmCheckExecutorQueueSize */ public void setPushTaskConfirmCheckExecutorQueueSize(int pushTaskConfirmCheckExecutorQueueSize) { this.pushTaskConfirmCheckExecutorQueueSize = pushTaskConfirmCheckExecutorQueueSize; @@ -1622,7 +1641,7 @@ public void setPushTaskConfirmCheckExecutorQueueSize(int pushTaskConfirmCheckExe /** * Setter method for property pushTaskConfirmCheckExecutorThreadSize. * - * @param pushTaskConfirmCheckExecutorThreadSize value to be assigned to property pushTaskConfirmCheckExecutorThreadSize + * @param pushTaskConfirmCheckExecutorThreadSize value to be assigned to property pushTaskConfirmCheckExecutorThreadSize */ public void setPushTaskConfirmCheckExecutorThreadSize(int pushTaskConfirmCheckExecutorThreadSize) { this.pushTaskConfirmCheckExecutorThreadSize = pushTaskConfirmCheckExecutorThreadSize; @@ -1667,7 +1686,7 @@ public long getPublishDataExecutorKeepAliveTime() { /** * Setter method for property publishDataExecutorMinPoolSize. * - * @param publishDataExecutorMinPoolSize value to be assigned to property publishDataExecutorMinPoolSize + * @param publishDataExecutorMinPoolSize value to be assigned to property publishDataExecutorMinPoolSize */ public void setPublishDataExecutorMinPoolSize(int publishDataExecutorMinPoolSize) { this.publishDataExecutorMinPoolSize = publishDataExecutorMinPoolSize; @@ -1676,7 +1695,7 @@ public void setPublishDataExecutorMinPoolSize(int publishDataExecutorMinPoolSize /** * Setter method for property publishDataExecutorMaxPoolSize. * - * @param publishDataExecutorMaxPoolSize value to be assigned to property publishDataExecutorMaxPoolSize + * @param publishDataExecutorMaxPoolSize value to be assigned to property publishDataExecutorMaxPoolSize */ public void setPublishDataExecutorMaxPoolSize(int publishDataExecutorMaxPoolSize) { this.publishDataExecutorMaxPoolSize = publishDataExecutorMaxPoolSize; @@ -1685,7 +1704,7 @@ public void setPublishDataExecutorMaxPoolSize(int publishDataExecutorMaxPoolSize /** * Setter method for property publishDataExecutorQueueSize. * - * @param publishDataExecutorQueueSize value to be assigned to property publishDataExecutorQueueSize + * @param publishDataExecutorQueueSize value to be assigned to property publishDataExecutorQueueSize */ public void setPublishDataExecutorQueueSize(int publishDataExecutorQueueSize) { this.publishDataExecutorQueueSize = publishDataExecutorQueueSize; @@ -1694,7 +1713,7 @@ public void setPublishDataExecutorQueueSize(int publishDataExecutorQueueSize) { /** * Setter method for property publishDataExecutorKeepAliveTime. * - * @param publishDataExecutorKeepAliveTime value to be assigned to property publishDataExecutorKeepAliveTime + * @param publishDataExecutorKeepAliveTime value to be assigned to property publishDataExecutorKeepAliveTime */ public void setPublishDataExecutorKeepAliveTime(long publishDataExecutorKeepAliveTime) { this.publishDataExecutorKeepAliveTime = publishDataExecutorKeepAliveTime; @@ -1712,7 +1731,7 @@ public double getAccessLimitRate() { /** * Setter method for property accessLimitRate. * - * @param accessLimitRate value to be assigned to property accessLimitRate + * @param accessLimitRate value to be assigned to property accessLimitRate */ public void setAccessLimitRate(double accessLimitRate) { this.accessLimitRate = accessLimitRate; @@ -1731,7 +1750,7 @@ public int getDataClientConnNum() { /** * Setter method for property dataClientConnNum . * - * @param dataClientConnNum value to be assigned to property dataClientConnNum + * @param dataClientConnNum value to be assigned to property dataClientConnNum */ public void setDataClientConnNum(int dataClientConnNum) { this.dataClientConnNum = dataClientConnNum; @@ -1778,7 +1797,7 @@ public int getSessionSchedulerPoolSize() { /** * Setter method for property sessionSchedulerPoolSize . * - * @param sessionSchedulerPoolSize value to be assigned to property sessionSchedulerPoolSize + * @param sessionSchedulerPoolSize value to be assigned to property sessionSchedulerPoolSize */ public void setSessionSchedulerPoolSize(int sessionSchedulerPoolSize) { this.sessionSchedulerPoolSize = sessionSchedulerPoolSize; @@ -1796,7 +1815,7 @@ public int getDataNodeExchangeForFetchDatumTimeOut() { /** * Setter method for property dataNodeExchangeForFetchDatumTimeOut . * - * @param dataNodeExchangeForFetchDatumTimeOut value to be assigned to property dataNodeExchangeForFetchDatumTimeOut + * @param dataNodeExchangeForFetchDatumTimeOut value to be assigned to property dataNodeExchangeForFetchDatumTimeOut */ public void setDataNodeExchangeForFetchDatumTimeOut(int dataNodeExchangeForFetchDatumTimeOut) { this.dataNodeExchangeForFetchDatumTimeOut = dataNodeExchangeForFetchDatumTimeOut; @@ -1804,6 +1823,7 @@ public void setDataNodeExchangeForFetchDatumTimeOut(int dataNodeExchangeForFetch /** * Getter method for property slotSyncPublisherMaxNum. + * * @return property value of slotSyncPublisherMaxNum */ @Override @@ -1824,6 +1844,7 @@ public Set getMetaServerIpAddresses() { /** * Setter method for property slotSyncPublisherMaxNum. + * * @param slotSyncPublisherMaxNum value to be assigned to property slotSyncPublisherMaxNum */ public void setSlotSyncPublisherMaxNum(int slotSyncPublisherMaxNum) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index 5fd124e97..cf7db8f0f 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; +import com.alipay.sofa.jraft.NodeManager; import com.alipay.sofa.registry.server.session.assemble.AppAssembleService; import com.alipay.sofa.registry.server.session.assemble.AppInterfaceAssembleService; import com.alipay.sofa.registry.server.session.assemble.AssembleService; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java index a50172eec..2c50eb0be 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java @@ -16,16 +16,13 @@ */ package com.alipay.sofa.registry.server.session.cache; -import com.alipay.sofa.registry.common.model.Node; import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.DataInfo; -import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.server.session.node.NodeManager; -import com.alipay.sofa.registry.server.session.node.NodeManagerFactory; import com.alipay.sofa.registry.server.session.store.Interests; +import com.alipay.sofa.registry.server.shared.meta.MetaServerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -33,11 +30,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.stream.Collectors; /** - * * @author xiaojian.xj * @version $Id: SessionDatumCacheDecorator.java, v 0.1 2020年11月03日 20:23 xiaojian.xj Exp $ */ @@ -52,6 +47,9 @@ public class SessionDatumCacheDecorator { @Autowired private CacheService sessionCacheService; + @Autowired + private MetaServerService metaServerService; + @Autowired private AppRevisionCacheRegistry appRevisionCacheRegistry; @@ -76,12 +74,12 @@ public Datum getDatumCache(String dataCenter, String dataInfoId) { public Map getDatumsCache(String dataInfoId) { Map map = new HashMap<>(); - NodeManager nodeManager = NodeManagerFactory.getNodeManager(Node.NodeType.META); - Collection dataCenters = nodeManager.getDataCenters(); + + Collection dataCenters = metaServerService.getDataCenters(); if (dataCenters != null) { Collection keys = dataCenters.stream().map(dataCenter -> new Key(Key.KeyType.OBJ, - DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter))) - .collect(Collectors.toList()); + DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter))) + .collect(Collectors.toList()); Map values = null; try { @@ -90,10 +88,10 @@ public Map getDatumsCache(String dataInfoId) { // The version is set to 0, so that when session checks the datum versions regularly, it will actively re-query the data. for (String dataCenter : dataCenters) { boolean result = sessionInterests.checkAndUpdateInterestVersionZero(dataCenter, - dataInfoId); + dataInfoId); taskLogger.error(String.format( - "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s", - result, e.getMessage()), e); + "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s", + result, e.getMessage()), e); } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java deleted file mode 100644 index b08d4a7bc..000000000 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/SessionNodeManager.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.session.node; - -import java.util.*; - -import com.alipay.sofa.registry.common.model.Node.NodeType; -import com.alipay.sofa.registry.common.model.metaserver.NodeChangeResult; -import com.alipay.sofa.registry.common.model.metaserver.RenewNodesRequest; -import com.alipay.sofa.registry.common.model.metaserver.SessionNode; -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.net.NetUtil; -import com.alipay.sofa.registry.remoting.exchange.RequestException; -import com.alipay.sofa.registry.remoting.exchange.message.Request; - -/** - * - * @author shangyu.wh - * @version $Id: SessionNodeManager.java, v 0.1 2018-03-05 10:42 shangyu.wh Exp $ - */ -public class SessionNodeManager extends AbstractNodeManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class); - - @Override - public SessionNode getNode(String dataInfoId) { - return null; - } - - @Override - public NodeType getNodeType() { - return NodeType.SESSION; - } - - public List getZoneServerList(String zonename) { - List serverList = new ArrayList<>(); - Collection sessionNodes = getDataCenterNodes(); - if (sessionNodes != null && !sessionNodes.isEmpty()) { - if (zonename == null || zonename.isEmpty()) { - zonename = sessionServerConfig.getSessionServerRegion(); - } - for (SessionNode sessionNode : sessionNodes) { - if (zonename.equals(sessionNode.getRegionId())) { - URL url = sessionNode.getNodeUrl(); - if (url != null) { - serverList.add(url.getIpAddress()); - } - } - } - - } - return serverList; - } - - @Override - public void renewNode() { - try { - - Request renewNodesRequestRequest = new Request() { - - @Override - public RenewNodesRequest getRequestBody() { - URL clientUrl = new URL(NetUtil.getLocalAddress().getHostAddress(), 0); - SessionNode sessionNode = new SessionNode(clientUrl, - sessionServerConfig.getSessionServerRegion()); - - return new RenewNodesRequest(sessionNode); - } - - @Override - public URL getRequestUrl() { - return new URL(raftClientManager.getLeader().getIp(), - sessionServerConfig.getMetaServerPort()); - } - }; - - metaNodeExchanger.request(renewNodesRequestRequest); - } catch (RequestException e) { - throw new RuntimeException("SessionNodeManager renew node error! " + e.getMessage(), e); - } - } - - @Override - public void updateNodes(NodeChangeResult nodeChangeResult) { - write.lock(); - try { - Long receiveVersion = nodeChangeResult.getVersion(); - boolean versionChange = checkAndUpdateListVersions( - sessionServerConfig.getSessionServerDataCenter(), receiveVersion); - if (!versionChange) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Current data type {} list version has not updated!", - getNodeType()); - } - } - nodes = nodeChangeResult.getNodes(); - } finally { - write.unlock(); - } - } -} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java index 851d44a16..c80b0dfd9 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java @@ -28,13 +28,13 @@ import com.alipay.sofa.registry.remoting.exchange.message.Response; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; import com.alipay.sofa.registry.server.session.node.RaftClientManager; -import com.alipay.sofa.registry.server.session.node.SessionNodeManager; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; public class AppRevisionNodeServiceImpl implements AppRevisionNodeService { - private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class, + private static final Logger LOGGER = LoggerFactory.getLogger( + AppRevisionNodeServiceImpl.class, "[AppRevisionService]"); @Autowired diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java index 4a33cc09c..c23c4ffc5 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java @@ -18,8 +18,6 @@ import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.remoting.CallbackHandler; -import com.alipay.sofa.registry.core.model.AppRevisionRegister; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.common.model.metaserver.ProvideData; /** @@ -29,11 +27,4 @@ public interface ClientNodeService { void pushWithCallback(Object object, URL url, CallbackHandler callbackHandler); - /** - * fetch persistence data from meta server - * - * @param dataInfoId - * @return - */ - ProvideData fetchData(String dataInfoId); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java deleted file mode 100644 index bbe2caff2..000000000 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/MetaNodeServiceImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.session.node.service; - -import com.alipay.sofa.registry.common.model.metaserver.*; -import org.springframework.beans.factory.annotation.Autowired; - -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; -import com.alipay.sofa.registry.remoting.exchange.RequestException; -import com.alipay.sofa.registry.remoting.exchange.message.Request; -import com.alipay.sofa.registry.remoting.exchange.message.Response; -import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.node.RaftClientManager; -import com.alipay.sofa.registry.server.session.node.SessionNodeManager; - -import java.util.List; - -/** - * @author shangyu.wh - * @version $Id: MetaNodeServiceImpl.java, v 0.1 2018-04-17 21:23 shangyu.wh Exp $ - */ -public class MetaNodeServiceImpl implements MetaNodeService { - - private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeManager.class, - "[MetaNodeService]"); - - @Autowired - protected SessionServerConfig sessionServerConfig; - - @Autowired - protected NodeExchanger metaNodeExchanger; - - @Autowired - RaftClientManager raftClientManager; - - @Override - public ProvideData fetchData(String dataInfoId) { - try { - - Request request = new Request() { - - @Override - public FetchProvideDataRequest getRequestBody() { - - return new FetchProvideDataRequest(dataInfoId); - } - - @Override - public URL getRequestUrl() { - return new URL(raftClientManager.getLeader().getIp(), - sessionServerConfig.getMetaServerPort()); - } - }; - - Response response = metaNodeExchanger.request(request); - - Object result = response.getResult(); - if (result instanceof ProvideData) { - return (ProvideData) result; - } else { - LOGGER.error("fetch null provider data!"); - throw new RuntimeException("MetaNodeService fetch null provider data!"); - } - } catch (RequestException e) { - LOGGER.error("fetch provider data error! " + e.getMessage(), e); - throw new RuntimeException("fetch provider data error! " + e.getMessage(), e); - } - } -} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java index 641c2b8ba..66f3a781b 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java @@ -26,9 +26,6 @@ import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; import org.springframework.beans.factory.annotation.Autowired; -import com.alipay.sofa.registry.common.model.Node; -import com.alipay.sofa.registry.common.model.RenewDatumRequest; -import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.StoreData; import com.alipay.sofa.registry.common.model.store.Subscriber; @@ -58,11 +55,6 @@ import com.alipay.sofa.registry.task.listener.TaskEvent; import com.alipay.sofa.registry.task.listener.TaskListenerManager; import com.google.common.collect.Lists; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; /** * @author shangyu.wh @@ -70,64 +62,65 @@ */ public class SessionRegistry implements Registry { - private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(SessionRegistry.class); - protected static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class, - "[Task]"); + protected static final Logger TASK_LOGGER = LoggerFactory.getLogger( + SessionRegistry.class, "[Task]"); /** * store subscribers */ @Autowired - private Interests sessionInterests; + private Interests sessionInterests; /** * store watchers */ @Autowired - private Watchers sessionWatchers; + private Watchers sessionWatchers; /** * store publishers */ @Autowired - private DataStore sessionDataStore; + private DataStore sessionDataStore; /** * transfer data to DataNode */ @Autowired - private DataNodeService dataNodeService; + private DataNodeService dataNodeService; /** * trigger task com.alipay.sofa.registry.server.meta.listener process */ @Autowired - private TaskListenerManager taskListenerManager; + private TaskListenerManager taskListenerManager; @Autowired - private SessionServerConfig sessionServerConfig; + private SessionServerConfig sessionServerConfig; @Autowired - private Exchange boltExchange; + private Exchange boltExchange; @Autowired - private SessionRegistryStrategy sessionRegistryStrategy; + private SessionRegistryStrategy sessionRegistryStrategy; @Autowired private WrapperInterceptorManager wrapperInterceptorManager; @Autowired - private DataIdMatchStrategy dataIdMatchStrategy; + private DataIdMatchStrategy dataIdMatchStrategy; @Autowired - private WriteDataAcceptor writeDataAcceptor; + private WriteDataAcceptor writeDataAcceptor; @Autowired - private SlotTableCache slotTableCache; - private AppRevisionCacheRegistry appRevisionCacheRegistry; + private SlotTableCache slotTableCache; + private AppRevisionCacheRegistry appRevisionCacheRegistry; - private volatile boolean enableDataRenewSnapshot = true; + private volatile boolean enableDataRenewSnapshot = true; @Override public void register(StoreData storeData) { @@ -199,7 +192,7 @@ public void unRegister(StoreData storeData) { // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot) // are handed over to WriteDataAcceptor writeDataAcceptor.accept(new PublisherWriteDataRequest(publisher, - WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache + WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache .getLeader(publisher.getDataInfoId()))); sessionRegistryStrategy.afterPublisherUnRegister(publisher); @@ -332,81 +325,81 @@ public void remove(List connectIds) { subMap.forEach((registerId, sub) -> { if (isFireSubscriberPushEmptyTask(sub.getDataId())) { - fireSubscriberPushEmptyTask(sub); - } - }); - } + fireSubscriberPushEmptyTask(sub); + } + }); + } - if (pubExisted || subExisted) { - connectIdsAll.add(connectId); - } - }); - cancel(connectIdsAll); - } + if (pubExisted || subExisted) { + connectIdsAll.add(connectId); + } + }); + cancel(connectIdsAll); + } - protected boolean isFireSubscriberPushEmptyTask (String dataId){ - return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex()); - } + protected boolean isFireSubscriberPushEmptyTask(String dataId) { + return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex()); + } - private void fireSubscriberPushEmptyTask (Subscriber subscriber){ - //trigger empty data push - TaskEvent taskEvent = new TaskEvent(subscriber, - TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK); - TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); - getTaskListenerManager().sendTaskEvent(taskEvent); - } + private void fireSubscriberPushEmptyTask(Subscriber subscriber) { + //trigger empty data push + TaskEvent taskEvent = new TaskEvent(subscriber, + TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK); + TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); + getTaskListenerManager().sendTaskEvent(taskEvent); + } - public void cleanClientConnect () { - - Set connectIndexes = new HashSet<>(); - Set pubIndexes = sessionDataStore.getConnectIds(); - Set subIndexes = sessionInterests.getConnectIds(); - Set watchIndexes = sessionWatchers.getConnectIds(); - connectIndexes.addAll(pubIndexes); - connectIndexes.addAll(subIndexes); - connectIndexes.addAll(watchIndexes); - - Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); - - List connectIds = new ArrayList<>(); - for (ConnectId connectId : connectIndexes) { - Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(), - connectId.getClientPort())); - if (channel == null) { - connectIds.add(connectId); - LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}", - connectId); - } - } - if (!connectIds.isEmpty()) { - cancel(connectIds); + public void cleanClientConnect() { + + Set connectIndexes = new HashSet<>(); + Set pubIndexes = sessionDataStore.getConnectIds(); + Set subIndexes = sessionInterests.getConnectIds(); + Set watchIndexes = sessionWatchers.getConnectIds(); + connectIndexes.addAll(pubIndexes); + connectIndexes.addAll(subIndexes); + connectIndexes.addAll(watchIndexes); + + Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); + + List connectIds = new ArrayList<>(); + for (ConnectId connectId : connectIndexes) { + Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(), + connectId.getClientPort())); + if (channel == null) { + connectIds.add(connectId); + LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}", + connectId); } } - - /** - * Getter method for property sessionInterests. - * - * @return property value of sessionInterests - */ - protected Interests getSessionInterests () { - return sessionInterests; + if (!connectIds.isEmpty()) { + cancel(connectIds); } + } - /** - * Getter method for property sessionDataStore. - * - * @return property value of sessionDataStore - */ - protected DataStore getSessionDataStore () { - return sessionDataStore; - } + /** + * Getter method for property sessionInterests. + * + * @return property value of sessionInterests + */ + protected Interests getSessionInterests() { + return sessionInterests; + } - /** - * Getter method for property taskListenerManager. - * - * @return property value of taskListenerManager - */ - protected TaskListenerManager getTaskListenerManager () { - return taskListenerManager; - } - } \ No newline at end of file + /** + * Getter method for property sessionDataStore. + * + * @return property value of sessionDataStore + */ + protected DataStore getSessionDataStore() { + return sessionDataStore; + } + + /** + * Getter method for property taskListenerManager. + * + * @return property value of taskListenerManager + */ + protected TaskListenerManager getTaskListenerManager() { + return taskListenerManager; + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java index 395814ee9..b60f2fbad 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java @@ -27,6 +27,7 @@ import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import com.alipay.sofa.registry.util.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import com.alipay.sofa.registry.common.model.Node.NodeType; @@ -140,6 +141,9 @@ public Object doHandle(Channel channel, DataChangeRequest dataChangeRequest) { } private void refreshMeta(Collection revisions) { + if (revisions == null || revisions.isEmpty()) { + return; + } for (String revision : revisions) { appRevisionCacheRegistry.getRevision(revision); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java index 44dd904ba..b25ba2d42 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java @@ -26,13 +26,16 @@ import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.core.model.AssembleType; +import com.alipay.sofa.registry.core.model.ReceivedData; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.session.assemble.SubscriberAssembleStrategy; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.server.session.cache.DatumKey; import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter; import com.alipay.sofa.registry.server.session.push.FirePushService; import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager; import com.alipay.sofa.registry.server.session.store.Interests; @@ -43,16 +46,12 @@ import org.springframework.util.CollectionUtils; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; /** - * * @author shangyu.wh * @version $Id: DataChangeFetchTask.java, v 0.1 2017-12-13 12:25 shangyu.wh Exp $ */ @@ -157,7 +156,7 @@ private void doExecute(String dataInfoId) { subscriber -> subscriber.getAssembleType() == assembleType) .collect(Collectors.toList()); - if(subscribersSend.isEmpty()){ + if (subscribersSend.isEmpty()) { continue; } Subscriber defaultSubscriber = subscribersSend.stream().findFirst().get(); @@ -256,99 +255,6 @@ private void evictReSubscribers(Collection subscribersPush) { } } - private void fireReceivedDataMultiPushTask(Datum datum, List subscriberRegisterIdList, - Collection subscribers, ScopeEnum scopeEnum, - Subscriber subscriber, PushTaskClosure pushTaskClosure) { - String dataId = datum.getDataId(); - String clientCell = sessionServerConfig.getClientCell(subscriber.getCell()); - Predicate zonePredicate = (zone) -> { - if (!clientCell.equals(zone)) { - if (ScopeEnum.zone == scopeEnum) { - // zone scope subscribe only return zone list - return true; - - } else if (ScopeEnum.dataCenter == scopeEnum || ScopeEnum.global == scopeEnum) { - // disable zone config - return sessionServerConfig.isInvalidForeverZone(zone) && !sessionServerConfig - .isInvalidIgnored(dataId); - } - } - return false; - }; - ReceivedData receivedData = ReceivedDataConverter - .getReceivedDataMulti(datum, scopeEnum, subscriberRegisterIdList, - clientCell, zonePredicate); - - //trigger push to client node - Map parameter = new HashMap<>(); - parameter.put(receivedData, subscriber.getSourceAddress()); - TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); - taskEvent.setTaskClosure(pushTaskClosure); - taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); - taskLogger.info("send {} taskURL:{},taskScope:{},,taskId={}", taskEvent.getTaskType(), - subscriber.getSourceAddress(), scopeEnum, taskEvent.getTaskId()); - taskListenerManager.sendTaskEvent(taskEvent); - } - - private Map> getCache(ScopeEnum scopeEnum) { - return sessionInterests.querySubscriberIndex(dataChangeRequest.getDataInfoId(), scopeEnum); - } - - private Datum getDatumCache() { - // build key - DatumKey datumKey = new DatumKey(dataChangeRequest.getDataInfoId(), - dataChangeRequest.getDataCenter()); - Key key = new Key(KeyType.OBJ, DatumKey.class.getName(), datumKey); - - // get from cache (it will fetch from backend server) - Value value = null; - try { - value = sessionCacheService.getValue(key); - } catch (CacheAccessException e) { - LOGGER.error("error when access cache: {}", datumKey, e); - } - - return value == null ? null : value.getPayload(); - } - - private void fireUserDataElementPushTask(InetSocketAddress address, Datum datum, - Collection subscribers, - PushTaskClosure pushTaskClosure) { - - TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_PUSH_TASK); - taskEvent.setTaskClosure(pushTaskClosure); - taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion())); - taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); - taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum); - taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, new URL(address)); - - int size = datum != null ? datum.publisherSize() : 0; - - taskLogger.info( - "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", - taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size, - subscribers.size(), taskEvent.getTaskId()); - taskListenerManager.sendTaskEvent(taskEvent); - } - - private void fireUserDataElementMultiPushTask(InetSocketAddress address, Datum datum, - Collection subscribers, - PushTaskClosure pushTaskClosure) { - - TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK); - taskEvent.setTaskClosure(pushTaskClosure); - taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion())); - taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); - taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum); - taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, new URL(address)); - - int size = datum != null ? datum.publisherSize() : 0; - - taskLogger.info( - "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", - taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size, - subscribers.size(), taskEvent.getTaskId()); - taskListenerManager.sendTaskEvent(taskEvent); private Map> getCache(String dataInfoId, ScopeEnum scopeEnum) { return sessionInterests.querySubscriberIndex(dataInfoId, scopeEnum); @@ -378,7 +284,7 @@ public void setTaskEvent(TaskEvent taskEvent) { /** * Setter method for property dataChangeRequest. * - * @param dataChangeRequest value to be assigned to property dataChangeRequest + * @param dataChangeRequest value to be assigned to property dataChangeRequest */ public void setDataChangeRequest(DataChangeRequest dataChangeRequest) { this.dataChangeRequest = dataChangeRequest; diff --git a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java index 33a8fca8d..52ba434a2 100644 --- a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java +++ b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java @@ -232,7 +232,7 @@ public void doTest() throws Exception { // post sync data request DataChangeRequest request = new DataChangeRequest(DataInfo.toDataInfoId( DATA_ID, DEFAULT_INSTANCE_ID, DEFAULT_GROUP), LOCAL_DATACENTER, - finalI); + finalI, new HashSet<>()); boltChannelMap.forEach((connect,boltChannel)->{