Skip to content

Commit bf212d2

Browse files
yongfeigaoyongfeigao
yongfeigao
authored and
yongfeigao
committed
merge from branch 4.9.1
1 parent 34e833f commit bf212d2

File tree

135 files changed

+4810
-1683
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+4810
-1683
lines changed

mq-client-common-open/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>com.sohu.tv</groupId>
88
<artifactId>mq</artifactId>
9-
<version>4.7.2</version>
9+
<version>4.9.1</version>
1010
</parent>
1111

1212
<artifactId>mq-client-common-open</artifactId>

mq-client-common-open/src/main/java/com/sohu/index/tv/mq/common/BatchConsumerCallback.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
* @param <T> msg obj
1111
* @param MessageExt
1212
*/
13-
public interface BatchConsumerCallback<T> {
13+
public interface BatchConsumerCallback<T, C> {
1414

1515
/**
1616
* 订阅回调方法
17-
*
18-
* @return
17+
* @param batchMessage
18+
* @param context @ConsumeConcurrentlyContext or @ConsumeOrderlyContext
19+
* @throws Exception
1920
*/
20-
void call(List<MQMessage<T>> batchMessage) throws Exception;
21+
void call(List<MQMessage<T>> batchMessage, C context) throws Exception;
2122
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.sohu.index.tv.mq.common;
22

3-
import java.nio.ByteBuffer;
4-
5-
import org.apache.rocketmq.common.message.MessageDecoder;
3+
import org.apache.rocketmq.common.message.Message;
4+
import org.apache.rocketmq.common.message.MessageClientExt;
65
import org.apache.rocketmq.common.message.MessageExt;
7-
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
86

97
/**
108
* 批量消息
@@ -15,12 +13,27 @@
1513
* @param <MessageExt>
1614
*/
1715
public class MQMessage<T> {
16+
17+
public static final String IDEMPOTENT_ID = "IDEMPOTENT_ID";
18+
19+
// 发送的原始对象
1820
private T message;
19-
private MessageExt messageExt;
2021

21-
public MQMessage(T message, MessageExt messageExt) {
22+
// 可以重试的次数
23+
private int retryTimes = -1;
24+
25+
// rocketmq 消息
26+
private Message innerMessage;
27+
28+
// 发送异常,测试用
29+
private boolean exceptionForTest;
30+
31+
public MQMessage() {
32+
}
33+
34+
public MQMessage(T message, Message innerMessage) {
2235
this.message = message;
23-
this.messageExt = messageExt;
36+
this.innerMessage = innerMessage;
2437
}
2538

2639
public T getMessage() {
@@ -32,22 +45,129 @@ public void setMessage(T message) {
3245
}
3346

3447
public MessageExt getMessageExt() {
35-
return messageExt;
48+
return (MessageExt) innerMessage;
3649
}
3750

3851
public void setMessageExt(MessageExt messageExt) {
39-
this.messageExt = messageExt;
52+
setInnerMessage(messageExt);
4053
}
41-
54+
55+
public Message getInnerMessage() {
56+
return innerMessage;
57+
}
58+
59+
public void setInnerMessage(Message innerMessage) {
60+
this.innerMessage = innerMessage;
61+
}
62+
63+
public MQMessage<T> setKeys(String keys) {
64+
innerMessage.setKeys(keys);
65+
return this;
66+
}
67+
68+
public String getKeys() {
69+
return innerMessage.getKeys();
70+
}
71+
72+
public String getTags() {
73+
return innerMessage.getTags();
74+
}
75+
76+
public MQMessage<T> setTags(String tags) {
77+
innerMessage.setTags(tags);
78+
return this;
79+
}
80+
81+
public MQMessage<T> setDelayTimeLevel(int level) {
82+
innerMessage.setDelayTimeLevel(level);
83+
return this;
84+
}
85+
86+
public int getDelayTimeLevel() {
87+
return innerMessage.getDelayTimeLevel();
88+
}
89+
90+
public byte[] getBody() {
91+
return innerMessage.getBody();
92+
}
93+
94+
public MQMessage<T> setBody(byte[] body) {
95+
innerMessage.setBody(body);
96+
return this;
97+
}
98+
99+
public boolean isWaitStoreMsgOK() {
100+
return innerMessage.isWaitStoreMsgOK();
101+
}
102+
103+
public MQMessage<T> setWaitStoreMsgOK(boolean waitStoreMsgOK) {
104+
innerMessage.setWaitStoreMsgOK(waitStoreMsgOK);
105+
return this;
106+
}
107+
108+
public MQMessage<T> setTopic(String topic) {
109+
innerMessage.setTopic(topic);
110+
return this;
111+
}
112+
113+
public String getTopic() {
114+
return innerMessage.getTopic();
115+
}
116+
117+
public int getRetryTimes() {
118+
return retryTimes;
119+
}
120+
121+
public MQMessage<T> setRetryTimes(int retryTimes) {
122+
this.retryTimes = retryTimes;
123+
return this;
124+
}
125+
126+
public MQMessage<T> resetRetryTimes(int retryTimes) {
127+
if (this.retryTimes == -1) {
128+
this.retryTimes = retryTimes;
129+
}
130+
return this;
131+
}
132+
42133
/**
43134
* 构建offsetMsgId
135+
*
44136
* @return
45137
*/
46138
public String buildOffsetMsgId() {
47-
int msgIdLength = (messageExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8
48-
: 16 + 4 + 8;
49-
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIdLength);
50-
return MessageDecoder.createMessageId(byteBufferMsgId, messageExt.getStoreHostBytes(),
51-
messageExt.getCommitLogOffset());
139+
return innerMessage instanceof MessageClientExt ? ((MessageClientExt)innerMessage).getOffsetMsgId() : null;
140+
}
141+
142+
public static <T> MQMessage<T> build(T message) {
143+
MQMessage<T> mqMessage = new MQMessage<>();
144+
mqMessage.setMessage(message);
145+
mqMessage.innerMessage = new Message();
146+
mqMessage.setWaitStoreMsgOK(true);
147+
return mqMessage;
148+
}
149+
150+
/**
151+
* 设置幂等id
152+
*
153+
* @param idempotentId
154+
*/
155+
public MQMessage<T> setIdempotentID(String idempotentId) {
156+
innerMessage.putUserProperty(IDEMPOTENT_ID, idempotentId);
157+
return this;
158+
}
159+
160+
public MQMessage<T> setExceptionForTest(boolean exceptionForTest) {
161+
this.exceptionForTest = exceptionForTest;
162+
return this;
163+
}
164+
165+
public boolean isExceptionForTest() {
166+
return exceptionForTest;
167+
}
168+
169+
@Override
170+
public String toString() {
171+
return "[topic=" + getTopic() + ", message=" + message + ", retryTimes=" + retryTimes + "]";
52172
}
53173
}

mq-client-common-open/src/main/java/com/sohu/index/tv/mq/common/Result.java

+40-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,15 @@ public class Result<T> implements Serializable {
2222
/**
2323
* 异常信息
2424
*/
25-
private Exception exception;
25+
private Throwable exception;
26+
27+
// 正在重试
28+
private boolean retrying;
29+
30+
// 重试过的次数
31+
private int retriedTimes;
32+
33+
private MQMessage<?> mqMessage;
2634

2735
public Result(boolean isSuccess) {
2836
this.isSuccess = isSuccess;
@@ -33,7 +41,7 @@ public Result(boolean isSuccess, T result) {
3341
this.result = result;
3442
}
3543

36-
public Result(boolean isSuccess, Exception exception) {
44+
public Result(boolean isSuccess, Throwable exception) {
3745
this.isSuccess = isSuccess;
3846
this.exception = exception;
3947
}
@@ -55,15 +63,42 @@ public void setResult(T result) {
5563
}
5664

5765
public Exception getException() {
58-
return exception;
66+
return (Exception) exception;
5967
}
6068

61-
public void setException(Exception exception) {
69+
public void setException(Throwable exception) {
6270
this.exception = exception;
6371
}
6472

73+
public boolean isRetrying() {
74+
return retrying;
75+
}
76+
77+
public Result<T> setRetrying(boolean retrying) {
78+
this.retrying = retrying;
79+
return this;
80+
}
81+
82+
public int getRetriedTimes() {
83+
return retriedTimes;
84+
}
85+
86+
public void setRetriedTimes(int retriedTimes) {
87+
this.retriedTimes = retriedTimes;
88+
}
89+
90+
@SuppressWarnings("unchecked")
91+
public <R> MQMessage<R> getMqMessage() {
92+
return (MQMessage<R>) mqMessage;
93+
}
94+
95+
public void setMqMessage(MQMessage<?> mqMessage) {
96+
this.mqMessage = mqMessage;
97+
}
98+
6599
@Override
66100
public String toString() {
67-
return "Result [isSuccess=" + isSuccess + ", result=" + result + ", exception=" + exception + "]";
101+
return "Result [isSuccess=" + isSuccess + ", result=" + result + ", exception=" + exception + ", retrying="
102+
+ retrying + ", retriedTimes=" + retriedTimes + "]";
68103
}
69104
}

mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractCommand.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,32 @@ public AbstractCommand(String groupKey, String commandKey, int poolSize, int tim
4242
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(poolSize)));
4343
this.alerter = alerter;
4444
}
45+
46+
/**
47+
* 构建(信号量隔离)
48+
*
49+
* @param groupKey
50+
* @param commandKey
51+
* @param timeout 超时时间
52+
*/
53+
public AbstractCommand(String groupKey, String commandKey, int timeout, Alerter alerter) {
54+
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey + "-semaphore"))
55+
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey + "-semaphore"))
56+
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
57+
HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
58+
.withFallbackIsolationSemaphoreMaxConcurrentRequests(100)
59+
.withExecutionIsolationSemaphoreMaxConcurrentRequests(50)
60+
.withExecutionTimeoutInMilliseconds(timeout)));
61+
this.alerter = alerter;
62+
}
4563

4664
protected T run() throws Exception {
4765
try {
4866
return invoke();
4967
} catch (Exception e) {
50-
logger.error("send err! "+getCommandGroup().name() + "-" +
51-
getCommandKey().name() + ":" +
52-
invokeErrorInfo(),
53-
e);
54-
throw new RuntimeException(e);
68+
logger.error("group:{} command:{} param:{}", getCommandGroup().name(), getCommandKey().name(),
69+
invokeErrorInfo(), e);
70+
throw e;
5571
}
5672
}
5773

@@ -77,7 +93,7 @@ public T getFallback() {
7793
// 判断熔断器是否打开
7894
if (super.isCircuitBreakerOpen()) {
7995
if (null != alerter) {
80-
String info = "group:" + getCommandGroup().name() + " command:" + getCommandKey().name() + " err!";
96+
String info = "group:" + getCommandGroup().name() + " command:" + getCommandKey().name() + " circuitBreakerOpen!";
8197
alerter.alert(info);
8298
}
8399
}

mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractConfig.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public abstract class AbstractConfig {
6969

7070
// 是否设置了instanceName
7171
protected String instanceName;
72+
73+
protected SohuAsyncTraceDispatcher traceDispatcher;
7274

7375
public AbstractConfig(String group, String topic) {
7476
this.topic = topic;
@@ -122,12 +124,14 @@ protected void init() {
122124
logger.error("http err, topic:{},group:{}", topic, group, e);
123125
}
124126
if (clusterInfoDTO == null) {
125-
if (clusterInfoDTOResult.getStatus() == 201) {
126-
logger.warn("please register your {}:{} topic:{} in MQCloud first, times:{}",
127-
role() == 1 ? "producer" : "consumer", group, topic, times++);
128-
} else {
129-
logger.warn("fetch topic:{} group:{} cluster info err:{}, times:{}", getTopic(), group,
130-
clusterInfoDTOResult.getMessage(), times++);
127+
if (clusterInfoDTOResult != null) {
128+
if (clusterInfoDTOResult.getStatus() == 201) {
129+
logger.warn("please register your {}:{} topic:{} in MQCloud first, times:{}",
130+
role() == 1 ? "producer" : "consumer", group, topic, times++);
131+
} else {
132+
logger.warn("fetch topic:{} group:{} cluster info err:{}, times:{}", getTopic(), group,
133+
clusterInfoDTOResult.getMessage(), times++);
134+
}
131135
}
132136
try {
133137
Thread.sleep(1000);
@@ -231,7 +235,7 @@ protected void initTrace() {
231235
TraceRocketMQProducer traceRocketMQProducer = new TraceRocketMQProducer(
232236
CommonUtil.buildTraceTopicProducer(traceTopic), traceTopic);
233237
// 初始化TraceDispatcher
234-
SohuAsyncTraceDispatcher traceDispatcher = new SohuAsyncTraceDispatcher(traceTopic);
238+
traceDispatcher = new SohuAsyncTraceDispatcher(traceTopic);
235239
// 设置producer属性
236240
traceRocketMQProducer.getProducer().setSendMsgTimeout(5000);
237241
traceRocketMQProducer.getProducer().setMaxMessageSize(traceDispatcher.getMaxMsgSize() - 10 * 1000);
@@ -316,4 +320,10 @@ public void setInstanceName(String instanceName) {
316320
public String getInstanceName() {
317321
return instanceName;
318322
}
323+
324+
public void shutdown(){
325+
if(traceDispatcher != null){
326+
traceDispatcher.shutdown();
327+
}
328+
}
319329
}

0 commit comments

Comments
 (0)