From 55d87c24e0d04ef4647a14b01116d4d3cf333573 Mon Sep 17 00:00:00 2001 From: bjxiaojian Date: Wed, 28 Oct 2020 14:03:20 +0800 Subject: [PATCH] [app_discovery] support subscribe app and interface publisher --- .../client/event/SubscriberProcessEvent.java | 2 +- .../registry/core/model/AppRevisionKey.java | 55 ---- .../core/model/AppRevisionRegister.java | 8 +- .../registry/core/model/AssembleType.java | 43 +++ .../core/model/SubscriberRegister.java | 25 ++ pom.xml | 10 + .../model/AppRegisterServerDataBox.java | 135 ++++++++++ .../common/model/PublisherInternUtil.java | 81 ++++++ .../model/constants/ValueConstants.java | 3 +- .../common/model/dataserver/Datum.java | 3 +- .../metaserver/FetchRevisionsRequest.java | 6 +- .../sessionserver/DataChangeRequest.java | 46 +++- .../model/sessionserver/DatumSetVersion.java | 61 +++++ .../common/model/store/AppPublisher.java | 49 ++++ .../common/model/store/Publisher.java | 6 + .../common/model/store/Subscriber.java | 21 ++ .../converter/AssembleTypeConverter.java | 39 +++ .../converter/AssembleTypeConverter.java~HEAD | 39 +++ .../registry/task/listener/TaskEvent.java | 2 + .../sofa/registry/util/RevisionUtils.java | 5 +- server/consistency/pom.xml | 2 +- .../change/notify/SessionServerNotifier.java | 22 +- .../handler/DatumSnapshotHandler.java | 195 ++++++++++++++ .../handler/PublishDataHandler.java | 9 + .../service/RaftAppRevisionService.java | 38 ++- .../meta/revision/AppRevisionRegistry.java | 7 +- .../meta/revision/AppRevisionService.java | 9 +- server/server/session/pom.xml | 4 + .../session/assemble/AppAssembleService.java | 51 ++++ .../assemble/AppInterfaceAssembleService.java | 140 ++++++++++ .../session/assemble/AssembleService.java | 37 +++ .../DefaultSubscriberAssembleStrategy.java | 63 +++++ .../assemble/InterfaceAssembleService.java | 51 ++++ .../assemble/SubscriberAssembleStrategy.java | 36 +++ .../bootstrap/SessionServerConfigBean.java | 5 + .../bootstrap/SessionServerConfiguration.java | 51 ++++ .../cache/AppRevisionCacheRegistry.java | 95 ++++--- .../cache/SessionDatumCacheDecorator.java | 135 ++++++++++ .../converter/AppPublisherConverter.java | 107 ++++++++ .../converter/AppRegisterConstant.java | 34 +++ .../session/converter/PublisherConverter.java | 109 +++++--- .../converter/SubscriberConverter.java | 2 + .../listener/DataChangeFetchTaskListener.java | 18 +- .../node/service/AppRevisionNodeService.java | 5 +- .../service/AppRevisionNodeServiceImpl.java | 7 +- .../node/service/ClientNodeService.java | 2 - .../session/predicate/RevisionPredicate.java | 49 ++++ .../session/predicate/ZonePredicate.java | 52 ++++ .../server/session/push/FirePushService.java | 154 +++++++++++ .../session/registry/SessionRegistry.java | 201 ++++++++------- .../handler/DataChangeRequestHandler.java | 61 ++++- .../scheduler/task/DataChangeFetchTask.java | 244 ++++++++++-------- .../session/store/SessionDataStore.java | 4 +- .../session/store/SessionInterests.java | 15 ++ .../DefaultAppRevisionHandlerStrategy.java | 4 +- ...faultDataChangeRequestHandlerStrategy.java | 1 + .../impl/DefaultSessionRegistryStrategy.java | 8 +- .../server/session/utils/AddressUtil.java | 68 +++++ .../store/AppPublisherConverterTest.java | 57 ++++ .../registry/test/sync/SessionNotifyTest.java | 23 +- 60 files changed, 2416 insertions(+), 398 deletions(-) delete mode 100644 core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionKey.java create mode 100644 core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DatumSetVersion.java create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/AppPublisher.java create mode 100644 server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java create mode 100644 server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD create mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppAssembleService.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppInterfaceAssembleService.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AssembleService.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/DefaultSubscriberAssembleStrategy.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/InterfaceAssembleService.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/SubscriberAssembleStrategy.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppRegisterConstant.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/RevisionPredicate.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/ZonePredicate.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/utils/AddressUtil.java create mode 100644 server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/AppPublisherConverterTest.java diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/event/SubscriberProcessEvent.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/event/SubscriberProcessEvent.java index d8c6b2a2b..aa243d09d 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/event/SubscriberProcessEvent.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/event/SubscriberProcessEvent.java @@ -22,7 +22,7 @@ /** * The type Subscriber process event. - * @author zhuoyu.sjw + * @auThor zhuoyu.sjw * @version $Id : SubscriberProcessEvent.java, v 0.1 2018-07-13 18:40 zhuoyu.sjw Exp $$ */ public class SubscriberProcessEvent implements Event { diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionKey.java b/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionKey.java deleted file mode 100644 index dd89120bc..000000000 --- a/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionKey.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.core.model; - -import java.io.Serializable; -import java.util.Objects; - -public class AppRevisionKey implements Comparable, Serializable { - String appname; - String revision; - - public AppRevisionKey(String appname, String revision) { - this.appname = appname; - this.revision = revision; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - AppRevisionKey that = (AppRevisionKey) o; - return Objects.equals(appname, that.appname) && Objects.equals(revision, that.revision); - } - - @Override - public int hashCode() { - return Objects.hash(appname, revision); - } - - @Override - public String toString() { - return appname + "@" + revision; - } - - @Override - public int compareTo(AppRevisionKey o) { - return toString().compareTo(o.toString()); - } -} diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionRegister.java b/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionRegister.java index a055e852c..ddbef71c2 100644 --- a/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionRegister.java +++ b/core/src/main/java/com/alipay/sofa/registry/core/model/AppRevisionRegister.java @@ -21,8 +21,8 @@ import java.util.Map; public class AppRevisionRegister implements Serializable { - public String revision; - public String appname; - public Map baseParams; - public List interfaces; + public String revision; + public String appname; + public Map> baseParams; + public Map interfaces; } diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java b/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java new file mode 100644 index 000000000..830a69db5 --- /dev/null +++ b/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.core.model; + +/** + * + * @author xiaojian.xj + * @version $Id: AssembleType.java, v 0.1 2020年10月27日 01:51 xiaojian.xj Exp $ + */ +public enum AssembleType { + + /** sub app: only sub data where dataId = app */ + sub_app, + + /** sub interface: only sub data where dataId = interface */ + sub_interface, + + /** sub app and interface: sub data from app and interface */ + sub_app_and_interface, ; + + public static boolean contains(String name) { + for (AssembleType subDataType : values()) { + if (subDataType.name().equals(name)) { + return true; + } + } + return false; + } +} \ No newline at end of file diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java b/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java index 8dd822779..a3e990fe5 100644 --- a/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java +++ b/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java @@ -26,6 +26,13 @@ public class SubscriberRegister extends BaseRegister { private String scope; + /** + * interface: only sub interface + * app: only sub app + * app_and_interface: sub app and interface + */ + private String assembleType; + /** * Getter method for property scope. * @@ -44,6 +51,24 @@ public void setScope(String scope) { this.scope = scope; } + /** + * Getter method for property assembleType. + * + * @return property value of assembleType + */ + public String getAssembleType() { + return assembleType; + } + + /** + * Setter method for property assembleType. + * + * @param assembleType value to be assigned to property assembleType + */ + public void setAssembleType(String assembleType) { + this.assembleType = assembleType; + } + /** * To string string. * diff --git a/pom.xml b/pom.xml index 80322e40c..4b7088f56 100644 --- a/pom.xml +++ b/pom.xml @@ -82,9 +82,13 @@ 6.4.6 ${user.dir} -Dnetwork_interface_denylist=docker0 +<<<<<<< HEAD false false +======= + 1.2.51.sec09_noneautotype +>>>>>>> [app_discovery] support subscribe app and interface publisher @@ -322,6 +326,7 @@ 3.2.2 + junit @@ -352,6 +357,11 @@ rocksdbjni ${rocksdbjni.version} + + com.alibaba + fastjson + ${fastjson.version} + diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java new file mode 100644 index 000000000..58efe598d --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.common.model; + +import com.alipay.sofa.registry.common.model.store.WordCache; +import com.google.common.collect.ArrayListMultimap; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + * @author xiaojian.xj + * @version $Id: AppRegisterServerDataBox.java, v 0.1 2020年11月12日 11:14 xiaojian.xj Exp $ + */ +public class AppRegisterServerDataBox implements Serializable { + private static final long serialVersionUID = -3615677271684611262L; + + /** revision */ + private String revision; + + /** ip:port */ + private String url; + + /** baseParams */ + private HashMap/*values*/> baseParams; + + /** */ + private Map/*value*/>> serviceParams; + + public String extract(String serviceName) { + serviceParams.get(serviceName); + + StringBuilder builder = new StringBuilder(); + builder.append("?"); + baseParams.entrySet().stream().forEach(entry -> { + entry.getValue().forEach(value -> builder.append(entry.getKey()).append("=").append(value).append("&")); + }); + + serviceParams.get(serviceName).entrySet().forEach(entry -> { + entry.getValue().forEach(value -> builder.append(entry.getKey()).append("=").append(value).append("&")); + }); + + + return builder.deleteCharAt(builder.toString().length() - 1).toString(); + } + + /** + * Getter method for property revision. + * + * @return property value of revision + */ + public String getRevision() { + return revision; + } + + /** + * Setter method for property revision. + * + * @param revision value to be assigned to property revision + */ + public void setRevision(String revision) { + this.revision = WordCache.getInstance().getWordCache(revision); + } + + /** + * Getter method for property url. + * + * @return property value of url + */ + public String getUrl() { + return url; + } + + /** + * Setter method for property url. + * + * @param url value to be assigned to property url + */ + public void setUrl(String url) { + this.url = url; + } + + /** + * Getter method for property baseParams. + * + * @return property value of baseParams + */ + public HashMap> getBaseParams() { + return baseParams; + } + + /** + * Setter method for property baseParams. + * + * @param baseParams value to be assigned to property baseParams + */ + public void setBaseParams(HashMap> baseParams) { + this.baseParams = baseParams; + } + + /** + * Getter method for property serviceParams. + * + * @return property value of serviceParams + */ + public Map>> getServiceParams() { + return serviceParams; + } + + /** + * Setter method for property serviceParams. + * + * @param serviceParams value to be assigned to property serviceParams + */ + public void setServiceParams(Map>> serviceParams) { + this.serviceParams = serviceParams; + } +} \ No newline at end of file diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java new file mode 100644 index 000000000..71e0dde67 --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.common.model; + +import com.alipay.sofa.registry.common.model.store.AppPublisher; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.WordCache; +import com.google.common.collect.ArrayListMultimap; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * + * @author xiaojian.xj + * @version $Id: PublisherInternUtil.java, v 0.1 2020年11月12日 16:53 xiaojian.xj Exp $ + */ +public class PublisherInternUtil { + + /** + * change publisher word cache + * @param publisher + * @return + */ + public static Publisher internPublisher(Publisher publisher) { + publisher.setRegisterId(publisher.getRegisterId()); + publisher.setDataInfoId(publisher.getDataInfoId()); + publisher.setInstanceId(publisher.getInstanceId()); + publisher.setGroup(publisher.getGroup()); + publisher.setDataId(publisher.getDataId()); + publisher.setClientId(publisher.getClientId()); + publisher.setCell(publisher.getCell()); + publisher.setProcessId(publisher.getProcessId()); + publisher.setAppName(publisher.getAppName()); + + if (publisher instanceof AppPublisher) { + AppPublisher appPublisher = (AppPublisher) publisher; + + for (AppRegisterServerDataBox dataBox : appPublisher.getAppDataList()) { + dataBox.setUrl(dataBox.getUrl()); + dataBox.setRevision(dataBox.getRevision()); + + ArrayListMultimap baseParams = ArrayListMultimap.create(); + dataBox.getBaseParams().entrySet().forEach(entry -> { + + entry.getValue().stream().forEach(value -> { + // cache base params key and value + baseParams.put(WordCache.getInstance().getWordCache(entry.getKey()), WordCache.getInstance().getWordCache(value)); + }); + }); + + Map>> serviceParams = new HashMap<>(); + dataBox.getServiceParams().entrySet().forEach(entry -> { + // cache serviceName + serviceParams.put(WordCache.getInstance().getWordCache(entry.getKey()), entry.getValue()); + + }); + + } + + return appPublisher; + } + return publisher; + } +} \ No newline at end of file diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java index e84166067..e917f91dd 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.common.model.constants; /** - * * @author zhuoyu.sjw * @version $Id: ValueConstants.java, v 0.1 2018-03-28 23:07 zhuoyu.sjw Exp $$ */ @@ -72,6 +71,8 @@ public class ValueConstants { public static final String DATA_SESSION_LEASE_SEC = "data.session.lease.sec#@#9600#@#CONFIG"; + public static final String SOFA_APP = "SOFA_APP"; + /** * switch key for dataId sensitive is disable or not */ diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/dataserver/Datum.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/dataserver/Datum.java index 961cb8d2b..217abcc21 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/dataserver/Datum.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/dataserver/Datum.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.alipay.sofa.registry.common.model.PublisherInternUtil; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.WordCache; import com.alipay.sofa.registry.util.DatumVersionUtil; @@ -239,7 +240,7 @@ public static Datum internDatum(Datum datum) { // because this Datum is put into Memory directly, by DatumCache.coverDatum publisher.setRegisterId(registerId); // change publisher word cache - Publisher.internPublisher(publisher); + PublisherInternUtil.internPublisher(publisher); }); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/FetchRevisionsRequest.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/FetchRevisionsRequest.java index dfdec0b05..a3d2713c0 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/FetchRevisionsRequest.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/FetchRevisionsRequest.java @@ -16,15 +16,13 @@ */ package com.alipay.sofa.registry.common.model.metaserver; -import com.alipay.sofa.registry.core.model.AppRevisionKey; - import java.io.Serializable; import java.util.List; public class FetchRevisionsRequest implements Serializable { - public List keys; + public List keys; - public FetchRevisionsRequest(List keys) { + public FetchRevisionsRequest(List keys) { this.keys = keys; } } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DataChangeRequest.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DataChangeRequest.java index e6c655cee..a91f0fd3d 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DataChangeRequest.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DataChangeRequest.java @@ -19,6 +19,7 @@ import com.alipay.sofa.registry.common.model.store.WordCache; import java.io.Serializable; +import java.util.Set; /** * request to notify sessionserver when data changed @@ -31,21 +32,33 @@ public class DataChangeRequest implements Serializable { private static final long serialVersionUID = -7674982522990222894L; private String dataInfoId; + private String changedDataInfoId; private String dataCenter; private long version; + private Set revisions; + + /** + * constructor + */ + public DataChangeRequest() { + } + /** * constructor + * * @param dataInfoId * @param dataCenter * @param version */ - public DataChangeRequest(String dataInfoId, String dataCenter, long version) { + public DataChangeRequest(String dataInfoId, String dataCenter, long version, + Set revisions) { this.dataInfoId = dataInfoId; this.dataCenter = dataCenter; this.version = version; + this.revisions = revisions; } /** @@ -60,7 +73,7 @@ public String getDataInfoId() { /** * Setter method for property dataInfoId. * - * @param dataInfoId value to be assigned to property dataInfoId + * @param dataInfoId value to be assigned to property dataInfoId */ public void setDataInfoId(String dataInfoId) { this.dataInfoId = WordCache.getInstance().getWordCache(dataInfoId); @@ -78,7 +91,7 @@ public String getDataCenter() { /** * Setter method for property dataCenter. * - * @param dataCenter value to be assigned to property dataCenter + * @param dataCenter value to be assigned to property dataCenter */ public void setDataCenter(String dataCenter) { this.dataCenter = WordCache.getInstance().getWordCache(dataCenter); @@ -96,12 +109,30 @@ public long getVersion() { /** * Setter method for property version. * - * @param version value to be assigned to property version + * @param version value to be assigned to property version */ public void setVersion(long version) { this.version = version; } + /** + * Getter method for property revisions. + * + * @return property value of revisions + */ + public Set getRevisions() { + return revisions; + } + + /** + * Setter method for property revisions. + * + * @param revisions value to be assigned to property revisions + */ + public void setRevisions(Set revisions) { + this.revisions = revisions; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("DataChangeRequest{"); @@ -112,4 +143,11 @@ public String toString() { return sb.toString(); } + public String getChangedDataInfoId() { + return changedDataInfoId; + } + + public void setChangedDataInfoId(String changedDataInfoId) { + this.changedDataInfoId = changedDataInfoId; + } } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DatumSetVersion.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DatumSetVersion.java new file mode 100644 index 000000000..3716a2525 --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/DatumSetVersion.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.common.model.sessionserver; + +import java.util.HashMap; +import java.util.Map; + +public class DatumSetVersion { + + private Map versions; + + public DatumSetVersion() { + versions = new HashMap<>(); + } + + public DatumSetVersion(String dataInfoId, Long version) { + versions = new HashMap<>(); + addVersion(dataInfoId, version); + } + + public void addVersion(String dataInfoId, Long version) { + versions.put(dataInfoId, version); + } + + public boolean higher(DatumSetVersion other) { + if (!versions.keySet().equals(other.versions.keySet())) { + return true; + } + for (Map.Entry entry : versions.entrySet()) { + if (entry.getValue() > other.versions.get(entry.getKey())) { + return true; + } + } + return false; + } + + public boolean lower(String dataInfoId, Long version) { + if (versions.containsKey(dataInfoId) && versions.get(dataInfoId) >= version) { + return false; + } + return false; + } + + public boolean isZero() { + return versions.size() == 0; + } +} diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/AppPublisher.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/AppPublisher.java new file mode 100644 index 000000000..3f862fc8b --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/AppPublisher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.common.model.store; + +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; + +import java.util.List; + +/** + * + * @author xiaojian.xj + * @version $Id: AppPublisher.java, v 0.1 2020年11月10日 23:15 xiaojian.xj Exp $ + */ +public class AppPublisher extends Publisher { + + private List appDataList; + + /** + * Getter method for property appDataList. + * + * @return property value of appDataList + */ + public List getAppDataList() { + return appDataList; + } + + /** + * Setter method for property appDataList. + * + * @param appDataList value to be assigned to property appDataList + */ + public void setAppDataList(List appDataList) { + this.appDataList = appDataList; + } +} \ No newline at end of file diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java index d0ffc93b6..a02233960 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Publisher.java @@ -22,6 +22,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Collections2; import org.apache.commons.collections.CollectionUtils; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ParamInfo; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox.ServiceParamInfo; +import com.alipay.sofa.registry.common.model.PublishType; +import com.alipay.sofa.registry.common.model.ServerDataBox; +import com.fasterxml.jackson.annotation.JsonIgnore; /** * diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java index 3d9b532b7..77fa91214 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java @@ -21,6 +21,7 @@ import com.alipay.sofa.registry.common.model.ElementType; import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -37,6 +38,8 @@ public class Subscriber extends BaseInfo { private ScopeEnum scope; /** */ private ElementType elementType; + /** */ + private AssembleType assembleType; /** * last push context @@ -65,6 +68,24 @@ public ElementType getElementType() { return elementType; } + /** + * Getter method for property assembleType. + * + * @return property value of assembleType + */ + public AssembleType getAssembleType() { + return assembleType; + } + + /** + * Setter method for property assembleType. + * + * @param assembleType value to be assigned to property assembleType + */ + public void setAssembleType(AssembleType assembleType) { + this.assembleType = assembleType; + } + /** * check version input greater than current version * @param version diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java new file mode 100644 index 000000000..4d8d30098 --- /dev/null +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.converter; + +import com.alipay.sofa.registry.core.model.AssembleType; + +/** + * + * @author xiaojian.xj + * @version $Id: AssembleTypeConverter.java, v 0.1 2020年10月27日 02:06 xiaojian.xj Exp $ + */ +public class AssembleTypeConverter { + + /** + * subType convert func + * @param subType + * @return + */ + public static AssembleType convertToSubType(String subType) { + if (AssembleType.contains(subType)) { + return AssembleType.valueOf(subType); + } + return AssembleType.sub_app_and_interface; + } +} \ No newline at end of file diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD new file mode 100644 index 000000000..4d8d30098 --- /dev/null +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.converter; + +import com.alipay.sofa.registry.core.model.AssembleType; + +/** + * + * @author xiaojian.xj + * @version $Id: AssembleTypeConverter.java, v 0.1 2020年10月27日 02:06 xiaojian.xj Exp $ + */ +public class AssembleTypeConverter { + + /** + * subType convert func + * @param subType + * @return + */ + public static AssembleType convertToSubType(String subType) { + if (AssembleType.contains(subType)) { + return AssembleType.valueOf(subType); + } + return AssembleType.sub_app_and_interface; + } +} \ No newline at end of file diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java b/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java index d38e8c053..f34ac5a54 100644 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/task/listener/TaskEvent.java @@ -48,6 +48,8 @@ public enum TaskType { SUBSCRIBER_MULTI_FETCH_TASK("SubscriberMultiFetchTask"), // PUBLISH_DATA_TASK("PublishDataTask"), // UN_PUBLISH_DATA_TASK("UnPublishDataTask"), // + RENEW_DATUM_TASK("RenewDatumTask"), // + DATUM_SNAPSHOT_TASK("DatumSnapshotTask"), // //Session Adapter task USER_DATA_ELEMENT_PUSH_TASK("UserDataElementPushTask"), // diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/util/RevisionUtils.java b/server/common/util/src/main/java/com/alipay/sofa/registry/util/RevisionUtils.java index 53ba59b74..a167bfc01 100644 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/util/RevisionUtils.java +++ b/server/common/util/src/main/java/com/alipay/sofa/registry/util/RevisionUtils.java @@ -16,17 +16,16 @@ */ package com.alipay.sofa.registry.util; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import org.springframework.util.DigestUtils; import java.util.Collections; import java.util.List; public class RevisionUtils { - public static String revisionsDigest(List keys) { + public static String revisionsDigest(List keys) { Collections.sort(keys); StringBuffer sb = new StringBuffer(); - for (AppRevisionKey key : keys) { + for (String key : keys) { sb.append(key.toString()); } return DigestUtils.md5DigestAsHex(sb.toString().getBytes()); diff --git a/server/consistency/pom.xml b/server/consistency/pom.xml index 390cb59e8..80806eae5 100644 --- a/server/consistency/pom.xml +++ b/server/consistency/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 5.5.0-SNAPSHOT + 5.6.0 ../pom.xml 4.0.0 diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java index 87205151f..42759cd49 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java @@ -24,6 +24,9 @@ import javax.annotation.PostConstruct; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.common.model.store.AppPublisher; +import com.alipay.sofa.registry.common.model.store.Publisher; import org.springframework.beans.factory.annotation.Autowired; import com.alipay.remoting.Connection; @@ -101,8 +104,25 @@ public Set getSuitableSource() { @Override public void notify(Datum datum, Long lastVersion) { + Set revisions = null; + + for (Publisher publisher : datum.getPubMap().values()) { + if (publisher instanceof AppPublisher) { + if (revisions == null) { + revisions = new HashSet<>(); + } + AppPublisher appPublisher = (AppPublisher) publisher; + for (AppRegisterServerDataBox dataBox : appPublisher.getAppDataList()) { + + if (!revisions.contains(dataBox.getRevision())) { + revisions.add(dataBox.getRevision()); + } + } + } + } + DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(), - datum.getDataCenter(), datum.getVersion()); + datum.getDataCenter(), datum.getVersion(), revisions); List connections = sessionServerConnectionFactory.getSessionConnections(); for (Connection connection : connections) { doNotify(new NotifyCallback(connection, request)); diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java new file mode 100644 index 000000000..0275df15a --- /dev/null +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/DatumSnapshotHandler.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.data.remoting.sessionserver.handler; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +import com.alipay.sofa.registry.common.model.PublisherInternUtil; +import org.springframework.beans.factory.annotation.Autowired; + +import com.alipay.sofa.registry.common.model.CommonResponse; +import com.alipay.sofa.registry.common.model.DatumSnapshotRequest; +import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.PublisherDigestUtil; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.WordCache; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.remoting.Channel; +import com.alipay.sofa.registry.server.data.cache.DatumCache; +import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter; +import com.alipay.sofa.registry.server.data.change.event.DatumSnapshotEvent; +import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler; +import com.alipay.sofa.registry.server.data.renew.DatumLeaseManager; +import com.alipay.sofa.registry.util.ParaCheckUtil; + +/** + * handling snapshot request + * + * @author kezhu.wukz + * @version $Id: ClientOffProcessor.java, v 0.1 2019-05-30 15:48 kezhu.wukz Exp $ + */ +public class DatumSnapshotHandler extends AbstractServerHandler { + + private static final Logger RENEW_LOGGER = LoggerFactory.getLogger( + ValueConstants.LOGGER_NAME_RENEW, + "[DatumSnapshotHandler]"); + + /** Limited List Printing */ + private static final int LIMITED_LIST_SIZE_FOR_PRINT = 10; + + @Autowired + private DataChangeEventCenter dataChangeEventCenter; + + @Autowired + private DatumLeaseManager datumLeaseManager; + + @Autowired + private DatumCache datumCache; + + @Autowired + private ThreadPoolExecutor renewDatumProcessorExecutor; + + @Override + public Executor getExecutor() { + return renewDatumProcessorExecutor; + } + + @Override + public void checkParam(DatumSnapshotRequest request) throws RuntimeException { + ParaCheckUtil.checkNotBlank(request.getConnectId(), "DatumSnapshotRequest.connectId"); + ParaCheckUtil.checkNotEmpty(request.getPublishers(), "DatumSnapshotRequest.publishers"); + } + + @Override + public Object doHandle(Channel channel, DatumSnapshotRequest request) { + RENEW_LOGGER.info("Received datumSnapshotRequest: {}", request); + + String connectId = WordCache.getInstance().getWordCache(request.getConnectId()); + + // convert to pubMap, and wrap it by WordCache + Map pubMap = new HashMap<>(); + List publishers = request.getPublishers(); + if (publishers != null) { + for (Publisher publisher : publishers) { + PublisherInternUtil.internPublisher(publisher); + pubMap.put(publisher.getRegisterId(), publisher); + } + } + + // diff the cache and snapshot + boolean isDiff = true; + Map cachePubMap = datumCache.getOwnByConnectId(connectId); + if (cachePubMap == null) { + RENEW_LOGGER + .info( + ">>>>>>> connectId={}, cachePubMap.size=0, pubMap.size={}, isDiff={}, the diff is: pubMap={}", + connectId, pubMap.size(), isDiff, limitedToString(pubMap.values())); + } else { + List diffPub1 = subtract(pubMap, cachePubMap); + List diffPub2 = subtract(cachePubMap, pubMap); + if (diffPub1.size() == 0 && diffPub2.size() == 0) { + isDiff = false; + } + RENEW_LOGGER + .info( + ">>>>>>> connectId={}, cachePubMap.size={}, pubMap.size={}, isDiff={}, the diff is: pubMap-cachePubMap=(size:{}){}, cachePubMap-pubMap=(size:{}){}", + connectId, cachePubMap.size(), pubMap.size(), isDiff, diffPub1.size(), + limitedToString(diffPub1), diffPub2.size(), limitedToString(diffPub2)); + } + + if (isDiff) { + // build DatumSnapshotEvent and send to eventCenter + dataChangeEventCenter.onChange(new DatumSnapshotEvent(connectId, cachePubMap, pubMap)); + } + + // record the renew timestamp + datumLeaseManager.renew(connectId); + + return CommonResponse.buildSuccessResponse(); + } + + /** + * Limited List Printing + */ + private String limitedToString(Collection publishers) { + Iterator it = publishers.iterator(); + if (!it.hasNext()) + return "[]"; + + StringBuilder sb = new StringBuilder(); + sb.append('['); + int i = 1; + for (;;) { + Publisher e = it.next(); + sb.append("Publisher{dataInfoId='").append(e.getDataInfoId()).append('\''); + sb.append(", cell='").append(e.getCell()).append('\''); + sb.append(", registerId='").append(e.getRegisterId()).append('\''); + sb.append(", version=").append(e.getVersion()); + sb.append(", sourceAddress=").append(e.getSourceAddress()); + sb.append(", registerTimestamp=").append(e.getRegisterTimestamp()); + sb.append(", clientRegisterTimestamp=").append(e.getClientRegisterTimestamp()); + sb.append('}'); + if (!it.hasNext() || i++ >= LIMITED_LIST_SIZE_FOR_PRINT) + return sb.append(']').toString(); + sb.append(',').append(' '); + } + } + + private List subtract(Map pubMap1, Map pubMap2) { + List list = new ArrayList(); + for (Map.Entry entry : pubMap1.entrySet()) { + String registerId = entry.getKey(); + Publisher publisher1 = entry.getValue(); + Publisher publisher2 = pubMap2.get(registerId); + if (publisher2 == null + || PublisherDigestUtil.getDigestValue(publisher1) != PublisherDigestUtil + .getDigestValue(publisher2)) { + list.add(publisher1); + } + } + return list; + } + + @Override + public CommonResponse buildFailedResponse(String msg) { + return CommonResponse.buildFailedResponse(msg); + } + + @Override + public HandlerType getType() { + return HandlerType.PROCESSER; + } + + @Override + public Class interest() { + return DatumSnapshotRequest.class; + } + + @Override + protected Node.NodeType getConnectNodeType() { + return Node.NodeType.DATA; + } +} diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/PublishDataHandler.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/PublishDataHandler.java index 6f8269f6b..d3b572518 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/PublishDataHandler.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/PublishDataHandler.java @@ -16,6 +16,15 @@ */ package com.alipay.sofa.registry.server.data.remoting.sessionserver.handler; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +import com.alipay.sofa.registry.common.model.PublisherInternUtil; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import org.springframework.beans.factory.annotation.Autowired; + +import com.alipay.sofa.registry.common.model.CommonResponse; +import com.alipay.sofa.registry.common.model.Node; import com.alipay.sofa.registry.common.model.PublishType; import com.alipay.sofa.registry.common.model.dataserver.DatumVersion; import com.alipay.sofa.registry.common.model.dataserver.PublishDataRequest; diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/RaftAppRevisionService.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/RaftAppRevisionService.java index afeaa240c..ccc696b0b 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/RaftAppRevisionService.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/repository/service/RaftAppRevisionService.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.server.meta.repository.service; import com.alipay.sofa.registry.core.model.AppRevisionRegister; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.jraft.processor.AbstractSnapshotProcess; import com.alipay.sofa.registry.jraft.processor.SnapshotProcess; import com.alipay.sofa.registry.log.Logger; @@ -34,21 +33,21 @@ @RaftService public class RaftAppRevisionService extends AbstractSnapshotProcess implements AppRevisionService { - private static final Logger LOGGER = LoggerFactory - .getLogger(RaftAppRevisionService.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(RaftAppRevisionService.class); - private Set snapShotFileNames = new HashSet<>(); + private Set snapShotFileNames = new HashSet<>(); - private Map registry = new ConcurrentHashMap<>(); - private String keysDigest = ""; + private Map registry = new ConcurrentHashMap<>(); + private String keysDigest = ""; - private static final String REVISIONS_NAME = "revisions"; - private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private static final String REVISIONS_NAME = "revisions"; + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); public RaftAppRevisionService() { } - public RaftAppRevisionService(Map registry) { + public RaftAppRevisionService(Map registry) { this.registry = registry; } @@ -64,7 +63,7 @@ public boolean save(String path) { public boolean load(String path) { try { if (path.endsWith(REVISIONS_NAME)) { - Map reg = load(path, registry.getClass()); + Map reg = load(path, registry.getClass()); if (reg == null) { reg = new HashMap<>(); } @@ -95,34 +94,31 @@ public Set getSnapshotFileNames() { } public void add(AppRevisionRegister appRevision) { - AppRevisionKey key = new AppRevisionKey(appRevision.appname, appRevision.revision); rwLock.writeLock().lock(); - if (registry.putIfAbsent(key, appRevision) == null) { + if (registry.putIfAbsent(appRevision.revision, appRevision) == null) { keysDigest = generateKeysDigest(); } rwLock.writeLock().unlock(); } - public boolean existed(String appname, String revision) { - AppRevisionKey key = new AppRevisionKey(appname, revision); - return registry.containsKey(key); + public boolean existed(String revision) { + return registry.containsKey(revision); } - public AppRevisionRegister get(String appname, String revision) { - AppRevisionKey key = new AppRevisionKey(appname, revision); - return registry.get(key); + public AppRevisionRegister get(String revision) { + return registry.get(revision); } public String getKeysDigest() { return keysDigest; } - public List getMulti(List keys) { + public List getMulti(List keys) { if (keys == null) { return new ArrayList<>(); } List ret = new ArrayList<>(keys.size()); - for (AppRevisionKey key : keys) { + for (String key : keys) { AppRevisionRegister rev = registry.get(key); if (rev != null) { ret.add(rev); @@ -131,7 +127,7 @@ public List getMulti(List keys) { return ret; } - public List getKeys() { + public List getKeys() { return new ArrayList<>(registry.keySet()); } diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionRegistry.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionRegistry.java index c62970c6b..e62702900 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionRegistry.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionRegistry.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.server.meta.revision; import com.alipay.sofa.registry.core.model.AppRevisionRegister; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.store.api.annotation.RaftReference; import java.util.List; @@ -27,20 +26,20 @@ public class AppRevisionRegistry { private AppRevisionService appRevisionService; public void register(AppRevisionRegister appRevision) { - if (appRevisionService.existed(appRevision.appname, appRevision.revision)) { + if (appRevisionService.existed(appRevision.revision)) { return; } appRevisionService.add(appRevision); } - public List checkRevisions(String keysDigest) { + public List checkRevisions(String keysDigest) { if (keysDigest.equals(appRevisionService.getKeysDigest())) { return null; } return appRevisionService.getKeys(); } - public List fetchRevisions(List keys) { + public List fetchRevisions(List keys) { return appRevisionService.getMulti(keys); } } diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionService.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionService.java index 30eeb20ec..738859402 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionService.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/revision/AppRevisionService.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.server.meta.revision; import com.alipay.sofa.registry.core.model.AppRevisionRegister; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.store.api.annotation.ReadOnLeader; import java.util.List; @@ -25,10 +24,10 @@ public interface AppRevisionService { @ReadOnLeader - AppRevisionRegister get(String appname, String revision); + AppRevisionRegister get(String revision); @ReadOnLeader - boolean existed(String appname, String revision); + boolean existed(String revision); void add(AppRevisionRegister appRevision); @@ -36,8 +35,8 @@ public interface AppRevisionService { String getKeysDigest(); @ReadOnLeader - List getKeys(); + List getKeys(); @ReadOnLeader - List getMulti(List keys); + List getMulti(List keys); } diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml index db6901d92..2f2fc6d89 100644 --- a/server/server/session/pom.xml +++ b/server/server/session/pom.xml @@ -30,6 +30,10 @@ spring-boot-starter-test test + + com.alibaba + fastjson + com.alipay.sofa registry-remoting-api diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppAssembleService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppAssembleService.java new file mode 100644 index 000000000..f097a8452 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppAssembleService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Map; + +/** + * + * @author xiaojian.xj + * @version $Id: AppAssembleService.java, v 0.1 2020年11月24日 19:34 xiaojian.xj Exp $ + */ +public class AppAssembleService implements AssembleService { + + @Autowired + private SessionDatumCacheDecorator sessionDatumCacheDecorator; + + @Override + public AssembleType support() { + return AssembleType.sub_app; + } + + @Override + public Map getAssembleDatum(Subscriber subscriber) { + return sessionDatumCacheDecorator.getDatumsCache(subscriber.getDataInfoId()); + } + + @Override + public Datum getAssembleDatum(String datacenter, Subscriber subscriber) { + return sessionDatumCacheDecorator.getDatumCache(datacenter, subscriber.getDataInfoId()); + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppInterfaceAssembleService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppInterfaceAssembleService.java new file mode 100644 index 000000000..625c9a16d --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AppInterfaceAssembleService.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.AppPublisher; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import com.alipay.sofa.registry.server.session.converter.AppPublisherConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +/** + * @author xiaojian.xj + * @version $Id: AppInterfaceAssembleService.java, v 0.1 2020年11月24日 19:34 xiaojian.xj Exp $ + */ +public class AppInterfaceAssembleService implements AssembleService { + + @Autowired + private SessionDatumCacheDecorator sessionDatumCacheDecorator; + + @Autowired + private AppRevisionCacheRegistry appRevisionCacheRegistry; + + @Override + public AssembleType support() { + return AssembleType.sub_app_and_interface; + } + + @Override + public Map getAssembleDatum(Subscriber subscriber) { + //get interface pub + Map interfaceDatumMap = sessionDatumCacheDecorator + .getDatumsCache(subscriber.getDataInfoId()); + + Map> appDatumMap = sessionDatumCacheDecorator + .getAppDatumCache(subscriber.getDataInfoId(), subscriber.getInstanceId()); + + Set datacenters = new HashSet<>(); + datacenters.addAll(interfaceDatumMap.keySet()); + datacenters.addAll(appDatumMap.keySet()); + + Map assembleResult = new HashMap<>(); + for (String datacenter : datacenters) { + + Datum interfaceDatum = interfaceDatumMap.get(datacenter); + Map appDatum = appDatumMap.get(datacenter); + + Datum assembleDatum = doMerge(interfaceDatum, appDatum, subscriber); + + assembleResult.put(datacenter, assembleDatum); + } + return assembleResult; + } + + @Override + public Datum getAssembleDatum(String datacenter, Subscriber subscriber) { + //get interface pub + Datum interfaceDatum = sessionDatumCacheDecorator.getDatumCache(datacenter, + subscriber.getDataInfoId()); + + Map> appDatumMap = sessionDatumCacheDecorator + .getAppDatumCache(subscriber.getDataInfoId(), subscriber.getInstanceId()); + + Map appDatum = appDatumMap.get(datacenter); + Datum assembleDatum = doMerge(interfaceDatum, appDatum, subscriber); + + return assembleDatum; + } + + private Datum doMerge(Datum interfaceDatum, Map appDatum, Subscriber subscriber) { + if (Objects.isNull(interfaceDatum) && CollectionUtils.isEmpty(appDatum)) { + return null; + } + + String dataCenter; + if (Objects.nonNull(interfaceDatum)) { + dataCenter = interfaceDatum.getDataCenter(); + } else { + dataCenter = appDatum.values().stream().findAny().get().getDataCenter(); + } + + DataInfo dataInfo = DataInfo.valueOf(subscriber.getDataInfoId()); + Datum datum = new Datum(); + datum.setDataInfoId(dataInfo.getDataInfoId()); + datum.setDataCenter(dataCenter); + datum.setDataId(dataInfo.getDataId()); + datum.setInstanceId(dataInfo.getInstanceId()); + datum.setGroup(dataInfo.getDataType()); + + if (interfaceDatum != null) { + datum.setVersion(interfaceDatum.getVersion()); + datum.getPubMap().putAll(interfaceDatum.getPubMap()); + } + + if (!CollectionUtils.isEmpty(appDatum)) { + for (Datum app : appDatum.values()) { + datum.setVersion(Math.max(app.getVersion(), datum.getVersion())); + for (Publisher publisher : app.getPubMap().values()) { + if (!(publisher instanceof AppPublisher)) { + continue; + } + AppPublisher appPublisher = (AppPublisher) publisher; + + datum.getPubMap().put( + appPublisher.getRegisterId(), + AppPublisherConverter.convert(appPublisher, appRevisionCacheRegistry, + dataInfo)); + } + } + } + + return datum; + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AssembleService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AssembleService.java new file mode 100644 index 000000000..cb442857c --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/AssembleService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; + +import java.util.Map; + +/** + * + * @author xiaojian.xj + * @version $Id: AssembleService.java, v 0.1 2020年11月03日 20:09 xiaojian.xj Exp $ + */ +public interface AssembleService { + + AssembleType support(); + + Map getAssembleDatum(Subscriber subscriber); + + Datum getAssembleDatum(String datacenter, Subscriber subscriber); +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/DefaultSubscriberAssembleStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/DefaultSubscriberAssembleStrategy.java new file mode 100644 index 000000000..4d52a360c --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/DefaultSubscriberAssembleStrategy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author xiaojian.xj + * @version $Id: DefaultSubscriberAssembleStrategy.java, v 0.1 2020年11月24日 21:33 xiaojian.xj Exp $ + */ +public class DefaultSubscriberAssembleStrategy implements SubscriberAssembleStrategy { + + private Map assembleServiceMap = new ConcurrentHashMap<>(); + + @Override + public void add(AssembleService assembleService) { + assembleServiceMap.put(assembleService.support(), assembleService); + } + + @Override + public Map assembleDatum(AssembleType assembleType, + Subscriber subscriber) { + AssembleService service = assembleServiceMap.get(assembleType); + + if (service == null) { + return null; + } + + return service.getAssembleDatum(subscriber); + + } + + @Override + public Datum assembleDatum(AssembleType assembleType, String datacenter, Subscriber subscriber) { + AssembleService service = assembleServiceMap.get(assembleType); + + if (service == null) { + return null; + } + + return service.getAssembleDatum(datacenter, subscriber); + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/InterfaceAssembleService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/InterfaceAssembleService.java new file mode 100644 index 000000000..6d4e6eb41 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/InterfaceAssembleService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Map; + +/** + * + * @author xiaojian.xj + * @version $Id: InterfaceAssembleService.java, v 0.1 2020年11月24日 19:35 xiaojian.xj Exp $ + */ +public class InterfaceAssembleService implements AssembleService { + + @Autowired + private SessionDatumCacheDecorator sessionDatumCacheDecorator; + + @Override + public AssembleType support() { + return AssembleType.sub_interface; + } + + @Override + public Map getAssembleDatum(Subscriber subscriber) { + return sessionDatumCacheDecorator.getDatumsCache(subscriber.getDataInfoId()); + } + + @Override + public Datum getAssembleDatum(String datacenter, Subscriber subscriber) { + return sessionDatumCacheDecorator.getDatumCache(datacenter, subscriber.getDataInfoId()); + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/SubscriberAssembleStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/SubscriberAssembleStrategy.java new file mode 100644 index 000000000..a1e173db5 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/assemble/SubscriberAssembleStrategy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.assemble; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.core.model.AssembleType; + +import java.util.Map; + +/** + * + * @author xiaojian.xj + * @version $Id: SubscriberAssembleStrategy.java, v 0.1 2020年10月29日 21:59 xiaojian.xj Exp $ + */ +public interface SubscriberAssembleStrategy { + void add(AssembleService assembleService); + + Map assembleDatum(AssembleType assembleType, Subscriber subscriber); + + Datum assembleDatum(AssembleType assembleType, String datacenter, Subscriber subscriber); +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java index 7a0172b52..f44b91f0e 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfigBean.java @@ -155,6 +155,11 @@ public class SessionServerConfigBean implements SessionServerConfig { private int renewDatumWheelQueueSize = 10000; private int pushDataTaskRetryFirstDelay = 500; + private int newRevisionTaskMaxBufferSize = 1000000; + + private int newRevisionTaskWorkerSize = 100; + + private int clientNodeExchangeTimeOut = 1000; //time out cause netty HashedWheelTimer occupy a lot of mem private long pushDataTaskRetryIncrementDelay = 500; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index e430926a7..5fd124e97 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -19,9 +19,17 @@ import java.util.ArrayList; import java.util.Collection; +import com.alipay.sofa.registry.server.session.assemble.AppAssembleService; +import com.alipay.sofa.registry.server.session.assemble.AppInterfaceAssembleService; +import com.alipay.sofa.registry.server.session.assemble.AssembleService; +import com.alipay.sofa.registry.server.session.assemble.DefaultSubscriberAssembleStrategy; +import com.alipay.sofa.registry.server.session.assemble.InterfaceAssembleService; +import com.alipay.sofa.registry.server.session.assemble.SubscriberAssembleStrategy; import com.alipay.sofa.registry.server.session.cache.*; import com.alipay.sofa.registry.server.session.connections.ConnectionsService; import com.alipay.sofa.registry.server.session.node.service.*; +import com.alipay.sofa.registry.server.session.predicate.RevisionPredicate; +import com.alipay.sofa.registry.server.session.push.FirePushService; import com.alipay.sofa.registry.server.session.remoting.handler.*; import com.alipay.sofa.registry.server.session.resource.*; import com.alipay.sofa.registry.server.session.strategy.*; @@ -398,6 +406,12 @@ public RaftClientManager raftClientManager() { public AppRevisionNodeService appRevisionNodeService() { return new AppRevisionNodeServiceImpl(); } + + @Bean + @ConditionalOnMissingBean + public FirePushService firePushService() { + return new FirePushService(); + } } @Configuration @@ -644,6 +658,43 @@ public ReceivedConfigDataPushTaskStrategy receivedConfigDataPushTaskStrategy() { public AppRevisionHandlerStrategy appRevisionHandlerStrategy() { return new DefaultAppRevisionHandlerStrategy(); } + + @Bean + public RevisionPredicate revisionPredicate() { + return new RevisionPredicate(); + } + + @Bean + public SessionDatumCacheDecorator sessionDatumCacheDecorator() { + return new SessionDatumCacheDecorator(); + } + + // @Bean + // public AssembleService appAssembleService(SubscriberAssembleStrategy subscriberAssembleStrategy) { + // AppAssembleService appAssembleService = new AppAssembleService(); + // subscriberAssembleStrategy.add(appAssembleService); + // return appAssembleService; + // } + // + // @Bean + // public AssembleService interfaceAssembleService(SubscriberAssembleStrategy subscriberAssembleStrategy) { + // InterfaceAssembleService interfaceAssembleService = new InterfaceAssembleService(); + // subscriberAssembleStrategy.add(interfaceAssembleService); + // return interfaceAssembleService; + // } + + @Bean + public AssembleService appInterfaceAssembleService(SubscriberAssembleStrategy subscriberAssembleStrategy) { + AppInterfaceAssembleService appInterfaceAssembleService = new AppInterfaceAssembleService(); + subscriberAssembleStrategy.add(appInterfaceAssembleService); + return appInterfaceAssembleService; + } + + @Bean + public SubscriberAssembleStrategy subscriberAssembleStrategy() { + return new DefaultSubscriberAssembleStrategy(); + } + } @Configuration diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java index 52d37c452..e7ef32993 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java @@ -16,10 +16,10 @@ */ package com.alipay.sofa.registry.server.session.cache; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.core.model.AppRevisionRegister; import com.alipay.sofa.registry.core.model.AppRevisionInterface; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.session.node.service.AppRevisionNodeService; @@ -29,7 +29,6 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; public class AppRevisionCacheRegistry { @@ -40,70 +39,84 @@ public class AppRevisionCacheRegistry { @Autowired private AppRevisionNodeService appRevisionNodeService; - final private Map registry = new ConcurrentHashMap<>(); + final private Map registry = new ConcurrentHashMap<>(); private String keysDigest = ""; final private Map>> interfaceRevisions = new ConcurrentHashMap<>(); + final private Map> appInterfaces = new ConcurrentHashMap<>(); private SingleFlight singleFlight = new SingleFlight(); public AppRevisionCacheRegistry() { } public void register(AppRevisionRegister appRevision) throws Exception { - AppRevisionKey key = new AppRevisionKey(appRevision.appname, appRevision.revision); - if (this.registry.containsKey(key)) { + if (this.registry.containsKey(appRevision.revision)) { return; } - singleFlight.execute(key, new AppRevisionRegisterTask(appRevision)); - } - - public Map> search(String dataInfoId) { - return interfaceRevisions.get(dataInfoId); + singleFlight.execute("revisionRegister" + appRevision.revision, () -> { + appRevisionNodeService.register(appRevision); + return null; + }); } - public void refreshAll() { - List revisions = appRevisionNodeService - .fetchMulti(appRevisionNodeService.checkRevisions(keysDigest)); - for (AppRevisionRegister rev : revisions) { - onNewRevision(rev); - } - if (revisions.size() > 0) { - keysDigest = generateKeysDigest(); + public Set getApps(String dataInfoId) { + if (!interfaceRevisions.containsKey(dataInfoId)) { + return new HashSet<>(); } + return interfaceRevisions.get(dataInfoId).keySet(); } - private void onNewRevision(AppRevisionRegister rev) { - AppRevisionKey key = new AppRevisionKey(rev.appname, rev.revision); - if (registry.putIfAbsent(key, rev) != null) { - return; - } - for (AppRevisionInterface inf : rev.interfaces) { - Map> apps = interfaceRevisions.computeIfAbsent( - DataInfo.toDataInfoId(inf.dataId, inf.instanceId, inf.group), - k -> new ConcurrentHashMap<>()); - Set infRevisions = apps.computeIfAbsent(rev.appname, - k -> Sets.newConcurrentHashSet()); - infRevisions.add(rev.revision); + public AppRevisionRegister getRevision(String revision) { + AppRevisionRegister revisionRegister = registry.get(revision); + if (revisionRegister != null) { + return revisionRegister; } + refreshAll(); + return registry.get(revision); } - private String generateKeysDigest() { - return RevisionUtils.revisionsDigest(new ArrayList<>(registry.keySet())); + public Set getInterfaces(String appname) { + return appInterfaces.get(appname); } - private void onInterfaceChanged(String dataInfoId) { + public void refreshAll() { + try { + singleFlight.execute("refreshAll", () -> { + List revisions = appRevisionNodeService + .fetchMulti(appRevisionNodeService.checkRevisions(keysDigest)); + for (AppRevisionRegister rev : revisions) { + onNewRevision(rev); + } + if (revisions.size() > 0) { + keysDigest = generateKeysDigest(); + } + return null; + }); + } catch (Exception e) { + LOG.error("refresh revisions failed ", e); + throw new RuntimeException("refresh revision failed", e); + } } - private class AppRevisionRegisterTask implements Callable { - private AppRevisionRegister revision; + private void onNewRevision(AppRevisionRegister rev) { + for (AppRevisionInterface inf : rev.interfaces.values()) { + String dataInfoId = DataInfo.toDataInfoId(inf.dataId, inf.instanceId, inf.group); + Map> apps = interfaceRevisions.computeIfAbsent(dataInfoId, + k -> new ConcurrentHashMap<>()); + Set infRevisions = apps.computeIfAbsent(rev.appname, + k -> Sets.newConcurrentHashSet()); + infRevisions.add(rev.revision); - public AppRevisionRegisterTask(AppRevisionRegister revision) { - this.revision = revision; + appInterfaces.computeIfAbsent(rev.appname, k -> Sets.newConcurrentHashSet()) + .add(dataInfoId); } + registry.put(rev.revision, rev); + } - @Override - public Object call() throws Exception { - appRevisionNodeService.register(revision); - return null; + private String generateKeysDigest() { + List keys = new ArrayList<>(); + for (Map.Entry entry : registry.entrySet()) { + keys.add(entry.getKey()); } + return RevisionUtils.revisionsDigest(keys); } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java new file mode 100644 index 000000000..a50172eec --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/SessionDatumCacheDecorator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.cache; + +import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.session.node.NodeManager; +import com.alipay.sofa.registry.server.session.node.NodeManagerFactory; +import com.alipay.sofa.registry.server.session.store.Interests; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * + * @author xiaojian.xj + * @version $Id: SessionDatumCacheDecorator.java, v 0.1 2020年11月03日 20:23 xiaojian.xj Exp $ + */ +public class SessionDatumCacheDecorator { + + private static final Logger taskLogger = LoggerFactory + .getLogger(SessionDatumCacheDecorator.class); + + @Autowired + private Interests sessionInterests; + + @Autowired + private CacheService sessionCacheService; + + @Autowired + private AppRevisionCacheRegistry appRevisionCacheRegistry; + + public Datum getDatumCache(String dataCenter, String dataInfoId) { + DatumKey datumKey = new DatumKey(dataInfoId, dataCenter); + Key key = new Key(Key.KeyType.OBJ, datumKey.getClass().getName(), datumKey); + + Value value = null; + try { + value = sessionCacheService.getValue(key); + } catch (CacheAccessException e) { + // The version is set to 0, so that when session checks the datum versions regularly, it will actively re-query the data. + boolean result = sessionInterests.checkAndUpdateInterestVersionZero(dataCenter, + dataInfoId); + taskLogger.error(String.format( + "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s", + result, e.getMessage()), e); + } + + return value == null ? null : value.getPayload(); + } + + public Map getDatumsCache(String dataInfoId) { + Map map = new HashMap<>(); + NodeManager nodeManager = NodeManagerFactory.getNodeManager(Node.NodeType.META); + Collection dataCenters = nodeManager.getDataCenters(); + if (dataCenters != null) { + Collection keys = dataCenters.stream().map(dataCenter -> new Key(Key.KeyType.OBJ, + DatumKey.class.getName(), new DatumKey(dataInfoId, dataCenter))) + .collect(Collectors.toList()); + + Map values = null; + try { + values = sessionCacheService.getValues(keys); + } catch (CacheAccessException e) { + // The version is set to 0, so that when session checks the datum versions regularly, it will actively re-query the data. + for (String dataCenter : dataCenters) { + boolean result = sessionInterests.checkAndUpdateInterestVersionZero(dataCenter, + dataInfoId); + taskLogger.error(String.format( + "error when access cache, so checkAndUpdateInterestVersionZero(return %s): %s", + result, e.getMessage()), e); + } + } + + if (values != null) { + values.forEach((key, value) -> { + if (value != null && value.getPayload() != null) { + Datum datum = (Datum) value.getPayload(); + String dataCenter = ((DatumKey) key.getEntityType()).getDataCenter(); + map.put(dataCenter, datum); + } + }); + } + + } + return map; + } + + public Map> getAppDatumCache(String dataInfoId, String instanceId) { + Map> result = new HashMap<>(); + //get metadata from session cache + for (String appName : appRevisionCacheRegistry.getApps(dataInfoId)) { + String appDataInfoId = DataInfo.toDataInfoId(appName, instanceId, ValueConstants.SOFA_APP); + + Map appDatum = this.getDatumsCache(appDataInfoId); + + if (CollectionUtils.isEmpty(appDatum)) { + continue; + } + + for (Entry datumEntry : appDatum.entrySet()) { + String datacenter = datumEntry.getKey(); + Datum datum = datumEntry.getValue(); + Map map = result.computeIfAbsent(datacenter, k -> new HashMap<>()); + map.put(appName, datum); + } + } + return result; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java new file mode 100644 index 000000000..30638fdee --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.converter; + +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.common.model.ServerDataBox; +import com.alipay.sofa.registry.common.model.store.AppPublisher; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.core.model.AppRevisionRegister; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; + +import javax.ws.rs.core.UriBuilder; +import javax.xml.crypto.Data; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author xiaojian.xj + * @version $Id: AppPublisherConverter.java, v 0.1 2020年11月24日 20:50 xiaojian.xj Exp $ + */ +public class AppPublisherConverter { + + public static Publisher convert(AppPublisher appPublisher, + AppRevisionCacheRegistry appRevisionCacheRegistry, + DataInfo dataInfo) { + Publisher publisher = new Publisher(); + String dataInfoId = dataInfo.getDataInfoId(); + fillCommonRegion(publisher, appPublisher, dataInfo); + List dataList = new ArrayList<>(); + for (AppRegisterServerDataBox appRegisterServerDataBox : appPublisher.getAppDataList()) { + AppRevisionRegister revisionRegister = appRevisionCacheRegistry + .getRevision(appRegisterServerDataBox.getRevision()); + Map> params = extractParams(revisionRegister, + appRegisterServerDataBox, dataInfoId); + dataList.add(new ServerDataBox(buildURL(appRegisterServerDataBox.getUrl(), params))); + } + publisher.setDataList(dataList); + return publisher; + + } + + private static Map> extractParams(AppRevisionRegister revisionRegister, + AppRegisterServerDataBox serverDataBox, + String dataInfoId) { + Map> params = new HashMap<>(); + params.putAll(revisionRegister.baseParams); + if (revisionRegister.interfaces.containsKey(dataInfoId)) { + params.putAll(revisionRegister.interfaces.get(dataInfoId).serviceParams); + } + params.putAll(serverDataBox.getBaseParams()); + if (serverDataBox.getServiceParams().containsKey(dataInfoId)) { + params.putAll(serverDataBox.getServiceParams().get(dataInfoId)); + } + return params; + } + + private static String buildURL(String address, Map> params) { + List querys = new ArrayList<>(); + for (Map.Entry> entry : params.entrySet()) { + String key = entry.getKey(); + for (String value : entry.getValue()) { + querys.add(key + "=" + value); + } + } + String queryStr = String.join("&", querys); + return address + "?" + queryStr; + } + + private static void fillCommonRegion(Publisher publisher, AppPublisher source, DataInfo dataInfo) { + + publisher.setAppName(source.getAppName()); + //ZONE MUST BE CURRENT SESSION ZONE + publisher.setCell(source.getCell()); + publisher.setClientId(source.getClientId()); + publisher.setDataId(dataInfo.getDataId()); + publisher.setGroup(dataInfo.getDataType()); + publisher.setInstanceId(dataInfo.getInstanceId()); + publisher.setRegisterId(source.getRegisterId()); + publisher.setProcessId(source.getProcessId()); + publisher.setVersion(source.getVersion()); + publisher.setRegisterTimestamp(source.getRegisterTimestamp()); + + publisher.setClientRegisterTimestamp(source.getClientRegisterTimestamp()); + publisher.setSourceAddress(source.getSourceAddress()); + + publisher.setClientVersion(source.getClientVersion()); + publisher.setDataInfoId(dataInfo.getDataInfoId()); + } + +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppRegisterConstant.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppRegisterConstant.java new file mode 100644 index 000000000..62407cb51 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppRegisterConstant.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.converter; + +/** + * + * @author xiaojian.xj + * @version $Id: AppRegisterConstant.java, v 0.1 2020年11月12日 15:47 xiaojian.xj Exp $ + */ +public class AppRegisterConstant { + + public static final String URL_KEY = "url"; + + public static final String REVISION_KEY = "revision"; + + public static final String BASE_PARAMS_KEY = "baseParams"; + + public static final String INTERFACE_PARAMS_KEY = "interfaceParams"; + +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java index 472955b54..4add32d22 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java @@ -16,15 +16,23 @@ */ package com.alipay.sofa.registry.server.session.converter; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; import com.alipay.sofa.registry.common.model.ServerDataBox; +import com.alipay.sofa.registry.common.model.store.AppPublisher; import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.core.model.DataBox; import com.alipay.sofa.registry.core.model.PublisherRegister; +import com.google.common.collect.ArrayListMultimap; +import org.apache.commons.lang.StringUtils; +import org.springframework.util.CollectionUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -34,6 +42,54 @@ */ public class PublisherConverter { + + public static final String PUB_TYPE = "!PublisherType"; + + public static final String APP_PUBLISHER = "APP_PUBLISHER"; + + private static Converter appPublisherConverter = source -> { + AppPublisher appPublisher = new AppPublisher(); + fillCommonRegion(appPublisher, source); + appPublisher.setAppDataList(convert2AppDataList(source.getDataList())); + + return appPublisher; + }; + + private static Converter publisherConverter = source -> { + Publisher publisher = new Publisher(); + + fillCommonRegion(publisher, source); + publisher.setDataList(convert(source.getDataList())); + + return publisher; + }; + + + public static void fillCommonRegion(Publisher publisher, PublisherRegister source) { + publisher.setAppName(source.getAppName()); + //ZONE MUST BE CURRENT SESSION ZONE + publisher.setCell(source.getZone()); + publisher.setClientId(source.getClientId()); + publisher.setDataId(source.getDataId()); + publisher.setGroup(source.getGroup()); + publisher.setInstanceId(source.getInstanceId()); + publisher.setRegisterId(source.getRegistId()); + publisher.setProcessId(source.getProcessId()); + publisher.setVersion(source.getVersion()); + + //registerTimestamp must happen from server,client time maybe different cause pub and unPublisher fail + publisher.setRegisterTimestamp(System.currentTimeMillis()); + + publisher.setClientRegisterTimestamp(source.getTimestamp()); + publisher.setSourceAddress(new URL(source.getIp(), source.getPort())); + + publisher.setClientVersion(ClientVersion.StoreData); + + DataInfo dataInfo = new DataInfo(source.getInstanceId(), source.getDataId(), + source.getGroup()); + publisher.setDataInfoId(dataInfo.getDataInfoId()); + } + /** * PublisherRegister to Publisher * @@ -42,37 +98,11 @@ public class PublisherConverter { */ public static Publisher convert(PublisherRegister publisherRegister) { - Converter messageToData = source -> { - Publisher publisher = new Publisher(); - - publisher.setAppName(source.getAppName()); - //ZONE MUST BE CURRENT SESSION ZONE - publisher.setCell(source.getZone()); - publisher.setClientId(source.getClientId()); - publisher.setDataId(source.getDataId()); - publisher.setGroup(source.getGroup()); - publisher.setInstanceId(source.getInstanceId()); - publisher.setRegisterId(source.getRegistId()); - publisher.setProcessId(source.getProcessId()); - publisher.setVersion(source.getVersion()); - - //registerTimestamp must happen from server,client time maybe different cause pub and unPublisher fail - publisher.setRegisterTimestamp(System.currentTimeMillis()); - - publisher.setClientRegisterTimestamp(source.getTimestamp()); - publisher.setSourceAddress(new URL(source.getIp(), source.getPort())); - - publisher.setClientVersion(ClientVersion.StoreData); - - DataInfo dataInfo = new DataInfo(source.getInstanceId(), source.getDataId(), - source.getGroup()); - publisher.setDataInfoId(dataInfo.getDataInfoId()); - - publisher.setDataList(convert(source.getDataList())); + if (StringUtils.equalsIgnoreCase(APP_PUBLISHER, publisherRegister.getAttributes().get(PUB_TYPE))) { + return appPublisherConverter.convert(publisherRegister); + } - return publisher; - }; - return messageToData.convert(publisherRegister); + return publisherConverter.convert(publisherRegister); } public static List convert(List boxList) { @@ -86,4 +116,23 @@ public static List convert(List boxList) { } return serverDataBoxes; } + + private static List convert2AppDataList(List dataList) { + List dataBoxes = new ArrayList<>(); + if (CollectionUtils.isEmpty(dataList)) { + return dataBoxes; + } + + for (DataBox dataBox : dataList) { + AppRegisterServerDataBox serverDataBox = new AppRegisterServerDataBox(); + JSONObject jsonObject = JSON.parseObject(dataBox.getData()); + serverDataBox.setUrl(jsonObject.getString(AppRegisterConstant.URL_KEY)); + serverDataBox.setRevision(jsonObject.getString(AppRegisterConstant.REVISION_KEY)); + serverDataBox.setBaseParams(JSONObject.parseObject(jsonObject.getString(AppRegisterConstant.BASE_PARAMS_KEY), HashMap.class)); + serverDataBox.setServiceParams(JSONObject.parseObject(jsonObject.getString(AppRegisterConstant.INTERFACE_PARAMS_KEY), HashMap.class)); + dataBoxes.add(serverDataBox); + } + + return dataBoxes; + } } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java index 5a8a4380f..db8fecf9e 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java @@ -21,6 +21,7 @@ import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.common.model.store.Watcher; +import com.alipay.sofa.registry.converter.AssembleTypeConverter; import com.alipay.sofa.registry.converter.ScopeEnumConverter; import com.alipay.sofa.registry.core.model.ConfiguratorRegister; import com.alipay.sofa.registry.core.model.SubscriberRegister; @@ -63,6 +64,7 @@ public static Subscriber convert(SubscriberRegister subscriberRegister) { source.getGroup()); subscriber.setDataInfoId(dataInfo.getDataInfoId()); + subscriber.setAssembleType(AssembleTypeConverter.convertToSubType(subscriberRegister.getAssembleType())); return subscriber; }; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/DataChangeFetchTaskListener.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/DataChangeFetchTaskListener.java index a2517509a..0ca2050ee 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/DataChangeFetchTaskListener.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/listener/DataChangeFetchTaskListener.java @@ -16,6 +16,10 @@ */ package com.alipay.sofa.registry.server.session.listener; +import com.alipay.sofa.registry.server.session.assemble.SubscriberAssembleStrategy; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import com.alipay.sofa.registry.server.session.push.FirePushService; import org.springframework.beans.factory.annotation.Autowired; import com.alipay.sofa.registry.log.Logger; @@ -54,7 +58,10 @@ public class DataChangeFetchTaskListener implements TaskListener { private ExecutorManager executorManager; @Autowired - private CacheService sessionCacheService; + private SubscriberAssembleStrategy subscriberAssembleStrategy; + + @Autowired + private AppRevisionCacheRegistry appRevisionCacheRegistry; /** * trigger task com.alipay.sofa.registry.server.meta.listener process @@ -62,6 +69,12 @@ public class DataChangeFetchTaskListener implements TaskListener { @Autowired private TaskListenerManager taskListenerManager; + @Autowired + private SessionDatumCacheDecorator sessionDatumCacheDecorator; + + @Autowired + private FirePushService firePushService; + private volatile TaskDispatcher singleTaskDispatcher; private TaskProcessor dataNodeSingleTaskProcessor; @@ -94,7 +107,8 @@ public TaskType support() { @Override public void handleEvent(TaskEvent event) { SessionTask dataChangeFetchTask = new DataChangeFetchTask(sessionServerConfig, - taskListenerManager, executorManager, sessionInterests, sessionCacheService); + taskListenerManager, executorManager, sessionInterests, subscriberAssembleStrategy, + sessionDatumCacheDecorator, appRevisionCacheRegistry, firePushService); dataChangeFetchTask.setTaskEvent(event); getSingleTaskDispatcher().dispatch(dataChangeFetchTask.getTaskId(), dataChangeFetchTask, diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeService.java index 89678a26e..0c1110f49 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeService.java @@ -16,7 +16,6 @@ */ package com.alipay.sofa.registry.server.session.node.service; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.core.model.AppRevisionRegister; import java.util.List; @@ -25,7 +24,7 @@ public interface AppRevisionNodeService { void register(AppRevisionRegister appRevision); - List fetchMulti(List keys); + List fetchMulti(List keys); - List checkRevisions(String keysDigest); + List checkRevisions(String keysDigest); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java index beced778f..851d44a16 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/AppRevisionNodeServiceImpl.java @@ -19,7 +19,6 @@ import com.alipay.sofa.registry.common.model.metaserver.CheckRevisionsRequest; import com.alipay.sofa.registry.common.model.metaserver.FetchRevisionsRequest; import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.core.model.AppRevisionRegister; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -68,7 +67,7 @@ public URL getRequestUrl() { } } - public List checkRevisions(String keysDigest) { + public List checkRevisions(String keysDigest) { Request request = new Request() { @Override public CheckRevisionsRequest getRequestBody() { @@ -83,7 +82,7 @@ public URL getRequestUrl() { }; try { Response response = metaNodeExchanger.request(request); - return (List) response.getResult(); + return (List) response.getResult(); } catch (RequestException e) { LOGGER.error("check app revisions error! " + e.getMessage(), e); throw new RuntimeException("check app revisions error! " + e.getMessage(), e); @@ -91,7 +90,7 @@ public URL getRequestUrl() { } - public List fetchMulti(List keys) { + public List fetchMulti(List keys) { Request request = new Request() { @Override public FetchRevisionsRequest getRequestBody() { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java index 489492958..4a33cc09c 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/node/service/ClientNodeService.java @@ -22,8 +22,6 @@ import com.alipay.sofa.registry.core.model.AppRevisionKey; import com.alipay.sofa.registry.common.model.metaserver.ProvideData; -import java.util.List; - /** * @author shangyu.wh * @version $Id: ClientNodeService.java, v 0.1 2017-12-01 11:16 shangyu.wh Exp $ diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/RevisionPredicate.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/RevisionPredicate.java new file mode 100644 index 000000000..8ff5aa251 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/RevisionPredicate.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.predicate; + +import com.alipay.sofa.registry.core.model.AppRevisionRegister; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.function.Predicate; + +/** + * + * @author xiaojian.xj + * @version $Id: RevisionPredicate.java, v 0.1 2020年11月13日 15:02 xiaojian.xj Exp $ + */ +public class RevisionPredicate { + + @Autowired + private AppRevisionCacheRegistry appRevisionCacheRegistry; + + public Predicate revisionPredicate(String dataInfoId) { + Predicate predicate = (revision) -> { + + AppRevisionRegister revisionRegister = appRevisionCacheRegistry.getRevision(revision); + if (revisionRegister == null) { + return false; + } + if (!revisionRegister.interfaces.containsKey(dataInfoId)) { + return false; + } + return true; + }; + return predicate; + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/ZonePredicate.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/ZonePredicate.java new file mode 100644 index 000000000..0219dd7e7 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/predicate/ZonePredicate.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.predicate; + +import com.alipay.sofa.registry.core.model.ScopeEnum; +import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; + +import java.util.function.Predicate; + +/** + * + * @author xiaojian.xj + * @version $Id: ZonePredicate.java, v 0.1 2020年11月12日 21:57 xiaojian.xj Exp $ + */ +public class ZonePredicate { + + public static Predicate zonePredicate(String dataId, String clientCell, + ScopeEnum scopeEnum, SessionServerConfig sessionServerConfig) { + Predicate zonePredicate = (zone) -> { + if (!clientCell.equals(zone)) { + if (ScopeEnum.zone == scopeEnum) { + // zone scope subscribe only return zone list + return true; + + } else if (ScopeEnum.dataCenter == scopeEnum || ScopeEnum.global == scopeEnum) { + // disable zone config + if (sessionServerConfig.isInvalidForeverZone(zone) && !sessionServerConfig + .isInvalidIgnored(dataId)) { + return true; + } + } + } + return false; + + }; + return zonePredicate; + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java new file mode 100644 index 000000000..d18bb9a32 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.AppPublisher; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.core.model.DataBox; +import com.alipay.sofa.registry.core.model.ReceivedData; +import com.alipay.sofa.registry.core.model.ScopeEnum; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; +import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter; +import com.alipay.sofa.registry.server.session.predicate.RevisionPredicate; +import com.alipay.sofa.registry.server.session.predicate.ZonePredicate; +import com.alipay.sofa.registry.server.session.scheduler.task.Constant; +import com.alipay.sofa.registry.server.session.scheduler.task.PushTaskClosure; +import com.alipay.sofa.registry.task.listener.TaskEvent; +import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType; +import com.alipay.sofa.registry.task.listener.TaskListenerManager; +import com.alipay.sofa.registry.util.DatumVersionUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * + * @author xiaojian.xj + * @version $Id: FirePushService.java, v 0.1 2020年11月12日 21:41 xiaojian.xj Exp $ + */ +public class FirePushService { + + private static final Logger taskLogger = LoggerFactory.getLogger(FirePushService.class); + + @Autowired + private RevisionPredicate revisionPredicate; + + @Autowired + private SessionServerConfig sessionServerConfig; + + @Autowired + private TaskListenerManager taskListenerManager; + + public void fireUserDataElementPushTask(Subscriber subscriber, Datum datum) { + + List subscribers = Collections.singletonList(subscriber); + this.fireUserDataElementPushTask(subscriber.getSourceAddress(), datum, subscribers, null, + subscriber.getScope()); + } + + public void fireUserDataElementPushTask(URL clientUrl, Datum datum, + Collection subscribers, + PushTaskClosure pushTaskClosure, ScopeEnum scopeEnum) { + TaskEvent taskEvent; + if (scopeEnum == ScopeEnum.zone) { + taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_PUSH_TASK); + } else if (scopeEnum == ScopeEnum.dataCenter) { + taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK); + } else { + return; + } + taskEvent.setTaskClosure(pushTaskClosure); + taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion())); + taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); + taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum); + taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, clientUrl); + + int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0; + + taskLogger.info( + "send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", + taskEvent.getTaskType(), clientUrl, datum.getDataInfoId(), datum.getDataCenter(), size, + subscribers.size(), taskEvent.getTaskId()); + taskListenerManager.sendTaskEvent(taskEvent); + } + + public void fireReceivedDataMultiPushTask(Datum datum, Subscriber subscriber) { + List subscriberRegisterIdList = Collections.singletonList(subscriber + .getRegisterId()); + + List subscribers = Collections.singletonList(subscriber); + this.fireReceivedDataMultiPushTask(datum, subscriberRegisterIdList, subscribers, + subscriber.getScope(), subscriber, null); + } + + public void fireReceivedDataMultiPushTask(Datum datum, List subscriberRegisterIdList, + Collection subscribers, + ScopeEnum scopeEnum, Subscriber subscriber, + PushTaskClosure pushTaskClosure) { + String dataId = datum.getDataId(); + String clientCell = sessionServerConfig.getClientCell(subscriber.getCell()); + Predicate zonePredicate = ZonePredicate.zonePredicate(dataId, clientCell, + scopeEnum, sessionServerConfig); + + ReceivedData receivedData = ReceivedDataConverter.getReceivedDataMulti(datum, scopeEnum, + subscriberRegisterIdList, clientCell, zonePredicate); + + //trigger push to client node + Map parameter = new HashMap<>(); + parameter.put(receivedData, subscriber.getSourceAddress()); + TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); + taskEvent.setTaskClosure(pushTaskClosure); + taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); + taskLogger.info("send {} taskURL:{},taskScope:{},,taskId={}", taskEvent.getTaskType(), + subscriber.getSourceAddress(), scopeEnum, taskEvent.getTaskId()); + taskListenerManager.sendTaskEvent(taskEvent); + } + + public void fireReceivedDataMultiPushTask(Subscriber subscriber) { + + List subscriberRegisterIdList = Collections.singletonList(subscriber + .getRegisterId()); + String clientCell = sessionServerConfig.getClientCell(subscriber.getCell()); + ReceivedData receivedData = ReceivedDataConverter.getReceivedDataMulti( + subscriber.getDataId(), subscriber.getGroup(), subscriber.getInstanceId(), + sessionServerConfig.getSessionServerDataCenter(), subscriber.getScope(), + subscriberRegisterIdList, clientCell); + + //trigger push to client node + Map parameter = new HashMap<>(); + parameter.put(receivedData, subscriber.getSourceAddress()); + TaskEvent taskEvent = new TaskEvent(parameter, + TaskEvent.TaskType.RECEIVED_DATA_MULTI_PUSH_TASK); + taskLogger.info("send {} taskURL:{},taskScope", taskEvent.getTaskType(), + subscriber.getSourceAddress(), receivedData.getScope()); + taskListenerManager.sendTaskEvent(taskEvent); + } +} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java index a974e75b8..641c2b8ba 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java @@ -18,6 +18,22 @@ import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.store.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import org.springframework.beans.factory.annotation.Autowired; + +import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.RenewDatumRequest; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.StoreData; +import com.alipay.sofa.registry.common.model.store.Subscriber; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.common.model.store.Watcher; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.remoting.Channel; @@ -54,61 +70,64 @@ */ public class SessionRegistry implements Registry { - private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class); - protected static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class, - "[Task]"); + protected static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class, + "[Task]"); /** * store subscribers */ @Autowired - private Interests sessionInterests; + private Interests sessionInterests; /** * store watchers */ @Autowired - private Watchers sessionWatchers; + private Watchers sessionWatchers; /** * store publishers */ @Autowired - private DataStore sessionDataStore; + private DataStore sessionDataStore; /** * transfer data to DataNode */ @Autowired - private DataNodeService dataNodeService; + private DataNodeService dataNodeService; /** * trigger task com.alipay.sofa.registry.server.meta.listener process */ @Autowired - private TaskListenerManager taskListenerManager; + private TaskListenerManager taskListenerManager; @Autowired - private SessionServerConfig sessionServerConfig; + private SessionServerConfig sessionServerConfig; @Autowired - private Exchange boltExchange; + private Exchange boltExchange; @Autowired - private SessionRegistryStrategy sessionRegistryStrategy; + private SessionRegistryStrategy sessionRegistryStrategy; @Autowired private WrapperInterceptorManager wrapperInterceptorManager; @Autowired - private DataIdMatchStrategy dataIdMatchStrategy; + private DataIdMatchStrategy dataIdMatchStrategy; @Autowired - private WriteDataAcceptor writeDataAcceptor; + private WriteDataAcceptor writeDataAcceptor; @Autowired - private SlotTableCache slotTableCache; + private SlotTableCache slotTableCache; + private AppRevisionCacheRegistry appRevisionCacheRegistry; + + private volatile boolean enableDataRenewSnapshot = true; @Override public void register(StoreData storeData) { @@ -180,7 +199,7 @@ public void unRegister(StoreData storeData) { // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot) // are handed over to WriteDataAcceptor writeDataAcceptor.accept(new PublisherWriteDataRequest(publisher, - WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache + WriteDataRequest.WriteDataRequestType.UN_PUBLISHER, slotTableCache .getLeader(publisher.getDataInfoId()))); sessionRegistryStrategy.afterPublisherUnRegister(publisher); @@ -257,10 +276,18 @@ public void fetchChangDataProcess() { checkDataInfoIds.add(dataInfoId); } }); + Set fetchDataInfoIds = new HashSet<>(); + + for (String dataInfoId : checkDataInfoIds) { + fetchDataInfoIds.add(dataInfoId); + fetchDataInfoIds.addAll(appRevisionCacheRegistry.getApps(dataInfoId)); + } - LOGGER.info("[fetchChangDataProcess] Fetch data versions for {} dataInfoIds", checkDataInfoIds.size()); + LOGGER.info("[fetchChangDataProcess] Fetch data versions for {} dataInfoIds", + fetchDataInfoIds.size()); - Map/*dataInfoIds*/> map = calculateDataNode(checkDataInfoIds); + Map/*dataInfoIds*/> map = calculateDataNode( + fetchDataInfoIds); map.forEach((address, dataInfoIds) -> { @@ -304,82 +331,82 @@ public void remove(List connectIds) { subExisted = true; subMap.forEach((registerId, sub) -> { - if(isFireSubscriberPushEmptyTask(sub.getDataId())){ - fireSubscriberPushEmptyTask(sub); - } - }); - } + if (isFireSubscriberPushEmptyTask(sub.getDataId())) { + fireSubscriberPushEmptyTask(sub); + } + }); + } - if (pubExisted || subExisted) { - connectIdsAll.add(connectId); - } - }); - cancel(connectIdsAll); - } + if (pubExisted || subExisted) { + connectIdsAll.add(connectId); + } + }); + cancel(connectIdsAll); + } - protected boolean isFireSubscriberPushEmptyTask(String dataId) { - return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex()); - } + protected boolean isFireSubscriberPushEmptyTask (String dataId){ + return dataIdMatchStrategy.match(dataId, () -> sessionServerConfig.getBlacklistSubDataIdRegex()); + } - private void fireSubscriberPushEmptyTask(Subscriber subscriber) { - //trigger empty data push - TaskEvent taskEvent = new TaskEvent(subscriber, - TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK); - TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); - getTaskListenerManager().sendTaskEvent(taskEvent); - } + private void fireSubscriberPushEmptyTask (Subscriber subscriber){ + //trigger empty data push + TaskEvent taskEvent = new TaskEvent(subscriber, + TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK); + TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); + getTaskListenerManager().sendTaskEvent(taskEvent); + } - public void cleanClientConnect() { - - Set connectIndexes = new HashSet<>(); - Set pubIndexes = sessionDataStore.getConnectIds(); - Set subIndexes = sessionInterests.getConnectIds(); - Set watchIndexes = sessionWatchers.getConnectIds(); - connectIndexes.addAll(pubIndexes); - connectIndexes.addAll(subIndexes); - connectIndexes.addAll(watchIndexes); - - Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); - - List connectIds = new ArrayList<>(); - for (ConnectId connectId : connectIndexes) { - Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(), - connectId.getClientPort())); - if (channel == null) { - connectIds.add(connectId); - LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}", - connectId); + public void cleanClientConnect () { + + Set connectIndexes = new HashSet<>(); + Set pubIndexes = sessionDataStore.getConnectIds(); + Set subIndexes = sessionInterests.getConnectIds(); + Set watchIndexes = sessionWatchers.getConnectIds(); + connectIndexes.addAll(pubIndexes); + connectIndexes.addAll(subIndexes); + connectIndexes.addAll(watchIndexes); + + Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); + + List connectIds = new ArrayList<>(); + for (ConnectId connectId : connectIndexes) { + Channel channel = sessionServer.getChannel(new URL(connectId.getClientHostAddress(), + connectId.getClientPort())); + if (channel == null) { + connectIds.add(connectId); + LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}", + connectId); + } + } + if (!connectIds.isEmpty()) { + cancel(connectIds); } } - if (!connectIds.isEmpty()) { - cancel(connectIds); - } - } - /** - * Getter method for property sessionInterests. - * - * @return property value of sessionInterests - */ - protected Interests getSessionInterests() { - return sessionInterests; - } + /** + * Getter method for property sessionInterests. + * + * @return property value of sessionInterests + */ + protected Interests getSessionInterests () { + return sessionInterests; + } - /** - * Getter method for property sessionDataStore. - * - * @return property value of sessionDataStore - */ - protected DataStore getSessionDataStore() { - return sessionDataStore; - } + /** + * Getter method for property sessionDataStore. + * + * @return property value of sessionDataStore + */ + protected DataStore getSessionDataStore () { + return sessionDataStore; + } - /** - * Getter method for property taskListenerManager. - * - * @return property value of taskListenerManager - */ - protected TaskListenerManager getTaskListenerManager() { - return taskListenerManager; - } -} \ No newline at end of file + /** + * Getter method for property taskListenerManager. + * + * @return property value of taskListenerManager + */ + protected TaskListenerManager getTaskListenerManager () { + return taskListenerManager; + } + } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java index dc98338c3..395814ee9 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java @@ -16,6 +16,19 @@ */ package com.alipay.sofa.registry.server.session.remoting.handler; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.Executor; + +import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.server.session.cache.Value; +import com.alipay.remoting.util.StringUtils; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import org.springframework.beans.factory.annotation.Autowired; + import com.alipay.sofa.registry.common.model.Node.NodeType; import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest; import com.alipay.sofa.registry.log.Logger; @@ -35,7 +48,6 @@ import java.util.concurrent.Executor; /** - * * @author kezhu.wukz * @author shangyu.wh * @version $Id: DataChangeRequestHandler.java, v 0.1 2017-12-12 15:09 shangyu.wh Exp $ @@ -66,6 +78,12 @@ public class DataChangeRequestHandler extends AbstractClientHandler interfaces = appRevisionCacheRegistry.getInterfaces(dataInfo + .getDataId()); + for (String interfaceDataInfoId : interfaces) { + DataChangeRequest request = new DataChangeRequest(); + request.setDataInfoId(interfaceDataInfoId); + request.setChangedDataInfoId(dataChangeRequest.getDataInfoId()); + request.setDataCenter(dataChangeRequest.getDataCenter()); + request.setVersion(dataChangeRequest.getVersion()); + fireChangFetch(request); + } + } else { + dataChangeRequest.setChangedDataInfoId(dataChangeRequest.getDataInfoId()); + fireChangFetch(dataChangeRequest); } - EXCHANGE_LOGGER.info( "Data version has change,and will fetch to update!Request={},URL={}", dataChangeRequest, channel.getRemoteAddress()); - - fireChangFetch(dataChangeRequest); } catch (Exception e) { LOGGER.error("DataChange Request error!", e); throw new RuntimeException("DataChangeRequest Request error!", e); @@ -111,11 +139,22 @@ public Object doHandle(Channel channel, DataChangeRequest dataChangeRequest) { return null; } + private void refreshMeta(Collection revisions) { + for (String revision : revisions) { + appRevisionCacheRegistry.getRevision(revision); + } + } + /** - * * @param dataChangeRequest */ private void fireChangFetch(DataChangeRequest dataChangeRequest) { + boolean result = sessionInterests.checkInterestVersions(dataChangeRequest.getDataCenter(), + dataChangeRequest.getDataInfoId(), dataChangeRequest.getVersion()); + + if (!result) { + return; + } dataChangeRequestHandlerStrategy.doFireChangFetch(dataChangeRequest); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java index 8d83e5853..44dd904ba 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java @@ -16,40 +16,40 @@ */ package com.alipay.sofa.registry.server.session.scheduler.task; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Predicate; - +import com.alipay.remoting.util.StringUtils; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest; +import com.alipay.sofa.registry.common.model.store.AppPublisher; import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.core.model.ReceivedData; +import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.session.assemble.SubscriberAssembleStrategy; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.cache.CacheAccessException; -import com.alipay.sofa.registry.server.session.cache.CacheService; -import com.alipay.sofa.registry.server.session.cache.DatumKey; -import com.alipay.sofa.registry.server.session.cache.Key; -import com.alipay.sofa.registry.server.session.cache.Key.KeyType; -import com.alipay.sofa.registry.server.session.cache.Value; -import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter; +import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.server.session.cache.SessionDatumCacheDecorator; +import com.alipay.sofa.registry.server.session.push.FirePushService; import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager; import com.alipay.sofa.registry.server.session.store.Interests; import com.alipay.sofa.registry.server.session.store.ReSubscribers; import com.alipay.sofa.registry.task.batcher.TaskProcessor.ProcessingResult; import com.alipay.sofa.registry.task.listener.TaskEvent; -import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType; import com.alipay.sofa.registry.task.listener.TaskListenerManager; -import com.alipay.sofa.registry.util.DatumVersionUtil; +import org.springframework.util.CollectionUtils; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; /** * @@ -58,122 +58,147 @@ */ public class DataChangeFetchTask extends AbstractSessionTask { - private final static Logger LOGGER = LoggerFactory - .getLogger(DataChangeFetchTask.class); + private final static Logger LOGGER = LoggerFactory + .getLogger(DataChangeFetchTask.class); - private static final Logger taskLogger = LoggerFactory.getLogger( - DataChangeFetchTask.class, "[Task]"); - - private final SessionServerConfig sessionServerConfig; + private final SessionServerConfig sessionServerConfig; /** * trigger task com.alipay.sofa.registry.server.meta.listener process */ - private final TaskListenerManager taskListenerManager; + private final TaskListenerManager taskListenerManager; + + private final ExecutorManager executorManager; + + private DataChangeRequest dataChangeRequest; + + private final Interests sessionInterests; - private final ExecutorManager executorManager; + private final SessionDatumCacheDecorator sessionDatumCacheDecorator; - private DataChangeRequest dataChangeRequest; + private final FirePushService firePushService; - private final Interests sessionInterests; + private final SubscriberAssembleStrategy subscriberAssembleStrategy; - private final CacheService sessionCacheService; + private final AppRevisionCacheRegistry appRevisionCacheRegistry; public DataChangeFetchTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, ExecutorManager executorManager, Interests sessionInterests, - CacheService sessionCacheService) { + SubscriberAssembleStrategy subscriberAssembleStrategy, + SessionDatumCacheDecorator sessionDatumCacheDecorator, + AppRevisionCacheRegistry appRevisionCacheRegistry, + FirePushService firePushService) { this.sessionServerConfig = sessionServerConfig; this.taskListenerManager = taskListenerManager; this.executorManager = executorManager; this.sessionInterests = sessionInterests; - this.sessionCacheService = sessionCacheService; + this.subscriberAssembleStrategy = subscriberAssembleStrategy; + this.appRevisionCacheRegistry = appRevisionCacheRegistry; + this.sessionDatumCacheDecorator = sessionDatumCacheDecorator; + this.firePushService = firePushService; } @Override public void execute() { - String localDataCenterID = sessionServerConfig.getSessionServerDataCenter(); + DataInfo dataInfo = DataInfo.valueOf(dataChangeRequest.getDataInfoId()); + Datum datum = sessionDatumCacheDecorator.getDatumCache(dataChangeRequest.getDataCenter(), + dataChangeRequest.getDataInfoId()); + // FIXME delete + // if (StringUtils.equals(APP_GROUP, dataInfo.getDataType())) { + // + // refreshMeta(datum.getPubMap().values()); + // + // //dataInfoId is app, get relate interfaces dataInfoId from cache + // Set interfaces = appRevisionCacheRegistry.getInterfaces(dataChangeRequest + // .getDataInfoId()); + // for (String interfaceDataInfoId : interfaces) { + // doExecute(interfaceDataInfoId); + // } + // } + doExecute(dataChangeRequest.getDataInfoId()); + } + private void doExecute(String dataInfoId) { + String localDataCenterID = sessionServerConfig.getSessionServerDataCenter(); boolean ifLocalDataCenter = localDataCenterID.equals(dataChangeRequest.getDataCenter()); - Datum datum = getDatumCache(); - - if (datum != null) { - PushTaskClosure pushTaskClosure = getTaskClosure(datum.getVersion()); - - for (ScopeEnum scopeEnum : ScopeEnum.values()) { - Map> map = getCache(scopeEnum); - if (map != null && !map.isEmpty()) { - LOGGER - .info( - "Get all subscribers to send from cache size:{},which dataInfoId:{} on dataCenter:{},scope:{}", - map.size(), dataChangeRequest.getDataInfoId(), - dataChangeRequest.getDataCenter(), scopeEnum); - for (Entry> entry : map.entrySet()) { - Map subscriberMap = entry.getValue(); - if (subscriberMap != null && !subscriberMap.isEmpty()) { - - //check subscriber push version - Collection subscribersSend = subscribersVersionCheck(subscriberMap - .values()); - if (subscribersSend.isEmpty()) { - continue; - } + for (ScopeEnum scopeEnum : ScopeEnum.values()) { + Map> map = getCache(dataInfoId, scopeEnum); + if (CollectionUtils.isEmpty(map)) { + continue; + } + + LOGGER.info("Get all subscribers to send from cache size:{},which dataInfoId:{} on dataCenter:{},scope:{}", + map.size(), dataInfoId, dataChangeRequest.getDataCenter(), scopeEnum); + for (Entry> entry : map.entrySet()) { + Map subscriberMap = entry.getValue(); + + if (CollectionUtils.isEmpty(subscriberMap)) { + continue; + } + + //check subscriber push version + Collection subscribers = subscribersVersionCheck(subscriberMap + .values()); + if (subscribers.isEmpty()) { + continue; + } + + //remove stopPush subscriber avoid push duplicate + evictReSubscribers(subscribers); + + List subscriberRegisterIdList = new ArrayList<>(subscriberMap.keySet()); - //remove stopPush subscriber avoid push duplicate - evictReSubscribers(subscribersSend); + for (AssembleType assembleType : AssembleType.values()) { - List subscriberRegisterIdList = new ArrayList<>( - subscriberMap.keySet()); + List subscribersSend = subscribers.stream().filter( + subscriber -> subscriber.getAssembleType() == assembleType) + .collect(Collectors.toList()); + if(subscribersSend.isEmpty()){ + continue; + } + Subscriber defaultSubscriber = subscribersSend.stream().findFirst().get(); + Datum datum = subscriberAssembleStrategy.assembleDatum(assembleType, + sessionServerConfig.getSessionServerDataCenter(), + defaultSubscriber); + + if (datum == null) { + LOGGER.error("Get publisher data error,which dataInfoId:" + + dataInfoId + " on dataCenter:" + + dataChangeRequest.getDataCenter()); + continue; + } + PushTaskClosure pushTaskClosure = getTaskClosure(dataInfoId, datum.getVersion()); + + switch (scopeEnum) { + case zone: + case dataCenter: + if (!ifLocalDataCenter) { + break; + } Subscriber subscriber = subscriberMap.values().iterator().next(); boolean isOldVersion = !ClientVersion.StoreData.equals(subscriber - .getClientVersion()); - - switch (scopeEnum) { - case zone: - if (ifLocalDataCenter) { - if (isOldVersion) { - fireUserDataElementPushTask(entry.getKey(), datum, - subscribersSend, pushTaskClosure); - } else { - fireReceivedDataMultiPushTask(datum, - subscriberRegisterIdList, subscribersSend, - ScopeEnum.zone, subscriber, pushTaskClosure); - } - } - break; - case dataCenter: - if (ifLocalDataCenter) { - if (isOldVersion) { - fireUserDataElementMultiPushTask(entry.getKey(), datum, - subscribersSend, pushTaskClosure); - } else { - fireReceivedDataMultiPushTask(datum, - subscriberRegisterIdList, subscribersSend, - scopeEnum, subscriber, pushTaskClosure); - } - } - break; - case global: - fireReceivedDataMultiPushTask(datum, subscriberRegisterIdList, - subscribersSend, scopeEnum, subscriber, pushTaskClosure); - break; - default: - LOGGER.warn("unknown scope, {}", subscriber); + .getClientVersion()); + if (isOldVersion) { + firePushService.fireUserDataElementPushTask(new URL(entry.getKey()), datum, subscribersSend, pushTaskClosure, scopeEnum); + } else { + firePushService.fireReceivedDataMultiPushTask(datum, subscriberRegisterIdList, subscribersSend, + scopeEnum, subscriber, pushTaskClosure); } - } + break; + case global: + firePushService.fireReceivedDataMultiPushTask(datum, subscriberRegisterIdList, + subscribersSend, scopeEnum, defaultSubscriber, pushTaskClosure); + break; + default: + LOGGER.warn("unknown scope, {}", scopeEnum); } + pushTaskClosure.start(); } } - - pushTaskClosure.start(); - - } else { - LOGGER.error("Get publisher data error,which dataInfoId:" - + dataChangeRequest.getDataInfoId() + " on dataCenter:" - + dataChangeRequest.getDataCenter()); } } @@ -188,13 +213,12 @@ private Collection subscribersVersionCheck(Collection su return subscribersSend; } - public PushTaskClosure getTaskClosure(Long version) { + public PushTaskClosure getTaskClosure(String dataInfoId, Long version) { //this for all this dataInfoId push result get and call back to change version PushTaskClosure pushTaskClosure = new PushTaskClosure(executorManager.getPushTaskCheckAsyncHashedWheelTimer(), - sessionServerConfig, dataChangeRequest.getDataInfoId()); + sessionServerConfig, dataInfoId); pushTaskClosure.setTaskClosure((status, task) -> { String dataCenter = dataChangeRequest.getDataCenter(); - String dataInfoId = dataChangeRequest.getDataInfoId(); Long changeVersion = dataChangeRequest.getVersion(); if (status == ProcessingResult.Success) { @@ -205,17 +229,20 @@ public PushTaskClosure getTaskClosure(Long version) { } boolean result = sessionInterests.checkAndUpdateInterestVersions(dataCenter, dataInfoId, version); if (result) { - LOGGER.info("Push all tasks success, dataCenter:{}, dataInfoId:{}, changeVersion:{}, pushVersion:{}, update!", dataCenter, + LOGGER.info("Push all tasks success, dataCenter:{}, dataInfoId:{}, changeVersion:{}, pushVersion:{}, update!", + dataCenter, dataInfoId, changeVersion, version); } else { LOGGER.info("Push all tasks success,but dataCenter:{} dataInfoId:{} version:{} need not update!", dataCenter, dataInfoId, version); - LOGGER.info("Push all tasks success, but dataCenter:{}, dataInfoId:{}, changeVersion:{}, pushVersion:{}, need not update!", + LOGGER.info( + "Push all tasks success, but dataCenter:{}, dataInfoId:{}, changeVersion:{}, pushVersion:{}, need not update!", dataCenter, dataInfoId, changeVersion, version); } } else { LOGGER.warn( - "Push tasks found error, subscribers version can not be update! dataCenter:{}, dataInfoId:{}, changeVersion:{}, pushVersion:{}", + "Push tasks found error, subscribers version can not be update! dataCenter:{}, dataInfoId:{}, changeVersion:{}, " + + "pushVersion:{}", dataCenter, dataInfoId, changeVersion, version); } }); @@ -322,6 +349,9 @@ private void fireUserDataElementMultiPushTask(InetSocketAddress address, Datum d taskEvent.getTaskType(), address, datum.getDataInfoId(), datum.getDataCenter(), size, subscribers.size(), taskEvent.getTaskId()); taskListenerManager.sendTaskEvent(taskEvent); + private Map> getCache(String dataInfoId, + ScopeEnum scopeEnum) { + return sessionInterests.querySubscriberIndex(dataInfoId, scopeEnum); } @Override diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java index 37d49eaa8..a40b2d742 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionDataStore.java @@ -22,6 +22,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.alipay.sofa.registry.common.model.ConnectId; +import com.alipay.sofa.registry.common.model.PublisherInternUtil; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -49,7 +51,7 @@ public class SessionDataStore implements DataStore { @Override public void add(Publisher publisher) { - Publisher.internPublisher(publisher); + PublisherInternUtil.internPublisher(publisher); write.lock(); try { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java index 22dc2e477..0fb5d5cbd 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java @@ -17,6 +17,21 @@ package com.alipay.sofa.registry.server.session.store; import com.alipay.sofa.registry.common.model.ConnectId; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.sessionserver.DatumSetVersion; +import org.springframework.beans.factory.annotation.Autowired; + import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.WordCache; import com.alipay.sofa.registry.core.model.ScopeEnum; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/DefaultAppRevisionHandlerStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/DefaultAppRevisionHandlerStrategy.java index 97acec47f..4f97d0b14 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/DefaultAppRevisionHandlerStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/DefaultAppRevisionHandlerStrategy.java @@ -20,11 +20,13 @@ import com.alipay.sofa.registry.core.model.RegisterResponse; import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; +import com.alipay.sofa.registry.task.listener.TaskEvent; +import com.alipay.sofa.registry.task.listener.TaskListenerManager; import org.springframework.beans.factory.annotation.Autowired; public class DefaultAppRevisionHandlerStrategy implements AppRevisionHandlerStrategy { @Autowired - AppRevisionCacheRegistry appRevisionCacheService; + private AppRevisionCacheRegistry appRevisionCacheService; @Override public void handleAppRevisionRegister(AppRevisionRegister appRevisionRegister, diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java index 76c498598..32186a7ee 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java @@ -43,4 +43,5 @@ public void doFireChangFetch(DataChangeRequest dataChangeRequest) { taskLogger.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); taskListenerManager.sendTaskEvent(taskEvent); } + } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSessionRegistryStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSessionRegistryStrategy.java index ac76297ef..527a102a1 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSessionRegistryStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSessionRegistryStrategy.java @@ -77,18 +77,20 @@ public void doFetchChangDataProcess(Map> params) { + // if (CollectionUtils.isEmpty(params)) { + // return address; + // } + // URIBuilder builder = null; + // try { + // builder = new URIBuilder(address); + // + // for (Map.Entry> entry : params.entrySet()) { + // String key = entry.getKey(); + // for (String value : entry.getValue()) { + // builder.addParameter(key, value); + // + // } + // } + // return builder.build().toString(); + // } catch (URISyntaxException e) { + // LOGGER.error("build url error.", e); + // return null; + // } + //} + + public static String buildURL(String address, Map> params) { + List querys = new ArrayList<>(); + for (Map.Entry> entry : params.entrySet()) { + String key = entry.getKey(); + for (String value : entry.getValue()) { + querys.add(key + "=" + value); + } + } + String queryStr = String.join("&", querys); + return address + "?" + queryStr; + } +} \ No newline at end of file diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/AppPublisherConverterTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/AppPublisherConverterTest.java new file mode 100644 index 000000000..9a2d8d799 --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/AppPublisherConverterTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.store; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.server.session.converter.AppRegisterConstant; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * + * @author xiaojian.xj + * @version $Id: AppPublisherConverterTest.java, v 0.1 2020年12月11日 17:34 xiaojian.xj Exp $ + */ +public class AppPublisherConverterTest { + + @Test + public void testConvert() { + + String box = "{\"url\":\"127.0.0.1:8080\",\"revision\":\"faf447f9a7990b4be937f0e06664ee41\",\"baseParams\":{\"a\":[\"2\"]}," + + "\"interfaceParams\":{\"com.alipay.test.Simple4#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP\":{},\"com.alipay.test" + + ".Simple5#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP\":{\"b\":[\"3\",\"4\"]}}}"; + + JSONObject jsonObject = JSON.parseObject(box); + AppRegisterServerDataBox serverDataBox = new AppRegisterServerDataBox(); + serverDataBox.setUrl(jsonObject.getString(AppRegisterConstant.URL_KEY)); + serverDataBox.setRevision(jsonObject.getString(AppRegisterConstant.REVISION_KEY)); + serverDataBox.setBaseParams(JSONObject.parseObject( + jsonObject.getString(AppRegisterConstant.BASE_PARAMS_KEY), HashMap.class)); + serverDataBox.setServiceParams(JSONObject.parseObject( + jsonObject.getString(AppRegisterConstant.INTERFACE_PARAMS_KEY), HashMap.class)); + Assert.assertEquals(serverDataBox.getBaseParams().get("a").size(), 1); + Assert.assertEquals( + serverDataBox.getServiceParams() + .get("com.alipay.test.Simple5#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP").get("b") + .size(), 2); + + } +} \ No newline at end of file diff --git a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java index 1f932889e..33a8fca8d 100644 --- a/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java +++ b/test/src/test/java/com/alipay/sofa/registry/test/sync/SessionNotifyTest.java @@ -16,6 +16,28 @@ */ package com.alipay.sofa.registry.test.sync; +import static com.alipay.sofa.registry.client.constants.ValueConstants.DEFAULT_DATA_CENTER; +import static com.alipay.sofa.registry.client.constants.ValueConstants.DEFAULT_GROUP; +import static com.alipay.sofa.registry.common.model.constants.ValueConstants.DEFAULT_INSTANCE_ID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.alipay.sofa.registry.server.session.scheduler.task.DataChangeFetchTask; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.junit4.SpringRunner; + import com.alipay.remoting.Connection; import com.alipay.sofa.registry.common.model.CommonResponse; import com.alipay.sofa.registry.common.model.GenericResponse; @@ -226,7 +248,6 @@ public void doTest() throws Exception { }); }); } - while (true){ TimeUnit.SECONDS.sleep(10); }