diff --git a/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java b/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java index 45330b543..87c9db0c5 100644 --- a/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java +++ b/client/api/src/main/java/com/alipay/sofa/registry/client/api/Publisher.java @@ -30,4 +30,6 @@ public interface Publisher extends Register { * @param data the data */ void republish(String... data); + + void setPreRequest(Object request); } diff --git a/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java b/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java index 8194b77c5..7a5812790 100644 --- a/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java +++ b/client/api/src/main/java/com/alipay/sofa/registry/client/api/registration/PublisherRegistration.java @@ -24,6 +24,8 @@ */ public class PublisherRegistration extends BaseRegistration { + private Object preRequest; + /** * Instantiates a new Publisher registration. * @@ -43,4 +45,12 @@ public String toString() { return "PublisherRegistration{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", appName='" + appName + '\'' + '}'; } + + public Object getPreRequest() { + return preRequest; + } + + public void setPreRequest(Object preRequest) { + this.preRequest = preRequest; + } } diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java index 928e1825e..7d6bb1361 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/AbstractInternalRegister.java @@ -77,6 +77,8 @@ public abstract class AbstractInternalRegister implements Register { */ public abstract Object assembly(); + public abstract Object getPreRequest(); + /** * Is registered boolean. * @@ -184,6 +186,7 @@ public SyncTask assemblySyncTask() { SyncTask syncTask = new SyncTask(); syncTask.setRequestId(requestId); syncTask.setRequest(assembly()); + syncTask.setPreRequest(getPreRequest()); syncTask.setDone(isDone()); return syncTask; } finally { @@ -348,6 +351,8 @@ public static class SyncTask { private Object request; + private Object preRequest; + private boolean done; /** @@ -403,5 +408,13 @@ public boolean isDone() { public void setDone(boolean done) { this.done = done; } + + public Object getPreRequest() { + return preRequest; + } + + public void setPreRequest(Object preRequest) { + this.preRequest = preRequest; + } } } diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java index 7fce34e63..d02d89a04 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultConfigurator.java @@ -150,6 +150,11 @@ public Object assembly() { return register; } + @Override + public Object getPreRequest() { + return null; + } + /** * Put configurator data. * diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java index 14dd00884..3fc79e207 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultPublisher.java @@ -46,12 +46,13 @@ public class DefaultPublisher extends AbstractInternalRegister implements Publis private Worker worker; private Collection dataList; private RegistryClientConfig config; + private Object preRequest; /** * Instantiates a new Default publisher. * * @param registration the publisher registration - * @param worker the worker + * @param worker the worker */ DefaultPublisher(PublisherRegistration registration, Worker worker, RegistryClientConfig config) { this.registration = registration; @@ -89,6 +90,11 @@ public void republish(String... data) { this.worker.schedule(new TaskEvent(this)); } + @Override + public void setPreRequest(Object preReq) { + this.preRequest = preReq; + } + /** * Unregister. */ @@ -149,6 +155,11 @@ public PublisherRegister assembly() { return register; } + @Override + public Object getPreRequest() { + return preRequest; + } + /** * @see Publisher#getDataId() */ diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java index b58568d80..6b5e37b1e 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultRegistryClient.java @@ -260,6 +260,7 @@ public Publisher register(PublisherRegistration registration, String... data) { publisher = new DefaultPublisher(registration, workerThread, registryClientConfig); ((DefaultPublisher) publisher).setAuthManager(authManager); + publisher.setPreRequest(registration.getPreRequest()); Publisher oldPublisher = registrationPublisherMap.putIfAbsent(registration, publisher); if (null != oldPublisher) { diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java index f3edf8f4a..9a5ee4e3b 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/provider/DefaultSubscriber.java @@ -217,6 +217,11 @@ public SubscriberRegister assembly() { return register; } + @Override + public Object getPreRequest() { + return null; + } + public void putReceivedData(SegmentData segmentData, String localZone) { writeLock.lock(); try { diff --git a/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java b/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java index 285c656d3..ebf3ed219 100644 --- a/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java +++ b/client/impl/src/main/java/com/alipay/sofa/registry/client/task/WorkerThread.java @@ -154,6 +154,20 @@ private void handleTask(TaskEvent event) { return; } + Object preRequest = syncTask.getPreRequest(); + if (preRequest != null) { + Object preResult = client.invokeSync(preRequest); + if (!(preResult instanceof RegisterResponse)) { + LOGGER.warn("[register] result type is wrong, {}", preResult); + return; + } + RegisterResponse preResponse = (RegisterResponse) preResult; + if (!preResponse.isSuccess()) { + LOGGER.info("[register] register to server failed, {}, {}", preRequest, + preResponse); + return; + } + } Object request = syncTask.getRequest(); Object result = client.invokeSync(request); diff --git a/client/pom.xml b/client/pom.xml index aa30333fe..f4eb6cbc9 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -30,7 +30,7 @@ - true + false true true 1.6 diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java index 08e7c5bf8..fa3a53385 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/AppRegisterServerDataBox.java @@ -38,7 +38,7 @@ public class AppRegisterServerDataBox implements Serializable { private String url; /** baseParams */ - private HashMap/*values*/> baseParams; + private Map/*values*/> baseParams; /** */ private Map/*value*/>> interfaceParams; @@ -84,7 +84,7 @@ public void setUrl(String url) { * * @return property value of baseParams */ - public HashMap> getBaseParams() { + public Map> getBaseParams() { return baseParams; } @@ -93,7 +93,7 @@ public HashMap> getBaseParams() { * * @param baseParams value to be assigned to property baseParams */ - public void setBaseParams(HashMap> baseParams) { + public void setBaseParams(Map> baseParams) { this.baseParams = baseParams; } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java index bdc454e9f..fd9d85de1 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherInternUtil.java @@ -20,13 +20,11 @@ import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.WordCache; import com.google.common.collect.ArrayListMultimap; +import com.sun.corba.se.spi.orbutil.threadpool.Work; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** - * * @author xiaojian.xj * @version $Id: PublisherInternUtil.java, v 0.1 2020年11月12日 16:53 xiaojian.xj Exp $ */ @@ -34,6 +32,7 @@ public class PublisherInternUtil { /** * change publisher word cache + * * @param publisher * @return */ @@ -55,22 +54,22 @@ public static Publisher internPublisher(Publisher publisher) { dataBox.setUrl(dataBox.getUrl()); dataBox.setRevision(dataBox.getRevision()); - ArrayListMultimap baseParams = ArrayListMultimap.create(); - dataBox.getBaseParams().entrySet().forEach(entry -> { - - entry.getValue().stream().forEach(value -> { - // cache base params key and value - baseParams.put(WordCache.getInstance().getWordCache(entry.getKey()), WordCache.getInstance().getWordCache(value)); - }); - }); + if(dataBox.getBaseParams() != null){ + Map> baseParams = new HashMap<>(); + dataBox.getBaseParams().forEach((key, value) -> baseParams.put(WordCache.getInstance().getWordCache(key), value)); + dataBox.setBaseParams(baseParams); + } - Map>> serviceParams = new HashMap<>(); - dataBox.getInterfaceParams().entrySet().forEach(entry -> { - // cache serviceName - serviceParams.put(WordCache.getInstance().getWordCache(entry.getKey()), entry.getValue()); - - }); + if(dataBox.getInterfaceParams() != null) { + Map>> interfaceParams = new HashMap<>(); + dataBox.getInterfaceParams().forEach((key, value) -> { + // cache serviceName + String interfaceName = WordCache.getInstance().getWordCache(key); + value.forEach((key1, value1) -> interfaceParams.computeIfAbsent(interfaceName, k -> new HashMap<>()).put(WordCache.getInstance().getWordCache(key1), value1)); + }); + dataBox.setInterfaceParams(interfaceParams); + } } return appPublisher; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java index 42759cd49..0f4b9d38c 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/notify/SessionServerNotifier.java @@ -113,10 +113,7 @@ public void notify(Datum datum, Long lastVersion) { } AppPublisher appPublisher = (AppPublisher) publisher; for (AppRegisterServerDataBox dataBox : appPublisher.getAppDataList()) { - - if (!revisions.contains(dataBox.getRevision())) { - revisions.add(dataBox.getRevision()); - } + revisions.add(dataBox.getRevision()); } } } diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java index e69ec31ed..212fd7c8c 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java @@ -280,7 +280,7 @@ public Collection sessionServerHandlers() { list.add(sessionConnectionHandler()); list.add(renewNodesRequestHandler()); list.add(fetchProvideDataRequestHandler()); - list.add(addAppRevisionHandler()); + list.add(appRevisionRegisterHandler()); list.add(checkRevisionsHandler()); list.add(fetchRevisionsHandler()); return list; @@ -328,8 +328,8 @@ public AbstractServerHandler fetchProvideDataRequestHandler() { } @Bean - public AbstractServerHandler addAppRevisionHandler() { - return new AddAppRevisionHandler(); + public AbstractServerHandler appRevisionRegisterHandler() { + return new AppRevisionRegisterHandler(); } @Bean diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java index 7b7ed6670..c02bc2945 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java @@ -66,6 +66,9 @@ public Response request(Request request) throws RequestException { final Object result = sessionServer.sendSync(channel, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getSessionNodeExchangeTimeout()); response = () -> result; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("SessionNodeExchanger response result:{} ", response.getResult()); + } } } else { String errorMsg = "SessionNode Exchanger get channel error! channel with url:" diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AddAppRevisionHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java similarity index 95% rename from server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AddAppRevisionHandler.java rename to server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java index e08f385da..a515ae076 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AddAppRevisionHandler.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java @@ -23,7 +23,7 @@ import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler; import org.springframework.beans.factory.annotation.Autowired; -public class AddAppRevisionHandler extends AbstractServerHandler { +public class AppRevisionRegisterHandler extends AbstractServerHandler { @Autowired private AppRevisionRegistry appRevisionRegistry; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java index 2e41f1cd7..79b0f9461 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/cache/AppRevisionCacheRegistry.java @@ -97,6 +97,9 @@ public void refreshAll() { } private void onNewRevision(AppRevisionRegister rev) { + if (rev.getInterfaces() == null) { + return; + } for (AppRevisionInterface inf : rev.getInterfaces().values()) { String dataInfoId = DataInfo.toDataInfoId(inf.getDataId(), inf.getInstanceId(), inf.getGroup()); Map> apps = interfaceRevisions.computeIfAbsent(dataInfoId, diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java index 1d13b9355..f8ebfed73 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/converter/PublisherConverter.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSONObject; import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; import com.alipay.sofa.registry.common.model.ServerDataBox; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.store.AppPublisher; import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion; import com.alipay.sofa.registry.common.model.store.DataInfo; @@ -33,17 +34,11 @@ import java.util.List; /** - * * @author shangyu.wh * @version $Id: PublisherConvert.java, v 0.1 2017-11-30 17:54 shangyu.wh Exp $ */ public class PublisherConverter { - - public static final String PUB_TYPE = "!PublisherType"; - - public static final String APP_PUBLISHER = "APP_PUBLISHER"; - private static Converter appPublisherConverter = source -> { AppPublisher appPublisher = new AppPublisher(); fillCommonRegion(appPublisher, source); @@ -95,7 +90,7 @@ public static void fillCommonRegion(Publisher publisher, PublisherRegister sourc */ public static Publisher convert(PublisherRegister publisherRegister) { - if (StringUtils.equalsIgnoreCase(APP_PUBLISHER, publisherRegister.getAttributes().get(PUB_TYPE))) { + if (StringUtils.equalsIgnoreCase(ValueConstants.SOFA_APP, publisherRegister.getGroup())) { return appPublisherConverter.convert(publisherRegister); } @@ -107,7 +102,7 @@ public static List convert(List boxList) { if (null != boxList) { for (DataBox dataBox : boxList) { ServerDataBox serverDataBox = new ServerDataBox(ServerDataBox.getBytes(dataBox - .getData())); + .getData())); serverDataBoxes.add(serverDataBox); } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java index d5f287e01..a908157de 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/FirePushService.java @@ -78,8 +78,11 @@ public void fireUserDataElementPushTask(URL clientUrl, Datum datum, } else { return; } + if (datum == null) { + datum = emptyDatum(subscribers.stream().findAny().get()); + } taskEvent.setTaskClosure(pushTaskClosure); - taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion())); + taskEvent.setSendTimeStamp(DatumVersionUtil.nextId()); taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers); taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum); taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, clientUrl); @@ -144,4 +147,16 @@ public void fireReceivedDataMultiPushTask(Subscriber subscriber) { subscriber.getSourceAddress(), receivedData.getScope()); taskListenerManager.sendTaskEvent(taskEvent); } + + private Datum emptyDatum(Subscriber subscriber) { + Datum datum = new Datum(); + datum.setDataInfoId(subscriber.getDataId()); + datum.setDataId(subscriber.getDataId()); + datum.setInstanceId(subscriber.getInstanceId()); + datum.setGroup(subscriber.getGroup()); + datum.setVersion(DatumVersionUtil.nextId()); + datum.setPubMap(new HashMap<>()); + datum.setDataCenter(sessionServerConfig.getSessionServerDataCenter()); + return datum; + } } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java index 87732ee9a..3c59c023d 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/DataChangeRequestHandler.java @@ -114,6 +114,9 @@ public Object doHandle(Channel channel, DataChangeRequest dataChangeRequest) { //dataInfoId is app, get relate interfaces dataInfoId from cache Set interfaces = appRevisionCacheRegistry.getInterfaces(dataInfo .getDataId()); + if (interfaces == null || interfaces.isEmpty()) { + return null; + } for (String interfaceDataInfoId : interfaces) { DataChangeRequest request = new DataChangeRequest(); request.setDataInfoId(interfaceDataInfoId); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java index bed5e9f92..87e0279eb 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchTask.java @@ -92,22 +92,6 @@ public DataChangeFetchTask(SessionServerConfig sessionServerConfig, @Override public void execute() { - - // DataInfo dataInfo = DataInfo.valueOf(dataChangeRequest.getDataInfoId()); - // Datum datum = sessionDatumCacheDecorator.getDatumCache(dataChangeRequest.getDataCenter(), - // dataChangeRequest.getDataInfoId()); - // FIXME delete - // if (StringUtils.equals(APP_GROUP, dataInfo.getDataType())) { - // - // refreshMeta(datum.getPubMap().values()); - // - // //dataInfoId is app, get relate interfaces dataInfoId from cache - // Set interfaces = appRevisionCacheRegistry.getInterfaces(dataChangeRequest - // .getDataInfoId()); - // for (String interfaceDataInfoId : interfaces) { - // doExecute(interfaceDataInfoId); - // } - // } doExecute(dataChangeRequest.getDataInfoId()); } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java index 32186a7ee..6c2259ce2 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/strategy/impl/DefaultDataChangeRequestHandlerStrategy.java @@ -38,8 +38,8 @@ public class DefaultDataChangeRequestHandlerStrategy implements DataChangeReques @Override public void doFireChangFetch(DataChangeRequest dataChangeRequest) { - TaskEvent taskEvent = new TaskEvent(dataChangeRequest.getDataInfoId(), - TaskEvent.TaskType.DATA_CHANGE_FETCH_CLOUD_TASK); + TaskEvent taskEvent = new TaskEvent(dataChangeRequest, + TaskEvent.TaskType.DATA_CHANGE_FETCH_TASK); taskLogger.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent); taskListenerManager.sendTaskEvent(taskEvent); } diff --git a/test/src/main/java/com/alipay/sofa/registry/server/test/AppDiscoveryBuilder.java b/test/src/main/java/com/alipay/sofa/registry/server/test/AppDiscoveryBuilder.java new file mode 100644 index 000000000..dc7ecedc4 --- /dev/null +++ b/test/src/main/java/com/alipay/sofa/registry/server/test/AppDiscoveryBuilder.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.test; + +import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox; +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.core.model.AppRevisionInterface; +import com.alipay.sofa.registry.core.model.AppRevisionRegister; + +import java.util.*; + +public class AppDiscoveryBuilder { + private AppRevisionRegister revisionRegister = new AppRevisionRegister(); + private AppRegisterServerDataBox dataBox = new AppRegisterServerDataBox(); + + public AppDiscoveryBuilder(String appname, String revision, String address) { + dataBox.setUrl(address); + dataBox.setRevision(revision); + revisionRegister.setAppName(appname); + revisionRegister.setRevision(revision); + } + + public String addService(String serviceName, String group, String instanceID) { + if(revisionRegister.getInterfaces()==null){ + revisionRegister.setInterfaces(new HashMap<>()); + } + String serviceId = DataInfo.toDataInfoId(serviceName, instanceID, group); + AppRevisionInterface inf =revisionRegister.getInterfaces().computeIfAbsent(DataInfo.toDataInfoId(serviceName, instanceID, group), k->new AppRevisionInterface()); + inf.setGroup(group); + inf.setInstanceId(instanceID); + inf.setDataId(serviceName); + return serviceId; + } + + public void addMetaInterfaceParam(String serviceId, String key, String value){ + AppRevisionInterface inf =revisionRegister.getInterfaces().get(serviceId); + if(inf.getServiceParams()== null){ + inf.setServiceParams(new HashMap<>()); + } + inf.getServiceParams().computeIfAbsent(key, k->new ArrayList<>()).add(value); + } + + public void addMetaBaseParam(String key, String value){ + if(revisionRegister.getBaseParams()== null){ + revisionRegister.setBaseParams(new HashMap<>()); + } + revisionRegister.getBaseParams().computeIfAbsent(key, k->new ArrayList<>()).add(value); + } + + public void addDataBaseParam(String key, String value){ + if(dataBox.getBaseParams() == null){ + dataBox.setBaseParams(new HashMap<>()); + } + dataBox.getBaseParams().computeIfAbsent(key, k -> new ArrayList<>()).add(value); + } + + public void addDataInterfaceParam(String serviceId, String key, String value){ + if(dataBox.getInterfaceParams()==null){ + dataBox.setInterfaceParams(new HashMap<>()); + } + dataBox.getInterfaceParams().computeIfAbsent(serviceId, k -> new HashMap<>()).computeIfAbsent(key, k->new ArrayList<>()).add(value); + } + + public AppRevisionRegister buildAppRevision() { + return revisionRegister; + } + + public AppRegisterServerDataBox buildData() { + return dataBox; + } +} diff --git a/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java b/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java new file mode 100644 index 000000000..0770a5ecb --- /dev/null +++ b/test/src/test/java/com/alipay/sofa/registry/test/app/PubSubTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.test.app; + +import com.alibaba.fastjson.JSON; +import com.alipay.sofa.registry.client.api.registration.PublisherRegistration; +import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; + +import com.alipay.sofa.registry.core.model.ScopeEnum; +import com.alipay.sofa.registry.server.test.AppDiscoveryBuilder; +import com.alipay.sofa.registry.test.BaseIntegrationTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringRunner.class) +public class PubSubTest extends BaseIntegrationTest { + @Test + public void reportMeta() throws InterruptedException { + String appname = "foo"; + String revision = "1111"; + AppDiscoveryBuilder builder = new AppDiscoveryBuilder(appname, revision, "127.0.0.1:12220"); + String serviceId1 = builder.addService("func1", ValueConstants.DEFAULT_GROUP, + ValueConstants.DEFAULT_INSTANCE_ID); + String serviceId2 = builder.addService("func2", ValueConstants.DEFAULT_GROUP, + ValueConstants.DEFAULT_INSTANCE_ID); + builder.addMetaBaseParam("metaParam1", "metaValue1"); + builder.addMetaInterfaceParam(serviceId1, "metaParam2", " metaValue2"); + builder.addMetaInterfaceParam(serviceId2, "metaParam3", " metaValue3"); + builder.addDataBaseParam("dataParam1", "dataValue1"); + builder.addDataInterfaceParam(serviceId1, "dataParam2", "dataValue2"); + builder.addDataInterfaceParam(serviceId2, "dataParam3", "dataValue3"); + + PublisherRegistration publisher = new PublisherRegistration(appname); + publisher.setGroup(ValueConstants.SOFA_APP); + + publisher.setPreRequest(builder.buildAppRevision()); + + registryClient1.register(publisher, JSON.toJSONString(builder.buildData())); + + MySubscriberDataObserver observer = new MySubscriberDataObserver(); + SubscriberRegistration subReg = new SubscriberRegistration("func1", observer); + subReg.setScopeEnum(ScopeEnum.dataCenter); + registryClient2.register(subReg); + Thread.sleep(3000L); + + assertEquals("func1", observer.dataId); + assertEquals(LOCAL_REGION, observer.userData.getLocalZone()); + assertEquals(1, observer.userData.getZoneData().size()); + assertEquals(1, observer.userData.getZoneData().values().size()); + assertTrue(observer.userData.getZoneData().containsKey(LOCAL_REGION)); + assertEquals(1, observer.userData.getZoneData().get(LOCAL_REGION).size()); + } +}