Skip to content

Commit

Permalink
cleanup app subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
dzdx committed Feb 19, 2021
1 parent d8806fd commit 9eac00a
Show file tree
Hide file tree
Showing 37 changed files with 159 additions and 1,204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,4 @@ public interface Publisher extends Register {
* @param data the data
*/
void republish(String... data);

void setPreRequest(Object request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
*/
public class PublisherRegistration extends BaseRegistration {

private Object preRequest;

/**
* Instantiates a new Publisher registration.
*
Expand All @@ -45,12 +43,4 @@ public String toString() {
return "PublisherRegistration{" + "dataId='" + dataId + '\'' + ", group='" + group + '\''
+ ", appName='" + appName + '\'' + '}';
}

public Object getPreRequest() {
return preRequest;
}

public void setPreRequest(Object preRequest) {
this.preRequest = preRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,45 @@
*/
public abstract class AbstractInternalRegister implements Register {

/** */
/**
*
*/
private final AtomicLong initialVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION);
/** */
/**
*
*/
private AuthManager authManager;
/** */
/**
*
*/
private volatile boolean registered = false;
/** */
/**
*
*/
private volatile boolean enabled = true;
/** */
/**
*
*/
private volatile boolean refused = false;
/** */
/**
*
*/
private AtomicLong pubVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION);
/** */
/**
*
*/
private AtomicLong ackVersion = new AtomicLong(VersionConstants.UNINITIALIZED_VERSION);
/** */
/**
*
*/
private volatile long timestamp = System.currentTimeMillis();
/** */
/**
*
*/
private volatile int registerCount = 0;
/** */
/**
*
*/
private volatile String requestId = UUID.randomUUID().toString();

private ReadWriteLock lock = new ReentrantReadWriteLock();
Expand All @@ -77,8 +97,6 @@ public abstract class AbstractInternalRegister implements Register {
*/
public abstract Object assembly();

public abstract Object getPreRequest();

/**
* Is registered boolean.
*
Expand Down Expand Up @@ -111,8 +129,8 @@ void waitToSync() {
* Sync ok.
*
* @param requestId the request id
* @param version the version
* @param refused the refused
* @param version the version
* @param refused the refused
* @return the boolean
*/
public boolean syncOK(String requestId, long version, boolean refused) {
Expand Down Expand Up @@ -186,7 +204,6 @@ public SyncTask assemblySyncTask() {
SyncTask syncTask = new SyncTask();
syncTask.setRequestId(requestId);
syncTask.setRequest(assembly());
syncTask.setPreRequest(getPreRequest());
syncTask.setDone(isDone());
return syncTask;
} finally {
Expand Down Expand Up @@ -305,7 +322,7 @@ void setTimestamp(long timestamp) {
/**
* Sets auth signature.
*
* @param register the register
* @param register the register
*/
void setAuthSignature(BaseRegister register) {
// auth signature
Expand Down Expand Up @@ -351,8 +368,6 @@ public static class SyncTask {

private Object request;

private Object preRequest;

private boolean done;

/**
Expand Down Expand Up @@ -409,12 +424,5 @@ public void setDone(boolean done) {
this.done = done;
}

public Object getPreRequest() {
return preRequest;
}

public void setPreRequest(Object preRequest) {
this.preRequest = preRequest;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

/**
* The type Default configurator.
*
* @author zhuoyu.sjw
* @version $Id : DefaultConfigurator.java, v 0.1 2018-04-18 14:41 zhuoyu.sjw Exp $$
*/
Expand All @@ -56,7 +57,7 @@ public class DefaultConfigurator extends AbstractInternalRegister implements Con
/**
* Instantiates a new Default configurator.
*
* @param config the config
* @param config the config
* @param worker the worker
*/
public DefaultConfigurator(ConfiguratorRegistration registration, RegistryClientConfig config,
Expand Down Expand Up @@ -150,11 +151,6 @@ public Object assembly() {
return register;
}

@Override
public Object getPreRequest() {
return null;
}

/**
* Put configurator data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class DefaultPublisher extends AbstractInternalRegister implements Publis
private Worker worker;
private Collection<String> dataList;
private RegistryClientConfig config;
private Object preRequest;

/**
* Instantiates a new Default publisher.
Expand Down Expand Up @@ -90,11 +89,6 @@ public void republish(String... data) {
this.worker.schedule(new TaskEvent(this));
}

@Override
public void setPreRequest(Object preReq) {
this.preRequest = preReq;
}

/**
* Unregister.
*/
Expand Down Expand Up @@ -155,11 +149,6 @@ public PublisherRegister assembly() {
return register;
}

@Override
public Object getPreRequest() {
return preRequest;
}

/**
* @see Publisher#getDataId()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ public Publisher register(PublisherRegistration registration, String... data) {

publisher = new DefaultPublisher(registration, workerThread, registryClientConfig);
((DefaultPublisher) publisher).setAuthManager(authManager);
publisher.setPreRequest(registration.getPreRequest());

Publisher oldPublisher = registrationPublisherMap.putIfAbsent(registration, publisher);
if (null != oldPublisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DefaultSubscriber extends AbstractInternalRegister implements Subsc
/**
* Instantiates a new Default subscriber multi.
*
* @param registration the registration
* @param registration the registration
*/
DefaultSubscriber(SubscriberRegistration registration, Worker worker,
RegistryClientConfig config) {
Expand Down Expand Up @@ -217,11 +217,6 @@ public SubscriberRegister assembly() {
return register;
}

@Override
public Object getPreRequest() {
return null;
}

public void putReceivedData(SegmentData segmentData, String localZone) {
writeLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,6 @@ private void handleTask(TaskEvent event) {
return;
}

Object preRequest = syncTask.getPreRequest();
if (preRequest != null) {
Object preResult = client.invokeSync(preRequest);
if (!(preResult instanceof RegisterResponse)) {
LOGGER.warn("[register] result type is wrong, {}", preResult);
return;
}
RegisterResponse preResponse = (RegisterResponse) preResult;
if (!preResponse.isSuccess()) {
LOGGER.info("[register] register to server failed, {}, {}", preRequest,
preResponse);
return;
}
}
Object request = syncTask.getRequest();

Object result = client.invokeSync(request);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alipay.sofa.registry.core.model;

/**
*
* @author zhuoyu.sjw
* @version $Id: SubscriberRegister.java, v 0.1 2017-11-28 15:40 zhuoyu.sjw Exp $$
*/
Expand All @@ -26,13 +25,6 @@ public class SubscriberRegister extends BaseRegister {

private String scope;

/**
* interface: only sub interface
* app: only sub app
* app_and_interface: sub app and interface
*/
private String assembleType;

/**
* Getter method for property <tt>scope</tt>.
*
Expand All @@ -51,24 +43,6 @@ public void setScope(String scope) {
this.scope = scope;
}

/**
* Getter method for property <tt>assembleType</tt>.
*
* @return property value of assembleType
*/
public String getAssembleType() {
return assembleType;
}

/**
* Setter method for property <tt>assembleType</tt>.
*
* @param assembleType value to be assigned to property assembleType
*/
public void setAssembleType(String assembleType) {
this.assembleType = assembleType;
}

/**
* To string string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alipay.sofa.registry.common.model;

import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.core.model.AssembleType;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
Expand Down Expand Up @@ -47,28 +46,26 @@ public static Map<InetSocketAddress, Map<String, Subscriber>> groupBySourceAddre
return ret;
}

public static Map<AssembleType, Map<ScopeEnum, List<Subscriber>>> groupByAssembleAndScope(Collection<Subscriber> subscribers) {
public static Map<ScopeEnum, List<Subscriber>> groupByScope(Collection<Subscriber> subscribers) {
if (subscribers.isEmpty()) {
return Collections.emptyMap();
}
Map<AssembleType, Map<ScopeEnum, List<Subscriber>>> ret = Maps.newHashMap();
Map<ScopeEnum, List<Subscriber>> ret = Maps.newHashMap();
for (Subscriber subscriber : subscribers) {
final AssembleType assembleType = subscriber.getAssembleType();
final ScopeEnum scopeEnum = subscriber.getScope();
if (assembleType == null || scopeEnum == null) {
LOGGER.warn("Nil AssembleType or ScopeEnum, {}", subscriber);
if (scopeEnum == null) {
LOGGER.warn("Nil ScopeEnum, {}", subscriber);
continue;
}
Map<ScopeEnum, List<Subscriber>> assembleTypeMapMap = ret.computeIfAbsent(assembleType, k -> Maps.newHashMap());
List<Subscriber> subList = assembleTypeMapMap.computeIfAbsent(scopeEnum, k -> Lists.newArrayList());
List<Subscriber> subList = ret.computeIfAbsent(scopeEnum, k -> Lists.newArrayList());
subList.add(subscriber);
}
return ret;
}

public static Set<String> getPushedDataInfoIds(Collection<Subscriber> subscribers) {
final Set<String> ret = new HashSet<>(256);
subscribers.forEach(s -> ret.addAll(s.getPushedDataInfoIds()));
subscribers.forEach(s -> ret.add(s.getDataInfoId()));
return ret;
}

Expand Down
Loading

0 comments on commit 9eac00a

Please sign in to comment.