From 9eac00a0851dc42cffbe67704c527cd606cf5e7f Mon Sep 17 00:00:00 2001 From: dzdx Date: Mon, 8 Feb 2021 23:04:20 +0800 Subject: [PATCH] cleanup app subscriber --- .../sofa/registry/client/api/Publisher.java | 2 - .../registration/PublisherRegistration.java | 10 - .../provider/AbstractInternalRegister.java | 58 +++-- .../client/provider/DefaultConfigurator.java | 8 +- .../client/provider/DefaultPublisher.java | 11 - .../provider/DefaultRegistryClient.java | 1 - .../client/provider/DefaultSubscriber.java | 7 +- .../registry/client/task/WorkerThread.java | 14 -- .../registry/core/model/AssembleType.java | 43 ---- .../core/model/SubscriberRegister.java | 26 -- .../common/model/SubscriberUtils.java | 15 +- .../model/client/pb/SubscriberRegisterPb.java | 125 ---------- .../pb/SubscriberRegisterPbOrBuilder.java | 10 - .../pb/SubscriberRegisterPbOuterClass.java | 9 +- .../model/constants/ValueConstants.java | 2 - .../registry/common/model/store/DataInfo.java | 3 - .../common/model/store/Subscriber.java | 105 +------- .../proto/SubscriberRegisterPb.proto | 1 - .../registry/common/model/store/SubTest.java | 5 +- .../converter/AssembleTypeConverter.java | 39 --- .../converter/AssembleTypeConverter.java~HEAD | 39 --- .../data/change/DataChangeEventCenter.java | 20 +- .../com/alipay/sofa/registry/AllTests.java | 5 +- .../converter/AppPublisherConverter.java | 127 ---------- .../session/converter/PublisherConverter.java | 6 - .../converter/ReceivedDataConverter.java | 140 +---------- .../converter/SubscriberConverter.java | 2 - .../pb/SubscriberRegisterConvertor.java | 1 - .../server/session/push/FirePushService.java | 232 ++---------------- .../session/push/PushDataGenerator.java | 33 +-- .../server/session/push/PushProcessor.java | 111 +++------ .../session/store/SessionInterests.java | 18 +- .../DefaultSubscriberHandlerStrategy.java | 13 +- .../server/session/WrapperInvocationTest.java | 2 +- .../server/session/store/DataCacheTest.java | 26 +- .../server/shared/util/DatumUtils.java | 21 ++ .../sofa/registry/test/app/PubSubTest.java | 73 ------ 37 files changed, 159 insertions(+), 1204 deletions(-) delete mode 100644 core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java delete mode 100644 server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java delete mode 100644 server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD delete mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java delete mode 100644 test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java diff --git a/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java b/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java index 87c9db0c5..45330b543 100644 --- a/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java +++ b/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java @@ -30,6 +30,4 @@ public interface Publisher extends Register { * @param data the data */ void republish(String... data); - - void setPreRequest(Object request); } diff --git a/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java b/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java index 7a5812790..8194b77c5 100644 --- a/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java +++ b/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java @@ -24,8 +24,6 @@ */ public class PublisherRegistration extends BaseRegistration { - private Object preRequest; - /** * Instantiates a new Publisher registration. * @@ -45,12 +43,4 @@ public String toString() { return "PublisherRegistration{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", appName='" + appName + '\'' + '}'; } - - public Object getPreRequest() { - return preRequest; - } - - public void setPreRequest(Object preRequest) { - this.preRequest = preRequest; - } } diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java index 7d6bb1361..d0032ab7d 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java @@ -37,25 +37,45 @@ */ public abstract class AbstractInternalRegister implements Register { - /** */ + /** + * + */ private final AtomicLong initialVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION); - /** */ + /** + * + */ private AuthManager authManager; - /** */ + /** + * + */ private volatile boolean registered = false; - /** */ + /** + * + */ private volatile boolean enabled = true; - /** */ + /** + * + */ private volatile boolean refused = false; - /** */ + /** + * + */ private AtomicLong pubVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION); - /** */ + /** + * + */ private AtomicLong ackVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION); - /** */ + /** + * + */ private volatile long timestamp = System.currentTimeMillis(); - /** */ + /** + * + */ private volatile int registerCount = 0; - /** */ + /** + * + */ private volatile String requestId = UUID.randomUUID().toString(); private ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -77,8 +97,6 @@ public abstract class AbstractInternalRegister implements Register { */ public abstract Object assembly(); - public abstract Object getPreRequest(); - /** * Is registered boolean. * @@ -111,8 +129,8 @@ void waitToSync() { * Sync ok. * * @param requestId the request id - * @param version the version - * @param refused the refused + * @param version the version + * @param refused the refused * @return the boolean */ public boolean syncOK(String requestId, long version, boolean refused) { @@ -186,7 +204,6 @@ public SyncTask assemblySyncTask() { SyncTask syncTask = new SyncTask(); syncTask.setRequestId(requestId); syncTask.setRequest(assembly()); - syncTask.setPreRequest(getPreRequest()); syncTask.setDone(isDone()); return syncTask; } finally { @@ -305,7 +322,7 @@ void setTimestamp(long timestamp) { /** * Sets auth signature. * - * @param register the register + * @param register the register */ void setAuthSignature(BaseRegister register) { // auth signature @@ -351,8 +368,6 @@ public static class SyncTask { private Object request; - private Object preRequest; - private boolean done; /** @@ -409,12 +424,5 @@ public void setDone(boolean done) { this.done = done; } - public Object getPreRequest() { - return preRequest; - } - - public void setPreRequest(Object preRequest) { - this.preRequest = preRequest; - } } } diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java index d02d89a04..45965c426 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java @@ -35,6 +35,7 @@ /** * The type Default configurator. + * * @author zhuoyu.sjw * @version $Id : DefaultConfigurator.java, v 0.1 2018-04-18 14:41 zhuoyu.sjw Exp $$ */ @@ -56,7 +57,7 @@ public class DefaultConfigurator extends AbstractInternalRegister implements Con /** * Instantiates a new Default configurator. * - * @param config the config + * @param config the config * @param worker the worker */ public DefaultConfigurator(ConfiguratorRegistration registration, RegistryClientConfig config, @@ -150,11 +151,6 @@ public Object assembly() { return register; } - @Override - public Object getPreRequest() { - return null; - } - /** * Put configurator data. * diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java index 3fc79e207..f305a7d11 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java @@ -46,7 +46,6 @@ public class DefaultPublisher extends AbstractInternalRegister implements Publis private Worker worker; private Collection dataList; private RegistryClientConfig config; - private Object preRequest; /** * Instantiates a new Default publisher. @@ -90,11 +89,6 @@ public void republish(String... data) { this.worker.schedule(new TaskEvent(this)); } - @Override - public void setPreRequest(Object preReq) { - this.preRequest = preReq; - } - /** * Unregister. */ @@ -155,11 +149,6 @@ public PublisherRegister assembly() { return register; } - @Override - public Object getPreRequest() { - return preRequest; - } - /** * @see Publisher#getDataId() */ diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java index 6b5e37b1e..b58568d80 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java @@ -260,7 +260,6 @@ public Publisher register(PublisherRegistration registration, String... data) { publisher = new DefaultPublisher(registration, workerThread, registryClientConfig); ((DefaultPublisher) publisher).setAuthManager(authManager); - publisher.setPreRequest(registration.getPreRequest()); Publisher oldPublisher = registrationPublisherMap.putIfAbsent(registration, publisher); if (null != oldPublisher) { diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java index 847360900..3cb07b572 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java @@ -63,7 +63,7 @@ public class DefaultSubscriber extends AbstractInternalRegister implements Subsc /** * Instantiates a new Default subscriber multi. * - * @param registration the registration + * @param registration the registration */ DefaultSubscriber(SubscriberRegistration registration, Worker worker, RegistryClientConfig config) { @@ -217,11 +217,6 @@ public SubscriberRegister assembly() { return register; } - @Override - public Object getPreRequest() { - return null; - } - public void putReceivedData(SegmentData segmentData, String localZone) { writeLock.lock(); try { diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java index ebf3ed219..285c656d3 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java @@ -154,20 +154,6 @@ private void handleTask(TaskEvent event) { return; } - Object preRequest = syncTask.getPreRequest(); - if (preRequest != null) { - Object preResult = client.invokeSync(preRequest); - if (!(preResult instanceof RegisterResponse)) { - LOGGER.warn("[register] result type is wrong, {}", preResult); - return; - } - RegisterResponse preResponse = (RegisterResponse) preResult; - if (!preResponse.isSuccess()) { - LOGGER.info("[register] register to server failed, {}, {}", preRequest, - preResponse); - return; - } - } Object request = syncTask.getRequest(); Object result = client.invokeSync(request); diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java b/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java deleted file mode 100644 index 830a69db5..000000000 --- a/core/src/main/java/com/alipay/sofa/registry/core/model/AssembleType.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.core.model; - -/** - * - * @author xiaojian.xj - * @version $Id: AssembleType.java, v 0.1 2020年10月27日 01:51 xiaojian.xj Exp $ - */ -public enum AssembleType { - - /** sub app: only sub data where dataId = app */ - sub_app, - - /** sub interface: only sub data where dataId = interface */ - sub_interface, - - /** sub app and interface: sub data from app and interface */ - sub_app_and_interface, ; - - public static boolean contains(String name) { - for (AssembleType subDataType : values()) { - if (subDataType.name().equals(name)) { - return true; - } - } - return false; - } -} \ No newline at end of file diff --git a/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java b/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java index a3e990fe5..11d117ce8 100644 --- a/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java +++ b/core/src/main/java/com/alipay/sofa/registry/core/model/SubscriberRegister.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.core.model; /** - * * @author zhuoyu.sjw * @version $Id: SubscriberRegister.java, v 0.1 2017-11-28 15:40 zhuoyu.sjw Exp $$ */ @@ -26,13 +25,6 @@ public class SubscriberRegister extends BaseRegister { private String scope; - /** - * interface: only sub interface - * app: only sub app - * app_and_interface: sub app and interface - */ - private String assembleType; - /** * Getter method for property scope. * @@ -51,24 +43,6 @@ public void setScope(String scope) { this.scope = scope; } - /** - * Getter method for property assembleType. - * - * @return property value of assembleType - */ - public String getAssembleType() { - return assembleType; - } - - /** - * Setter method for property assembleType. - * - * @param assembleType value to be assigned to property assembleType - */ - public void setAssembleType(String assembleType) { - this.assembleType = assembleType; - } - /** * To string string. * diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/SubscriberUtils.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/SubscriberUtils.java index 6e3f86a94..4255a33b6 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/SubscriberUtils.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/SubscriberUtils.java @@ -17,7 +17,6 @@ package com.alipay.sofa.registry.common.model; import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -47,20 +46,18 @@ public static Map> groupBySourceAddre return ret; } - public static Map>> groupByAssembleAndScope(Collection subscribers) { + public static Map> groupByScope(Collection subscribers) { if (subscribers.isEmpty()) { return Collections.emptyMap(); } - Map>> ret = Maps.newHashMap(); + Map> ret = Maps.newHashMap(); for (Subscriber subscriber : subscribers) { - final AssembleType assembleType = subscriber.getAssembleType(); final ScopeEnum scopeEnum = subscriber.getScope(); - if (assembleType == null || scopeEnum == null) { - LOGGER.warn("Nil AssembleType or ScopeEnum, {}", subscriber); + if (scopeEnum == null) { + LOGGER.warn("Nil ScopeEnum, {}", subscriber); continue; } - Map> assembleTypeMapMap = ret.computeIfAbsent(assembleType, k -> Maps.newHashMap()); - List subList = assembleTypeMapMap.computeIfAbsent(scopeEnum, k -> Lists.newArrayList()); + List subList = ret.computeIfAbsent(scopeEnum, k -> Lists.newArrayList()); subList.add(subscriber); } return ret; @@ -68,7 +65,7 @@ public static Map>> groupByAssembl public static Set getPushedDataInfoIds(Collection subscribers) { final Set ret = new HashSet<>(256); - subscribers.forEach(s -> ret.addAll(s.getPushedDataInfoIds())); + subscribers.forEach(s -> ret.add(s.getDataInfoId())); return ret; } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPb.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPb.java index e6824617a..f1dda707b 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPb.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPb.java @@ -34,7 +34,6 @@ private SubscriberRegisterPb(com.google.protobuf.GeneratedMessageV3.Builder b private SubscriberRegisterPb() { scope_ = ""; - assembleType_ = ""; } @java.lang.Override @@ -88,12 +87,6 @@ private SubscriberRegisterPb(com.google.protobuf.CodedInputStream input, break; } - case 26: { - java.lang.String s = input.readStringRequireUtf8(); - - assembleType_ = s; - break; - } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -176,39 +169,6 @@ public com.alipay.sofa.registry.common.model.client.pb.BaseRegisterPbOrBuilder g return getBaseRegister(); } - public static final int ASSEMBLETYPE_FIELD_NUMBER = 3; - private volatile java.lang.Object assembleType_; - - /** - * string assembleType = 3; - */ - public java.lang.String getAssembleType() { - java.lang.Object ref = assembleType_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - assembleType_ = s; - return s; - } - } - - /** - * string assembleType = 3; - */ - public com.google.protobuf.ByteString getAssembleTypeBytes() { - java.lang.Object ref = assembleType_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = com.google.protobuf.ByteString - .copyFromUtf8((java.lang.String) ref); - assembleType_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -229,9 +189,6 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io if (baseRegister_ != null) { output.writeMessage(2, getBaseRegister()); } - if (!getAssembleTypeBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 3, assembleType_); - } unknownFields.writeTo(output); } @@ -247,9 +204,6 @@ public int getSerializedSize() { if (baseRegister_ != null) { size += com.google.protobuf.CodedOutputStream.computeMessageSize(2, getBaseRegister()); } - if (!getAssembleTypeBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, assembleType_); - } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -271,7 +225,6 @@ public boolean equals(final java.lang.Object obj) { if (hasBaseRegister()) { result = result && getBaseRegister().equals(other.getBaseRegister()); } - result = result && getAssembleType().equals(other.getAssembleType()); result = result && unknownFields.equals(other.unknownFields); return result; } @@ -289,8 +242,6 @@ public int hashCode() { hash = (37 * hash) + BASEREGISTER_FIELD_NUMBER; hash = (53 * hash) + getBaseRegister().hashCode(); } - hash = (37 * hash) + ASSEMBLETYPE_FIELD_NUMBER; - hash = (53 * hash) + getAssembleType().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -431,8 +382,6 @@ public Builder clear() { baseRegister_ = null; baseRegisterBuilder_ = null; } - assembleType_ = ""; - return this; } @@ -462,7 +411,6 @@ public com.alipay.sofa.registry.common.model.client.pb.SubscriberRegisterPb buil } else { result.baseRegister_ = baseRegisterBuilder_.build(); } - result.assembleType_ = assembleType_; onBuilt(); return result; } @@ -514,10 +462,6 @@ public Builder mergeFrom(com.alipay.sofa.registry.common.model.client.pb.Subscri if (other.hasBaseRegister()) { mergeBaseRegister(other.getBaseRegister()); } - if (!other.getAssembleType().isEmpty()) { - assembleType_ = other.assembleType_; - onChanged(); - } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -734,75 +678,6 @@ private com.google.protobuf.SingleFieldBuilderV3string assembleType = 3; - */ - public java.lang.String getAssembleType() { - java.lang.Object ref = assembleType_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - assembleType_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - - /** - * string assembleType = 3; - */ - public com.google.protobuf.ByteString getAssembleTypeBytes() { - java.lang.Object ref = assembleType_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = com.google.protobuf.ByteString - .copyFromUtf8((java.lang.String) ref); - assembleType_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - /** - * string assembleType = 3; - */ - public Builder setAssembleType(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - assembleType_ = value; - onChanged(); - return this; - } - - /** - * string assembleType = 3; - */ - public Builder clearAssembleType() { - - assembleType_ = getDefaultInstance().getAssembleType(); - onChanged(); - return this; - } - - /** - * string assembleType = 3; - */ - public Builder setAssembleTypeBytes(com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - assembleType_ = value; - onChanged(); - return this; - } - public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFieldsProto3(unknownFields); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOrBuilder.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOrBuilder.java index 4141b45e7..a23d4091b 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOrBuilder.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOrBuilder.java @@ -47,14 +47,4 @@ public interface SubscriberRegisterPbOrBuilder extends * .BaseRegisterPb baseRegister = 2; */ com.alipay.sofa.registry.common.model.client.pb.BaseRegisterPbOrBuilder getBaseRegisterOrBuilder(); - - /** - * string assembleType = 3; - */ - java.lang.String getAssembleType(); - - /** - * string assembleType = 3; - */ - com.google.protobuf.ByteString getAssembleTypeBytes(); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOuterClass.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOuterClass.java index 7375166eb..f7e24cc8c 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOuterClass.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/client/pb/SubscriberRegisterPbOuterClass.java @@ -40,11 +40,10 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { private static com.google.protobuf.Descriptors.FileDescriptor descriptor; static { java.lang.String[] descriptorData = { "\n\032SubscriberRegisterPb.proto\032\024BaseRegist" - + "erPb.proto\"b\n\024SubscriberRegisterPb\022\r\n\005sc" + + "erPb.proto\"L\n\024SubscriberRegisterPb\022\r\n\005sc" + "ope\030\001 \001(\t\022%\n\014baseRegister\030\002 \001(\0132\017.BaseRe" - + "gisterPb\022\024\n\014assembleType\030\003 \001(\tB7\n/com.al" - + "ipay.sofa.registry.common.model.client.p" - + "bP\001Z\002pbb\006proto3" }; + + "gisterPbB7\n/com.alipay.sofa.registry.com" + + "mon.model.client.pbP\001Z\002pbb\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; @@ -59,7 +58,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protob internal_static_SubscriberRegisterPb_descriptor = getDescriptor().getMessageTypes().get(0); internal_static_SubscriberRegisterPb_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_SubscriberRegisterPb_descriptor, new java.lang.String[] { "Scope", - "BaseRegister", "AssembleType", }); + "BaseRegister", }); com.alipay.sofa.registry.common.model.client.pb.BaseRegisterPbOuterClass.getDescriptor(); } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java index 01401a2bb..74787beeb 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java @@ -71,8 +71,6 @@ public class ValueConstants { public static final String DATA_SESSION_LEASE_SEC = "data.session.lease.sec#@#9600#@#CONFIG"; - public static final String SOFA_APP = "SOFA_APP"; - public static final String DISABLE_DATA_ID_CASE_SENSITIVE_SWITCH = "disable.dataId.case.sensitive"; /** * switch for dataId sensitive is disable or not, default value is false which means dataId is case sensitive diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/DataInfo.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/DataInfo.java index ac962b653..132a4e1f2 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/DataInfo.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/DataInfo.java @@ -189,7 +189,4 @@ public int hashCode() { return Objects.hash(dataInfoId); } - public boolean typeIsSofaApp() { - return ValueConstants.SOFA_APP.equals(dataType); - } } \ No newline at end of file diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java index 2321fb216..c78f904b8 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/Subscriber.java @@ -19,7 +19,6 @@ import java.util.*; import com.alipay.sofa.registry.common.model.ElementType; -import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -44,7 +43,6 @@ public class Subscriber extends BaseInfo { /** * */ - private AssembleType assembleType; /** * last push context @@ -73,82 +71,20 @@ public ElementType getElementType() { return elementType; } - /** - * Getter method for property assembleType. - * - * @return property value of assembleType - */ - public AssembleType getAssembleType() { - return assembleType; - } - - /** - * Setter method for property assembleType. - * - * @param assembleType value to be assigned to property assembleType - */ - public void setAssembleType(AssembleType assembleType) { - this.assembleType = assembleType; - } - - public synchronized Set getPushedDataInfoIds() { - final Set ret = new HashSet<>(4); - for (PushContext ctx : lastPushContexts.values()) { - ret.addAll(ctx.pushDatums.keySet()); - } - return ret; - } - - // check the version of one dataInfoId - public synchronized boolean checkVersion(String dataCenter, String dataInfoId, long version) { - final PushContext ctx = lastPushContexts.get(dataCenter); - if (ctx == null || ctx.fetchSeqEnd == 0) { - return true; - } - - DatumPushContext datumCtx = ctx.pushDatums.get(dataInfoId); - if (datumCtx == null) { - return true; - } - return datumCtx.version < version; - } - - // check all the version - public synchronized boolean checkVersions(String dataCenter, Map datumVersions) { + // check the version + public synchronized boolean checkVersion(String dataCenter, long version) { final PushContext ctx = lastPushContexts.get(dataCenter); - if (ctx == null || ctx.fetchSeqEnd == 0) { + if (ctx == null) { return true; } - - // has diff dataInfoId - if (!datumVersions.keySet().equals(ctx.pushDatums.keySet())) { - return true; - } - for (Map.Entry version : datumVersions.entrySet()) { - DatumPushContext datumCtx = ctx.pushDatums.get(version.getKey()); - if (datumCtx.version < version.getValue()) { - return true; - } - } - return false; + return ctx.pushVersion < version; } - public synchronized boolean checkAndUpdateVersion(String dataCenter, long pushVersion, Map datumVersion, - long fetchSeqStart, long fetchSeqEnd) { + public synchronized boolean checkAndUpdateVersion(String dataCenter, long pushVersion) { final PushContext ctx = lastPushContexts.computeIfAbsent(dataCenter, k -> new PushContext()); - if (ctx.pushVersion < pushVersion && ctx.fetchSeqEnd <= fetchSeqStart) { + if (ctx.pushVersion < pushVersion) { ctx.pushVersion = pushVersion; - ctx.fetchSeqEnd = fetchSeqEnd; - ctx.update(datumVersion); - return true; - } - return false; - } - - public synchronized boolean checkVersion(String dataCenter, long fetchSeqStart) { - final PushContext ctx = lastPushContexts.get(dataCenter); - if (ctx == null || ctx.fetchSeqEnd <= fetchSeqStart) { return true; } return false; @@ -160,7 +96,7 @@ public synchronized boolean hasPushed() { return false; } for (PushContext ctx : lastPushContexts.values()) { - if (ctx.fetchSeqEnd != 0) { + if (ctx.pushVersion != 0) { return true; } } @@ -216,35 +152,12 @@ public static Subscriber internSubscriber(Subscriber subscriber) { return subscriber; } - private static class DatumPushContext { - final long version; - - DatumPushContext(long version) { - this.version = version; - } - - @Override - public String toString() { - return "DatumPushContext{" + "version=" + version + '}'; - } - } - private static class PushContext { - //dataInfoId as key - Map pushDatums = Collections.emptyMap(); - long pushVersion; - long fetchSeqEnd; - - void update(Map datumVersions) { - Map map = new HashMap<>(datumVersions.size()); - datumVersions.forEach((k, v) -> map.put(k, new DatumPushContext(v))); - this.pushDatums = map; - } + long pushVersion; @Override public String toString() { - return "PushContext{" + "pushVersion=" + pushVersion + ", fetchSeqEnd=" + fetchSeqEnd - + ", pushDatums=" + pushDatums + '}'; + return "PushContext{" + "pushVersion=" + pushVersion + '}'; } } diff --git a/server/common/model/src/main/resources/proto/SubscriberRegisterPb.proto b/server/common/model/src/main/resources/proto/SubscriberRegisterPb.proto index dd6dcb4cc..f541683f3 100644 --- a/server/common/model/src/main/resources/proto/SubscriberRegisterPb.proto +++ b/server/common/model/src/main/resources/proto/SubscriberRegisterPb.proto @@ -8,6 +8,5 @@ import "BaseRegisterPb.proto"; message SubscriberRegisterPb { string scope = 1; BaseRegisterPb baseRegister = 2; - string assembleType = 3; } diff --git a/server/common/model/src/test/java/com/alipay/sofa/registry/common/model/store/SubTest.java b/server/common/model/src/test/java/com/alipay/sofa/registry/common/model/store/SubTest.java index 6b96e1754..56b38082a 100644 --- a/server/common/model/src/test/java/com/alipay/sofa/registry/common/model/store/SubTest.java +++ b/server/common/model/src/test/java/com/alipay/sofa/registry/common/model/store/SubTest.java @@ -22,7 +22,6 @@ import java.util.Map; /** - * * @author yuzhi.lyz * @version v 0.1 2020-12-26 11:46 yuzhi.lyz Exp $ */ @@ -49,7 +48,7 @@ public static void main(String[] args) { private static boolean test1(Map map, String dataCenter, long ver) { for (Subscriber s : map.values()) { - if (s.checkVersions(dataCenter, Collections.singletonMap("aa", ver))) { + if (s.checkVersion(dataCenter, ver)) { return true; } } @@ -60,7 +59,7 @@ private static void initMap(Map map, int count) { String key = String.valueOf(System.currentTimeMillis()); for (int i = 0; i < count; i++) { Subscriber sub = new Subscriber(); - sub.checkVersions(dataCenter, Collections.singletonMap("aa", 10L)); + sub.checkVersion(dataCenter, 10L); map.put(key + "_" + i, sub); } } diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java deleted file mode 100644 index 4d8d30098..000000000 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.converter; - -import com.alipay.sofa.registry.core.model.AssembleType; - -/** - * - * @author xiaojian.xj - * @version $Id: AssembleTypeConverter.java, v 0.1 2020年10月27日 02:06 xiaojian.xj Exp $ - */ -public class AssembleTypeConverter { - - /** - * subType convert func - * @param subType - * @return - */ - public static AssembleType convertToSubType(String subType) { - if (AssembleType.contains(subType)) { - return AssembleType.valueOf(subType); - } - return AssembleType.sub_app_and_interface; - } -} \ No newline at end of file diff --git a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD b/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD deleted file mode 100644 index 4d8d30098..000000000 --- a/server/common/util/src/main/java/com/alipay/sofa/registry/converter/AssembleTypeConverter.java~HEAD +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.converter; - -import com.alipay.sofa.registry.core.model.AssembleType; - -/** - * - * @author xiaojian.xj - * @version $Id: AssembleTypeConverter.java, v 0.1 2020年10月27日 02:06 xiaojian.xj Exp $ - */ -public class AssembleTypeConverter { - - /** - * subType convert func - * @param subType - * @return - */ - public static AssembleType convertToSubType(String subType) { - if (AssembleType.contains(subType)) { - return AssembleType.valueOf(subType); - } - return AssembleType.sub_app_and_interface; - } -} \ No newline at end of file diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java index eff461c8a..cddbc1db0 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java @@ -22,7 +22,6 @@ import com.alipay.sofa.registry.common.model.dataserver.DatumVersion; import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest; import com.alipay.sofa.registry.common.model.sessionserver.DataPushRequest; -import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -34,7 +33,9 @@ import com.alipay.sofa.registry.task.KeyedThreadPoolExecutor; import com.alipay.sofa.registry.util.ConcurrentUtils; import com.alipay.sofa.registry.util.LoopRunnable; + import static com.alipay.sofa.registry.server.data.change.ChangeMetrics.*; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -343,20 +344,9 @@ public void runUnthrowable() { final Set revisions = new HashSet<>(64); final String dataCenter = event.getDataCenter(); for (String dataInfoId : event.getDataInfoIds()) { - final DataInfo dataInfo = DataInfo.valueOf(dataInfoId); - if (dataInfo.typeIsSofaApp()) { - // get datum is slower than get version - Datum datum = datumCache.get(dataCenter, dataInfoId); - if (datum != null) { - changes.put(dataInfoId, DatumVersion.of(datum.getVersion())); - revisions.addAll(datum.revisions()); - } - } else { - DatumVersion datumVersion = datumCache.getVersion(dataCenter, - dataInfoId); - if (datumVersion != null) { - changes.put(dataInfoId, datumVersion); - } + DatumVersion datumVersion = datumCache.getVersion(dataCenter, dataInfoId); + if (datumVersion != null) { + changes.put(dataInfoId, datumVersion); } } if (changes.isEmpty()) { diff --git a/server/server/meta/src/test/java/com/alipay/sofa/registry/AllTests.java b/server/server/meta/src/test/java/com/alipay/sofa/registry/AllTests.java index 8a730e91b..eae9a097e 100644 --- a/server/server/meta/src/test/java/com/alipay/sofa/registry/AllTests.java +++ b/server/server/meta/src/test/java/com/alipay/sofa/registry/AllTests.java @@ -64,9 +64,10 @@ DefaultProvideDataNotifierTest.class, LocalSlotManagerTest.class, NodeModifiedTest.class, BalanceTaskTest.class, TestAbstractNodeEventTest.class, DefaultSlotManagerTest.class, - DefaultSlotTableMonitorTest.class, DiskSlotTableRecorderTest.class, + DefaultSlotTableMonitorTest.class, + DiskSlotTableRecorderTest.class, SlotTableResourceTest.class, - //SlotMigrationIntegrationTest.class, + //SlotMigrationIntegrationTest.class, SlotTableBuilderTest.class, SlotBuilderTest.class, DataNodeComparatorTest.class, MigrateSlotGroupTest.class, ScheduledSlotArrangerTest.class, LeaderOnlyBalancerTest.class }) diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java deleted file mode 100644 index dd3da8043..000000000 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/AppPublisherConverter.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.server.session.converter; - -import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; -import com.alipay.sofa.registry.common.model.ServerDataBox; -import com.alipay.sofa.registry.common.model.store.AppPublisher; -import com.alipay.sofa.registry.common.model.store.AppRevision; -import com.alipay.sofa.registry.common.model.store.DataInfo; -import com.alipay.sofa.registry.common.model.store.Publisher; -import com.alipay.sofa.registry.core.model.AppRevisionInterface; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; -import com.alipay.sofa.registry.server.shared.util.AddressUtil; -import com.google.common.collect.ArrayListMultimap; -import org.springframework.util.CollectionUtils; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * @author xiaojian.xj - * @version $Id: AppPublisherConverter.java, v 0.1 2020年11月24日 20:50 xiaojian.xj Exp $ - */ -public class AppPublisherConverter { - private static final Logger LOG = LoggerFactory.getLogger(AppPublisherConverter.class); - - public static Publisher convert(AppPublisher appPublisher, - AppRevisionCacheRegistry appRevisionCacheRegistry, - DataInfo dataInfo) { - Publisher publisher = new Publisher(); - String dataInfoId = dataInfo.getDataInfoId(); - fillCommonRegion(publisher, appPublisher, dataInfo); - List dataList = new ArrayList<>(); - for (AppRegisterServerDataBox appRegisterServerDataBox : appPublisher.getAppDataList()) { - AppRevision revisionRegister = appRevisionCacheRegistry - .getRevision(appRegisterServerDataBox.getRevision()); - if (revisionRegister == null) { - LOG.error("Can not find revision {}, url {}", - appRegisterServerDataBox.getRevision(), appRegisterServerDataBox.getUrl()); - throw new RuntimeException(String.format("can not find revision %s, url %s", - appRegisterServerDataBox.getRevision(), appRegisterServerDataBox.getUrl())); - } - - if (!revisionRegister.getInterfaceMap().containsKey(dataInfoId)) { - continue; - } - Map> params = extractParams(revisionRegister, - appRegisterServerDataBox, dataInfoId); - ServerDataBox serverDataBox = new ServerDataBox(AddressUtil.buildURL( - appRegisterServerDataBox.getUrl(), params)); - serverDataBox.object2bytes(); - dataList.add(serverDataBox); - } - publisher.setDataList(dataList); - return publisher; - - } - - private static Map> extractParams(AppRevision revisionRegister, - AppRegisterServerDataBox serverDataBox, - String dataInfoId) { - ArrayListMultimap multimap = ArrayListMultimap.create(); - - combineParams(revisionRegister.getBaseParams(), multimap); - combineParams(serverDataBox.getBaseParams(), multimap); - - if (!CollectionUtils.isEmpty(revisionRegister.getInterfaceMap())) { - AppRevisionInterface appRevisionInterface = revisionRegister.getInterfaceMap().get( - dataInfoId); - if (appRevisionInterface != null) { - combineParams(appRevisionInterface.getServiceParams(), multimap); - } - } - if (!CollectionUtils.isEmpty(serverDataBox.getInterfaceParams())) { - Map> params = serverDataBox.getInterfaceParams().get(dataInfoId); - combineParams(params, multimap); - } - return multimap.asMap(); - } - - private static void combineParams(Map> params, ArrayListMultimap multimap) { - if (CollectionUtils.isEmpty(params)) { - return; - } - params.forEach((key, value) -> multimap.putAll(key, value)); - } - - private static void fillCommonRegion(Publisher publisher, AppPublisher source, DataInfo dataInfo) { - - publisher.setAppName(source.getAppName()); - //ZONE MUST BE CURRENT SESSION ZONE - publisher.setCell(source.getCell()); - publisher.setClientId(source.getClientId()); - publisher.setDataId(dataInfo.getDataId()); - publisher.setGroup(dataInfo.getDataType()); - publisher.setInstanceId(dataInfo.getInstanceId()); - publisher.setRegisterId(source.getRegisterId()); - publisher.setProcessId(source.getProcessId()); - publisher.setVersion(source.getVersion()); - publisher.setRegisterTimestamp(source.getRegisterTimestamp()); - - publisher.setClientRegisterTimestamp(source.getClientRegisterTimestamp()); - publisher.setSourceAddress(source.getSourceAddress()); - - publisher.setClientVersion(source.getClientVersion()); - publisher.setDataInfoId(dataInfo.getDataInfoId()); - } - -} \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java index eccfc0b80..6a0bc6534 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java @@ -18,7 +18,6 @@ import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; import com.alipay.sofa.registry.common.model.ServerDataBox; -import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.AppPublisher; import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion; import com.alipay.sofa.registry.common.model.store.DataInfo; @@ -96,11 +95,6 @@ public static void fillCommonRegion(Publisher publisher, PublisherRegister sourc * @return */ public static Publisher convert(PublisherRegister publisherRegister) { - - if (StringUtils.equalsIgnoreCase(ValueConstants.SOFA_APP, publisherRegister.getGroup())) { - return appPublisherConverter.convert(publisherRegister); - } - return publisherConverter.convert(publisherRegister); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/ReceivedDataConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/ReceivedDataConverter.java index 15229049e..67b3d05d7 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/ReceivedDataConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/ReceivedDataConverter.java @@ -24,66 +24,32 @@ import java.util.function.Predicate; import com.alipay.sofa.registry.common.model.ServerDataBox; -import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Publisher; -import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.core.model.DataBox; import com.alipay.sofa.registry.core.model.ReceivedConfigData; import com.alipay.sofa.registry.core.model.ReceivedData; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.util.DatumVersionUtil; /** * The type Received data converter. + * * @author shangyu.wh * @version $Id : ReceivedDataConverter.java, v 0.1 2017-12-13 13:42 shangyu.wh Exp $ */ public class ReceivedDataConverter { private static final Logger LOGGER = LoggerFactory.getLogger(ReceivedDataConverter.class); - /** - * convert for no datum from data node,just use for first get no datum - * if push empty data to subscriber must reset version,avoid client ignore this empty push - * @param dataId - * @param group - * @param instanceId - * @param dataCenter - * @param scope - * @param subscriberRegisterIdList - * @param regionLocal - * @return - */ - public static ReceivedData getReceivedDataMulti(String dataId, String group, String instanceId, - String dataCenter, ScopeEnum scope, - List subscriberRegisterIdList, - String regionLocal) { - ReceivedData receivedData = new ReceivedData(); - receivedData.setDataId(dataId); - receivedData.setGroup(group); - receivedData.setInstanceId(instanceId); - receivedData.setSubscriberRegistIds(subscriberRegisterIdList); - receivedData.setSegment(dataCenter); - receivedData.setScope(scope.name()); - //no datum set return version as mini as,avoid old client check - receivedData.setVersion(ValueConstants.DEFAULT_NO_DATUM_VERSION); - - receivedData.setLocalZone(regionLocal); - - Map> swizzMap = new HashMap<>(); - receivedData.setData(swizzMap); - return receivedData; - } - /** * Standard RunEnv - * @param datum the datum - * @param scope the scope - * @param subscriberRegisterIdList the subscriber register id list - * @param regionLocal the region local + * + * @param datum the datum + * @param scope the scope + * @param subscriberRegisterIdList the subscriber register id list + * @param regionLocal the region local * @return received data multi */ public static ReceivedData getReceivedDataMulti(Datum datum, ScopeEnum scope, @@ -152,100 +118,6 @@ private static void fillRegionDatas(List regionDatas, List datums, ScopeEnum scope, - List subscriberRegisterIdList, - Subscriber subscriber) { - ReceivedData receivedData = new ReceivedData(); - receivedData.setDataId(subscriber.getDataId()); - receivedData.setGroup(subscriber.getGroup()); - receivedData.setInstanceId(subscriber.getInstanceId()); - receivedData.setSubscriberRegistIds(subscriberRegisterIdList); - receivedData.setSegment(ValueConstants.DEFAULT_DATA_CENTER); - receivedData.setScope(scope.name()); - - String regionLocal = subscriber.getCell(); - receivedData.setLocalZone(regionLocal); - - receivedData.setVersion(DatumVersionUtil.nextId()); - - Map> swizzMap = new HashMap<>(); - - for (Entry entry : datums.entrySet()) { - Datum datum = entry.getValue(); - - Map publisherMap = datum.getPubMap(); - if (publisherMap.isEmpty()) { - continue; - } - - for (Entry publishers : publisherMap.entrySet()) { - Publisher publisher = publishers.getValue(); - List datas = publisher.getDataList(); - - String region = publisher.getCell(); - - if (ScopeEnum.zone == scope && !regionLocal.equals(region)) { - // zone scope subscribe only return zone list - continue; - } - - if (null == datas) { - datas = new ArrayList<>(); - } - - List regionDatas = swizzMap.computeIfAbsent(region, - k -> new ArrayList<>()); - fillRegionDatas(regionDatas, datas); - - } - } - - receivedData.setData(swizzMap); - return receivedData; - } - - /** - * Gets merge datum. - * - * @param datumMap the datum map - * @return the merge datum - */ - public static Datum getMergeDatum(Map datumMap) { - Datum merge = null; - Map mergePublisherMap = new HashMap<>(); - long version = 0; - for (Datum datum : datumMap.values()) { - if (datum.getDataId() == null) { - LOGGER.error("ReceivedData convert error,datum dataId is null,datum={}", datum); - continue; - } - if (null == merge) { - //new Datum avoid to change datumMap - merge = new Datum(datum.getDataInfoId(), datum.getDataCenter()); - merge.setDataId(datum.getDataId()); - merge.setGroup(datum.getGroup()); - merge.setInstanceId(datum.getInstanceId()); - } - mergePublisherMap.putAll(datum.getPubMap()); - version = Math.max(version, datum.getVersion()); - } - if (null == merge) { - return null; - } - merge.setVersion(version); - merge.addPublishers(mergePublisherMap); - merge.setDataCenter(ValueConstants.DEFAULT_DATA_CENTER); - return merge; - } - public static ReceivedConfigData getReceivedConfigData(ServerDataBox dataBox, DataInfo dataInfo, Long version) { ReceivedConfigData receivedConfigData = new ReceivedConfigData(); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java index db8fecf9e..5a8a4380f 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/SubscriberConverter.java @@ -21,7 +21,6 @@ import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.common.model.store.Watcher; -import com.alipay.sofa.registry.converter.AssembleTypeConverter; import com.alipay.sofa.registry.converter.ScopeEnumConverter; import com.alipay.sofa.registry.core.model.ConfiguratorRegister; import com.alipay.sofa.registry.core.model.SubscriberRegister; @@ -64,7 +63,6 @@ public static Subscriber convert(SubscriberRegister subscriberRegister) { source.getGroup()); subscriber.setDataInfoId(dataInfo.getDataInfoId()); - subscriber.setAssembleType(AssembleTypeConverter.convertToSubType(subscriberRegister.getAssembleType())); return subscriber; }; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/pb/SubscriberRegisterConvertor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/pb/SubscriberRegisterConvertor.java index d961bb28b..b9c012707 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/pb/SubscriberRegisterConvertor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/pb/SubscriberRegisterConvertor.java @@ -52,7 +52,6 @@ public static SubscriberRegister convert2Java(SubscriberRegisterPb subscriberReg subscriberRegister.setAttributes(subscriberRegisterPb.getBaseRegister().getAttributesMap()); subscriberRegister.setScope(subscriberRegisterPb.getScope()); - subscriberRegister.setAssembleType(subscriberRegisterPb.getAssembleType()); return subscriberRegister; } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java index 476534104..b27bc79d8 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java @@ -21,7 +21,6 @@ import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -31,10 +30,10 @@ import com.alipay.sofa.registry.server.shared.util.DatumUtils; import com.alipay.sofa.registry.task.KeyedPreemptThreadPoolExecutor; import com.alipay.sofa.registry.task.KeyedThreadPoolExecutor; -import com.alipay.sofa.registry.util.DatumVersionUtil; + import static com.alipay.sofa.registry.server.session.push.PushMetrics.Fetch.*; + import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -43,7 +42,6 @@ import java.util.*; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Predicate; public class FirePushService { public static final long EXCEPT_MIN_VERSION = Long.MIN_VALUE; @@ -59,17 +57,12 @@ public class FirePushService { @Autowired private Interests sessionInterests; - @Autowired - private AppRevisionCacheRegistry appRevisionCacheRegistry; - private KeyedPreemptThreadPoolExecutor changeFetchExecutor; private KeyedThreadPoolExecutor registerFetchExecutor; @Autowired private PushProcessor pushProcessor; - private final AtomicLong fetchSeq = new AtomicLong(); - @PostConstruct public void init() { changeFetchExecutor = new KeyedPreemptThreadPoolExecutor("ChangeFetchExecutor", @@ -100,11 +93,9 @@ public boolean fireOnChange(String dataCenter, String dataInfoId, long expectVer } public boolean fireOnPushEmpty(Subscriber subscriber) { - processPush(true, DatumVersionUtil.nextId(), getDataCenterWhenPushEmpty(), - Collections.emptyMap(), Collections.singletonList(subscriber), Long.MAX_VALUE, - Long.MAX_VALUE); + Datum emptyDatum = DatumUtils.newEmptyDatum(subscriber, getDataCenterWhenPushEmpty()); + processPush(true, emptyDatum, Collections.singletonList(subscriber)); PUSH_EMPTY_COUNTER.inc(); - // use Long.MAX_VALUE as fetch.seq, could not push again after push empty LOGGER.info("firePushEmpty, {}", subscriber); return true; } @@ -127,15 +118,9 @@ public boolean fireOnRegister(Subscriber subscriber) { public boolean fireOnDatum(Datum datum) { DataInfo dataInfo = DataInfo.valueOf(datum.getDataInfoId()); - if (dataInfo.typeIsSofaApp()) { - LOGGER.error("unsupported DataType when fireOnDatum {}", dataInfo); - return false; - } Collection subscribers = sessionInterests.getInterestOfDatum(dataInfo .getDataInfoId()); - processPush(true, datum.getVersion(), datum.getDataCenter(), - Collections.singletonMap(datum.getDataInfoId(), datum), subscribers, - fetchSeq.incrementAndGet(), fetchSeq.incrementAndGet()); + processPush(true, datum, subscribers); PUSH_TEMP_COUNTER.inc(); return true; } @@ -146,7 +131,6 @@ protected String getDataCenterWhenPushEmpty() { } private void doExecuteOnChange(String dataCenter, String changeDataInfoId, long expectVersion) { - final long fetchSeqStart = fetchSeq.incrementAndGet(); final Datum datum = getDatum(dataCenter, changeDataInfoId, expectVersion); if (expectVersion != EXCEPT_MIN_VERSION) { if (datum == null) { @@ -166,151 +150,27 @@ private void doExecuteOnChange(String dataCenter, String changeDataInfoId, long } DataInfo dataInfo = DataInfo.valueOf(changeDataInfoId); - if (dataInfo.typeIsSofaApp()) { - if (datum != null) { - final Set revisions = datum.revisions(); - appRevisionCacheRegistry.refreshMeta(revisions); - } - onAppDatumChange(dataInfo, datum, fetchSeqStart, dataCenter); - } else { - onInterfaceDatumChange(dataInfo, datum, fetchSeqStart, dataCenter); - } + onDatumChange(dataInfo, datum, dataCenter); } - private void onAppDatumChange(DataInfo appDataInfo, Datum appDatum, long fetchSeqStart, String dataCenter) { - //dataInfoId is app, get relate interfaces dataInfoId from cache - Set interfaceInfoIds = appRevisionCacheRegistry.getInterfaces(appDataInfo.getDataId()); - - if (CollectionUtils.isEmpty(interfaceInfoIds)) { - LOGGER.warn("App no interfaces {}", appDataInfo.getDataInfoId()); - return; - } - for (String interfaceDataInfoId : interfaceInfoIds) { - Map>> groups = SubscriberUtils - .groupByAssembleAndScope(sessionInterests.getDatas(interfaceDataInfoId)); - if (groups.isEmpty()) { - continue; - } - for (Map.Entry>> group : groups - .entrySet()) { - final AssembleType assembleType = group.getKey(); - final Map datumMap = Maps.newHashMap(); - collect(datumMap, appDatum); - - switch (assembleType) { - // not care app change - case sub_interface: - continue; - case sub_app_and_interface: { - // not collect self, self has collected - datumMap.putAll(getAppDatumsOfInterface(interfaceDataInfoId, dataCenter, - appDataInfo.getInstanceId(), t -> !t.equals(appDataInfo.getDataId()))); - // add interface datum - Datum interfaceDatum = getDatum(dataCenter, interfaceDataInfoId, Long.MIN_VALUE); - collect(datumMap, interfaceDatum); - break; - } - case sub_app: { - // not collect self, self has collected - datumMap.putAll(getAppDatumsOfInterface(interfaceDataInfoId, dataCenter, - appDataInfo.getInstanceId(), t -> !t.equals(appDataInfo.getDataId()))); - break; - } - default: { - LOGGER.error("unsupported AssembleType:" + assembleType); - continue; - } - } - final long pushVersion = DatumVersionUtil.nextId(); - final long fetchEndSeq = fetchSeq.incrementAndGet(); - // push1.fetchSeq.start > push2.fetchSeq.end, means - // 1. push1.datum > push2.datum - // 2. push1.pushVersion > push2.pushVersion - if (CollectionUtils.isEmpty(datumMap)) { - // TODO datum changed, but - LOGGER.warn("empty push {}, dataCenter={}", interfaceDataInfoId, dataCenter); - } - for (Map.Entry> scopes : group.getValue().entrySet()) { - processPush(false, pushVersion, dataCenter, datumMap, scopes.getValue(), - fetchSeqStart, fetchEndSeq); - } - } - } - } + private void onDatumChange(DataInfo dataInfo, Datum datum, String dataCenter) { - private Map getAppDatumsOfInterface(String interfaceDataInfoId, - String dataCenter, String instanceId, - Predicate predicate) { - Set appDataIds = appRevisionCacheRegistry.getAppRevisions(interfaceDataInfoId) - .keySet(); - if (CollectionUtils.isEmpty(appDataIds)) { - return Collections.emptyMap(); + Map> scopes = SubscriberUtils.groupByScope(sessionInterests + .getDatas(dataInfo.getDataInfoId())); + if (datum == null) { + datum = DatumUtils.newEmptyDatum(dataInfo, dataCenter); + LOGGER.warn("empty push {}, dataCenter={}", dataInfo.getDataInfoId(), dataCenter); } - Map datumMap = Maps.newHashMap(); - for (String appDataId : appDataIds) { - if (predicate == null || predicate.test(appDataId)) { - String appDataInfoId = DataInfo.toDataInfoId(appDataId, instanceId, - ValueConstants.SOFA_APP); - Datum appDatum = getDatum(dataCenter, appDataInfoId, Long.MIN_VALUE); - collect(datumMap, appDatum); - } + for (Map.Entry> scope : scopes.entrySet()) { + processPush(false, datum, scope.getValue()); } - return datumMap; } - private void onInterfaceDatumChange(DataInfo interfaceDataInfo, Datum interfaceDatum, - long fetchSeqStart, String dataCenter) { - Map>> groups = SubscriberUtils - .groupByAssembleAndScope(sessionInterests.getDatas(interfaceDataInfo.getDataInfoId())); - - for (Map.Entry>> group : groups.entrySet()) { - final AssembleType assembleType = group.getKey(); - final Map datumMap = Maps.newHashMap(); - collect(datumMap, interfaceDatum); - long pushVersion = 0; - switch (assembleType) { - case sub_app: - // not care interface change - continue; - case sub_app_and_interface: { - datumMap.putAll(getAppDatumsOfInterface(interfaceDataInfo.getDataInfoId(), - dataCenter, interfaceDataInfo.getInstanceId(), null)); - break; - } - case sub_interface: { - // only care the interface - if (interfaceDatum != null) { - pushVersion = interfaceDatum.getVersion(); - } - break; - } - default: { - LOGGER.error("unsupported AssembleType:" + assembleType); - continue; - } - } - if (pushVersion <= 0) { - pushVersion = DatumVersionUtil.nextId(); - } - final long fetchSeqEnd = fetchSeq.incrementAndGet(); - if (CollectionUtils.isEmpty(datumMap)) { - LOGGER.warn("empty push {}, dataCenter={}", interfaceDataInfo.getDataInfoId(), - dataCenter); - } - for (Map.Entry> scopes : group.getValue().entrySet()) { - processPush(false, pushVersion, dataCenter, datumMap, scopes.getValue(), - fetchSeqStart, fetchSeqEnd); - } - } - } - - private void processPush(boolean noDelay, long pushVersion, String dataCenter, - Map datumMap, Collection subscriberList, - long fetchSeqStart, long fetchSeqEnd) { + private void processPush(boolean noDelay, Datum datum, Collection subscriberList) { if (subscriberList.isEmpty()) { return; } - subscriberList = subscribersPushCheck(dataCenter, DatumUtils.getVesions(datumMap), + subscriberList = subscribersPushCheck(datum.getDataCenter(), datum.getVersion(), subscriberList); if (CollectionUtils.isEmpty(subscriberList)) { return; @@ -320,8 +180,7 @@ private void processPush(boolean noDelay, long pushVersion, String dataCenter, for (Map.Entry> e : group.entrySet()) { final InetSocketAddress addr = e.getKey(); final Map subscriberMap = e.getValue(); - pushProcessor.firePush(noDelay, pushVersion, dataCenter, addr, subscriberMap, datumMap, - fetchSeqStart, fetchSeqEnd); + pushProcessor.firePush(noDelay, addr, subscriberMap, datum); } } @@ -344,11 +203,11 @@ private Datum getDatum(String dataCenter, String dataInfoId, long expectVersion) return value == null ? null : (Datum) value.getPayload(); } - private List subscribersPushCheck(String dataCenter, Map versions, + private List subscribersPushCheck(String dataCenter, Long version, Collection subscribers) { List subscribersSend = Lists.newArrayList(); for (Subscriber subscriber : subscribers) { - if (subscriber.checkVersions(dataCenter, versions)) { + if (subscriber.checkVersion(dataCenter, version)) { subscribersSend.add(subscriber); } } @@ -378,62 +237,17 @@ public void run() { } private void doExecuteOnSubscriber(String dataCenter, Subscriber subscriber) { - final AssembleType assembleType = subscriber.getAssembleType(); final String subDataInfoId = subscriber.getDataInfoId(); - final long fetchSeqStart = fetchSeq.incrementAndGet(); - long pushVersion = 0; - final Map datumMap = Maps.newHashMap(); - switch (assembleType) { - case sub_interface: { - // only care the interface - Datum datum = getDatum(dataCenter, subDataInfoId, Long.MIN_VALUE); - if (datum != null) { - // sub_interface, use the datum.version as push.version - pushVersion = datum.getVersion(); - } - collect(datumMap, datum); - break; - } - case sub_app_and_interface: { - // try get app - datumMap.putAll(getAppDatumsOfInterface(subDataInfoId, dataCenter, - subscriber.getInstanceId(), null)); - // try get interface - Datum datum = getDatum(dataCenter, subDataInfoId, Long.MIN_VALUE); - collect(datumMap, datum); - break; - } - - case sub_app: { - datumMap.putAll(getAppDatumsOfInterface(subDataInfoId, dataCenter, - subscriber.getInstanceId(), null)); - break; - } - - default: - LOGGER.error("unsupported assembleType {}, {}", assembleType, subscriber); - return; - } - if (pushVersion <= 0) { - pushVersion = DatumVersionUtil.nextId(); - } - final long fetchSeqEnd = fetchSeq.incrementAndGet(); - if (CollectionUtils.isEmpty(datumMap)) { - // subscriber register allow push empty + Datum datum = getDatum(dataCenter, subDataInfoId, Long.MIN_VALUE); + if (datum == null) { + datum = DatumUtils.newEmptyDatum(subscriber, dataCenter); LOGGER.warn("empty push, dataCenter={}, {}", dataCenter, subscriber); } if (subscriber.hasPushed()) { return; } - processPush(true, pushVersion, sessionServerConfig.getSessionServerDataCenter(), datumMap, - Collections.singletonList(subscriber), fetchSeqStart, fetchSeqEnd); - } - - private void collect(Map datumMap, Datum datum) { - if (datum != null) { - datumMap.put(datum.getDataInfoId(), datum); - } + processPush(true, datum, Collections.singletonList(subscriber)); } private final class RegisterTask implements Runnable { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushDataGenerator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushDataGenerator.java index ebe298033..1c5211959 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushDataGenerator.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushDataGenerator.java @@ -21,8 +21,6 @@ import com.alipay.sofa.registry.common.model.store.*; import com.alipay.sofa.registry.core.model.ReceivedData; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; -import com.alipay.sofa.registry.server.session.converter.AppPublisherConverter; import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter; import com.alipay.sofa.registry.server.session.converter.pb.ReceivedDataConvertor; import com.alipay.sofa.registry.server.session.predicate.ZonePredicate; @@ -35,10 +33,7 @@ public class PushDataGenerator { @Autowired - private SessionServerConfig sessionServerConfig; - - @Autowired - private AppRevisionCacheRegistry appRevisionCacheRegistry; + private SessionServerConfig sessionServerConfig; public Object createPushData(Datum datum, Map subscriberMap) { SubscriberUtils.getAndAssertHasSameScope(subscriberMap.values()); @@ -64,30 +59,4 @@ public Object createPushData(Datum datum, Map subscriberMap) return receivedData; } - public Datum mergeDatum(Subscriber subscriber, String dataCenter, Map datumMap, - long pushVersion) { - DataInfo dataInfo = DataInfo.valueOf(subscriber.getDataInfoId()); - Datum ret = new Datum(); - ret.setVersion(pushVersion); - ret.setDataInfoId(dataInfo.getDataInfoId()); - ret.setDataCenter(dataCenter); - ret.setDataId(dataInfo.getDataId()); - ret.setInstanceId(dataInfo.getInstanceId()); - ret.setGroup(dataInfo.getDataType()); - for (Datum datum : datumMap.values()) { - for (Publisher publisher : datum.getPubMap().values()) { - if (publisher instanceof AppPublisher) { - AppPublisher appPublisher = (AppPublisher) publisher; - Publisher newPublisher = AppPublisherConverter.convert(appPublisher, - appRevisionCacheRegistry, dataInfo); - if (newPublisher.getDataList().size() > 0) { - ret.addPublisher(newPublisher); - } - } else { - ret.addPublisher(publisher); - } - } - } - return ret; - } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java index 55a547e2a..90cbf36aa 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java @@ -20,7 +20,6 @@ import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.store.BaseInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; @@ -29,7 +28,6 @@ import com.alipay.sofa.registry.remoting.exchange.RequestChannelClosedException; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; import com.alipay.sofa.registry.server.session.node.service.ClientNodeService; -import com.alipay.sofa.registry.server.shared.util.DatumUtils; import com.alipay.sofa.registry.task.KeyedThreadPoolExecutor; import com.alipay.sofa.registry.task.MetricsableThreadPoolExecutor; import com.alipay.sofa.registry.trace.TraceID; @@ -37,7 +35,9 @@ import com.alipay.sofa.registry.util.WakeUpLoopRunnable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import static com.alipay.sofa.registry.server.session.push.PushMetrics.Push.*; + import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; @@ -90,56 +90,39 @@ private boolean firePush(PushTask pushTask) { PENDING_NEW_COUNTER.inc(); return true; } - boolean conflict = false; - PushTask prev = null; + PushTask prev; pendingLock.lock(); try { prev = pendingTasks.get(key); if (prev == null) { pendingTasks.put(key, pushTask); PENDING_NEW_COUNTER.inc(); - } else if (pushTask.afterThan(prev)) { + } else { // update the expireTimestamp as prev's, avoid the push block by the continues fire pushTask.expireTimestamp = prev.expireTimestamp; pendingTasks.put(key, pushTask); PENDING_REPLACE_COUNTER.inc(); - } else { - conflict = true; } } finally { pendingLock.unlock(); } - if (!conflict) { - if (pushTask.noDelay) { - watchDog.wakeup(); - } - return true; - } else { - PENDING_CONFLICT_COUNTER.inc(); - LOGGER.info("[ConflictPending] key={}, prev={}, {}, prev {}={} > {}-{}", key, - prev.taskID, prev.pushingTaskKey, prev.fetchSeqEnd, pushTask.taskID, - pushTask.fetchSeqStart); - return false; + if (pushTask.noDelay) { + watchDog.wakeup(); } + return true; } - protected List createPushTask(boolean noDelay, long pushVersion, String dataCenter, - InetSocketAddress addr, - Map subscriberMap, - Map datumMap, long fetchStartSeq, - long fetchEndSeq) { - PushTask pushTask = new PushTask(noDelay, pushVersion, dataCenter, addr, subscriberMap, - datumMap, fetchStartSeq, fetchEndSeq); + protected List createPushTask(boolean noDelay, InetSocketAddress addr, + Map subscriberMap, Datum datum) { + PushTask pushTask = new PushTask(noDelay, addr, subscriberMap, datum); // wait to merge to debouncing pushTask.expireAfter(sessionServerConfig.getPushDataTaskDebouncingMillis()); return Collections.singletonList(pushTask); } - void firePush(boolean noDelay, long pushVersion, String dataCenter, InetSocketAddress addr, - Map subscriberMap, Map datumMap, - long fetchSeqStart, long fetchSeqEnd) { - List fires = createPushTask(noDelay, pushVersion, dataCenter, addr, - subscriberMap, datumMap, fetchSeqStart, fetchSeqEnd); + void firePush(boolean noDelay, InetSocketAddress addr, Map subscriberMap, + Datum datum) { + List fires = createPushTask(noDelay, addr, subscriberMap, datum); for (PushTask task : fires) { boolean fire = firePush(task); LOGGER.info("fire push={}, {}", fire, task); @@ -206,22 +189,8 @@ private boolean checkPushing(PushTask task, PushingTaskKey pushingTaskKey) { // check the pushing task final PushTask prev = pushingTasks.get(pushingTaskKey); if (prev == null) { - // check the subscriber version - for (Subscriber subscriber : task.subscriberMap.values()) { - // TODO need remove the conflict subscriber - if (!subscriber.checkVersion(task.dataCenter, task.fetchSeqStart)) { - LOGGER.warn("conflict push {}, {}, subscriber={}", task.taskID, pushingTaskKey, - subscriber.printPushContext()); - return false; - } - } return true; } - if (!task.afterThan(prev)) { - LOGGER.warn("prev push is newly, {}, prev={}, now={}", pushingTaskKey, prev.taskID, - task.taskID); - return false; - } final long span = System.currentTimeMillis() - prev.pushTimestamp; if (span > sessionServerConfig.getClientNodeExchangeTimeOut() * 2) { // force to remove the prev task @@ -258,11 +227,9 @@ class PushTask implements Runnable { volatile long pushTimestamp; final boolean noDelay; - final long fetchSeqStart; - final long fetchSeqEnd; final String dataCenter; final long pushVersion; - final Map datumMap; + final Datum datum; final InetSocketAddress addr; final Map subscriberMap; final Subscriber subscriber; @@ -270,28 +237,22 @@ class PushTask implements Runnable { final PushingTaskKey pushingTaskKey; - PushTask(boolean noDelay, long pushVersion, String dataCenter, InetSocketAddress addr, - Map subscriberMap, Map datumMap, - long fetchSeqStart, long fetchSeqEnd) { + PushTask(boolean noDelay, InetSocketAddress addr, Map subscriberMap, + Datum datum) { this.taskID = TraceID.newTraceID(); this.noDelay = noDelay; - this.dataCenter = dataCenter; - this.pushVersion = pushVersion; - this.datumMap = datumMap; + this.dataCenter = datum.getDataCenter(); + this.pushVersion = datum.getVersion(); + this.datum = datum; this.addr = addr; this.subscriberMap = subscriberMap; - this.fetchSeqStart = fetchSeqStart; - this.fetchSeqEnd = fetchSeqEnd; this.subscriber = subscriberMap.values().iterator().next(); this.pushingTaskKey = new PushingTaskKey(subscriber.getDataInfoId(), addr, - subscriber.getScope(), subscriber.getAssembleType(), subscriber.getClientVersion()); + subscriber.getScope(), subscriber.getClientVersion()); } protected Object createPushData() { - Datum merged = pushDataGenerator.mergeDatum(subscriber, dataCenter, datumMap, - pushVersion); - LOGGER.info("merged {}, from {}, {}, {}", merged, datumMap, taskID, pushingTaskKey); - return pushDataGenerator.createPushData(merged, subscriberMap); + return pushDataGenerator.createPushData(datum, subscriberMap); } void expireAfter(long intervalMs) { @@ -332,10 +293,6 @@ public void run() { } } - boolean afterThan(PushTask t) { - return fetchSeqStart >= t.fetchSeqEnd; - } - PendingTaskKey pendingKeyOf() { return new PendingTaskKey(dataCenter, addr, subscriber.getDataInfoId(), subscriberMap.keySet()); @@ -346,12 +303,11 @@ public String toString() { StringBuilder sb = new StringBuilder(512); sb.append("PushTask{").append(subscriber.getDataInfoId()).append(",ID=").append(taskID) .append(",createT=").append(createTimestamp).append(",expireT=") - .append(expireTimestamp).append(",seqStart=").append(fetchSeqStart) - .append(",seqEnd=").append(fetchSeqEnd).append(",DC=").append(dataCenter) - .append(",ver=").append(pushVersion).append(",addr=").append(addr) - .append(",scope=").append(subscriber.getScope()).append(",subIds=") - .append(subscriberMap.keySet()).append(",sub=") - .append(subscriber.printPushContext()).append(",retry=").append(retryCount.get()); + .append(expireTimestamp).append(",DC=").append(dataCenter).append(",ver=") + .append(pushVersion).append(",addr=").append(addr).append(",scope=") + .append(subscriber.getScope()).append(",subIds=").append(subscriberMap.keySet()) + .append(",sub=").append(subscriber.printPushContext()).append(",retry=") + .append(retryCount.get()); return sb.toString(); } } @@ -372,11 +328,9 @@ public void onCallback(Channel channel, Object message) { PUSH_CLIENT_SUCCESS_COUNTER.inc(); boolean cleaned = false; try { - final Map versions = DatumUtils.getVesions(pushTask.datumMap); for (Subscriber subscriber : pushTask.subscriberMap.values()) { - if (!subscriber.checkAndUpdateVersion(pushTask.dataCenter, - pushTask.pushVersion, versions, pushTask.fetchSeqStart, - pushTask.fetchSeqEnd)) { + if (!subscriber + .checkAndUpdateVersion(pushTask.dataCenter, pushTask.pushVersion)) { LOGGER.warn("PushY, but failed to updateVersion, {}, {}", pushTask.taskID, pushTask.pushingTaskKey); } @@ -477,15 +431,13 @@ private static final class PushingTaskKey { final InetSocketAddress addr; final String dataInfoId; final ScopeEnum scopeEnum; - final AssembleType assembleType; final BaseInfo.ClientVersion clientVersion; PushingTaskKey(String dataInfoId, InetSocketAddress addr, ScopeEnum scopeEnum, - AssembleType assembleType, BaseInfo.ClientVersion clientVersion) { + BaseInfo.ClientVersion clientVersion) { this.dataInfoId = dataInfoId; this.addr = addr; this.scopeEnum = scopeEnum; - this.assembleType = assembleType; this.clientVersion = clientVersion; } @@ -497,13 +449,12 @@ public boolean equals(Object o) { return false; PushingTaskKey that = (PushingTaskKey) o; return Objects.equals(addr, that.addr) && Objects.equals(dataInfoId, that.dataInfoId) - && scopeEnum == that.scopeEnum && assembleType == that.assembleType - && clientVersion == that.clientVersion; + && scopeEnum == that.scopeEnum && clientVersion == that.clientVersion; } @Override public int hashCode() { - return Objects.hash(addr, dataInfoId, scopeEnum, assembleType, clientVersion); + return Objects.hash(addr, dataInfoId, scopeEnum, clientVersion); } @Override diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java index 42bceab66..7bc52a03f 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/store/SessionInterests.java @@ -17,13 +17,11 @@ package com.alipay.sofa.registry.server.session.store; import com.alipay.sofa.registry.common.model.SubscriberUtils; -import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.session.cache.AppRevisionCacheRegistry; import com.alipay.sofa.registry.util.ParaCheckUtil; -import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -47,7 +45,6 @@ public SessionInterests() { @Override public boolean add(Subscriber subscriber) { ParaCheckUtil.checkNotNull(subscriber.getScope(), "subscriber.scope"); - ParaCheckUtil.checkNotNull(subscriber.getAssembleType(), "subscriber.assembleType"); ParaCheckUtil.checkNotNull(subscriber.getClientVersion(), "subscriber.clientVersion"); Subscriber.internSubscriber(subscriber); @@ -71,7 +68,7 @@ public boolean checkInterestVersion(String dataCenter, String datumDataInfoId, l return false; } for (Subscriber subscriber : subscribers) { - if (subscriber.checkVersion(dataCenter, datumDataInfoId, version)) { + if (subscriber.checkVersion(dataCenter, version)) { return true; } } @@ -80,18 +77,7 @@ public boolean checkInterestVersion(String dataCenter, String datumDataInfoId, l @Override public Collection getInterestOfDatum(String datumDataInfoId) { - DataInfo dataInfo = DataInfo.valueOf(datumDataInfoId); - if (dataInfo.typeIsSofaApp()) { - List list = Lists.newArrayList(); - Set interfaceDataInfoIds = appRevisionCacheRegistry.getInterfaces(dataInfo - .getDataId()); - for (String interfaceDataInfoId : interfaceDataInfoIds) { - list.addAll(getDatas(interfaceDataInfoId)); - } - return list; - } else { - return getDatas(datumDataInfoId); - } + return getDatas(datumDataInfoId); } @Override diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberHandlerStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberHandlerStrategy.java index 2df495701..99d3b84a5 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberHandlerStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultSubscriberHandlerStrategy.java @@ -93,14 +93,13 @@ protected void handle(Subscriber subscriber, Channel channel, } private void log(boolean success, SubscriberRegister subscriberRegister, Subscriber subscriber) { - //[Y|N],[R|U|N],app,zone,dataInfoId,registerId,scope,assembleType,elementType,clientVersion,clientIp,clientPort - SUB_LOGGER.info("{},{},{},{},{},{},{},{},{},{},{},{},{},{}", success ? 'Y' : 'N', + //[Y|N],[R|U|N],app,zone,dataInfoId,registerId,scope,elementType,clientVersion,clientIp,clientPort + SUB_LOGGER.info("{},{},{},{},{},{},{},{},{},{},{},{},{}", success ? 'Y' : 'N', EventTypeConstants.getEventTypeFlag(subscriberRegister.getEventType()), - subscriberRegister.getAppName(), subscriberRegister.getZone(), subscriberRegister - .getDataId(), subscriberRegister.getGroup(), subscriberRegister.getInstanceId(), - subscriberRegister.getRegistId(), subscriberRegister.getScope(), - subscriber == null ? "" : subscriber.getAssembleType(), subscriber == null ? "" - : subscriber.getElementType(), + subscriberRegister.getAppName(), subscriberRegister.getZone(), + subscriberRegister.getDataId(), subscriberRegister.getGroup(), + subscriberRegister.getInstanceId(), subscriberRegister.getRegistId(), + subscriberRegister.getScope(), subscriber == null ? "" : subscriber.getElementType(), subscriber == null ? "" : subscriber.getClientVersion(), subscriberRegister.getIp(), subscriberRegister.getPort()); } diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/WrapperInvocationTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/WrapperInvocationTest.java index 8100ef592..f8ad38fa0 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/WrapperInvocationTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/WrapperInvocationTest.java @@ -38,7 +38,7 @@ */ public class WrapperInvocationTest { - private Logger logger = LoggerFactory.getLogger(getClass()); + private Logger logger = LoggerFactory.getLogger(getClass()); @Rule public ExpectedException thrown = ExpectedException.none(); diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java index 9e00f13ee..ea2b1a3d6 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/DataCacheTest.java @@ -21,33 +21,23 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.*; import com.alipay.sofa.registry.common.model.store.DataInfo; -import com.alipay.sofa.registry.core.model.AssembleType; import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.net.NetUtil; import com.alipay.sofa.registry.server.session.bootstrap.CommonConfig; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfigBean; -import com.alipay.sofa.registry.server.session.cache.CacheGenerator; -import com.alipay.sofa.registry.server.session.cache.CacheService; -import com.alipay.sofa.registry.server.session.cache.SessionCacheService; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.junit.Test; import java.net.InetSocketAddress; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** - * * @author shangyu.wh * @version $Id: DataCacheTest.java, v 0.1 2017-12-06 19:42 shangyu.wh Exp $ */ @@ -210,12 +200,9 @@ private Map> getCacheSub(String dataI SessionInterests sessionInterests) { Collection subscribers = sessionInterests.getDatas(dataInfoId); Map> ret = Maps.newHashMap(); - Map>> groups = SubscriberUtils - .groupByAssembleAndScope(subscribers); - for (Map> group : groups.values()) { - List list = group.get(scopeEnum); - ret.putAll(SubscriberUtils.groupBySourceAddress(list)); - } + Map> scopes = SubscriberUtils.groupByScope(subscribers); + List list = scopes.get(scopeEnum); + ret.putAll(SubscriberUtils.groupBySourceAddress(list)); return ret; } @@ -236,7 +223,6 @@ private Subscriber getSub(String dataId, ScopeEnum scopeEnum, String registerId, subscriberRegister.setVersion(version.get()); subscriberRegister.setRegisterTimestamp(System.currentTimeMillis()); subscriberRegister.setScope(scopeEnum); - subscriberRegister.setAssembleType(AssembleType.sub_app_and_interface); subscriberRegister.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriberRegister.setDataInfoId(DataInfo.toDataInfoId(dataId, "instance2", "rpc")); @@ -344,7 +330,6 @@ public void testOverwriteSameConnectIdSubscriber() { Subscriber subscriber1 = new Subscriber(); subscriber1.setScope(ScopeEnum.dataCenter); - subscriber1.setAssembleType(AssembleType.sub_app_and_interface); subscriber1.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriber1.setDataInfoId("dataInfoId1"); subscriber1.setDataId("dataId1"); @@ -354,7 +339,6 @@ public void testOverwriteSameConnectIdSubscriber() { Subscriber subscriber2 = new Subscriber(); subscriber2.setScope(ScopeEnum.dataCenter); - subscriber2.setAssembleType(AssembleType.sub_app_and_interface); subscriber2.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriber2.setDataInfoId("dataInfoId2"); subscriber2.setDataId("dataId2"); @@ -376,7 +360,6 @@ public void testOverwriteSameConnectIdSubscriber() { Subscriber subscriber3 = new Subscriber(); subscriber3.setScope(ScopeEnum.dataCenter); - subscriber3.setAssembleType(AssembleType.sub_app_and_interface); subscriber3.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriber3.setDataInfoId(subscriber1.getDataInfoId()); subscriber3.setDataId(subscriber1.getDataId()); @@ -386,7 +369,6 @@ public void testOverwriteSameConnectIdSubscriber() { Subscriber subscriber4 = new Subscriber(); subscriber4.setScope(ScopeEnum.dataCenter); - subscriber4.setAssembleType(AssembleType.sub_app_and_interface); subscriber4.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriber4.setDataInfoId(subscriber2.getDataInfoId()); subscriber4.setDataId(subscriber2.getDataId()); @@ -469,7 +451,6 @@ public void testSubAndClientOffUnorder() { Subscriber subscriber1 = new Subscriber(); subscriber1.setScope(ScopeEnum.dataCenter); - subscriber1.setAssembleType(AssembleType.sub_app_and_interface); subscriber1.setClientVersion(BaseInfo.ClientVersion.StoreData); subscriber1.setDataInfoId("dataInfoId1"); subscriber1.setDataId("dataId1"); @@ -480,7 +461,6 @@ public void testSubAndClientOffUnorder() { Subscriber subscriber2 = new Subscriber(); subscriber2.setScope(subscriber1.getScope()); - subscriber2.setAssembleType(subscriber1.getAssembleType()); subscriber2.setClientVersion(subscriber1.getClientVersion()); subscriber2.setDataInfoId(subscriber1.getDataInfoId()); subscriber2.setDataId(subscriber1.getDataId()); diff --git a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/util/DatumUtils.java b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/util/DatumUtils.java index 32d0bf750..1e4838469 100644 --- a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/util/DatumUtils.java +++ b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/util/DatumUtils.java @@ -18,6 +18,7 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.dataserver.Datum; +import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; import java.util.HashMap; @@ -55,4 +56,24 @@ public static Map getVesions(Map datumMap) { datumMap.forEach((k, v) -> versions.put(k, v.getVersion())); return versions; } + + public static Datum newEmptyDatum(Subscriber subscriber, String datacenter) { + Datum datum = new Datum(); + datum.setDataId(subscriber.getDataId()); + datum.setInstanceId(subscriber.getInstanceId()); + datum.setGroup(subscriber.getGroup()); + datum.setVersion(ValueConstants.DEFAULT_NO_DATUM_VERSION); + datum.setDataCenter(datacenter); + return datum; + } + + public static Datum newEmptyDatum(DataInfo dataInfo, String datacenter) { + Datum datum = new Datum(); + datum.setDataId(dataInfo.getDataId()); + datum.setInstanceId(dataInfo.getInstanceId()); + datum.setGroup(dataInfo.getDataType()); + datum.setVersion(ValueConstants.DEFAULT_NO_DATUM_VERSION); + datum.setDataCenter(datacenter); + return datum; + } } diff --git a/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java b/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java deleted file mode 100644 index 59f54077b..000000000 --- a/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.registry.test.app; - -import com.alipay.sofa.registry.client.api.registration.PublisherRegistration; -import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration; -import com.alipay.sofa.registry.common.model.constants.ValueConstants; - -import com.alipay.sofa.registry.core.model.AppRevisionInterface; -import com.alipay.sofa.registry.core.model.ScopeEnum; -import com.alipay.sofa.registry.server.test.AppDiscoveryBuilder; -import com.alipay.sofa.registry.test.BaseIntegrationTest; -import com.alipay.sofa.registry.util.JsonUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.test.context.junit4.SpringRunner; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(SpringRunner.class) -public class PubSubTest extends BaseIntegrationTest { - @Test - public void reportMeta() throws InterruptedException { - String appname = "foo"; - String revision = "1111"; - AppDiscoveryBuilder builder = new AppDiscoveryBuilder(appname, revision, "127.0.0.1:12220"); - AppRevisionInterface inf1 = builder.addService("func1", ValueConstants.DEFAULT_GROUP, - ValueConstants.DEFAULT_INSTANCE_ID); - AppRevisionInterface inf2 = builder.addService("func2", ValueConstants.DEFAULT_GROUP, - ValueConstants.DEFAULT_INSTANCE_ID); - builder.addMetaBaseParam("metaParam1", "metaValue1"); - builder.addMetaInterfaceParam(inf1, "metaParam2", " metaValue2"); - builder.addMetaInterfaceParam(inf2, "metaParam3", " metaValue3"); - builder.addDataBaseParam("dataParam1", "dataValue1"); - builder.addDataInterfaceParam(inf1, "dataParam2", "dataValue2"); - builder.addDataInterfaceParam(inf1, "dataParam3", "dataValue3"); - - PublisherRegistration publisher = new PublisherRegistration(appname); - publisher.setGroup(ValueConstants.SOFA_APP); - - publisher.setPreRequest(builder.buildAppRevision()); - - registryClient1.register(publisher, JsonUtils.writeValueAsString(builder.buildData())); - - MySubscriberDataObserver observer = new MySubscriberDataObserver(); - SubscriberRegistration subReg = new SubscriberRegistration("func1", observer); - subReg.setScopeEnum(ScopeEnum.dataCenter); - registryClient2.register(subReg); - Thread.sleep(3000L); - - assertEquals("func1", observer.dataId); - assertEquals(LOCAL_REGION, observer.userData.getLocalZone()); - assertEquals(1, observer.userData.getZoneData().size()); - assertEquals(1, observer.userData.getZoneData().values().size()); - assertTrue(observer.userData.getZoneData().containsKey(LOCAL_REGION)); - assertEquals(1, observer.userData.getZoneData().get(LOCAL_REGION).size()); - } -}