diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java index 4c281260e..1237f4627 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/DataNodeExchanger.java @@ -19,7 +19,7 @@ import com.alipay.sofa.registry.remoting.ChannelHandler; import com.alipay.sofa.registry.remoting.exchange.Exchange; import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; -import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger; +import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger; import org.springframework.beans.factory.annotation.Autowired; import java.util.Collection; @@ -29,7 +29,7 @@ * @author xuanbei * @since 2019/2/15 */ -public class DataNodeExchanger extends ClientExchanger { +public class DataNodeExchanger extends ClientSideExchanger { @Autowired private DataServerConfig dataServerConfig; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/SessionNodeExchanger.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/SessionNodeExchanger.java index ba74f7086..8a7ae873e 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/SessionNodeExchanger.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/SessionNodeExchanger.java @@ -19,7 +19,7 @@ import com.alipay.sofa.registry.remoting.ChannelHandler; import com.alipay.sofa.registry.remoting.exchange.Exchange; import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; -import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger; +import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger; import org.springframework.beans.factory.annotation.Autowired; import java.util.Collection; @@ -30,7 +30,7 @@ * @author yuzhi.lyz * @version v 0.1 2020-11-30 21:28 yuzhi.lyz Exp $ */ -public final class SessionNodeExchanger extends ClientExchanger { +public final class SessionNodeExchanger extends ClientSideExchanger { @Autowired private DataServerConfig dataServerConfig; diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/slot/SlotDiffSyncer.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/slot/SlotDiffSyncer.java index 734eb6a0d..70eb399cb 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/slot/SlotDiffSyncer.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/slot/SlotDiffSyncer.java @@ -31,7 +31,7 @@ import com.alipay.sofa.registry.server.data.lease.SessionLeaseManager; import com.alipay.sofa.registry.server.data.remoting.DataNodeExchanger; import com.alipay.sofa.registry.server.data.remoting.SessionNodeExchanger; -import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger; +import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger; import org.glassfish.jersey.internal.guava.Sets; import java.util.*; @@ -120,7 +120,7 @@ private DataSlotDiffSyncResult processSyncResp(int slotId, return result; } - public boolean syncDataInfoIds(int slotId, String targetAddress, ClientExchanger exchanger, + public boolean syncDataInfoIds(int slotId, String targetAddress, ClientSideExchanger exchanger, long slotTableEpoch, String summaryTargetIp) { for (;;) { Map summaryMap = datumStorage.getDatumSummary(slotId, @@ -142,7 +142,7 @@ public boolean syncDataInfoIds(int slotId, String targetAddress, ClientExchanger } } - public boolean syncPublishers(int slotId, String targetAddress, ClientExchanger exchanger, long slotTableEpoch, + public boolean syncPublishers(int slotId, String targetAddress, ClientSideExchanger exchanger, long slotTableEpoch, String summaryTargetIp, int maxPublishers) { Map summaryMap = datumStorage.getDatumSummary(slotId, summaryTargetIp); Map round = pickSummarys(summaryMap, maxPublishers); diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfig.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfig.java index 0353e009f..aeebe99be 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfig.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfig.java @@ -51,14 +51,6 @@ public interface MetaServerConfig { int getMetaNodeExchangeTimeout(); - int getDataCenterChangeNotifyTaskRetryTimes(); - - int getDataNodeChangePushTaskRetryTimes(); - - int getGetDataCenterChangeListTaskRetryTimes(); - - int getSessionNodeChangePushTaskRetryTimes(); - String getRaftGroup(); String getRaftDataPath(); diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfigBean.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfigBean.java index a39c12f11..2da4e072b 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfigBean.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/config/MetaServerConfigBean.java @@ -70,14 +70,6 @@ public class MetaServerConfigBean implements MetaServerConfig { private int metaNodeExchangeTimeout = 3000; - private int dataCenterChangeNotifyTaskRetryTimes = 3; - - private int dataNodeChangePushTaskRetryTimes = 1; - - private int getDataCenterChangeListTaskRetryTimes = 3; - - private int sessionNodeChangePushTaskRetryTimes = 3; - private int raftElectionTimeout = 1000; private int initialSlotTableLockTimeMilli = 1000; @@ -341,82 +333,6 @@ public void setMetaNodeExchangeTimeout(int metaNodeExchangeTimeout) { this.metaNodeExchangeTimeout = metaNodeExchangeTimeout; } - /** - * Getter method for property dataCenterChangeNotifyTaskRetryTimes. - * - * @return property value of dataCenterChangeNotifyTaskRetryTimes - */ - @Override - public int getDataCenterChangeNotifyTaskRetryTimes() { - return dataCenterChangeNotifyTaskRetryTimes; - } - - /** - * Setter method for property dataCenterChangeNotifyTaskRetryTimes. - * - * @param dataCenterChangeNotifyTaskRetryTimes value to be assigned to property dataCenterChangeNotifyTaskRetryTimes - */ - public void setDataCenterChangeNotifyTaskRetryTimes(int dataCenterChangeNotifyTaskRetryTimes) { - this.dataCenterChangeNotifyTaskRetryTimes = dataCenterChangeNotifyTaskRetryTimes; - } - - /** - * Getter method for property dataNodeChangePushTaskRetryTimes. - * - * @return property value of dataNodeChangePushTaskRetryTimes - */ - @Override - public int getDataNodeChangePushTaskRetryTimes() { - return dataNodeChangePushTaskRetryTimes; - } - - /** - * Setter method for property dataNodeChangePushTaskRetryTimes. - * - * @param dataNodeChangePushTaskRetryTimes value to be assigned to property dataNodeChangePushTaskRetryTimes - */ - public void setDataNodeChangePushTaskRetryTimes(int dataNodeChangePushTaskRetryTimes) { - this.dataNodeChangePushTaskRetryTimes = dataNodeChangePushTaskRetryTimes; - } - - /** - * Getter method for property getDataCenterChangeListTaskRetryTimes. - * - * @return property value of getDataCenterChangeListTaskRetryTimes - */ - @Override - public int getGetDataCenterChangeListTaskRetryTimes() { - return getDataCenterChangeListTaskRetryTimes; - } - - /** - * Setter method for property getDataCenterChangeListTaskRetryTimes. - * - * @param getDataCenterChangeListTaskRetryTimes value to be assigned to property getDataCenterChangeListTaskRetryTimes - */ - public void setGetDataCenterChangeListTaskRetryTimes(int getDataCenterChangeListTaskRetryTimes) { - this.getDataCenterChangeListTaskRetryTimes = getDataCenterChangeListTaskRetryTimes; - } - - /** - * Getter method for property sessionNodeChangePushTaskRetryTimes. - * - * @return property value of sessionNodeChangePushTaskRetryTimes - */ - @Override - public int getSessionNodeChangePushTaskRetryTimes() { - return sessionNodeChangePushTaskRetryTimes; - } - - /** - * Setter method for property sessionNodeChangePushTaskRetryTimes. - * - * @param sessionNodeChangePushTaskRetryTimes value to be assigned to property sessionNodeChangePushTaskRetryTimes - */ - public void setSessionNodeChangePushTaskRetryTimes(int sessionNodeChangePushTaskRetryTimes) { - this.sessionNodeChangePushTaskRetryTimes = sessionNodeChangePushTaskRetryTimes; - } - /** * Getter method for property schedulerCheckNodeListChangePushTimeout. * diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/DataNodeExchanger.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/DataNodeExchanger.java index bf9bdd043..ed3bc6873 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/DataNodeExchanger.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/DataNodeExchanger.java @@ -16,18 +16,9 @@ */ package com.alipay.sofa.registry.server.meta.remoting; -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.remoting.Client; -import com.alipay.sofa.registry.remoting.Server; import com.alipay.sofa.registry.remoting.exchange.Exchange; -import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; -import com.alipay.sofa.registry.remoting.exchange.RequestException; -import com.alipay.sofa.registry.remoting.exchange.message.Request; -import com.alipay.sofa.registry.remoting.exchange.message.Response; import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; +import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger; import com.google.common.annotations.VisibleForTesting; import org.springframework.beans.factory.annotation.Autowired; @@ -36,57 +27,19 @@ * @author shangyu.wh * @version $Id: DataNodeExchanger.java, v 0.1 2018-01-23 19:18 shangyu.wh Exp $ */ -public class DataNodeExchanger implements NodeExchanger { - - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeExchanger.class); - - @Autowired - private MetaServerConfig metaServerConfig; +public class DataNodeExchanger extends ServerSideExchanger { @Autowired - private Exchange boltExchange; + private MetaServerConfig metaServerConfig; @Override - public Response request(Request request) throws RequestException { - Response response = null; - try { - Server dataServer = boltExchange.getServer(metaServerConfig.getDataServerPort()); - - if (dataServer != null) { - URL url = request.getRequestUrl(); - if (url != null) { - - Channel channel = dataServer.getChannel(url); - - if (channel != null && channel.isConnected()) { - if(request.getCallBackHandler() != null) { - dataServer.sendCallback(channel, request.getRequestBody(), request.getCallBackHandler(), - request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout()); - } else { - final Object result = dataServer.sendSync(channel, request.getRequestBody(), - request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout()); - response = () -> result; - } - } else { - LOGGER.error("DataNode Exchanger get channel error! channel with url:" + url - + " can not be null or disconnected!"); - throw new RequestException( - "DataNode Exchanger get channel error! channel with url:" + url - + " can not be null or disconnected!", - request); - } - } - } - } catch (Exception e) { - LOGGER.error("DataNode Exchanger request data error!", e); - throw new RequestException("DataNode Exchanger request data error!", request, e); - } - return response; + public int getRpcTimeout() { + return metaServerConfig.getDataNodeExchangeTimeout(); } @Override - public Client connectServer() { - return null; + public int getServerPort() { + return metaServerConfig.getDataServerPort(); } @VisibleForTesting diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/MetaServerExchanger.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/MetaServerExchanger.java index 4f66d1ac0..ef74a1ece 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/MetaServerExchanger.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/MetaServerExchanger.java @@ -16,18 +16,8 @@ */ package com.alipay.sofa.registry.server.meta.remoting; -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.remoting.Client; -import com.alipay.sofa.registry.remoting.Server; -import com.alipay.sofa.registry.remoting.exchange.Exchange; -import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; -import com.alipay.sofa.registry.remoting.exchange.RequestException; -import com.alipay.sofa.registry.remoting.exchange.message.Request; -import com.alipay.sofa.registry.remoting.exchange.message.Response; import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; +import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger; import org.springframework.beans.factory.annotation.Autowired; /** @@ -35,51 +25,18 @@ * @author shangyu.wh * @version $Id: MetaNodeExchanger.java, v 0.1 2018-02-12 14:22 shangyu.wh Exp $ */ -public class MetaServerExchanger implements NodeExchanger { - - private static final Logger LOGGER = LoggerFactory.getLogger(MetaServerExchanger.class); - - @Autowired - private MetaServerConfig metaServerConfig; +public class MetaServerExchanger extends ServerSideExchanger { @Autowired - private Exchange boltExchange; + private MetaServerConfig metaServerConfig; @Override - public Response request(Request request) throws RequestException { - Response response = null; - try { - Server metaServer = boltExchange.getServer(metaServerConfig.getMetaServerPort()); - - if (metaServer != null) { - URL url = request.getRequestUrl(); - if (url != null) { - - Channel channel = metaServer.getChannel(url); - - if (channel != null && channel.isConnected()) { - final Object result = metaServer.sendSync(channel, request.getRequestBody(), - request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getDataNodeExchangeTimeout()); - response = () -> result; - } else { - LOGGER.error("MetaServer Exchanger get channel error! channel with url:" - + url + " can not be null or disconnected!"); - throw new RequestException( - "MetaServer Exchanger get channel error! channel with url:" + url - + " can not be null or disconnected!", - request); - } - } - } - } catch (Exception e) { - LOGGER.error("MetaServer Exchanger request data error!", e); - throw new RequestException("MetaServer Exchanger request data error!", request, e); - } - return response; + public int getRpcTimeout() { + return metaServerConfig.getMetaNodeExchangeTimeout(); } @Override - public Client connectServer() { - return null; + public int getServerPort() { + return metaServerConfig.getMetaServerPort(); } } \ No newline at end of file diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java index c02bc2945..6d79c0b5f 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/SessionNodeExchanger.java @@ -16,17 +16,8 @@ */ package com.alipay.sofa.registry.server.meta.remoting; -import com.alipay.sofa.registry.log.Logger; -import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.remoting.Client; -import com.alipay.sofa.registry.remoting.Server; -import com.alipay.sofa.registry.remoting.exchange.Exchange; -import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; -import com.alipay.sofa.registry.remoting.exchange.RequestException; -import com.alipay.sofa.registry.remoting.exchange.message.Request; -import com.alipay.sofa.registry.remoting.exchange.message.Response; import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; +import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger; import org.springframework.beans.factory.annotation.Autowired; /** @@ -34,55 +25,18 @@ * @author shangyu.wh * @version $Id: SessionNodeExchanger.java, v 0.1 2018-01-15 21:21 shangyu.wh Exp $ */ -public class SessionNodeExchanger implements NodeExchanger { - - private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeExchanger.class); - - @Autowired - private Exchange boltExchange; +public class SessionNodeExchanger extends ServerSideExchanger { @Autowired - private MetaServerConfig metaServerConfig; + private MetaServerConfig metaServerConfig; @Override - public Response request(Request request) throws RequestException { - Response response = null; - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("SessionNodeExchanger request body:{} url:{}", request.getRequestBody(), - request.getRequestUrl()); - } - - Server sessionServer = boltExchange.getServer(metaServerConfig.getSessionServerPort()); - - if (sessionServer != null) { - - Channel channel = sessionServer.getChannel(request.getRequestUrl()); - if (channel != null && channel.isConnected()) { - if(request.getCallBackHandler() != null) { - sessionServer.sendCallback(channel, request.getRequestBody(), request.getCallBackHandler(), - request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getSessionNodeExchangeTimeout()); - } else { - final Object result = sessionServer.sendSync(channel, request.getRequestBody(), - request.getTimeout() != null ? request.getTimeout() : metaServerConfig.getSessionNodeExchangeTimeout()); - response = () -> result; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("SessionNodeExchanger response result:{} ", response.getResult()); - } - } - } else { - String errorMsg = "SessionNode Exchanger get channel error! channel with url:" - + channel == null ? "" : channel.getRemoteAddress() + " can not be null or disconnected!"; - LOGGER.error(errorMsg); - throw new RequestException(errorMsg, request); - } - - } - return response; + public int getRpcTimeout() { + return metaServerConfig.getSessionNodeExchangeTimeout(); } @Override - public Client connectServer() { - return null; + public int getServerPort() { + return metaServerConfig.getSessionServerPort(); } } \ No newline at end of file diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java index a515ae076..10263a39f 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/AppRevisionRegisterHandler.java @@ -20,10 +20,9 @@ import com.alipay.sofa.registry.core.model.AppRevisionRegister; import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.server.meta.revision.AppRevisionRegistry; -import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler; import org.springframework.beans.factory.annotation.Autowired; -public class AppRevisionRegisterHandler extends AbstractServerHandler { +public class AppRevisionRegisterHandler extends MetaServerHandler { @Autowired private AppRevisionRegistry appRevisionRegistry; @@ -42,9 +41,4 @@ public Object doHandle(Channel channel, AppRevisionRegister message) { public Class interest() { return AppRevisionRegister.class; } - - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } } diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/CheckRevisionsHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/CheckRevisionsHandler.java index 7d94e08cd..0f8c6e95d 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/CheckRevisionsHandler.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/CheckRevisionsHandler.java @@ -20,10 +20,9 @@ import com.alipay.sofa.registry.common.model.metaserver.CheckRevisionsRequest; import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.server.meta.revision.AppRevisionRegistry; -import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler; import org.springframework.beans.factory.annotation.Autowired; -public class CheckRevisionsHandler extends AbstractServerHandler { +public class CheckRevisionsHandler extends MetaServerHandler { @Autowired private AppRevisionRegistry appRevisionRegistry; @@ -38,11 +37,6 @@ public Object doHandle(Channel channel, CheckRevisionsRequest request) { return appRevisionRegistry.checkRevisions(request.keysDigest); } - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - @Override public Class interest() { return CheckRevisionsRequest.class; diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/FetchRevisionsHandler.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/FetchRevisionsHandler.java index 92247b69f..a24995a6e 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/FetchRevisionsHandler.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/remoting/handler/FetchRevisionsHandler.java @@ -37,11 +37,6 @@ public Object doHandle(Channel channel, FetchRevisionsRequest message) { return appRevisionRegistry.fetchRevisions(message.keys); } - @Override - public HandlerType getType() { - return HandlerType.PROCESSER; - } - @Override public Class interest() { return FetchRevisionsRequest.class; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/ClientNodeExchanger.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/ClientNodeExchanger.java index 686534cac..fd1e23747 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/ClientNodeExchanger.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/ClientNodeExchanger.java @@ -16,79 +16,43 @@ */ package com.alipay.sofa.registry.server.session.remoting; -import org.springframework.beans.factory.annotation.Autowired; - -import com.alipay.sofa.registry.common.model.store.URL; import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; -import com.alipay.sofa.registry.remoting.Channel; -import com.alipay.sofa.registry.remoting.Client; -import com.alipay.sofa.registry.remoting.Server; -import com.alipay.sofa.registry.remoting.exchange.Exchange; -import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; import com.alipay.sofa.registry.remoting.exchange.RequestException; import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; -import com.alipay.sofa.registry.remoting.exchange.message.Response.ResultStatus; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; +import com.alipay.sofa.registry.server.shared.remoting.ServerSideExchanger; +import org.springframework.beans.factory.annotation.Autowired; /** * * @author shangyu.wh * @version $Id: ClientNodeExchanger.java, v 0.1 2017-12-12 12:13 shangyu.wh Exp $ */ -public class ClientNodeExchanger implements NodeExchanger { +public class ClientNodeExchanger extends ServerSideExchanger { private static final Logger LOGGER = LoggerFactory.getLogger("SESSION-PUSH", "[Exchange]"); - @Autowired - private Exchange boltExchange; - @Autowired private SessionServerConfig sessionServerConfig; @Override public Response request(Request request) throws RequestException { - Response response = null; try { - //sender who session node send message to client node - Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); - - if (sessionServer != null) { - URL url = request.getRequestUrl(); - if (url != null) { - Channel channel = sessionServer.getChannel(url); - - if (channel != null && channel.isConnected()) { - if (null != request.getCallBackHandler()) { - //TODO log ASYNC - sessionServer.sendCallback(channel, request.getRequestBody(), - request.getCallBackHandler(), - request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getClientNodeExchangeTimeOut()); - response = () -> ResultStatus.SUCCESSFUL; - } else { - final Object result = sessionServer.sendSync(channel, - request.getRequestBody(), - request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getClientNodeExchangeTimeOut()); - response = () -> result; - } - } else { - LOGGER.error( - "ClientNode Exchanger get channel {} error! Can't be null or disconnected!", - url); - throw new RequestException("ClientNode Exchanger get channel " + url - + "error! Can't be null or disconnected!", - request); - } - } - } - } catch (Exception e) { - throw new RequestException("ClientNode Exchanger request data error!", request, e); + return super.request(request); + } catch (Throwable e) { + LOGGER.error("ClientNode Exchanger request data error, {}", request, e); + throw new RequestException("ClientNode Exchanger request data error", request, e); } - return response; } @Override - public Client connectServer() { - return null; + public int getRpcTimeout() { + return sessionServerConfig.getClientNodeExchangeTimeOut(); + } + + @Override + public int getServerPort() { + return sessionServerConfig.getServerPort(); } } \ No newline at end of file diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/DataNodeExchanger.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/DataNodeExchanger.java index ff09632a6..1294c4203 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/DataNodeExchanger.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/DataNodeExchanger.java @@ -26,7 +26,7 @@ import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger; +import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.Resource; @@ -38,7 +38,7 @@ * @author shangyu.wh * @version $Id : DataNodeExchanger.java, v 0.1 2017-12-01 11:51 shangyu.wh Exp $ */ -public class DataNodeExchanger extends ClientExchanger { +public class DataNodeExchanger extends ClientSideExchanger { private static final Logger LOGGER = LoggerFactory .getLogger(DataNodeExchanger.class); diff --git a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/meta/AbstractMetaNodeExchanger.java b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/meta/AbstractMetaNodeExchanger.java index 89602c8c3..9b0b64e1c 100644 --- a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/meta/AbstractMetaNodeExchanger.java +++ b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/meta/AbstractMetaNodeExchanger.java @@ -25,7 +25,7 @@ import com.alipay.sofa.registry.remoting.exchange.RequestException; import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; -import com.alipay.sofa.registry.server.shared.remoting.ClientExchanger; +import com.alipay.sofa.registry.server.shared.remoting.ClientSideExchanger; import com.google.common.collect.Sets; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +37,7 @@ * @author yuzhi.lyz * @version v 0.1 2020-11-28 15:36 yuzhi.lyz Exp $ */ -public abstract class AbstractMetaNodeExchanger extends ClientExchanger { +public abstract class AbstractMetaNodeExchanger extends ClientSideExchanger { private static final Logger LOGGER = LoggerFactory .getLogger(AbstractMetaNodeExchanger.class); diff --git a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientExchanger.java b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientSideExchanger.java similarity index 89% rename from server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientExchanger.java rename to server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientSideExchanger.java index 521e52997..ed4415bd3 100644 --- a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientExchanger.java +++ b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ClientSideExchanger.java @@ -42,8 +42,8 @@ * @author yuzhi.lyz * @version v 0.1 2020-11-29 12:08 yuzhi.lyz Exp $ */ -public abstract class ClientExchanger implements NodeExchanger { - private static final Logger LOGGER = LoggerFactory.getLogger(ClientExchanger.class); +public abstract class ClientSideExchanger implements NodeExchanger { + private static final Logger LOGGER = LoggerFactory.getLogger(ClientSideExchanger.class); private final String serverType; @Autowired @@ -52,7 +52,7 @@ public abstract class ClientExchanger implements NodeExchanger { protected volatile Set serverIps = Sets.newHashSet(); private final Connector connector; - protected ClientExchanger(String serverType) { + protected ClientSideExchanger(String serverType) { this.serverType = serverType; this.connector = new Connector(); ConcurrentUtils.createDaemonThread(serverType + "-async-connector", connector).start(); @@ -60,20 +60,28 @@ protected ClientExchanger(String serverType) { @Override public Response request(Request request) throws RequestException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("serverPort={} to server, url:{}, request body:{} ", getServerPort(), request.getRequestUrl(), + request.getRequestBody()); + } + + final URL url = request.getRequestUrl(); + if (url == null) { + throw new RequestException("null url", request); + } Client client = boltExchange.getClient(serverType); final int timeout = request.getTimeout() != null ? request.getTimeout() : getRpcTimeout(); try { CallbackHandler callback = request.getCallBackHandler(); if (callback == null) { - final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), timeout); + final Object result = client.sendSync(url, request.getRequestBody(), timeout); return () -> result; } else { - client.sendCallback(request.getRequestUrl(), request.getRequestBody(), callback, timeout); + client.sendCallback(url, request.getRequestBody(), callback, timeout); return () -> Response.ResultStatus.SUCCESSFUL; } } catch (Throwable e) { - throw new RequestException(serverType + "Exchanger request error! Request url:" + request.getRequestUrl(), - request, e); + throw new RequestException(serverType + "Exchanger request error! Request url:" + url, request, e); } } diff --git a/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java new file mode 100644 index 000000000..cb52fc7ab --- /dev/null +++ b/server/server/shared/src/main/java/com/alipay/sofa/registry/server/shared/remoting/ServerSideExchanger.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.shared.remoting; + +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.remoting.Channel; +import com.alipay.sofa.registry.remoting.Client; +import com.alipay.sofa.registry.remoting.Server; +import com.alipay.sofa.registry.remoting.exchange.Exchange; +import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; +import com.alipay.sofa.registry.remoting.exchange.RequestException; +import com.alipay.sofa.registry.remoting.exchange.message.Request; +import com.alipay.sofa.registry.remoting.exchange.message.Response; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * + * @author yuzhi.lyz + * @version v 0.1 2020-12-18 11:40 yuzhi.lyz Exp $ + */ +public abstract class ServerSideExchanger implements NodeExchanger { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerSideExchanger.class); + + @Autowired + protected Exchange boltExchange; + + @Override + public Response request(Request request) throws RequestException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("serverPort={} to client, url:{}, request body:{} ", getServerPort(), request.getRequestUrl(), + request.getRequestBody()); + } + final URL url = request.getRequestUrl(); + if (url == null) { + throw new RequestException("null url", request); + } + final Server server = boltExchange.getServer(getServerPort()); + if (server == null) { + throw new RequestException("no server for " + getServerPort(), request); + } + final int timeout = request.getTimeout() != null ? request.getTimeout() : getRpcTimeout(); + + Channel channel = server.getChannel(url); + if (channel == null || !channel.isConnected()) { + throw new RequestException(getServerPort() + ", get channel error! channel with url:" + url + + " can not be null or disconnected!", request); + } + try { + if (request.getCallBackHandler() != null) { + server.sendCallback(channel, request.getRequestBody(), request.getCallBackHandler(), timeout); + return () -> Response.ResultStatus.SUCCESSFUL; + } else { + final Object result = server.sendSync(channel, request.getRequestBody(), timeout); + return () -> result; + } + } catch (Throwable e) { + throw new RequestException(getServerPort() + ", Exchanger request error! Request url:" + url, request, e); + } + } + + @Override + public Client connectServer() { + throw new UnsupportedOperationException(); + } + + public abstract int getRpcTimeout(); + + public abstract int getServerPort(); +}