Skip to content

Commit

Permalink
What type of PR is this?
Browse files Browse the repository at this point in the history
 Refactor the serverside nodeexchanger
  • Loading branch information
yuzhi.lyz committed Dec 18, 2020
1 parent ff1a607 commit b2cfd53
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger;
import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
Expand All @@ -29,7 +29,7 @@
* @author xuanbei
* @since 2019/2/15
*/
public class DataNodeExchanger extends ClientExchanger {
public class DataNodeExchanger extends ClientSideExchanger {

@Autowired
private DataServerConfig dataServerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger;
import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
Expand All @@ -30,7 +30,7 @@
* @author yuzhi.lyz
* @version v 0.1 2020-11-30 21:28 yuzhi.lyz Exp $
*/
public final class SessionNodeExchanger extends ClientExchanger {
public final class SessionNodeExchanger extends ClientSideExchanger {
@Autowired
private DataServerConfig dataServerConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.alipay.sofa.registry.server.data.lease.SessionLeaseManager;
import com.alipay.sofa.registry.server.data.remoting.DataNodeExchanger;
import com.alipay.sofa.registry.server.data.remoting.SessionNodeExchanger;
import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger;
import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger;
import org.glassfish.jersey.internal.guava.Sets;

import java.util.*;
Expand Down Expand Up @@ -120,7 +120,7 @@ private DataSlotDiffSyncResult processSyncResp(int slotId,
return result;
}

public boolean syncDataInfoIds(int slotId, String targetAddress, ClientExchanger exchanger,
public boolean syncDataInfoIds(int slotId, String targetAddress, ClientSideExchanger exchanger,
long slotTableEpoch, String summaryTargetIp) {
for (;;) {
Map<String, DatumSummary> summaryMap = datumStorage.getDatumSummary(slotId,
Expand All @@ -142,7 +142,7 @@ public boolean syncDataInfoIds(int slotId, String targetAddress, ClientExchanger
}
}

public boolean syncPublishers(int slotId, String targetAddress, ClientExchanger exchanger, long slotTableEpoch,
public boolean syncPublishers(int slotId, String targetAddress, ClientSideExchanger exchanger, long slotTableEpoch,
String summaryTargetIp, int maxPublishers) {
Map<String, DatumSummary> summaryMap = datumStorage.getDatumSummary(slotId, summaryTargetIp);
Map<String, DatumSummary> round = pickSummarys(summaryMap, maxPublishers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ public interface MetaServerConfig {

int getMetaNodeExchangeTimeout();

int getDataCenterChangeNotifyTaskRetryTimes();

int getDataNodeChangePushTaskRetryTimes();

int getGetDataCenterChangeListTaskRetryTimes();

int getSessionNodeChangePushTaskRetryTimes();

String getRaftGroup();

String getRaftDataPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ public class MetaServerConfigBean implements MetaServerConfig {

private int metaNodeExchangeTimeout = 3000;

private int dataCenterChangeNotifyTaskRetryTimes = 3;

private int dataNodeChangePushTaskRetryTimes = 1;

private int getDataCenterChangeListTaskRetryTimes = 3;

private int sessionNodeChangePushTaskRetryTimes = 3;

private int raftElectionTimeout = 1000;

private int initialSlotTableLockTimeMilli = 1000;
Expand Down Expand Up @@ -341,82 +333,6 @@ public void setMetaNodeExchangeTimeout(int metaNodeExchangeTimeout) {
this.metaNodeExchangeTimeout = metaNodeExchangeTimeout;
}

/**
* Getter method for property <tt>dataCenterChangeNotifyTaskRetryTimes</tt>.
*
* @return property value of dataCenterChangeNotifyTaskRetryTimes
*/
@Override
public int getDataCenterChangeNotifyTaskRetryTimes() {
return dataCenterChangeNotifyTaskRetryTimes;
}

/**
* Setter method for property <tt>dataCenterChangeNotifyTaskRetryTimes</tt>.
*
* @param dataCenterChangeNotifyTaskRetryTimes value to be assigned to property dataCenterChangeNotifyTaskRetryTimes
*/
public void setDataCenterChangeNotifyTaskRetryTimes(int dataCenterChangeNotifyTaskRetryTimes) {
this.dataCenterChangeNotifyTaskRetryTimes = dataCenterChangeNotifyTaskRetryTimes;
}

/**
* Getter method for property <tt>dataNodeChangePushTaskRetryTimes</tt>.
*
* @return property value of dataNodeChangePushTaskRetryTimes
*/
@Override
public int getDataNodeChangePushTaskRetryTimes() {
return dataNodeChangePushTaskRetryTimes;
}

/**
* Setter method for property <tt>dataNodeChangePushTaskRetryTimes</tt>.
*
* @param dataNodeChangePushTaskRetryTimes value to be assigned to property dataNodeChangePushTaskRetryTimes
*/
public void setDataNodeChangePushTaskRetryTimes(int dataNodeChangePushTaskRetryTimes) {
this.dataNodeChangePushTaskRetryTimes = dataNodeChangePushTaskRetryTimes;
}

/**
* Getter method for property <tt>getDataCenterChangeListTaskRetryTimes</tt>.
*
* @return property value of getDataCenterChangeListTaskRetryTimes
*/
@Override
public int getGetDataCenterChangeListTaskRetryTimes() {
return getDataCenterChangeListTaskRetryTimes;
}

/**
* Setter method for property <tt>getDataCenterChangeListTaskRetryTimes</tt>.
*
* @param getDataCenterChangeListTaskRetryTimes value to be assigned to property getDataCenterChangeListTaskRetryTimes
*/
public void setGetDataCenterChangeListTaskRetryTimes(int getDataCenterChangeListTaskRetryTimes) {
this.getDataCenterChangeListTaskRetryTimes = getDataCenterChangeListTaskRetryTimes;
}

/**
* Getter method for property <tt>sessionNodeChangePushTaskRetryTimes</tt>.
*
* @return property value of sessionNodeChangePushTaskRetryTimes
*/
@Override
public int getSessionNodeChangePushTaskRetryTimes() {
return sessionNodeChangePushTaskRetryTimes;
}

/**
* Setter method for property <tt>sessionNodeChangePushTaskRetryTimes</tt>.
*
* @param sessionNodeChangePushTaskRetryTimes value to be assigned to property sessionNodeChangePushTaskRetryTimes
*/
public void setSessionNodeChangePushTaskRetryTimes(int sessionNodeChangePushTaskRetryTimes) {
this.sessionNodeChangePushTaskRetryTimes = sessionNodeChangePushTaskRetryTimes;
}

/**
* Getter method for property <tt>schedulerCheckNodeListChangePushTimeout</tt>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,9 @@
*/
package com.alipay.sofa.registry.server.meta.remoting;

import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.RequestException;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger;
import com.google.common.annotations.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;

Expand All @@ -36,57 +27,19 @@
* @author shangyu.wh
* @version $Id: DataNodeExchanger.java, v 0.1 2018-01-23 19:18 shangyu.wh Exp $
*/
public class DataNodeExchanger implements NodeExchanger {

private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeExchanger.class);

@Autowired
private MetaServerConfig metaServerConfig;
public class DataNodeExchanger extends ServerSideExchanger {

@Autowired
private Exchange boltExchange;
private MetaServerConfig metaServerConfig;

@Override
public Response request(Request request) throws RequestException {
Response response = null;
try {
Server dataServer = boltExchange.getServer(metaServerConfig.getDataServerPort());

if (dataServer != null) {
URL url = request.getRequestUrl();
if (url != null) {

Channel channel = dataServer.getChannel(url);

if (channel != null && channel.isConnected()) {
if(request.getCallBackHandler() != null) {
dataServer.sendCallback(channel, request.getRequestBody(), request.getCallBackHandler(),
request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout());
} else {
final Object result = dataServer.sendSync(channel, request.getRequestBody(),
request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout());
response = () -> result;
}
} else {
LOGGER.error("DataNode Exchanger get channel error! channel with url:" + url
+ " can not be null or disconnected!");
throw new RequestException(
"DataNode Exchanger get channel error! channel with url:" + url
+ " can not be null or disconnected!",
request);
}
}
}
} catch (Exception e) {
LOGGER.error("DataNode Exchanger request data error!", e);
throw new RequestException("DataNode Exchanger request data error!", request, e);
}
return response;
public int getRpcTimeout() {
return metaServerConfig.getDataNodeExchangeTimeout();
}

@Override
public Client connectServer() {
return null;
public int getServerPort() {
return metaServerConfig.getDataServerPort();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,27 @@
*/
package com.alipay.sofa.registry.server.meta.remoting;

import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.RequestException;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger;
import org.springframework.beans.factory.annotation.Autowired;

/**
*
* @author shangyu.wh
* @version $Id: MetaNodeExchanger.java, v 0.1 2018-02-12 14:22 shangyu.wh Exp $
*/
public class MetaServerExchanger implements NodeExchanger {

private static final Logger LOGGER = LoggerFactory.getLogger(MetaServerExchanger.class);

@Autowired
private MetaServerConfig metaServerConfig;
public class MetaServerExchanger extends ServerSideExchanger {

@Autowired
private Exchange boltExchange;
private MetaServerConfig metaServerConfig;

@Override
public Response request(Request request) throws RequestException {
Response response = null;
try {
Server metaServer = boltExchange.getServer(metaServerConfig.getMetaServerPort());

if (metaServer != null) {
URL url = request.getRequestUrl();
if (url != null) {

Channel channel = metaServer.getChannel(url);

if (channel != null && channel.isConnected()) {
final Object result = metaServer.sendSync(channel, request.getRequestBody(),
request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout());
response = () -> result;
} else {
LOGGER.error("MetaServer Exchanger get channel error! channel with url:"
+ url + " can not be null or disconnected!");
throw new RequestException(
"MetaServer Exchanger get channel error! channel with url:" + url
+ " can not be null or disconnected!",
request);
}
}
}
} catch (Exception e) {
LOGGER.error("MetaServer Exchanger request data error!", e);
throw new RequestException("MetaServer Exchanger request data error!", request, e);
}
return response;
public int getRpcTimeout() {
return metaServerConfig.getMetaNodeExchangeTimeout();
}

@Override
public Client connectServer() {
return null;
public int getServerPort() {
return metaServerConfig.getMetaServerPort();
}
}
Loading

0 comments on commit b2cfd53

Please sign in to comment.