Skip to content

Commit

Permalink
[app-discovery] testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
dzdx committed Dec 18, 2020
1 parent f7869f9 commit ff1a607
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface Publisher extends Register {
* @param data the data
*/
void republish(String... data);

void setPreRequest(Object request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
*/
public class PublisherRegistration extends BaseRegistration {

private Object preRequest;

/**
* Instantiates a new Publisher registration.
*
Expand All @@ -43,4 +45,12 @@ public String toString() {
return "PublisherRegistration{" + "dataId='" + dataId + '\'' + ", group='" + group + '\''
+ ", appName='" + appName + '\'' + '}';
}

public Object getPreRequest() {
return preRequest;
}

public void setPreRequest(Object preRequest) {
this.preRequest = preRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public abstract class AbstractInternalRegister implements Register {
*/
public abstract Object assembly();

public abstract Object getPreRequest();

/**
* Is registered boolean.
*
Expand Down Expand Up @@ -184,6 +186,7 @@ public SyncTask assemblySyncTask() {
SyncTask syncTask = new SyncTask();
syncTask.setRequestId(requestId);
syncTask.setRequest(assembly());
syncTask.setPreRequest(getPreRequest());
syncTask.setDone(isDone());
return syncTask;
} finally {
Expand Down Expand Up @@ -348,6 +351,8 @@ public static class SyncTask {

private Object request;

private Object preRequest;

private boolean done;

/**
Expand Down Expand Up @@ -403,5 +408,13 @@ public boolean isDone() {
public void setDone(boolean done) {
this.done = done;
}

public Object getPreRequest() {
return preRequest;
}

public void setPreRequest(Object preRequest) {
this.preRequest = preRequest;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public Object assembly() {
return register;
}

@Override
public Object getPreRequest() {
return null;
}

/**
* Put configurator data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ public class DefaultPublisher extends AbstractInternalRegister implements Publis
private Worker worker;
private Collection<String> dataList;
private RegistryClientConfig config;
private Object preRequest;

/**
* Instantiates a new Default publisher.
*
* @param registration the publisher registration
* @param worker the worker
* @param worker the worker
*/
DefaultPublisher(PublisherRegistration registration, Worker worker, RegistryClientConfig config) {
this.registration = registration;
Expand Down Expand Up @@ -89,6 +90,11 @@ public void republish(String... data) {
this.worker.schedule(new TaskEvent(this));
}

@Override
public void setPreRequest(Object preReq) {
this.preRequest = preReq;
}

/**
* Unregister.
*/
Expand Down Expand Up @@ -149,6 +155,11 @@ public PublisherRegister assembly() {
return register;
}

@Override
public Object getPreRequest() {
return preRequest;
}

/**
* @see Publisher#getDataId()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public Publisher register(PublisherRegistration registration, String... data) {

publisher = new DefaultPublisher(registration, workerThread, registryClientConfig);
((DefaultPublisher) publisher).setAuthManager(authManager);
publisher.setPreRequest(registration.getPreRequest());

Publisher oldPublisher = registrationPublisherMap.putIfAbsent(registration, publisher);
if (null != oldPublisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ public SubscriberRegister assembly() {
return register;
}

@Override
public Object getPreRequest() {
return null;
}

public void putReceivedData(SegmentData segmentData, String localZone) {
writeLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ private void handleTask(TaskEvent event) {
return;
}

Object preRequest = syncTask.getPreRequest();
if (preRequest != null) {
Object preResult = client.invokeSync(preRequest);
if (!(preResult instanceof RegisterResponse)) {
LOGGER.warn("[register] result type is wrong, {}", preResult);
return;
}
RegisterResponse preResponse = (RegisterResponse) preResult;
if (!preResponse.isSuccess()) {
LOGGER.info("[register] register to server failed, {}, {}", preRequest,
preResponse);
return;
}
}
Object request = syncTask.getRequest();

Object result = client.invokeSync(request);
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

<properties>
<!-- Build args -->
<module.install.skip>true</module.install.skip>
<module.install.skip>false</module.install.skip>
<module.deploy.skip>true</module.deploy.skip>
<nexus.staging.deploy.mojo.skip>true</nexus.staging.deploy.mojo.skip>
<maven.compiler.source>1.6</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class AppRegisterServerDataBox implements Serializable {
private String url;

/** baseParams */
private HashMap<String/*key*/, List<String>/*values*/> baseParams;
private Map<String/*key*/, List<String>/*values*/> baseParams;

/** */
private Map<String/*service*/, Map<String/*key*/, List<String>/*value*/>> interfaceParams;
Expand Down Expand Up @@ -84,7 +84,7 @@ public void setUrl(String url) {
*
* @return property value of baseParams
*/
public HashMap<String, List<String>> getBaseParams() {
public Map<String, List<String>> getBaseParams() {
return baseParams;
}

Expand All @@ -93,7 +93,7 @@ public HashMap<String, List<String>> getBaseParams() {
*
* @param baseParams value to be assigned to property baseParams
*/
public void setBaseParams(HashMap<String, List<String>> baseParams) {
public void setBaseParams(Map<String, List<String>> baseParams) {
this.baseParams = baseParams;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.WordCache;
import com.google.common.collect.ArrayListMultimap;
import com.sun.corba.se.spi.orbutil.threadpool.Work;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
*
* @author xiaojian.xj
* @version $Id: PublisherInternUtil.java, v 0.1 2020年11月12日 16:53 xiaojian.xj Exp $
*/
public class PublisherInternUtil {

/**
* change publisher word cache
*
* @param publisher
* @return
*/
Expand All @@ -55,22 +54,22 @@ public static Publisher internPublisher(Publisher publisher) {
dataBox.setUrl(dataBox.getUrl());
dataBox.setRevision(dataBox.getRevision());

ArrayListMultimap<String, String> baseParams = ArrayListMultimap.create();
dataBox.getBaseParams().entrySet().forEach(entry -> {

entry.getValue().stream().forEach(value -> {
// cache base params key and value
baseParams.put(WordCache.getInstance().getWordCache(entry.getKey()), WordCache.getInstance().getWordCache(value));
});
});
if(dataBox.getBaseParams() != null){
Map<String, List<String>> baseParams = new HashMap<>();
dataBox.getBaseParams().forEach((key, value) -> baseParams.put(WordCache.getInstance().getWordCache(key), value));
dataBox.setBaseParams(baseParams);
}

Map<String, Map<String, List<String>>> serviceParams = new HashMap<>();
dataBox.getInterfaceParams().entrySet().forEach(entry -> {
// cache serviceName
serviceParams.put(WordCache.getInstance().getWordCache(entry.getKey()), entry.getValue());

});
if(dataBox.getInterfaceParams() != null) {
Map<String, Map<String, List<String>>> interfaceParams = new HashMap<>();
dataBox.getInterfaceParams().forEach((key, value) -> {
// cache serviceName
String interfaceName = WordCache.getInstance().getWordCache(key);
value.forEach((key1, value1) -> interfaceParams.computeIfAbsent(interfaceName, k -> new HashMap<>()).put(WordCache.getInstance().getWordCache(key1), value1));

});
dataBox.setInterfaceParams(interfaceParams);
}
}

return appPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ public void notify(Datum datum, Long lastVersion) {
}
AppPublisher appPublisher = (AppPublisher) publisher;
for (AppRegisterServerDataBox dataBox : appPublisher.getAppDataList()) {

if (!revisions.contains(dataBox.getRevision())) {
revisions.add(dataBox.getRevision());
}
revisions.add(dataBox.getRevision());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public Collection<AbstractServerHandler> sessionServerHandlers() {
list.add(sessionConnectionHandler());
list.add(renewNodesRequestHandler());
list.add(fetchProvideDataRequestHandler());
list.add(addAppRevisionHandler());
list.add(appRevisionRegisterHandler());
list.add(checkRevisionsHandler());
list.add(fetchRevisionsHandler());
return list;
Expand Down Expand Up @@ -328,8 +328,8 @@ public AbstractServerHandler fetchProvideDataRequestHandler() {
}

@Bean
public AbstractServerHandler addAppRevisionHandler() {
return new AddAppRevisionHandler();
public AbstractServerHandler appRevisionRegisterHandler() {
return new AppRevisionRegisterHandler();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public Response request(Request request) throws RequestException {
final Object result = sessionServer.sendSync(channel, request.getRequestBody(),
request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getSessionNodeExchangeTimeout());
response = () -> result;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("SessionNodeExchanger response result:{} ", response.getResult());
}
}
} else {
String errorMsg = "SessionNode Exchanger get channel error! channel with url:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler;
import org.springframework.beans.factory.annotation.Autowired;

public class AddAppRevisionHandler extends AbstractServerHandler<AppRevisionRegister> {
public class AppRevisionRegisterHandler extends AbstractServerHandler<AppRevisionRegister> {
@Autowired
private AppRevisionRegistry appRevisionRegistry;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public void refreshAll() {
}

private void onNewRevision(AppRevisionRegister rev) {
if (rev.getInterfaces() == null) {
return;
}
for (AppRevisionInterface inf : rev.getInterfaces().values()) {
String dataInfoId = DataInfo.toDataInfoId(inf.getDataId(), inf.getInstanceId(), inf.getGroup());
Map<String, Set<String>> apps = interfaceRevisions.computeIfAbsent(dataInfoId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alipay.sofa.registry.common.model.AppRegisterServerDataBox;
import com.alipay.sofa.registry.common.model.ServerDataBox;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.store.AppPublisher;
import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion;
import com.alipay.sofa.registry.common.model.store.DataInfo;
Expand All @@ -33,17 +34,11 @@
import java.util.List;

/**
*
* @author shangyu.wh
* @version $Id: PublisherConvert.java, v 0.1 2017-11-30 17:54 shangyu.wh Exp $
*/
public class PublisherConverter {


public static final String PUB_TYPE = "!PublisherType";

public static final String APP_PUBLISHER = "APP_PUBLISHER";

private static Converter<PublisherRegister, AppPublisher> appPublisherConverter = source -> {
AppPublisher appPublisher = new AppPublisher();
fillCommonRegion(appPublisher, source);
Expand Down Expand Up @@ -95,7 +90,7 @@ public static void fillCommonRegion(Publisher publisher, PublisherRegister sourc
*/
public static Publisher convert(PublisherRegister publisherRegister) {

if (StringUtils.equalsIgnoreCase(APP_PUBLISHER, publisherRegister.getAttributes().get(PUB_TYPE))) {
if (StringUtils.equalsIgnoreCase(ValueConstants.SOFA_APP, publisherRegister.getGroup())) {
return appPublisherConverter.convert(publisherRegister);
}

Expand All @@ -107,7 +102,7 @@ public static List<ServerDataBox> convert(List<DataBox> boxList) {
if (null != boxList) {
for (DataBox dataBox : boxList) {
ServerDataBox serverDataBox = new ServerDataBox(ServerDataBox.getBytes(dataBox
.getData()));
.getData()));
serverDataBoxes.add(serverDataBox);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ public void fireUserDataElementPushTask(URL clientUrl, Datum datum,
} else {
return;
}
if (datum == null) {
datum = emptyDatum(subscribers.stream().findAny().get());
}
taskEvent.setTaskClosure(pushTaskClosure);
taskEvent.setSendTimeStamp(DatumVersionUtil.getRealTimestamp(datum.getVersion()));
taskEvent.setSendTimeStamp(DatumVersionUtil.nextId());
taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, clientUrl);
Expand Down Expand Up @@ -144,4 +147,16 @@ public void fireReceivedDataMultiPushTask(Subscriber subscriber) {
subscriber.getSourceAddress(), receivedData.getScope());
taskListenerManager.sendTaskEvent(taskEvent);
}

private Datum emptyDatum(Subscriber subscriber) {
Datum datum = new Datum();
datum.setDataInfoId(subscriber.getDataId());
datum.setDataId(subscriber.getDataId());
datum.setInstanceId(subscriber.getInstanceId());
datum.setGroup(subscriber.getGroup());
datum.setVersion(DatumVersionUtil.nextId());
datum.setPubMap(new HashMap<>());
datum.setDataCenter(sessionServerConfig.getSessionServerDataCenter());
return datum;
}
}
Loading

0 comments on commit ff1a607

Please sign in to comment.