Skip to content

Commit

Permalink
- Increase checkversion frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhi.lyz committed Dec 17, 2020
1 parent e42ff42 commit f7869f9
Show file tree
Hide file tree
Showing 34 changed files with 564 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public String toString() {
return "PublisherVersion{" + "version=" + version + ", registerTimestamp="
+ registerTimestamp + '}';
}

public PublisherVersion incrRegisterTimestamp() {
return PublisherVersion.of(this.version, this.registerTimestamp + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.registry.common.model;

import java.util.Objects;

/**
*
* @author yuzhi.lyz
Expand All @@ -29,4 +31,24 @@ public Tuple(T1 o1, T2 o2) {
this.o1 = o1;
this.o2 = o2;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Tuple))
return false;
Tuple<?, ?> tuple = (Tuple<?, ?>) o;
return Objects.equals(o1, tuple.o1) && Objects.equals(o2, tuple.o2);
}

@Override
public int hashCode() {
return Objects.hash(o1, o2);
}

@Override
public String toString() {
return "Tuple{" + "o1=" + o1 + ", o2=" + o2 + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.task;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.util.ConcurrentUtils;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
*
* thread unsafe, could not use concurrently
* @author yuzhi.lyz
* @version v 0.1 2020-12-15 13:38 yuzhi.lyz Exp $
*/
public class KeyedThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyedThreadPoolExecutor.class);

private final Worker[] workers;
private final String executorName;
private final int coreBufferSize;

public KeyedThreadPoolExecutor(String executorName, int coreSize, int coreBufferSize) {
this.executorName = executorName;
this.coreBufferSize = coreBufferSize;
workers = new Worker[coreSize];
for (int i = 0; i < coreSize; i++) {
Worker w = new Worker(i, new ArrayBlockingQueue<>(coreBufferSize));
workers[i] = w;
ConcurrentUtils.createDaemonThread(executorName + "_" + i, w).start();
}
}

private final class Worker implements Runnable {
final BlockingQueue<KeyedTask> queue;
final int idx;

Worker(int idx, BlockingQueue<KeyedTask> queue) {
this.idx = idx;
this.queue = queue;
}

@Override
public void run() {
for (;;) {
try {
final KeyedTask task = queue.poll(60, TimeUnit.SECONDS);
if (task == null) {
LOGGER.info("{}_{} idle", executorName, idx);
continue;
}
task.run();
} catch (Throwable e) {
LOGGER.error("{}_{} run task error", executorName, idx, e);
}
}
}
}

public <T extends Runnable> KeyedTask<T> execute(Object key, T runnable) {
KeyedTask task = new KeyedTask(key, runnable);
Worker w = workerOf(key);
if (!w.queue.offer(task)) {
throw new RejectedExecutionException(String.format("%s_%d full, max=%d, now=%d",
executorName, w.idx, coreBufferSize, w.queue.size()));
}
return task;
}

private Worker workerOf(Object key) {
int n = (key.hashCode() & 0x7fffffff) % workers.length;
return workers[n];
}

public static final class KeyedTask<T extends Runnable> implements Runnable {
final long createTime = System.currentTimeMillis();
final Object key;
final T runnable;

volatile long startTime;
volatile long endTime;
volatile boolean success;
volatile boolean canceled;

private KeyedTask(Object key, T runnable) {
this.key = key;
this.runnable = runnable;
}

public void cancel() {
this.canceled = true;
}

@Override
public void run() {
try {
if (!canceled) {
runnable.run();
}
this.success = true;
} catch (Throwable e) {
LOGGER.error("failed to run task {}, {}", key, runnable, e);
} finally {
this.endTime = System.currentTimeMillis();
}
}

public boolean isFinished() {
return this.endTime > 0;
}

public boolean isSuccess() {
return isFinished() && success;
}

public boolean isFailed() {
return isFinished() && !success;
}

public long getCreateTime() {
return createTime;
}

public long getStartTime() {
return startTime;
}

public long getEndTime() {
return endTime;
}

public Object key() {
return key;
}

public T getRunnable() {
return runnable;
}

public boolean isOverAfter(int intervalMs) {
if (!isFinished()) {
return false;
}
return canceled || System.currentTimeMillis() - endTime >= intervalMs;
}

@Override
public String toString() {
return "KeyedTask{" + "createTime=" + createTime + ", key=" + key + ", runnable="
+ runnable + ", startTime=" + startTime + ", endTime=" + endTime + ", success="
+ success + ", canceled=" + canceled + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.server.session.scheduler;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
package com.alipay.sofa.registry.task;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.metrics.TaskMetrics;

import java.util.concurrent.*;

/**
*
* @author shangyu.wh
* @version $Id: ThreadPoolExecutorSession.java, v 0.1 2018-10-11 19:07 shangyu.wh Exp $
* @author yuzhi.lyz
* @version v 0.1 2020-12-15 11:50 yuzhi.lyz Exp $
*/
public class SessionThreadPoolExecutor extends ThreadPoolExecutor {
public class MetricsableThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory
.getLogger(MetricsableThreadPoolExecutor.class);

private static final Logger LOGGER = LoggerFactory.getLogger(SessionThreadPoolExecutor.class);
protected final String executorName;

private String executorName;

public SessionThreadPoolExecutor(String executorName, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
public MetricsableThreadPoolExecutor(String executorName, int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.executorName = executorName;
registerTaskMetrics();
this.setRejectedExecutionHandler(handler);
}

public SessionThreadPoolExecutor(String executorName, int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
public MetricsableThreadPoolExecutor(String executorName, int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(executorName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
(r, executor) -> {
String msg = String.format("Task(%s) %s rejected from %s, throw RejectedExecutionException.",
Expand All @@ -67,5 +64,4 @@ private void registerTaskMetrics() {
public String toString() {
return (new StringBuilder(executorName).append(" ").append(super.toString())).toString();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* @author chen.zhu
Expand Down Expand Up @@ -92,4 +93,13 @@ public static void objectWaitUninterruptibly(Object o, int timeoutMs) {
LOGGER.warn("Interrupted waiting", ignored);
}
}

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
try {
unit.sleep(sleepFor);
} catch (InterruptedException ignored) {
// no need to remark Thread.currentThread().interrupt();
LOGGER.warn("Interrupted sleeping", ignored);
}
}
}
38 changes: 0 additions & 38 deletions server/consistency/pom.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author shangyu.wh
* @version $Id: RequestException.java, v 0.1 2018-01-15 18:16 shangyu.wh Exp $
*/
public class RequestException extends Exception {
public class RequestException extends RuntimeException {

private static final int MAX_BODY_SIZE = 512;
private Request request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import com.alipay.sofa.registry.server.data.resource.HealthResource;
import com.alipay.sofa.registry.server.data.slot.SlotManager;
import com.alipay.sofa.registry.server.data.slot.SlotManagerImpl;
import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer;
import com.alipay.sofa.registry.server.shared.remoting.AbstractClientHandler;
import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler;
import com.alipay.sofa.registry.task.MetricsableThreadPoolExecutor;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.PropertySplitter;
import org.glassfish.jersey.jackson.JacksonFeature;
Expand Down Expand Up @@ -325,7 +325,7 @@ public static class ExecutorConfiguration {

@Bean(name = "publishProcessorExecutor")
public ThreadPoolExecutor publishProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("PublishProcessorExecutor",
return new MetricsableThreadPoolExecutor("PublishProcessorExecutor",
dataServerConfig.getPublishExecutorMinPoolSize(),
dataServerConfig.getPublishExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getPublishExecutorQueueSize()),
Expand All @@ -334,7 +334,7 @@ public ThreadPoolExecutor publishProcessorExecutor(DataServerConfig dataServerCo

@Bean(name = "getDataProcessorExecutor")
public ThreadPoolExecutor getDataProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("GetDataProcessorExecutor",
return new MetricsableThreadPoolExecutor("GetDataProcessorExecutor",
dataServerConfig.getGetDataExecutorMinPoolSize(),
dataServerConfig.getGetDataExecutorMaxPoolSize(),
dataServerConfig.getGetDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
Expand All @@ -344,7 +344,7 @@ public ThreadPoolExecutor getDataProcessorExecutor(DataServerConfig dataServerCo

@Bean(name = "slotSyncRequestProcessorExecutor")
public ThreadPoolExecutor slotSyncRequestProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("SlotSyncRequestProcessorExecutor",
return new MetricsableThreadPoolExecutor("SlotSyncRequestProcessorExecutor",
dataServerConfig.getSlotSyncRequestExecutorMinPoolSize(),
dataServerConfig.getSlotSyncRequestExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getSlotSyncRequestExecutorQueueSize()),
Expand Down
Loading

0 comments on commit f7869f9

Please sign in to comment.