From 39954fb31a5bc4739da673cdcff886ff3224e5db Mon Sep 17 00:00:00 2001 From: "yuzhi.lyz" Date: Fri, 18 Dec 2020 19:04:25 +0800 Subject: [PATCH] What type of PR is this? refactor the session-store --- .aci.yml | 14 +- .master.aci.yml | 3 +- .../sofa/registry/common/model/IPPort.java | 79 ++++ .../registry/common/model/store/BaseInfo.java | 2 + .../sofa/registry/util/ParaCheckUtil.java | 6 + .../session/cache/SubscriberResult.java | 81 ----- .../StopPushProvideDataProcessor.java | 9 +- .../session/registry/SessionRegistry.java | 4 +- .../resource/SessionDigestResource.java | 11 +- .../session/store/AbstractDataManager.java | 163 +++++++++ .../server/session/store/DataManager.java | 32 +- .../server/session/store/DataStore.java | 31 -- .../server/session/store/Interests.java | 38 +- .../session/store/SessionDataStore.java | 229 ++---------- .../session/store/SessionInterests.java | 338 +++++------------- .../server/session/store/SessionWatchers.java | 184 ++-------- .../session/store/SlotSessionDataStore.java | 22 +- .../server/session/store/StoreHelpers.java | 93 +++++ .../server/session/store/Watchers.java | 13 - .../server/session/store/DataCacheTest.java | 19 +- .../shared/remoting/ServerSideExchanger.java | 23 +- 21 files changed, 560 insertions(+), 834 deletions(-) create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/IPPort.java delete mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SubscriberResult.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/AbstractDataManager.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/StoreHelpers.java diff --git a/.aci.yml b/.aci.yml index d7f22ad03..23c2d1f8a 100644 --- a/.aci.yml +++ b/.aci.yml @@ -49,16 +49,4 @@ STC扫描: - export PATH=$PATH:/opt/taobao/java/bin:/opt/taobao/maven/bin - java -version - mvn -version - - mvn clean package -Dmaven.test.skip=true -Pdev - -发布JAR包: - stage: 发布JAR包 - plugin: ANT-BUILD - pluginConfig: - image: reg.docker.alibaba-inc.com/antb/jarbuild:0.0.1 - script: - - java -version - - mvn -version - - sed -i '//,/<\/mirrors>/d' /opt/taobao/maven_settings/settings-release.xml - - sed -i '//,/<\/profiles>/d' /opt/taobao/maven_settings/settings-release.xml - - mvn clean deploy -Dmaven.test.skip=true -s /opt/taobao/maven_settings/settings-release.xml -Pdev -Pantcode-release \ No newline at end of file + - mvn clean package -Dmaven.test.skip=true -Pdev \ No newline at end of file diff --git a/.master.aci.yml b/.master.aci.yml index 10d391c7a..1c0740781 100644 --- a/.master.aci.yml +++ b/.master.aci.yml @@ -28,9 +28,10 @@ STC扫描: 集成测试: stage: 测试 plugin: LINKQ-IT # java集成测试插件 + checkRule: + - passRate = 100 tools: jdk: '1.8' - maven: 3.0.3 parameters: encoding: UTF-8 # 编码设置 pluginConfig: diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/IPPort.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/IPPort.java new file mode 100644 index 000000000..19ff4b84e --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/IPPort.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.common.model; + +import com.alipay.sofa.registry.util.ParaCheckUtil; + +import java.io.Serializable; +import java.util.Objects; + +/** + * + * @author yuzhi.lyz + * @version v 0.1 2020-12-21 11:14 yuzhi.lyz Exp $ + */ +public final class IPPort implements Serializable { + private final String ip; + private final int port; + + private IPPort(String ip, int port) { + this.ip = ip; + this.port = port; + } + + /** + * Getter method for property ip. + * @return property value of ip + */ + public String getIp() { + return ip; + } + + /** + * Getter method for property port. + * @return property value of port + */ + public int getPort() { + return port; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof IPPort)) + return false; + IPPort ipPort = (IPPort) o; + return port == ipPort.port && Objects.equals(ip, ipPort.ip); + } + + @Override + public int hashCode() { + return Objects.hash(ip, port); + } + + @Override + public String toString() { + return ip + ":" + port; + } + + public static IPPort of(String ip, int port) { + ParaCheckUtil.checkNotBlank(ip, "ip"); + ParaCheckUtil.checkIsPositive(port, "port"); + return new IPPort(ip, port); + } +} diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java index df97df025..6918085b8 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java @@ -19,9 +19,11 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Set; import com.alipay.sofa.registry.common.model.ConnectId; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.Sets; /** * diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/util/ParaCheckUtil.java b/server/common/util/src/main/java/com/alipay/sofa/registry/util/ParaCheckUtil.java index 1bac303b3..0c18e8e1f 100644 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/util/ParaCheckUtil.java +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/util/ParaCheckUtil.java @@ -68,6 +68,12 @@ public static void checkNonNegative(long v, String paraName) { } } + public static void checkIsPositive(long v, String paraName) { + if (v <= 0) { + throw new RuntimeException(String.format("%s is require positive, %d", paraName, v)); + } + } + public static void checkContains(Set sets, Object param, String paraName) throws RuntimeException { if (!sets.contains(param)) { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SubscriberResult.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SubscriberResult.java deleted file mode 100644 index cdea4d663..000000000 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SubscriberResult.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.session.cache; - -import com.alipay.sofa.registry.core.model.ScopeEnum; - -/** - * - * @author shangyu.wh - * @version $Id: SubscriberResult.java, v 0.1 2017-12-06 17:16 shangyu.wh Exp $ - */ -public class SubscriberResult implements EntityType { - - private final String dataInfoId; - - private final ScopeEnum scope; - - public SubscriberResult(String dataInfoId, ScopeEnum scope) { - this.dataInfoId = dataInfoId; - this.scope = scope; - } - - @Override - public String getUniqueKey() { - StringBuilder sb = new StringBuilder(dataInfoId); - sb.append(COMMA).append(scope); - return sb.toString(); - } - - @Override - public int hashCode() { - String hashKey = getUniqueKey(); - return hashKey.hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof SubscriberResult) { - return getUniqueKey().equals(((SubscriberResult) other).getUniqueKey()); - } else { - return false; - } - } - - /** - * Getter method for property dataInfoId. - * - * @return property value of dataInfoId - */ - public String getDataInfoId() { - return dataInfoId; - } - - /** - * Getter method for property scope. - * - * @return property value of scope - */ - public ScopeEnum getScope() { - return scope; - } - - @Override - public String toString() { - return "SubscriberResult{" + "dataInfoId='" + dataInfoId + '\'' + ", scope=" + scope + '}'; - } -} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/provideData/processor/StopPushProvideDataProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/provideData/processor/StopPushProvideDataProcessor.java index bb9c0bbbc..cdca185dd 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/provideData/processor/StopPushProvideDataProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/provideData/processor/StopPushProvideDataProcessor.java @@ -30,6 +30,7 @@ import com.alipay.sofa.registry.task.listener.TaskEvent; import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType; import com.alipay.sofa.registry.task.listener.TaskListenerManager; +import com.alipay.sofa.registry.util.ConcurrentUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -103,12 +104,8 @@ private void fireReSubscriber() { LOGGER.error("Open push switch first fetch task execute error", e); } - try { - //wait 1 MINUTES for dataFetch task evict duplicate subscriber push - TimeUnit.MINUTES.sleep(1); - } catch (InterruptedException e) { - LOGGER.error("Wait for dataFetch Task Interrupted!"); - } + //wait 1 MINUTES for dataFetch task evict duplicate subscriber push + ConcurrentUtils.sleepUninterruptibly(1, TimeUnit.MINUTES); //fetch task process 1 minutes,can schedule execute fetch task sessionServerConfig.setBeginDataFetchTask(true); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java index 413d47fbf..9d7cbcd63 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java @@ -275,7 +275,7 @@ public void waitingUnthrowable() { } private void fetchVersions() { - Collection checkDataInfoIds = sessionInterests.getInterestDataInfoIds(); + Collection checkDataInfoIds = sessionInterests.getDataInfoIds(); Map/*dataInfoIds*/> map = calculateDataNode(checkDataInfoIds); for (Map.Entry> e : map.entrySet()) { final int slotId = e.getKey(); @@ -318,7 +318,7 @@ public String toString() { @Override public void fetchChangDataProcess() { - Collection checkDataInfoIds = sessionInterests.getInterestDataInfoIds(); + Collection checkDataInfoIds = sessionInterests.getDataInfoIds(); Map/*dataInfoIds*/> map = calculateDataNode(checkDataInfoIds); map.forEach((slotId, dataInfoIds) -> { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java index 744567c73..15573ab32 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java @@ -111,10 +111,9 @@ public Map> getSessionDataByDataInfoId(@ @PathParam("type") String type) { Map> serverList = new HashMap<>(); if (dataInfoId != null) { - Collection publishers = sessionDataStore - .getStoreDataByDataInfoId(dataInfoId); - Collection subscribers = sessionInterests.getInterests(dataInfoId); - Collection watchers = sessionWatchers.getWatchers(dataInfoId); + Collection publishers = sessionDataStore.getDatas(dataInfoId); + Collection subscribers = sessionInterests.getDatas(dataInfoId); + Collection watchers = sessionWatchers.getDatas(dataInfoId); fillServerList(type, serverList, publishers, subscribers, watchers); } @@ -189,8 +188,8 @@ public Map getPushSwitch() { @Produces(MediaType.APPLICATION_JSON) public Collection getDataInfoIdList() { Collection ret = new HashSet<>(); - ret.addAll(sessionInterests.getInterestDataInfoIds()); - ret.addAll(sessionDataStore.getStoreDataInfoIds()); + ret.addAll(sessionInterests.getDataInfoIds()); + ret.addAll(sessionDataStore.getDataInfoIds()); return ret; } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/AbstractDataManager.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/AbstractDataManager.java new file mode 100644 index 000000000..f2c4455fe --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/AbstractDataManager.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.store; + +import com.alipay.sofa.registry.common.model.ConnectId; +import com.alipay.sofa.registry.common.model.store.BaseInfo; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; +import com.alipay.sofa.registry.util.ParaCheckUtil; +import com.google.common.collect.Lists; +import org.apache.commons.collections.MapUtils; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * + * @author yuzhi.lyz + * @version v 0.1 2020-12-18 17:18 yuzhi.lyz Exp $ + */ +public abstract class AbstractDataManager implements + DataManager { + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + protected final Lock read = readWriteLock + .readLock(); + protected final Lock write = readWriteLock + .writeLock(); + + protected final ConcurrentHashMap> stores = new ConcurrentHashMap<>(); + protected final Logger logger; + + @Autowired + protected SessionServerConfig sessionServerConfig; + + AbstractDataManager(Logger logger) { + this.logger = logger; + } + + @Override + public boolean deleteById(String registerId, String dataInfoId) { + Map datas = stores.get(dataInfoId); + if (datas == null) { + logger.error("Delete failed because is not registered for dataInfoId: {}", dataInfoId); + return false; + } + boolean modified = false; + write.lock(); + try { + T dataToDelete = datas.remove(registerId); + if (dataToDelete != null) { + modified = true; + postDelete(dataToDelete); + } + } finally { + write.unlock(); + } + if (modified) { + logger.error("Delete failed because is not registered for registerId: {}", registerId); + } + return modified; + } + + protected abstract void postDelete(T data); + + @Override + public boolean deleteByConnectId(ConnectId connectId) { + boolean modified = false; + write.lock(); + try { + for (Map map : stores.values()) { + for (Iterator it = map.values().iterator(); it.hasNext();) { + T data = (T) it.next(); + if (connectId.equals(data.connectId())) { + modified = true; + it.remove(); + postDelete(data); + } + } + } + } finally { + write.unlock(); + } + return modified; + } + + @Override + public Collection getDatas(String dataInfoId) { + ParaCheckUtil.checkNotBlank(dataInfoId, "dataInfoId"); + Map dataMap = stores.get(dataInfoId); + if (MapUtils.isEmpty(dataMap)) { + return Collections.emptyList(); + } + return Lists.newArrayList(dataMap.values()); + } + + @Override + public Map> getDatas() { + return StoreHelpers.copyMap((Map)stores); + } + + @Override + public Map queryByConnectId(ConnectId connectId) { + return StoreHelpers.getByConnectId(connectId, stores); + } + + @Override + public T queryById(String registerId, String dataInfoId) { + final Map datas = stores.get(dataInfoId); + return datas == null ? null : datas.get(registerId); + } + + @Override + public long count() { + return StoreHelpers.count(stores); + } + + @Override + public Set getConnectIds() { + return StoreHelpers.collectConnectIds(stores); + } + + @Override + public Set collectProcessIds() { + return StoreHelpers.collectProcessIds(stores); + } + + @Override + public Collection getDataInfoIds() { + return stores.entrySet().stream().filter(e -> !(e.getValue().isEmpty())).map(e -> e.getKey()) + .collect(Collectors.toSet()); + } + + public SessionServerConfig getSessionServerConfig() { + return sessionServerConfig; + } + + /** + * Setter method for property sessionServerConfig. + * + * @param sessionServerConfig value to be assigned to property sessionServerConfig + */ + public void setSessionServerConfig(SessionServerConfig sessionServerConfig) { + this.sessionServerConfig = sessionServerConfig; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataManager.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataManager.java index 1444a1917..d5194adf0 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataManager.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataManager.java @@ -18,7 +18,9 @@ import com.alipay.sofa.registry.common.model.ConnectId; +import java.util.Collection; import java.util.Map; +import java.util.Set; /** * Session Data store manager,according base data function @@ -41,15 +43,7 @@ public interface DataManager { * * @param data */ - void add(DATA data); - - /** - * remove single data by register id - * @param registerId - * @param dataInfoId - * @return - */ - boolean deleteById(ID registerId, DATAINFOID dataInfoId); + boolean add(DATA data); /** * query data by client node connectId @@ -66,9 +60,29 @@ public interface DataManager { */ boolean deleteByConnectId(ConnectId connectId); + DATA queryById(ID registerId, DATAINFOID dataInfoId); + + /** + * remove single data by register id + * @param registerId + * @param dataInfoId + * @return + */ + boolean deleteById(ID registerId, DATAINFOID dataInfoId); + /** * count pub and sub number * @return */ long count(); + + Set getConnectIds(); + + Collection getDatas(DATAINFOID dataInfoId); + + Map> getDatas(); + + Set collectProcessIds(); + + Collection getDataInfoIds(); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataStore.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataStore.java index 1a0aaa229..364730330 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataStore.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/DataStore.java @@ -16,12 +16,9 @@ */ package com.alipay.sofa.registry.server.session.store; -import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.store.Publisher; -import java.util.Collection; import java.util.Map; -import java.util.Set; /** * @@ -30,33 +27,5 @@ */ public interface DataStore extends DataManager { - /** - * get all publishers by dataInfoId - * @param dataInfoId - * @return - */ - Collection getStoreDataByDataInfoId(String dataInfoId); - - /*** - * get Publiser by registerId and dataInfoId - * @param registerId - * @param dataInfoId - * @return - */ - Publisher queryById(String registerId, String dataInfoId); - - /** - * get all publisher dataInfoIds - * - * @return - */ - Collection getStoreDataInfoIds(); - - Set getConnectIds(); - - Set getPublisherProcessIds(); - - Map> getDataInfoIdPublishers(); - Map> getDataInfoIdPublishers(int slotId); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java index b379a6842..0ce696568 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Interests.java @@ -16,15 +16,12 @@ */ package com.alipay.sofa.registry.server.session.store; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.ScopeEnum; + import java.net.InetSocketAddress; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; - -import com.alipay.sofa.registry.common.model.ConnectId; -import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.core.model.ScopeEnum; /** * @@ -33,14 +30,6 @@ */ public interface Interests extends DataManager { - /** - * query subscribers by dataInfoID - * - * @param dataInfoId - * @return - */ - Collection getInterests(String dataInfoId); - /** * check subscribers interest dataInfoId version,very dataCenter dataInfoId version different * if return false @@ -74,14 +63,7 @@ public interface Interests extends DataManager { boolean checkAndUpdateInterestVersionZero(String dataCenter, String dataInfoId); /** - * get all subscriber dataInfoIds - * - * @return - */ - Collection getInterestDataInfoIds(); - - /** - * get subscribers whith specify dataInfo and scope,and group by source InetSocketAddress + * get subscribers whith specify dataInfo and scope,and group by source address * @param dataInfoId * @param scope * @return @@ -89,17 +71,5 @@ public interface Interests extends DataManager { Map> querySubscriberIndex(String dataInfoId, ScopeEnum scope); - /** - * get subscriber by dataInfoId and registerId - * @param registerId - * @param dataInfoId - * @return - */ - Subscriber queryById(String registerId, String dataInfoId); - - Set getConnectIds(); - List getDataCenters(); - - Set getSubscriberProcessIds(); } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java index 47b454fa6..ed688dad7 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java @@ -16,237 +16,62 @@ */ package com.alipay.sofa.registry.server.session.store; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.PublisherInternUtil; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.util.ParaCheckUtil; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; + +import java.util.Map; /** * @author shangyu.wh * @version $Id: SessionDataStore.java, v 0.1 2017-12-01 18:14 shangyu.wh Exp $ */ -public class SessionDataStore implements DataStore { - - private static final Logger LOGGER = LoggerFactory - .getLogger(SessionDataStore.class); - private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock write = readWriteLock - .writeLock(); +public class SessionDataStore extends AbstractDataManager implements DataStore { - /** - * publisher store - */ - private Map> registry = new ConcurrentHashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(SessionDataStore.class); - /*** index */ - private Map> connectIndex = new ConcurrentHashMap<>(); + public SessionDataStore() { + super(LOGGER); + } @Override - public void add(Publisher publisher) { + public boolean add(Publisher publisher) { + ParaCheckUtil.checkNotNull(publisher.getVersion(), "publisher.version"); + ParaCheckUtil.checkNotNull(publisher.getRegisterTimestamp(), "publisher.registerTimestamp"); + PublisherInternUtil.internPublisher(publisher); + Map publishers = stores.computeIfAbsent(publisher.getDataInfoId(), k -> Maps + .newConcurrentMap()); + boolean toAdd = true; + Publisher existingPublisher = null; write.lock(); try { - Map publishers = registry.computeIfAbsent(publisher.getDataInfoId(), k-> Maps - .newConcurrentMap()); - - Publisher existingPublisher = publishers.get(publisher.getRegisterId()); - + existingPublisher = publishers.get(publisher.getRegisterId()); if (existingPublisher != null) { - if (existingPublisher.getVersion() != null) { - long oldVersion = existingPublisher.getVersion(); - Long newVersion = publisher.getVersion(); - if (newVersion == null) { - LOGGER.error("There is publisher input version can't be null!"); - return; - } else if (oldVersion > newVersion) { - LOGGER - .warn( - "There is publisher already exists,but old version {} higher than input {},it will not be overwrite! {}", - oldVersion, newVersion, existingPublisher); - return; - } else if (oldVersion == newVersion) { - Long newTime = publisher.getRegisterTimestamp(); - long oldTime = existingPublisher.getRegisterTimestamp(); - if (newTime == null) { - LOGGER - .error("There is publisher input Register Timestamp can not be null!"); - return; - } - if (oldTime > newTime) { - LOGGER - .warn( - "There is publisher already exists,but old timestamp {} higher than input {},it will not be overwrite! {}", - oldTime, newTime, existingPublisher); - return; - } - } + if (!existingPublisher.publisherVersion().orderThan(publisher.publisherVersion())) { + toAdd = false; } - LOGGER - .warn( - "There is publisher already exists,version:{},it will be overwrite!Input version:{},info:{}", - existingPublisher.getVersion(), publisher.getVersion(), existingPublisher); - removeFromConnectIndex(existingPublisher); } - publishers.put(publisher.getRegisterId(), publisher); - addToConnectIndex(publisher); - - } finally { - write.unlock(); - } - } - - @Override - public boolean deleteById(String registerId, String dataInfoId) { - - write.lock(); - try { - Map publishers = registry.get(dataInfoId); - - if (publishers == null) { - LOGGER.warn("Delete failed because publisher is not registered for dataInfoId: {}", - dataInfoId); - return false; - } else { - Publisher publisherTodelete = publishers.remove(registerId); - - if (publisherTodelete == null) { - LOGGER.warn( - "Delete failed because publisher is not registered for registerId: {}", - registerId); - return false; - - } else { - removeFromConnectIndex(publisherTodelete); - return true; - } + if (toAdd) { + publishers.put(publisher.getRegisterId(), publisher); } } finally { write.unlock(); } - } - - @Override - public Map queryByConnectId(ConnectId connectId) { - return connectIndex.get(connectId); - } - - @Override - public boolean deleteByConnectId(ConnectId connectId) { - write.lock(); - try { - for (Map map : registry.values()) { - for (Iterator it = map.values().iterator(); it.hasNext();) { - Publisher publisher = (Publisher) it.next(); - if (publisher != null && connectId.equals(publisher.connectId())) { - it.remove(); - LOGGER.info("remove publisher by connectId={}, {}", connectId, publisher); - } - } - } - connectIndex.remove(connectId); - return true; - } finally { - write.unlock(); - } - } - - @Override - public Collection getStoreDataByDataInfoId(String dataInfoId) { - - Map publishers = registry.get(dataInfoId); - - if (publishers == null) { - LOGGER.info("There is not registered publisher for dataInfoId: {}", dataInfoId); - return null; - } else { - return publishers.values(); - } - } - - @Override - public Publisher queryById(String registerId, String dataInfoId) { - - Map publishers = registry.get(dataInfoId); - - if (publishers == null) { - LOGGER.warn("Publisher is not registered for dataInfoId: {}", dataInfoId); - return null; + // log without lock + if (existingPublisher != null) { + LOGGER.warn("exist publisher, added={}, existVer={}, inputVer={}, {}", toAdd, + existingPublisher.publisherVersion(), publisher.publisherVersion(), existingPublisher); } - return publishers.get(registerId); - } + return toAdd; - @Override - public Collection getStoreDataInfoIds() { - return registry.keySet(); } - @Override - public long count() { - long count = 0; - for (Map map : registry.values()) { - count += map.size(); - } - return count; - } - - private void addToConnectIndex(Publisher publisher) { - ConnectId connectId = publisher.connectId(); - Map publisherMap = connectIndex.get(connectId); - if (publisherMap == null) { - Map newPublisherMap = new ConcurrentHashMap<>(); - publisherMap = connectIndex.putIfAbsent(connectId, newPublisherMap); - if (publisherMap == null) { - publisherMap = newPublisherMap; - } - } - - publisherMap.put(publisher.getRegisterId(), publisher); - } - - private void removeFromConnectIndex(Publisher publisher) { - ConnectId connectId = publisher.connectId(); - Map publisherMap = connectIndex.get(connectId); - if (publisherMap != null) { - publisherMap.remove(publisher.getRegisterId()); - } else { - LOGGER.warn("ConnectId {} not existed in Index to remove!", connectId); - } - } - - @Override - public Set getConnectIds() { - return Sets.newHashSet(connectIndex.keySet()); - } - - @Override - public Set getPublisherProcessIds() { - HashSet processIds = Sets.newHashSet(); - for (Map publishers : getDataInfoIdPublishers().values()) { - for (Publisher publisher : publishers.values()) { - if (publisher.getProcessId() != null) { - processIds.add(publisher.getProcessId()); - } - } - } - return processIds; - } - - @Override - public Map> getDataInfoIdPublishers() { - Map> ret = new HashMap<>(registry.size()); - for (Map.Entry> e : registry.entrySet()) { - ret.put(e.getKey(), new HashMap<>(e.getValue())); - } - return ret; + protected void postDelete(Publisher data) { } @Override diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java index cf9c1f162..4e9171892 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java @@ -16,53 +16,33 @@ */ package com.alipay.sofa.registry.server.session.store; -import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.WordCache; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.cache.SubscriberResult; import com.alipay.sofa.registry.util.VersionsMapUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.commons.collections.MapUtils; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; /** * @author shangyu.wh * @version $Id: AbstractSessionInterests.java, v 0.1 2017-11-30 20:42 shangyu.wh Exp $ */ -public class SessionInterests implements Interests, ReSubscribers { +public class SessionInterests extends AbstractDataManager implements Interests, + ReSubscribers { private static final Logger LOGGER = LoggerFactory .getLogger(SessionInterests.class); - private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock read = readWriteLock - .readLock(); - private final Lock write = readWriteLock - .writeLock(); - - @Autowired - private SessionServerConfig sessionServerConfig; - - /** - * store all register subscriber - */ - private final ConcurrentHashMap> interests = new ConcurrentHashMap<>(); - - private final Map> connectIndex = new ConcurrentHashMap<>(); - private final Map>> resultIndex = new ConcurrentHashMap<>(); /** @@ -72,145 +52,63 @@ public class SessionInterests implements Interests, ReSubscribers { private final Map> stopPushInterests = new ConcurrentHashMap<>(); + public SessionInterests() { + super(LOGGER); + } + @Override - public void add(Subscriber subscriber) { + public boolean add(Subscriber subscriber) { Subscriber.internSubscriber(subscriber); + Map subscribers = stores.computeIfAbsent(subscriber.getDataInfoId(), + k -> Maps.newConcurrentMap()); + + Subscriber existingSubscriber = null; write.lock(); try { - Map subscribers = interests.computeIfAbsent(subscriber.getDataInfoId(), - k->Maps.newConcurrentMap()); - - Subscriber existingSubscriber = subscribers.get(subscriber.getRegisterId()); + existingSubscriber = subscribers.get(subscriber.getRegisterId()); if (existingSubscriber != null) { - LOGGER.warn("There is subscriber already exists,it will be overwrite! {}", - existingSubscriber); if (sessionServerConfig.isStopPushSwitch()) { deleteReSubscriber(existingSubscriber); } - invalidateIndex(existingSubscriber); + invalidateResultIndex(existingSubscriber); } subscribers.put(subscriber.getRegisterId(), subscriber); addReSubscriber(subscriber); - - addIndex(subscriber); - + addResultIndex(subscriber); } finally { write.unlock(); } - - } - - @Override - public boolean deleteById(String registerId, String dataInfoId) { - write.lock(); - try { - - Map subscribers = interests.get(dataInfoId); - - if (subscribers == null) { - LOGGER.error( - "Delete failed because subscriber is not registered for dataInfoId: {}", - dataInfoId); - return false; - } else { - Subscriber subscriberTodelete = subscribers.remove(registerId); - - if (subscriberTodelete == null) { - LOGGER.error( - "Delete failed because subscriber is not registered for registerId: {}", - registerId); - return false; - } else { - if (sessionServerConfig.isStopPushSwitch()) { - deleteReSubscriber(subscriberTodelete); - } - removeIndex(subscriberTodelete); - - return true; - } - } - } finally { - write.unlock(); - } - - } - - @Override - public boolean deleteByConnectId(ConnectId connectId) { - write.lock(); - try { - for (Map map : interests.values()) { - for (Iterator it = map.values().iterator(); it.hasNext();) { - Subscriber subscriber = (Subscriber) it.next(); - if (connectId.equals(subscriber.connectId())) { - it.remove(); - if (sessionServerConfig.isStopPushSwitch()) { - deleteReSubscriber(subscriber); - } - - invalidateIndex(subscriber); - } - } - } - //force remove connectId - invalidateConnectIndex(connectId); - return true; - } catch (Exception e) { - LOGGER.error("Delete subscriber by connectId {} error!", connectId, e); - return false; - } finally { - write.unlock(); - } - } - - @Override - public long count() { - AtomicLong count = new AtomicLong(0); - for (Map map : interests.values()) { - count.addAndGet(map.size()); - } - return count.get(); - } - - @Override - public Map queryByConnectId(ConnectId connectId) { - return connectIndex.get(connectId); - } - - public Subscriber queryById(String registerId, String dataInfoId) { - Map subscribers = interests.get(dataInfoId); - - if (subscribers == null) { - return null; + // log without lock + if (existingSubscriber != null) { + LOGGER.warn("There is subscriber already exists,it will be overwrite! {}", + existingSubscriber); } - return subscribers.get(registerId); + return true; } @Override - public Collection getInterests(String dataInfoId) { - Map subscribers = interests.get(dataInfoId); - if (subscribers == null) { - LOGGER.info("There is not registered subscriber for : {}", dataInfoId); - return Collections.emptyList(); + protected void postDelete(Subscriber data) { + if (sessionServerConfig.isStopPushSwitch()) { + deleteReSubscriber(data); } - return subscribers.values(); + removeResultIndex(data); } @Override public boolean checkInterestVersions(String dataCenter, String dataInfoId, Long version) { - Map subscribers = interests.get(dataInfoId); + Map subscribers = stores.get(dataInfoId); - if (subscribers == null || subscribers.isEmpty()) { + if (MapUtils.isEmpty(subscribers)) { return false; } Map dataInfoVersions = interestVersions - .computeIfAbsent(dataCenter, k->Maps.newConcurrentMap()); + .computeIfAbsent(dataCenter, k -> Maps.newConcurrentMap()); Long oldValue = dataInfoVersions.get(dataInfoId); @@ -220,21 +118,18 @@ public boolean checkInterestVersions(String dataCenter, String dataInfoId, Long @Override public boolean checkAndUpdateInterestVersions(String dataCenter, String dataInfoId, Long version) { - read.lock(); - try { - dataInfoId = WordCache.getInstance().getWordCache(dataInfoId); + dataInfoId = WordCache.getInstance().getWordCache(dataInfoId); + final Map subscribers = stores.get(dataInfoId); - Map subscribers = interests.get(dataInfoId); - - if (subscribers == null || subscribers.isEmpty()) { - LOGGER.info( - "There are not Subscriber Existed! Who are interest with dataInfoId {} !", - dataInfoId); - return false; - } + if (MapUtils.isEmpty(subscribers)) { + LOGGER.info("There is no Subscriber Existed, dataInfoId={}", dataInfoId); + return false; + } + Map dataInfoVersions = interestVersions + .computeIfAbsent(dataCenter, k -> Maps.newConcurrentMap()); - Map dataInfoVersions = interestVersions - .computeIfAbsent(dataCenter, k -> Maps.newConcurrentMap()); + read.lock(); + try { //set zero if (version.longValue() == 0l) { return dataInfoVersions.put(dataInfoId, version) != null; @@ -245,84 +140,23 @@ public boolean checkAndUpdateInterestVersions(String dataCenter, String dataInfo } } + @Override public boolean checkAndUpdateInterestVersionZero(String dataCenter, String dataInfoId) { return checkAndUpdateInterestVersions(dataCenter, dataInfoId, 0l); } - @Override - public Collection getInterestDataInfoIds() { - return interests.entrySet().stream().filter(e -> !(e.getValue().isEmpty())).map(e -> e.getKey()) - .collect(Collectors.toSet()); - } - - private void addIndex(Subscriber subscriber) { - addConnectIndex(subscriber); - addResultIndex(subscriber); - } - - private void removeIndex(Subscriber subscriber) { - removeConnectIndex(subscriber); - removeResultIndex(subscriber); - } - - private void invalidateIndex(Subscriber subscriber) { - removeConnectIndex(subscriber); - invalidateResultIndex(subscriber); - } - - private void addConnectIndex(Subscriber subscriber) { - ConnectId connectId = subscriber.connectId(); - - Map subscriberMap = connectIndex.get(connectId); - if (subscriberMap == null) { - Map newSubscriberMap = new ConcurrentHashMap<>(); - subscriberMap = connectIndex.putIfAbsent(connectId, newSubscriberMap); - if (subscriberMap == null) { - subscriberMap = newSubscriberMap; - } - } - - subscriberMap.put(subscriber.getRegisterId(), subscriber); - } - private void addResultIndex(Subscriber subscriber) { - SubscriberResult subscriberResult = new SubscriberResult(subscriber.getDataInfoId(), - subscriber.getScope()); - Map> mapSub = resultIndex.get(subscriberResult); - if (mapSub == null) { - Map> newMap = new ConcurrentHashMap<>(); - mapSub = resultIndex.putIfAbsent(subscriberResult, newMap); - if (mapSub == null) { - mapSub = newMap; - } - } + subscriber.getScope()); + final Map> mapSub = resultIndex + .computeIfAbsent(subscriberResult, k -> Maps.newConcurrentMap()); - InetSocketAddress address = new InetSocketAddress(subscriber.getSourceAddress() - .getIpAddress(), subscriber.getSourceAddress().getPort()); - - Map subscribers = mapSub.get(address); - if (subscribers == null) { - Map newSubs = new ConcurrentHashMap<>(); - subscribers = mapSub.putIfAbsent(address, newSubs); - if (subscribers == null) { - subscribers = newSubs; - } - } + InetSocketAddress address = new InetSocketAddress(subscriber.getSourceAddress().getIpAddress(), subscriber.getSourceAddress().getPort()); + Map subscribers = mapSub.computeIfAbsent(address, k -> Maps.newConcurrentMap()); subscribers.put(subscriber.getRegisterId(), subscriber); } - private void removeConnectIndex(Subscriber subscriber) { - ConnectId connectId = subscriber.connectId(); - Map subscriberMap = connectIndex.get(connectId); - if (subscriberMap != null) { - subscriberMap.remove(subscriber.getRegisterId()); - } else { - LOGGER.warn("ConnectId {} not existed in Index to remove!", connectId); - } - } - private void removeResultIndex(Subscriber subscriber) { SubscriberResult subscriberResult = new SubscriberResult(subscriber.getDataInfoId(), subscriber.getScope()); @@ -334,7 +168,7 @@ private void removeResultIndex(Subscriber subscriber) { if (subscribers != null) { subscribers.remove(subscriber.getRegisterId()); } else { - LOGGER.warn("InetSocketAddress {} not existed in Index to remove!", address); + LOGGER.warn("Address {} not existed in Index to remove!", address); } } else { @@ -342,10 +176,6 @@ private void removeResultIndex(Subscriber subscriber) { } } - private void invalidateConnectIndex(ConnectId connectId) { - connectIndex.remove(connectId); - } - private void invalidateResultIndex(Subscriber subscriber) { SubscriberResult subscriberResult = new SubscriberResult(subscriber.getDataInfoId(), subscriber.getScope()); @@ -353,9 +183,7 @@ private void invalidateResultIndex(Subscriber subscriber) { if (mapSub != null) { InetSocketAddress address = new InetSocketAddress(subscriber.getSourceAddress() .getIpAddress(), subscriber.getSourceAddress().getPort()); - mapSub.remove(address); - } else { LOGGER.warn("SubscriberResult {} not existed in Index to remove!", subscriberResult); } @@ -364,19 +192,18 @@ private void invalidateResultIndex(Subscriber subscriber) { @Override public Map> querySubscriberIndex(String dataInfoId, ScopeEnum scope) { + final SubscriberResult subscriberResult = new SubscriberResult(dataInfoId, scope); read.lock(); try { - SubscriberResult subscriberResult = new SubscriberResult(dataInfoId, scope); Map> map = resultIndex.get(subscriberResult); - if (map != null && !map.isEmpty()) { - return new ConcurrentHashMap<>(map); + if (!MapUtils.isEmpty(map)) { + return StoreHelpers.copyMap((Map)map); } else { - return new ConcurrentHashMap<>(); + return Collections.emptyMap(); } } finally { read.unlock(); } - } @Override @@ -393,7 +220,6 @@ public void addReSubscriber(Subscriber subscriber) { @Override public boolean deleteReSubscriber(Subscriber subscriber) { - Map subscribers = stopPushInterests.get(subscriber.getDataInfoId()); if (subscribers == null) { @@ -406,48 +232,48 @@ public boolean deleteReSubscriber(Subscriber subscriber) { @Override public Map> getReSubscribers() { - return stopPushInterests; + return StoreHelpers.copyMap((Map)stopPushInterests); } @Override - public void clearReSubscribers() { - stopPushInterests.clear(); + public List getDataCenters() { + return Lists.newArrayList(interestVersions.keySet()); } @Override - public Set getConnectIds() { - return Sets.newHashSet(connectIndex.keySet()); + public void clearReSubscribers() { + stopPushInterests.clear(); } - public SessionServerConfig getSessionServerConfig() { - return sessionServerConfig; - } + private static final class SubscriberResult { + final String dataInfoId; + final ScopeEnum scope; - /** - * Setter method for property sessionServerConfig. - * - * @param sessionServerConfig value to be assigned to property sessionServerConfig - */ - public void setSessionServerConfig(SessionServerConfig sessionServerConfig) { - this.sessionServerConfig = sessionServerConfig; - } + SubscriberResult(String dataInfoId, ScopeEnum scope) { + this.dataInfoId = dataInfoId; + this.scope = scope; + } - @Override - public List getDataCenters() { - return Lists.newArrayList(interestVersions.keySet()); - } + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof SubscriberResult)) + return false; + SubscriberResult that = (SubscriberResult) o; + return Objects.equals(dataInfoId, that.dataInfoId) && scope == that.scope; + } - @Override - public Set getSubscriberProcessIds() { - HashSet processIds = Sets.newHashSet(); - for (Map subscribers : interests.values()) { - for (Subscriber subscriber : subscribers.values()) { - if (subscriber.getProcessId() != null) { - processIds.add(subscriber.getProcessId()); - } - } + @Override + public int hashCode() { + return Objects.hash(dataInfoId, scope); + } + + @Override + public String toString() { + return "SubscriberResult{" + "dataInfoId='" + dataInfoId + '\'' + ", scope=" + scope + + '}'; } - return processIds; } } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionWatchers.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionWatchers.java index 263f63898..9c0f534f0 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionWatchers.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionWatchers.java @@ -16,101 +16,58 @@ */ package com.alipay.sofa.registry.server.session.store; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.store.Watcher; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.util.VersionsMapUtils; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.commons.collections.MapUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author shangyu.wh * @version $Id: SessionWatchers.java, v 0.1 2018-04-17 19:00 shangyu.wh Exp $ */ -public class SessionWatchers implements Watchers { - - private static final Logger LOGGER = LoggerFactory - .getLogger(SessionWatchers.class); +public class SessionWatchers extends AbstractDataManager implements Watchers { + private static final Logger LOGGER = LoggerFactory.getLogger(SessionWatchers.class); - private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock read = readWriteLock - .readLock(); - private final Lock write = readWriteLock - .writeLock(); - - /** - * store all register watchers - */ - private ConcurrentHashMap> watchers = new ConcurrentHashMap<>(); - - private Map> connectIndex = new ConcurrentHashMap<>(); + public SessionWatchers() { + super(LOGGER); + } /** * store watcher dataInfo version */ - private ConcurrentHashMap watcherVersions = new ConcurrentHashMap<>(); + private ConcurrentHashMap watcherVersions = new ConcurrentHashMap<>(); @Override - public void add(Watcher watcher) { + public boolean add(Watcher watcher) { Watcher.internWatcher(watcher); - + Map watcherMap = stores + .computeIfAbsent(watcher.getDataInfoId(), k -> Maps.newConcurrentMap()); + Watcher existingWatcher = null; write.lock(); try { - Map watcherMap = watchers.computeIfAbsent(watcher.getDataInfoId(), k->Maps.newConcurrentMap()); - Watcher existingWatcher = watcherMap.get(watcher.getRegisterId()); - - if (existingWatcher != null) { - LOGGER.warn("There is watcher already exists,it will be overwrite! {}", - existingWatcher); - removeConnectIndex(existingWatcher); - } - + existingWatcher = watcherMap.get(watcher.getRegisterId()); watcherMap.put(watcher.getRegisterId(), watcher); - - addConnectIndex(watcher); - } finally { write.unlock(); } - } - - @Override - public boolean deleteByConnectId(ConnectId connectId) { - write.lock(); - try { - for (Map map : watchers.values()) { - for (Iterator it = map.values().iterator(); it.hasNext();) { - Watcher watcher = (Watcher) it.next(); - if (watcher != null && connectId.equals(watcher.connectId())) { - it.remove(); - invalidateConnectIndex(connectId); - } - } - } - return true; - } catch (Exception e) { - LOGGER.error("Delete watcher by connectId {} error!", connectId, e); - return false; - } finally { - write.unlock(); + // log without lock + if (existingWatcher != null) { + LOGGER.warn("watcher already exists,it will be overwrite! {}", existingWatcher); } + return true; } @Override public boolean checkWatcherVersions(String dataInfoId, Long version) { read.lock(); try { - - Map watcherMap = watchers.get(dataInfoId); - - if (watcherMap == null || watcherMap.isEmpty()) { + Map watcherMap = stores.get(dataInfoId); + if (MapUtils.isEmpty(watcherMap)) { LOGGER.info("There are not Watcher Existed! Who are interest with dataInfoId {} !", dataInfoId); return false; @@ -123,103 +80,6 @@ public boolean checkWatcherVersions(String dataInfoId, Long version) { } @Override - public Collection getWatchers(String dataInfoId) { - read.lock(); - try { - - if (dataInfoId == null) { - throw new IllegalArgumentException("Input dataInfoId can not be null!"); - } - Map watcherMap = watchers.get(dataInfoId); - if (watcherMap == null) { - LOGGER.info("There is not registered Watcher for : {}", dataInfoId); - return null; - } - return watcherMap.values(); - } finally { - read.unlock(); - } - } - - @Override - public boolean deleteById(String registerId, String dataInfoId) { - write.lock(); - try { - - Map watcherMap = watchers.get(dataInfoId); - - if (watcherMap == null) { - LOGGER.error("Delete failed because watcher is not registered for dataInfoId: {}", - dataInfoId); - return false; - } else { - Watcher watcher = watcherMap.remove(registerId); - - if (watcher == null) { - LOGGER.error( - "Delete failed because watcher is not registered for registerId: {}", - registerId); - return false; - } else { - removeConnectIndex(watcher); - return true; - } - } - } finally { - write.unlock(); - } - } - - @Override - public Map queryByConnectId(ConnectId connectId) { - read.lock(); - try { - return connectIndex.get(connectId); - } finally { - read.unlock(); - } + protected void postDelete(Watcher data) { } - - @Override - public long count() { - AtomicLong count = new AtomicLong(0); - for (Map map : watchers.values()) { - count.addAndGet(map.size()); - } - return count.get(); - } - - private void addConnectIndex(Watcher watcher) { - ConnectId connectId = watcher.connectId(); - Map subscriberMap = connectIndex.get(connectId); - if (subscriberMap == null) { - Map newSubscriberMap = new ConcurrentHashMap<>(); - subscriberMap = connectIndex.putIfAbsent(connectId, newSubscriberMap); - if (subscriberMap == null) { - subscriberMap = newSubscriberMap; - } - } - - subscriberMap.put(watcher.getRegisterId(), watcher); - } - - private void removeConnectIndex(Watcher watcher) { - ConnectId connectId = watcher.connectId(); - Map subscriberMap = connectIndex.get(connectId); - if (subscriberMap != null) { - subscriberMap.remove(watcher.getRegisterId()); - } else { - LOGGER.warn("ConnectId {} not existed in Index to invalidate!", connectId); - } - } - - private void invalidateConnectIndex(ConnectId connectId) { - connectIndex.remove(connectId); - } - - @Override - public Set getConnectIds() { - return Sets.newHashSet(connectIndex.keySet()); - } - } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SlotSessionDataStore.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SlotSessionDataStore.java index 40e2d083d..e8169a35a 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SlotSessionDataStore.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SlotSessionDataStore.java @@ -52,9 +52,9 @@ private DataStore getOrCreateDataStore(String dataInfoId) { } @Override - public Collection getStoreDataByDataInfoId(String dataInfoId) { + public Collection getDatas(String dataInfoId) { DataStore ds = getOrCreateDataStore(dataInfoId); - return ds.getStoreDataByDataInfoId(dataInfoId); + return ds.getDatas(dataInfoId); } @Override @@ -64,10 +64,10 @@ public Publisher queryById(String registerId, String dataInfoId) { } @Override - public Collection getStoreDataInfoIds() { + public Collection getDataInfoIds() { Set set = new HashSet<>(128); slot2DataStores.values().forEach(ds -> { - set.addAll(ds.getStoreDataInfoIds()); + set.addAll(ds.getDataInfoIds()); }); return set; } @@ -80,17 +80,17 @@ public Set getConnectIds() { } @Override - public Set getPublisherProcessIds() { + public Set collectProcessIds() { Set ret = Sets.newHashSet(); - slot2DataStores.values().forEach(d -> ret.addAll(d.getPublisherProcessIds())); + slot2DataStores.values().forEach(d -> ret.addAll(d.collectProcessIds())); return ret; } @Override - public Map> getDataInfoIdPublishers() { + public Map> getDatas() { Map> ret = new HashMap<>(512); for (DataStore ds : slot2DataStores.values()) { - Map> m = ds.getDataInfoIdPublishers(); + Map> m = ds.getDatas(); for (Map.Entry> e : m.entrySet()) { Map publisherMap = ret.computeIfAbsent(e.getKey(), k -> new HashMap<>(128)); publisherMap.putAll(e.getValue()); @@ -102,13 +102,13 @@ public Map> getDataInfoIdPublishers() { @Override public Map> getDataInfoIdPublishers(int slotId) { DataStore ds = slot2DataStores.computeIfAbsent(slotId, k -> new SessionDataStore()); - return ds.getDataInfoIdPublishers(); + return ds.getDatas(); } @Override - public void add(Publisher publisher) { + public boolean add(Publisher publisher) { DataStore ds = getOrCreateDataStore(publisher.getDataInfoId()); - ds.add(publisher); + return ds.add(publisher); } @Override diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/StoreHelpers.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/StoreHelpers.java new file mode 100644 index 000000000..5fb6ed6d4 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/StoreHelpers.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.store; + +import com.alipay.sofa.registry.common.model.ConnectId; +import com.alipay.sofa.registry.common.model.store.BaseInfo; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + * @author yuzhi.lyz + * @version v 0.1 2020-12-18 16:42 yuzhi.lyz Exp $ + */ +public final class StoreHelpers { + private StoreHelpers() { + } + + public static Set collectConnectId(Map map) { + Set sets = Sets.newHashSet(); + map.values().forEach(r -> sets.add(r.connectId())); + return sets; + } + + public static Set collectConnectIds(Map> maps) { + Set sets = Sets.newHashSet(); + maps.values().forEach(m -> sets.addAll(collectConnectId(m))); + return sets; + } + + public static long count(Map> maps) { + long count = 0; + for (Map map : maps.values()) { + count += map.size(); + } + return count; + } + + public static Map getByConnectId(ConnectId connectId, + Map> maps) { + Map retMap = Maps.newHashMap(); + maps.values().forEach(m -> { + m.values().forEach(r -> { + if (connectId.equals(r.connectId())) { + retMap.put(r.getRegisterId(), r); + } + }); + }); + + return retMap; + } + + public static Set collectProcessIds(Map> maps) { + HashSet processIds = Sets.newHashSet(); + for (Map map : maps.values()) { + for (T t : map.values()) { + if (t.getProcessId() != null) { + processIds.add(t.getProcessId()); + } + } + } + return processIds; + } + + public static Map copyMap(Map m) { + Map ret = new HashMap<>(m.size()); + for (Map.Entry e : m.entrySet()) { + if (!e.getValue().isEmpty()) { + ret.put(e.getKey(), new HashMap<>(e.getValue())); + } + } + return ret; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Watchers.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Watchers.java index 1d9e0be6f..d4423e35a 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Watchers.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/Watchers.java @@ -16,12 +16,8 @@ */ package com.alipay.sofa.registry.server.session.store; -import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.store.Watcher; -import java.util.Collection; -import java.util.Set; - /** * * @author shangyu.wh @@ -29,14 +25,6 @@ */ public interface Watchers extends DataManager { - /** - * query watcher by dataInfoID - * - * @param dataInfoId - * @return - */ - Collection getWatchers(String dataInfoId); - /** * check watchers interest dataInfoId version * if not exist add @@ -48,5 +36,4 @@ public interface Watchers extends DataManager { */ boolean checkWatcherVersions(String dataInfoId, Long version); - Set getConnectIds(); } \ No newline at end of file diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java index f5f285596..7adea2f22 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java @@ -118,13 +118,13 @@ public void testDeleteSubById() { Assert.assertEquals(2, map.size()); //remain 100 - Assert.assertEquals(map2.keySet().size(), 2); + Assert.assertEquals(map2.keySet().size(), 1); InetSocketAddress address = new InetSocketAddress("192.168.1.2", 9000); InetSocketAddress addressDel = new InetSocketAddress("192.168.1.9", 8000); Assert.assertFalse(map2.get(address).isEmpty()); - Assert.assertFalse(!map2.get(addressDel).isEmpty()); + Assert.assertTrue(map2.get(addressDel) == null); //Assert.assertEquals(NetUtil.toAddressString(map2.keySet().iterator().next()),"192.168.1.2:9000"); Assert.assertEquals(map2.get(address).size(), 100); @@ -280,6 +280,8 @@ public void testOverwriteSameConnectIdPublisher() { publisher1.setRegisterId("RegisterId1"); publisher1.setSourceAddress(new URL("192.168.1.1", 12345)); publisher1.setTargetAddress(new URL("192.168.1.2", 9600)); + publisher1.setVersion(1L); + publisher1.setRegisterTimestamp(System.currentTimeMillis()); Publisher publisher2 = new Publisher(); publisher2.setDataInfoId("dataInfoId2"); @@ -287,7 +289,8 @@ public void testOverwriteSameConnectIdPublisher() { publisher2.setRegisterId("RegisterId2"); publisher2.setSourceAddress(new URL("192.168.1.1", 12345)); publisher2.setTargetAddress(new URL("192.168.1.2", 9600)); - + publisher2.setVersion(2L); + publisher2.setRegisterTimestamp(System.currentTimeMillis()); sessionDataStore.add(publisher1); sessionDataStore.add(publisher2); @@ -306,6 +309,8 @@ public void testOverwriteSameConnectIdPublisher() { publisher3.setRegisterId(publisher1.getRegisterId()); publisher3.setSourceAddress(new URL("192.168.1.1", 12346)); publisher3.setTargetAddress(new URL("192.168.1.2", 9600)); + publisher3.setVersion(2L); + publisher3.setRegisterTimestamp(System.currentTimeMillis()); Publisher publisher4 = new Publisher(); publisher4.setDataInfoId(publisher2.getDataInfoId()); @@ -313,6 +318,8 @@ public void testOverwriteSameConnectIdPublisher() { publisher4.setRegisterId(publisher2.getRegisterId()); publisher4.setSourceAddress(new URL("192.168.1.1", 12346)); publisher4.setTargetAddress(new URL("192.168.1.2", 9600)); + publisher4.setVersion(2L); + publisher4.setRegisterTimestamp(System.currentTimeMillis()); sessionDataStore.add(publisher3); sessionDataStore.add(publisher4); @@ -468,7 +475,7 @@ public void testSubAndClientOffUnorder() { .getAddressString() + "_" + subscriber1.getTargetAddress().getAddressString())); Assert.assertEquals(sessionInterests.queryByConnectId(ConnectId - .parse("192.168.1.1:12345_192.168.1.2:9600")), null); + .parse("192.168.1.1:12345_192.168.1.2:9600")).isEmpty(), true); Assert.assertEquals( sessionInterests .queryByConnectId(ConnectId.parse("192.168.1.1:12346_192.168.1.2:9600")).size(), 1); @@ -480,8 +487,8 @@ public void testSubAndClientOffUnorder() { sessionInterests .querySubscriberIndex(subscriber1.getDataInfoId(), subscriber1.getScope()) .get(new InetSocketAddress("192.168.1.1", 12346)).size(), 1); - Assert.assertEquals(sessionInterests.getInterests(subscriber1.getDataInfoId()).size(), 1); - Assert.assertTrue(sessionInterests.getInterests(subscriber1.getDataInfoId()).contains( + Assert.assertEquals(sessionInterests.getDatas(subscriber1.getDataInfoId()).size(), 1); + Assert.assertTrue(sessionInterests.getDatas(subscriber1.getDataInfoId()).contains( subscriber2)); } } \ No newline at end of file diff --git a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java index cb52fc7ab..594a4a417 100644 --- a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java +++ b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java @@ -27,8 +27,12 @@ import com.alipay.sofa.registry.remoting.exchange.RequestException; import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; +import com.alipay.sofa.registry.util.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; +import java.util.Collection; +import java.util.Optional; + /** * * @author yuzhi.lyz @@ -38,7 +42,7 @@ public abstract class ServerSideExchanger implements NodeExchanger { private static final Logger LOGGER = LoggerFactory.getLogger(ServerSideExchanger.class); @Autowired - protected Exchange boltExchange; + protected Exchange boltExchange; @Override public Response request(Request request) throws RequestException { @@ -74,6 +78,23 @@ public Response request(Request request) throws RequestException { } } + public Channel choseChannel() { + Server sessionServer = boltExchange.getServer(getServerPort()); + if (sessionServer == null) { + return null; + + } + Collection channels = sessionServer.getChannels(); + Optional channelOptional = CollectionUtils.getRandom(channels); + if (channelOptional.isPresent()) { + Channel channel = channelOptional.get(); + if (channel.isConnected()) { + return channel; + } + } + return null; + } + @Override public Client connectServer() { throw new UnsupportedOperationException();