Skip to content

Commit 02a8a38

Browse files
authored
Merge pull request crossoverJie#164 from crossoverJie/optimization-proto
Protocol add properties
2 parents 48adb95 + c72deb2 commit 02a8a38

File tree

17 files changed

+79
-63
lines changed

17 files changed

+79
-63
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Using `CIM`, you can achieve the following requirements:
4747
| [群聊](https://youtu.be/_9a4lIkQ5_o) [私聊](https://youtu.be/kfEfQFPLBTQ) | [群聊](https://www.bilibili.com/video/av39405501) [私聊](https://www.bilibili.com/video/av39405821) |
4848
| <img src="https://i.loli.net//2019//05//08//5cd1d9e788004.jpg" height="295px" /> | <img src="https://i.loli.net//2019//05//08//5cd1da2f943c5.jpg" height="295px" />
4949

50+
![demo.gif](pic/demo.gif)
5051

5152
## TODO LIST
5253

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public static class Auth{
3838

3939
@JsonIgnore
4040
private MessageListener messageListener =
41-
(client, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg);
41+
(client, properties, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg);
4242

4343
@JsonIgnore
4444
private OkHttpClient okHttpClient = new OkHttpClient();

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import com.crossoverjie.cim.client.sdk.ReConnectManager;
77
import com.crossoverjie.cim.client.sdk.RouteManager;
88
import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer;
9-
import com.crossoverjie.cim.common.constant.Constants;
109
import com.crossoverjie.cim.common.exception.CIMException;
1110
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
1211
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
12+
import com.crossoverjie.cim.common.protocol.BaseCommand;
1313
import com.crossoverjie.cim.common.protocol.Request;
1414
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
1515
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
@@ -82,7 +82,7 @@ public ClientImpl(ClientConfigurationData conf) {
8282
heartBeatPacket = Request.newBuilder()
8383
.setRequestId(this.conf.getAuth().getUserId())
8484
.setReqMsg("ping")
85-
.setType(Constants.CommandType.PING)
85+
.setCmd(BaseCommand.PING)
8686
.build();
8787
client = this;
8888

@@ -177,7 +177,7 @@ private void loginServer() {
177177
Request login = Request.newBuilder()
178178
.setRequestId(this.conf.getAuth().getUserId())
179179
.setReqMsg(this.conf.getAuth().getUserName())
180-
.setType(Constants.CommandType.LOGIN)
180+
.setCmd(BaseCommand.LOGIN_REQUEST)
181181
.build();
182182
channel.writeAndFlush(login)
183183
.addListener((ChannelFutureListener) channelFuture ->

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.crossoverjie.cim.client.sdk.ClientState;
44
import com.crossoverjie.cim.client.sdk.impl.ClientImpl;
5-
import com.crossoverjie.cim.common.constant.Constants;
5+
import com.crossoverjie.cim.common.protocol.BaseCommand;
66
import com.crossoverjie.cim.common.protocol.Response;
77
import com.crossoverjie.cim.common.util.NettyAttrUtil;
88
import io.netty.channel.ChannelFutureListener;
@@ -60,15 +60,15 @@ public void channelInactive(ChannelHandlerContext ctx) {
6060
protected void channelRead0(ChannelHandlerContext ctx, Response msg) {
6161

6262

63-
if (msg.getType() == Constants.CommandType.PING) {
63+
if (msg.getCmd() == BaseCommand.PING) {
6464
ClientImpl.getClient().getConf().getEvent().debug("received ping from server");
6565
NettyAttrUtil.updateReaderTime(ctx.channel(), System.currentTimeMillis());
6666
}
6767

68-
if (msg.getType() != Constants.CommandType.PING) {
68+
if (msg.getCmd() != BaseCommand.PING) {
6969
// callback
7070
ClientImpl.getClient().getConf().getCallbackThreadPool().execute(() -> {
71-
ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getResMsg());
71+
ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getPropertiesMap(), msg.getResMsg());
7272
});
7373
}
7474

Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package com.crossoverjie.cim.client.sdk.io;
22

33
import com.crossoverjie.cim.client.sdk.Client;
4+
import java.util.Map;
45

56
public interface MessageListener {
67

78
/**
8-
* @param client client
9-
* @param msg msgs
9+
* @param client client
10+
* @param properties meta data
11+
* @param msg msgs
1012
*/
11-
void received(Client client, String msg);
13+
void received(Client client, Map<String, String> properties, String msg);
1214
}

cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
44
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
55
import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest;
6+
import com.crossoverjie.cim.common.constant.Constants;
67
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
78
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
89
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
@@ -63,7 +64,11 @@ public void groupChat() throws Exception {
6364
Client client2 = Client.builder()
6465
.auth(auth2)
6566
.routeUrl(routeUrl)
66-
.messageListener((client, message) -> client2Receive.set(message))
67+
.messageListener((client, properties, message) -> {
68+
client2Receive.set(message);
69+
Assertions.assertEquals(properties.get(Constants.MetaKey.USER_ID), String.valueOf(auth1.getUserId()));
70+
Assertions.assertEquals(properties.get(Constants.MetaKey.USER_NAME), auth1.getUserName());
71+
})
6772
.build();
6873
TimeUnit.SECONDS.sleep(3);
6974
ClientState.State state2 = client2.getState();
@@ -91,7 +96,7 @@ public void groupChat() throws Exception {
9196
});
9297

9398
Awaitility.await().untilAsserted(
94-
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
99+
() -> Assertions.assertEquals(msg, client2Receive.get()));
95100
super.stopSingle();
96101
}
97102

@@ -139,7 +144,7 @@ public void testP2PChat() throws Exception {
139144
Client client2 = Client.builder()
140145
.auth(auth2)
141146
.routeUrl(routeUrl)
142-
.messageListener((client, message) -> client2Receive.set(message))
147+
.messageListener((client, properties, message) -> client2Receive.set(message))
143148
.build();
144149
TimeUnit.SECONDS.sleep(3);
145150
ClientState.State state2 = client2.getState();
@@ -156,7 +161,7 @@ public void testP2PChat() throws Exception {
156161
Client client3 = Client.builder()
157162
.auth(auth3)
158163
.routeUrl(routeUrl)
159-
.messageListener((client, message) -> {
164+
.messageListener((client, properties, message) -> {
160165
log.info("client3 receive message = {}", message);
161166
client3Receive.set(message);
162167
})
@@ -192,7 +197,7 @@ public void testP2PChat() throws Exception {
192197
});
193198

194199
Awaitility.await().untilAsserted(
195-
() -> Assertions.assertEquals(String.format("%s:%s", cj, msg), client3Receive.get()));
200+
() -> Assertions.assertEquals(msg, client3Receive.get()));
196201
Awaitility.await().untilAsserted(
197202
() -> Assertions.assertNull(client2Receive.get()));
198203
super.stopSingle();
@@ -244,7 +249,7 @@ public void testReconnect() throws Exception {
244249
Client client2 = Client.builder()
245250
.auth(auth2)
246251
.routeUrl(routeUrl)
247-
.messageListener((client, message) -> client2Receive.set(message))
252+
.messageListener((client, properties, message) -> client2Receive.set(message))
248253
.backoffStrategy(backoffStrategy)
249254
.build();
250255
TimeUnit.SECONDS.sleep(3);
@@ -260,7 +265,7 @@ public void testReconnect() throws Exception {
260265
String msg = "hello";
261266
client1.sendGroup(msg);
262267
Awaitility.await()
263-
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
268+
.untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get()));
264269
client2Receive.set("");
265270

266271

@@ -287,7 +292,7 @@ public void testReconnect() throws Exception {
287292
log.info("send message again, client2Receive = {}", client2Receive.get());
288293
client1.sendGroup(msg);
289294
Awaitility.await()
290-
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
295+
.untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get()));
291296
super.stopTwoServer();
292297
}
293298

@@ -327,7 +332,7 @@ public void offLineAndOnline() throws Exception {
327332
Client client2 = Client.builder()
328333
.auth(auth2)
329334
.routeUrl(routeUrl)
330-
.messageListener((client, message) -> client2Receive.set(message))
335+
.messageListener((client, properties, message) -> client2Receive.set(message))
331336
// Avoid auto reconnect, this test will manually close client.
332337
.reconnectCheck((client) -> false)
333338
.build();
@@ -344,7 +349,7 @@ public void offLineAndOnline() throws Exception {
344349
String msg = "hello";
345350
client1.sendGroup(msg);
346351
Awaitility.await().untilAsserted(
347-
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
352+
() -> Assertions.assertEquals(msg, client2Receive.get()));
348353
client2Receive.set("");
349354

350355
// Manually offline
@@ -353,7 +358,7 @@ public void offLineAndOnline() throws Exception {
353358
client2 = Client.builder()
354359
.auth(auth2)
355360
.routeUrl(routeUrl)
356-
.messageListener((client, message) -> client2Receive.set(message))
361+
.messageListener((client, properties, message) -> client2Receive.set(message))
357362
// Avoid to auto reconnect, this test will manually close client.
358363
.reconnectCheck((client) -> false)
359364
.build();
@@ -364,7 +369,7 @@ public void offLineAndOnline() throws Exception {
364369
// send msg again
365370
client1.sendGroup(msg);
366371
Awaitility.await().untilAsserted(
367-
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
372+
() -> Assertions.assertEquals(msg, client2Receive.get()));
368373

369374
super.stopSingle();
370375
}

cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/MsgCallBackListener.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import com.crossoverjie.cim.client.sdk.Event;
55
import com.crossoverjie.cim.client.sdk.io.MessageListener;
66
import com.crossoverjie.cim.client.service.MsgLogger;
7+
import com.crossoverjie.cim.common.constant.Constants;
8+
import java.util.Map;
79

810
/**
911
* Function:自定义收到消息回调
@@ -25,8 +27,9 @@ public MsgCallBackListener(MsgLogger msgLogger, Event event) {
2527

2628

2729
@Override
28-
public void received(Client client, String msg) {
29-
this.msgLogger.log(msg);
30-
this.event.info(msg);
30+
public void received(Client client, Map<String, String> properties, String msg) {
31+
String sendUserName = properties.getOrDefault(Constants.MetaKey.USER_NAME, "nobody");
32+
this.msgLogger.log(sendUserName + ":" + msg);
33+
this.event.info(sendUserName + ":" + msg);
3134
}
3235
}

cim-common/src/main/java/com/crossoverjie/cim/common/constant/Constants.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,9 @@ public class Constants {
2121
*/
2222
public static final String COUNTER_CLIENT_PUSH_COUNT = "counter.client.push.count" ;
2323

24-
25-
/**
26-
* 自定义报文类型
27-
*/
28-
public static class CommandType{
29-
/**
30-
* 登录
31-
*/
32-
public static final int LOGIN = 1 ;
33-
/**
34-
* 业务消息
35-
*/
36-
public static final int MSG = 2 ;
37-
38-
/**
39-
* ping
40-
*/
41-
public static final int PING = 3 ;
24+
public static class MetaKey {
25+
public static final String USER_ID = "userId" ;
26+
public static final String USER_NAME = "userName" ;
4227
}
4328

44-
4529
}

cim-common/src/main/proto/cim.proto

+14-7
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,21 @@ option java_package = "com.crossoverjie.cim.common.protocol";
44
option java_multiple_files = true;
55

66
message Request{
7-
// todo source user info
8-
int64 requestId = 2;
9-
string reqMsg = 1;
10-
int32 type = 3;
7+
int64 requestId = 2;
8+
string reqMsg = 1;
9+
BaseCommand cmd = 3;
10+
map<string, string> properties = 4;
1111
}
1212

1313
message Response{
14-
int64 responseId = 2;
15-
string resMsg = 1;
16-
int32 type = 3;
14+
int64 responseId = 2;
15+
string resMsg = 1;
16+
BaseCommand cmd = 3;
17+
map<string, string> properties = 4;
18+
}
19+
20+
enum BaseCommand{
21+
LOGIN_REQUEST = 0;
22+
MESSAGE = 1;
23+
PING = 2;
1724
}

cim-common/src/test/java/com/crossoverjie/cim/common/util/ProtocolTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.crossoverjie.cim.common.util;
22

3+
import com.crossoverjie.cim.common.protocol.BaseCommand;
34
import com.crossoverjie.cim.common.protocol.Request;
45
import com.google.protobuf.InvalidProtocolBufferException;
56
import org.junit.Test;
@@ -11,7 +12,7 @@ public void testProtocol() throws InvalidProtocolBufferException {
1112
Request protocol = Request.newBuilder()
1213
.setRequestId(123L)
1314
.setReqMsg("你好啊")
14-
.setType(1)
15+
.setCmd(BaseCommand.LOGIN_REQUEST)
1516
.build();
1617

1718
byte[] encode = encode(protocol);

cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/AccountServiceRedisImpl.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.crossoverjie.cim.route.service.impl;
22

3+
import com.crossoverjie.cim.common.constant.Constants;
34
import com.crossoverjie.cim.common.core.proxy.RpcProxyManager;
45
import com.crossoverjie.cim.common.enums.StatusEnum;
56
import com.crossoverjie.cim.common.exception.CIMException;
@@ -10,6 +11,7 @@
1011
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
1112
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
1213
import com.crossoverjie.cim.route.api.vo.res.RegisterInfoResVO;
14+
import com.crossoverjie.cim.route.constant.Constant;
1315
import com.crossoverjie.cim.route.service.AccountService;
1416
import com.crossoverjie.cim.route.service.UserInfoCacheService;
1517
import com.crossoverjie.cim.server.api.ServerApi;
@@ -159,7 +161,11 @@ public void pushMsg(CIMServerResVO cimServerResVO, long sendUserId, ChatReqVO gr
159161
cimUserInfo.ifPresent(userInfo -> {
160162
String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort();
161163
SendMsgReqVO vo =
162-
new SendMsgReqVO(userInfo.getUserName() + ":" + groupReqVO.getMsg(), groupReqVO.getUserId());
164+
new SendMsgReqVO(groupReqVO.getMsg(), groupReqVO.getUserId());
165+
vo.setProperties(Map.of(
166+
Constants.MetaKey.USER_ID, String.valueOf(sendUserId),
167+
Constants.MetaKey.USER_NAME, userInfo.getUserName())
168+
);
163169
serverApi.sendMsg(vo, url);
164170

165171
});

cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/UserInfoCacheServiceImpl.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ public class UserInfoCacheServiceImpl implements UserInfoCacheService {
4141
@Override
4242
public Optional<CIMUserInfo> loadUserInfoByUserId(Long userId) {
4343
//Retrieve user information using a second-level cache.
44-
Optional<CIMUserInfo> cimUserInfo = userInfoMap.getUnchecked(userId);
45-
return cimUserInfo;
44+
return userInfoMap.getUnchecked(userId);
4645
}
4746

4847
@Override

cim-server-api/src/main/java/com/crossoverjie/cim/server/api/vo/req/SendMsgReqVO.java

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
import io.swagger.v3.oas.annotations.media.Schema;
66
import jakarta.validation.constraints.NotNull;
7+
import java.util.Map;
8+
import lombok.Getter;
9+
import lombok.Setter;
710

811
/**
912
* Function:
@@ -22,6 +25,10 @@ public class SendMsgReqVO extends BaseRequest {
2225
@Schema(requiredMode = Schema.RequiredMode.REQUIRED, description = "userId", example = "11")
2326
private Long userId ;
2427

28+
@Setter
29+
@Getter
30+
private Map<String, String> properties;
31+
2532
public SendMsgReqVO() {
2633
}
2734

cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.crossoverjie.cim.server.config;
22

3-
import com.crossoverjie.cim.common.constant.Constants;
43
import com.crossoverjie.cim.common.core.proxy.RpcProxyManager;
54
import com.crossoverjie.cim.common.metastore.MetaStore;
65
import com.crossoverjie.cim.common.metastore.ZkMetaStoreImpl;
6+
import com.crossoverjie.cim.common.protocol.BaseCommand;
77
import com.crossoverjie.cim.common.protocol.Request;
88
import com.crossoverjie.cim.route.api.RouteApi;
99
import jakarta.annotation.Resource;
@@ -54,7 +54,7 @@ public Request heartBeat() {
5454
return Request.newBuilder()
5555
.setRequestId(0L)
5656
.setReqMsg("pong")
57-
.setType(Constants.CommandType.PING)
57+
.setCmd(BaseCommand.PING)
5858
.build();
5959
}
6060

0 commit comments

Comments
 (0)