Skip to content

Commit 9cbb9e3

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents 0a3c60e + c0bec11 commit 9cbb9e3

File tree

14 files changed

+632
-40
lines changed

14 files changed

+632
-40
lines changed

mq-cloud/Dockerfile

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
FROM openjdk:8-jdk-alpine
2+
3+
# 更新源、安装openssh 并修改配置文件和生成key 并且同步时间
4+
RUN apk update && \
5+
echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing/" >> /etc/apk/repositories && \
6+
apk add openssh-server tzdata nmon bash sudo net-tools && \
7+
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
8+
sed -i "s/#PermitRootLogin.*/PermitRootLogin yes/g" /etc/ssh/sshd_config && \
9+
sed -i 's/#StrictModes yes/StrictModes no/' /etc/ssh/sshd_config && \
10+
sed -i "s/Subsystem.*/Subsystem sftp internal-sftp/g" /etc/ssh/sshd_config && \
11+
sed -i 's/ash/bash/g' /etc/passwd && \
12+
ssh-keygen -t rsa -P "" -f /etc/ssh/ssh_host_rsa_key && \
13+
ssh-keygen -t ecdsa -P "" -f /etc/ssh/ssh_host_ecdsa_key && \
14+
ssh-keygen -t ed25519 -P "" -f /etc/ssh/ssh_host_ed25519_key && \
15+
echo "root:admin" | chpasswd && \
16+
cp /usr/bin/nmon /tmp/ && \
17+
echo "JAVA_HOME=/usr/lib/jvm/default-jvm" >> /etc/profile && \
18+
echo "PATH=\$JAVA_HOME/bin:\$PATH" >> /etc/profile && \
19+
echo "export JAVA_HOME PATH" >> /etc/profile
20+
21+
USER root
22+
23+
# 添加mqcloud用户并设置权限
24+
ARG user=mqcloud
25+
ARG passwd=9j7t4SDJOIusddca+Mzd6Q==
26+
ARG ssh_path=/home/mqcloud/.ssh
27+
RUN mkdir -p $ssh_path && \
28+
adduser -D $user && echo "$user:$passwd" | chpasswd && \
29+
echo "mqcloud ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers && \
30+
sed -i "s/Defaults requiretty/#Defaults requiretty/g" /etc/sudoers
31+
32+
# 安装ns
33+
ARG ns_path=/opt/mqcloud/ns
34+
RUN mkdir -p $ns_path $ns_path/data/config $ns_path/logs
35+
RUN chown mqcloud:mqcloud -R $ns_path
36+
ADD src/main/resources/static/software/rocketmq-docker.tar $ns_path
37+
38+
# 安装broker
39+
ARG broker_path=/opt/mqcloud/broker-a
40+
RUN mkdir -p $broker_path $broker_path/data/consumequeue $broker_path/data/commitlog $broker_path/logs
41+
ADD src/main/resources/static/software/rocketmq-docker.tar $broker_path
42+
RUN chown mqcloud:mqcloud -R $broker_path
43+
44+
# mqcloud
45+
ARG JAR_FILE
46+
ADD ${JAR_FILE} /mq-cloud.war
47+
ENTRYPOINT ["sh","/run.sh"]
48+
49+
ADD run.sh /
50+
EXPOSE 8080
51+
EXPOSE 22

mq-cloud/pom.xml

+14-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<dependencies>
3838
<dependency>
3939
<groupId>com.sohu.tv</groupId>
40-
<artifactId>mq-client-common-open</artifactId>
40+
<artifactId>mq-client-open</artifactId>
4141
</dependency>
4242
<dependency>
4343
<groupId>com.sohu.tv</groupId>
@@ -191,6 +191,19 @@
191191
<groupId>org.apache.maven.plugins</groupId>
192192
<artifactId>maven-resources-plugin</artifactId>
193193
</plugin>
194+
<!-- docker plugin -->
195+
<plugin>
196+
<groupId>com.spotify</groupId>
197+
<artifactId>dockerfile-maven-plugin</artifactId>
198+
<configuration>
199+
<repository>com.sohu.tv/mqcloud</repository>
200+
<tag>${project.version}</tag>
201+
<buildArgs>
202+
<JAR_FILE>target/mq-cloud.war</JAR_FILE>
203+
</buildArgs>
204+
<skip>false</skip>
205+
</configuration>
206+
</plugin>
194207
</plugins>
195208
</build>
196209

mq-cloud/run.sh

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/sh
2+
/usr/sbin/sshd -D &
3+
sh /opt/mqcloud/ns/bin/mqnamesrv -c /opt/mqcloud/ns/ns.conf >> /opt/mqcloud/ns/logs/startup.log 2>&1 &
4+
sh /opt/mqcloud/broker-a/bin/mqbroker -c /opt/mqcloud/broker-a/broker.conf >> /opt/mqcloud/broker-a/logs/startup.log 2>&1 &
5+
java -jar -Dfile.encoding=UTF-8 -Dmqcloud.env=demo -Dspring.datasource.data=classpath:h2db/demo.sql -DPROJECT_DIR=/ /mq-cloud.war
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package com.sohu.tv.mq.cloud.demo;
2+
3+
import com.sohu.index.tv.mq.common.MQMessage;
4+
import com.sohu.tv.mq.cloud.bo.*;
5+
import com.sohu.tv.mq.cloud.mq.DefaultCallback;
6+
import com.sohu.tv.mq.cloud.mq.MQAdminTemplate;
7+
import com.sohu.tv.mq.cloud.service.*;
8+
import com.sohu.tv.mq.cloud.util.Result;
9+
import com.sohu.tv.mq.rocketmq.RocketMQConsumer;
10+
import com.sohu.tv.mq.rocketmq.RocketMQProducer;
11+
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
12+
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
13+
import org.apache.rocketmq.tools.admin.MQAdminExt;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
import org.springframework.beans.factory.annotation.Autowired;
17+
import org.springframework.boot.CommandLineRunner;
18+
import org.springframework.stereotype.Component;
19+
20+
import java.util.LinkedList;
21+
import java.util.List;
22+
import java.util.Properties;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
/**
26+
* demo环境数据初始化
27+
*
28+
* @author: yongfeigao
29+
* @date: 2022/4/19 10:43
30+
*/
31+
@Component
32+
public class DemoDataInitializer implements CommandLineRunner {
33+
private final Logger logger = LoggerFactory.getLogger(getClass());
34+
35+
@Autowired
36+
private ServerDataService serverDataService;
37+
38+
@Autowired
39+
private ClusterService clusterService;
40+
41+
@Autowired
42+
private NameServerService nameServerService;
43+
44+
@Autowired
45+
private BrokerService brokerService;
46+
47+
@Autowired
48+
private TopicService topicService;
49+
50+
@Autowired
51+
private ConsumerService consumerService;
52+
53+
@Autowired
54+
private UserConsumerService userConsumerService;
55+
56+
@Autowired
57+
private CommonConfigService commonConfigService;
58+
59+
@Autowired
60+
private MQAdminTemplate mqAdminTemplate;
61+
62+
@Override
63+
public void run(String... args) throws Exception {
64+
String env = System.getProperty("mqcloud.env");
65+
if (!"demo".equals(env)) {
66+
logger.info("mqcloud.env={} return", env);
67+
return;
68+
}
69+
String topicName = "mqcloud-demo-topic";
70+
String topicProducer = "mqcloud-demo-topic-producer";
71+
String topicConsumer = "mqcloud-demo-consumer";
72+
Result<Topic> topicResult = topicService.queryTopic(topicName);
73+
if (topicResult.isOK()) {
74+
logger.info("data has initialized!");
75+
// 启动生产消费
76+
produceAndConsume(topicName, topicProducer, topicConsumer);
77+
return;
78+
}
79+
80+
int cid = 1;
81+
// 1.等待broker注册
82+
Cluster cluster = clusterService.getMQClusterById(cid);
83+
while (true) {
84+
ClusterInfo clusterInfo = mqAdminTemplate.execute(new DefaultCallback<ClusterInfo>() {
85+
public ClusterInfo callback(MQAdminExt mqAdmin) throws Exception {
86+
return mqAdmin.examineBrokerClusterInfo();
87+
}
88+
public ClusterInfo exception(Exception e) {
89+
logger.error("examineBrokerClusterInfo err", e);
90+
return null;
91+
}
92+
public Cluster mqCluster() {
93+
return cluster;
94+
}
95+
});
96+
if (clusterInfo != null && clusterInfo.getBrokerAddrTable() != null && clusterInfo.getBrokerAddrTable().size() > 0) {
97+
logger.info("brokerClusterInfo:{}", clusterInfo.getBrokerAddrTable());
98+
break;
99+
}
100+
logger.info("wait broker register to ns, sleep 3000");
101+
Thread.sleep(3000);
102+
}
103+
104+
// 2.添加topic
105+
Cluster mqCluster = clusterService.getMQClusterById(cid);
106+
Audit audit = new Audit();
107+
audit.setInfo("demo topic");
108+
audit.setUid(1);
109+
AuditTopic auditTopic = new AuditTopic();
110+
auditTopic.setName(topicName);
111+
auditTopic.setQueueNum(8);
112+
auditTopic.setProducer(topicProducer);
113+
Result<?> createTopicResult = topicService.createTopic(mqCluster, audit, auditTopic);
114+
if (createTopicResult.isNotOK()) {
115+
if (createTopicResult.getException() != null) {
116+
logger.error("create topic failed", createTopicResult.getException());
117+
} else {
118+
logger.error("create topic failed:{}", createTopicResult.getMessage());
119+
}
120+
// broker直接创建topic
121+
return;
122+
} else {
123+
logger.info("create topic:{} OK!", topicName);
124+
}
125+
126+
// 3.添加消费者
127+
topicResult = topicService.queryTopic(topicName);
128+
Topic topic = topicResult.getResult();
129+
UserConsumer userConsumer = new UserConsumer();
130+
userConsumer.setUid(audit.getUid());
131+
userConsumer.setTid(topic.getId());
132+
Consumer consumer = new Consumer();
133+
consumer.setInfo("demo 消费");
134+
consumer.setTid(topic.getId());
135+
consumer.setName(topicConsumer);
136+
Result<?> createUserConsumerResult = userConsumerService.saveUserConsumer(cluster, userConsumer, consumer);
137+
if (createUserConsumerResult.isNotOK()) {
138+
logger.error("create consumer failed", createUserConsumerResult.getException());
139+
return;
140+
} else {
141+
logger.info("create consumer:{} OK!", topicConsumer);
142+
}
143+
144+
// 启动生产消费
145+
produceAndConsume(topicName, topicProducer, topicConsumer);
146+
}
147+
148+
/**
149+
* 生产和消费
150+
*
151+
* @param topic
152+
* @param producerGroup
153+
* @param consumerGroup
154+
*/
155+
public void produceAndConsume(String topic, String producerGroup, String consumerGroup) {
156+
new Thread() {
157+
public void run() {
158+
try {
159+
Result<Topic> topicResult = topicService.queryTopic(topic);
160+
while (true) {
161+
TopicRouteData topicRouteData = topicService.route(topicResult.getResult());
162+
if (topicRouteData != null && topicRouteData.getBrokerDatas() != null) {
163+
break;
164+
}
165+
logger.info("waiting topic route...{}", topicRouteData);
166+
Thread.sleep(10000);
167+
}
168+
169+
// 1.启动消费者
170+
RocketMQConsumer rocketMQConsumer = new RocketMQConsumer(consumerGroup, topic);
171+
rocketMQConsumer.setMqCloudDomain("127.0.0.1:8080");
172+
AtomicLong counter = new AtomicLong();
173+
rocketMQConsumer.setConsumerCallback((msg, msgExt) -> {
174+
if (counter.incrementAndGet() % 10 == 0) {
175+
logger.info("receive msg count:{}", counter.get());
176+
}
177+
});
178+
rocketMQConsumer.start();
179+
180+
// 2.启动生产者
181+
RocketMQProducer producer = new RocketMQProducer(producerGroup, topic);
182+
producer.setMqCloudDomain("127.0.0.1:8080");
183+
producer.start();
184+
for (int i = 0; i < Long.MAX_VALUE; ++i) {
185+
try {
186+
producer.send(MQMessage.build("demo message" + i));
187+
Thread.sleep(1000);
188+
} catch (Exception e) {
189+
e.printStackTrace();
190+
}
191+
}
192+
} catch (Exception e) {
193+
logger.error("start produceAndConsume err", e);
194+
}
195+
}
196+
}.start();
197+
}
198+
}

mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/TopicService.java

+4
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,10 @@ public Result<?> createAndUpdateTopicOnCluster(Cluster mqCluster, TopicConfig to
432432
public Result<?> callback(MQAdminExt mqAdmin) throws Exception {
433433
long start = System.currentTimeMillis();
434434
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, mqCluster.getName());
435+
if (masterSet.size() == 0) {
436+
logger.error("create or update topic failed:no master node, cluser:{}, topic:{}", mqCluster, topicConfig);
437+
return Result.getResult(Status.BROKER_NOT_EXIST_ERROR);
438+
}
435439
for (String addr : masterSet) {
436440
mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
437441
}

mq-cloud/src/main/java/com/sohu/tv/mq/cloud/task/ServerStatusTask.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
11
package com.sohu.tv.mq.cloud.task;
22

3-
import java.util.Date;
4-
import java.util.List;
5-
6-
import org.slf4j.Logger;
7-
import org.slf4j.LoggerFactory;
8-
import org.springframework.beans.factory.annotation.Autowired;
9-
import org.springframework.scheduling.annotation.Scheduled;
10-
113
import com.sohu.tv.mq.cloud.bo.ServerInfo;
124
import com.sohu.tv.mq.cloud.service.SSHTemplate;
135
import com.sohu.tv.mq.cloud.service.SSHTemplate.DefaultLineProcessor;
@@ -19,8 +11,14 @@
1911
import com.sohu.tv.mq.cloud.task.server.data.Server;
2012
import com.sohu.tv.mq.cloud.task.server.nmon.NMONService;
2113
import com.sohu.tv.mq.cloud.util.Result;
22-
2314
import net.javacrumbs.shedlock.core.SchedulerLock;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.scheduling.annotation.Scheduled;
19+
20+
import java.util.Date;
21+
import java.util.List;
2422

2523
/**
2624
* 服务器状态监控任务
@@ -121,6 +119,7 @@ public void process(String line, int lineNum) throws Exception {
121119
server.parse(line, null);
122120
}
123121
});
122+
server.resetDateTime();
124123
if(!result.isSuccess()) {
125124
logger.error("collect " + ip + " err:" + result.getResult(), result.getExcetion());
126125
}

mq-cloud/src/main/java/com/sohu/tv/mq/cloud/task/monitor/SohuMonitorListener.java

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Set;
1212
import java.util.TreeMap;
1313

14+
import com.alibaba.fastjson.JSON;
1415
import org.apache.rocketmq.common.MixAll;
1516
import org.apache.rocketmq.common.message.MessageQueue;
1617
import org.apache.rocketmq.common.protocol.body.Connection;
@@ -312,6 +313,11 @@ public void reportConsumerRunningInfo(String consumerGroup, TreeMap<String, Cons
312313
consumerStat.setSbscription(sbscription);
313314
consumerStatDao.saveSimpleConsumerStat(consumerStat);
314315
subscriptionWarn(consumerGroup, sbscription);
316+
317+
if (uniqSet.size() <= 1){
318+
log.debug("analyze subscription result is {},but find subscription number only one,the criTable is {}",
319+
false, JSON.toJSON(criTable));
320+
}
315321
}
316322
} catch (NumberFormatException e) {
317323
log.warn("num parse err");

0 commit comments

Comments
 (0)