Skip to content

Commit 2b30598

Browse files
committed
merge from 4.7.2
1 parent 32b0bfd commit 2b30598

File tree

86 files changed

+1858
-444
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1858
-444
lines changed

mq-client-common-open/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>com.sohu.tv</groupId>
88
<artifactId>mq</artifactId>
9-
<version>4.7.1</version>
9+
<version>4.7.2</version>
1010
</parent>
1111

1212
<artifactId>mq-client-common-open</artifactId>

mq-client-common-open/src/main/java/com/sohu/index/tv/mq/common/BatchConsumerCallback.java

+2-36
Original file line numberDiff line numberDiff line change
@@ -10,46 +10,12 @@
1010
* @param <T> msg obj
1111
* @param MessageExt
1212
*/
13-
public interface BatchConsumerCallback<T, MessageExt> {
13+
public interface BatchConsumerCallback<T> {
1414

1515
/**
1616
* 订阅回调方法
1717
*
1818
* @return
1919
*/
20-
void call(List<MQMessage<T, MessageExt>> batchMessage) throws Exception;
21-
22-
/**
23-
* 批量消息
24-
*
25-
* @author yongfeigao
26-
* @date 2019年10月18日
27-
* @param <T>
28-
* @param <MessageExt>
29-
*/
30-
public class MQMessage<T, MessageExt> {
31-
private T message;
32-
private MessageExt messageExt;
33-
34-
public MQMessage(T message, MessageExt messageExt) {
35-
this.message = message;
36-
this.messageExt = messageExt;
37-
}
38-
39-
public T getMessage() {
40-
return message;
41-
}
42-
43-
public void setMessage(T message) {
44-
this.message = message;
45-
}
46-
47-
public MessageExt getMessageExt() {
48-
return messageExt;
49-
}
50-
51-
public void setMessageExt(MessageExt messageExt) {
52-
this.messageExt = messageExt;
53-
}
54-
}
20+
void call(List<MQMessage<T>> batchMessage) throws Exception;
5521
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.sohu.index.tv.mq.common;
2+
3+
import java.nio.ByteBuffer;
4+
5+
import org.apache.rocketmq.common.message.MessageDecoder;
6+
import org.apache.rocketmq.common.message.MessageExt;
7+
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
8+
9+
/**
10+
* 批量消息
11+
*
12+
* @author yongfeigao
13+
* @date 2019年10月18日
14+
* @param <T>
15+
* @param <MessageExt>
16+
*/
17+
public class MQMessage<T> {
18+
private T message;
19+
private MessageExt messageExt;
20+
21+
public MQMessage(T message, MessageExt messageExt) {
22+
this.message = message;
23+
this.messageExt = messageExt;
24+
}
25+
26+
public T getMessage() {
27+
return message;
28+
}
29+
30+
public void setMessage(T message) {
31+
this.message = message;
32+
}
33+
34+
public MessageExt getMessageExt() {
35+
return messageExt;
36+
}
37+
38+
public void setMessageExt(MessageExt messageExt) {
39+
this.messageExt = messageExt;
40+
}
41+
42+
/**
43+
* 构建offsetMsgId
44+
* @return
45+
*/
46+
public String buildOffsetMsgId() {
47+
int msgIdLength = (messageExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8
48+
: 16 + 4 + 8;
49+
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIdLength);
50+
return MessageDecoder.createMessageId(byteBufferMsgId, messageExt.getStoreHostBytes(),
51+
messageExt.getCommitLogOffset());
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.sohu.tv.mq.metric;
2+
3+
import java.util.LinkedList;
4+
import java.util.List;
5+
import java.util.concurrent.atomic.AtomicLong;
6+
7+
/**
8+
* 消费失败统计
9+
*
10+
* @author yongfeigao
11+
* @date 2021年4月29日
12+
*/
13+
public class ConsumeFailedStat {
14+
// 计数数组的下标
15+
private volatile AtomicLong indexer = new AtomicLong();
16+
17+
private MessageExceptionMetric[] messageExceptionMetricArray;
18+
19+
public ConsumeFailedStat(int size) {
20+
messageExceptionMetricArray = new MessageExceptionMetric[size];
21+
}
22+
23+
public void set(MessageExceptionMetric messageExceptionMetric) {
24+
int index = (int) (indexer.getAndIncrement() % messageExceptionMetricArray.length);
25+
// 溢出重置
26+
if (index < 0) {
27+
indexer.set(0);
28+
index = 0;
29+
}
30+
messageExceptionMetricArray[index] = messageExceptionMetric;
31+
}
32+
33+
/**
34+
* 获取所有统计
35+
*
36+
* @return
37+
*/
38+
public List<StackTraceMetric> getAll() {
39+
List<StackTraceMetric> list = new LinkedList<>();
40+
for (MessageExceptionMetric metric : messageExceptionMetricArray) {
41+
if (metric == null) {
42+
continue;
43+
}
44+
StackTraceMetric threadMetric = new StackTraceMetric(metric.getStartTime(), metric.getMsgIdList());
45+
threadMetric.setId(metric.getThreadId());
46+
threadMetric.setName(metric.getThreadName());
47+
threadMetric.setStackTraceArray(metric.getException().getStackTrace());
48+
threadMetric.setErrorClass(metric.getException().getClass().toString());
49+
threadMetric.setMessage(metric.getException().getMessage());
50+
list.add(threadMetric);
51+
}
52+
return list;
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.sohu.tv.mq.metric;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
/**
7+
* 消费统计管理
8+
*
9+
* @author yongfeigao
10+
* @date 2021年4月14日
11+
*/
12+
public class ConsumeStatManager {
13+
14+
private static ConsumeStatManager instance = new ConsumeStatManager();
15+
16+
private Map<String, ConsumeThreadStat> consumeThreadMetricsMap = new HashMap<>();
17+
18+
private Map<String, ConsumeFailedStat> consumeFailedMetricsMap = new HashMap<>();
19+
20+
private ConsumeStatManager() {
21+
}
22+
23+
public static ConsumeStatManager getInstance() {
24+
return instance;
25+
}
26+
27+
/**
28+
* 注册
29+
*
30+
* @param consuemrGroup
31+
*/
32+
public void register(String consuemrGroup) {
33+
consumeThreadMetricsMap.put(consuemrGroup, new ConsumeThreadStat());
34+
consumeFailedMetricsMap.put(consuemrGroup, new ConsumeFailedStat(10));
35+
}
36+
37+
public ConsumeThreadStat getConsumeThreadMetrics(String consuemrGroup) {
38+
return consumeThreadMetricsMap.get(consuemrGroup);
39+
}
40+
41+
public ConsumeFailedStat getConsumeFailedMetrics(String consuemrGroup) {
42+
return consumeFailedMetricsMap.get(consuemrGroup);
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.sohu.tv.mq.metric;
2+
/**
3+
* 消费线程统计
4+
*
5+
* @author yongfeigao
6+
* @date 2021年4月14日
7+
*/
8+
9+
import java.util.LinkedList;
10+
import java.util.List;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.ConcurrentMap;
13+
14+
/**
15+
* 消费线程统计
16+
*
17+
* @author yongfeigao
18+
* @date 2021年4月14日
19+
*/
20+
public class ConsumeThreadStat {
21+
22+
// 线程消息统计
23+
private ConcurrentMap<Thread, MessageMetric> threadMessageMetricMap = new ConcurrentHashMap<>();
24+
25+
/**
26+
* 设置消息统计
27+
*
28+
* @param messageMetric
29+
*/
30+
public void set(MessageMetric messageMetric) {
31+
threadMessageMetricMap.put(Thread.currentThread(), messageMetric);
32+
}
33+
34+
/**
35+
* 移除
36+
*/
37+
public void remove() {
38+
threadMessageMetricMap.remove(Thread.currentThread());
39+
}
40+
41+
/**
42+
* 获取所有统计
43+
*
44+
* @return
45+
*/
46+
public List<StackTraceMetric> getAll() {
47+
List<StackTraceMetric> list = new LinkedList<>();
48+
threadMessageMetricMap.forEach((thread, messageMetric) -> {
49+
StackTraceMetric threadMetric = new StackTraceMetric(messageMetric.getStartTime(), messageMetric.getMsgIdList());
50+
threadMetric.initThreadMetric(thread);
51+
list.add(threadMetric);
52+
});
53+
return list;
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.sohu.tv.mq.metric;
2+
/**
3+
* 消息异常统计
4+
*
5+
* @author yongfeigao
6+
* @date 2021年4月29日
7+
*/
8+
public class MessageExceptionMetric extends MessageMetric {
9+
// 异常
10+
private Throwable exception;
11+
// 线程id
12+
private long threadId;
13+
// 线程名
14+
private String threadName;
15+
16+
public Throwable getException() {
17+
return exception;
18+
}
19+
20+
public void setException(Throwable exception) {
21+
this.exception = exception;
22+
}
23+
24+
public long getThreadId() {
25+
return threadId;
26+
}
27+
28+
public void setThreadId(long threadId) {
29+
this.threadId = threadId;
30+
}
31+
32+
public String getThreadName() {
33+
return threadName;
34+
}
35+
36+
public void setThreadName(String threadName) {
37+
this.threadName = threadName;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.sohu.tv.mq.metric;
2+
3+
import java.util.List;
4+
5+
/**
6+
* 消息统计
7+
*
8+
* @author yongfeigao
9+
* @date 2021年4月14日
10+
*/
11+
public class MessageMetric {
12+
// 开始时间
13+
private long startTime;
14+
// 消费的消息id
15+
private List<String> msgIdList;
16+
17+
public long getStartTime() {
18+
return startTime;
19+
}
20+
21+
public void setStartTime(long startTime) {
22+
this.startTime = startTime;
23+
}
24+
25+
public List<String> getMsgIdList() {
26+
return msgIdList;
27+
}
28+
29+
public void setMsgIdList(List<String> msgIdList) {
30+
this.msgIdList = msgIdList;
31+
}
32+
}

0 commit comments

Comments
 (0)