Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-validation'

/** 🧩 WebSocket & Messaging **/
/** 🧩 WebSocket & Messaging & RabbitMQ **/
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.security:spring-security-messaging'

// implementation 'org.springframework.boot:spring-boot-starter-amqp'
// implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'

/** 🛢 Database **/
// runtimeOnly 'com.mysql:mysql-connector-j'
// PostgreSQL + pgvector (추천 시스템용)
Expand Down
85 changes: 75 additions & 10 deletions src/main/java/com/goteego/chat/controller/ChatController.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,53 @@
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;

import java.security.Principal;
import java.util.Map;

//@Controller
//@RequiredArgsConstructor
//@Slf4j
//public class ChatController {
//
// private final ChatService chatService;
//
// // 1:1 채팅
// @MessageMapping("/chat.direct.send/{roomId}")
// public void sendDirectMessage(@Payload DirectMessageRequest directMessageRequest,
// @DestinationVariable(value = "roomId") String roomId,
// Principal principal) {
//
// log.info("✅✅✅ [ChatController] /chat.direct.send/{roomId} 메서드 진입! ✅✅✅");
// User user = (User) ((UsernamePasswordAuthenticationToken) principal).getPrincipal();
// chatService.sendDirectMessage(roomId, directMessageRequest, user.getId());
//
// Long userId = user.getId();
// log.warn("메시지 발신자 ID: {}", userId);
//
// chatService.sendDirectMessage(roomId, directMessageRequest, userId);
// }
//
// // 그룹 채팅
// @MessageMapping("/chat.group.send/{roomId}")
// public void sendGroupMessage(@Payload GroupMessageRequest groupMessageRequest,
// @DestinationVariable(value = "roomId") String roomId,
// Principal principal) {
// User user = (User) ((UsernamePasswordAuthenticationToken) principal).getPrincipal();
// chatService.sendGroupMessage(roomId, groupMessageRequest, user.getId());
// Long userId = user.getId();
//
// chatService.sendGroupMessage(roomId, groupMessageRequest, userId);
//
// }
//
//}



@Controller
@RequiredArgsConstructor
Expand All @@ -23,20 +65,43 @@ public class ChatController {

private final ChatService chatService;

// 1:1 채팅
@MessageMapping("/chat.direct.send/{roomId}")
public void sendDirectMessage(@Payload DirectMessageRequest directMessageRequest, @DestinationVariable(value = "roomId") String roomId, Principal principal) {
public void sendDirectMessage(
@Payload DirectMessageRequest directMessageRequest,
@DestinationVariable(value = "roomId") String roomId,
SimpMessageHeaderAccessor headerAccessor
) {
Map<String, Object> sessionAttributes = headerAccessor.getSessionAttributes();
Object userIdObj = sessionAttributes.get("userId");

if (userIdObj == null) {
log.error("!!!!!!!!!! [ChatController] Session attributes에서 userId를 찾을 수 없습니다. !!!!!!!!!!!");
return;
}

Long userId = (Long) userIdObj;
log.warn("메시지 발신자 ID: {}", userId);

log.info("✅✅✅ [ChatController] /chat.direct.send/{roomId} 메서드 진입! ✅✅✅");
User user = (User) ((UsernamePasswordAuthenticationToken) principal).getPrincipal();
chatService.sendDirectMessage(roomId, directMessageRequest, user.getId());
chatService.sendDirectMessage(roomId, directMessageRequest, userId);
}

// 그룹 채팅
@MessageMapping("/chat.group.send/{roomId}")
public void sendGroupMessage(@Payload GroupMessageRequest groupMessageRequest, @DestinationVariable(value = "roomId") String roomId, Principal principal) {
User user = (User) ((UsernamePasswordAuthenticationToken) principal).getPrincipal();
chatService.sendGroupMessage(roomId, groupMessageRequest, user.getId());
}
public void sendGroupMessage(
@Payload GroupMessageRequest groupMessageRequest,
@DestinationVariable(value = "roomId") String roomId,
SimpMessageHeaderAccessor headerAccessor
) {
Map<String, Object> sessionAttributes = headerAccessor.getSessionAttributes();
Object userIdObj = sessionAttributes.get("userId");

if (userIdObj == null) {
log.error("!!!!!!!!!! [ChatController] Session attributes에서 userId를 찾을 수 없습니다. !!!!!!!!!!!");
return;
}

Long userId = (Long) userIdObj;
log.warn("그룹 메시지 발신자 ID: {}", userId);

chatService.sendGroupMessage(roomId, groupMessageRequest, userId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.goteego.chat.dto.message;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.goteego.chat.domain.enumerate.MessageType;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -8,6 +9,7 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class DirectMessageRequest {
private String content; // 필수: 메시지 내용
private Long recipientId; // 필수: 수신자 아이디
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.goteego.chat.dto.message;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.goteego.chat.domain.enumerate.MessageType;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -8,6 +9,7 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class GroupMessageRequest {
private String content;
private MessageType type;
Expand Down
69 changes: 51 additions & 18 deletions src/main/java/com/goteego/chat/service/redis/RedisSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,44 @@
import com.goteego.chat.dto.message.NotificationResponse;
import com.goteego.chat.dto.message.transfer.DirectMessageTransferDto;
import com.goteego.chat.dto.message.transfer.NotificationTransferDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.stereotype.Service;
import org.springframework.messaging.simp.user.SimpUserRegistry;


@Service
@Slf4j
public class RedisSubscriber {

private final ObjectMapper objectMapper;
private final SimpMessagingTemplate messagingTemplate;

// ChannelTopic을 주입받아 토픽 이름을 비교하는 데 사용합니다.
// private final ChannelTopic chatTopic;
private final ChannelTopic notificationTopic;
private final ChannelTopic directChatTopic;
private final ChannelTopic groupChatTopic;

private final SimpUserRegistry userRegistry;

private static final String DIRECT_MESSAGE_PATH = "/queue/messages";
private static final String GROUP_MESSAGE_PATH = "/sub/chat/room/";
private static final String NOTIFICATION_PATH = "/queue/notifications";

public RedisSubscriber(ObjectMapper objectMapper, SimpMessagingTemplate messagingTemplate,
// @Qualifier("chatTopic") ChannelTopic chatTopic,
@Qualifier("directChatTopic") ChannelTopic directChatTopic,
@Qualifier("groupChatTopic") ChannelTopic groupChatTopic,
@Qualifier("notificationTopic") ChannelTopic notificationTopic) {
@Qualifier("notificationTopic") ChannelTopic notificationTopic,
SimpUserRegistry userRegistry) {
this.objectMapper = objectMapper;
this.messagingTemplate = messagingTemplate;
// this.chatTopic = chatTopic;
this.directChatTopic = directChatTopic;
this.groupChatTopic = groupChatTopic;
this.notificationTopic = notificationTopic;
this.userRegistry = userRegistry;
}

private static final String DIRECT_MESSAGE_PATH = "/queue/messages";
private static final String GROUP_MESSAGE_PATH = "/sub/chat/room/";
private static final String NOTIFICATION_PATH = "/queue/notifications";

/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리
* @param publishMessage 직렬화된 메시지 객체
* @param channel 메시지가 발행된 채널(토픽) 이름
*/
public void sendMessage(String publishMessage, String channel) {
try {
log.info("✅ Redis에서 메시지 수신, 채널: {}", channel);
Expand Down Expand Up @@ -75,6 +71,10 @@ public void sendMessage(String publishMessage, String channel) {
}


// 1. SimpMessagingTemplate은 이 메시지를 자신의 서버에 있는 로컬 SimpleBroker에게 전달
// 2. 로컬 SimpleBroker는 자신에게 연결된 모든 웹소켓 세션을 모두 조회
// 3. 세션들 중에서 /sub/chat/room/... 경로를 구독(subscribe)하고 있는 클라이언트에게만 메시지를 전달
// 4. 만약 이 서버에 해당 채팅방을 구독 중인 클라이언트가 한 명도 없다면, SimpleBroker는 아무 일도 하지 않고 조용히 작업을 종료
private void handleGroupMessage(GroupMessageResponse message) {
messagingTemplate.convertAndSend(GROUP_MESSAGE_PATH + message.getRoomId(), message);
log.info("RedisSubscriber - Group message sent to /sub/chat/room/{}", message.getRoomId());
Expand All @@ -96,8 +96,41 @@ private void handleNotification(NotificationTransferDto dto) {
log.info("RedisSubscriber - Notification sent to user {}", dto.getRecipientId());
}


// 1. SimpMessagingTemplate은 자신의 서버에 연결된 모든 웹소켓 세션 중에서, StompHandler에서 설정했던 Principal의 이름(name)이 "userId"와 일치하는 세션 조회
// 2. 일치하는 세션을 찾으면, 해당 클라이언트에게만 메시지를 전달
// 3. 만약 이 서버에 해당 "userId"를 가진 사용자의 세션이 없다면, 아무에게도 메시지를 보내지 않고 조용히 작업을 종료
private void sendToUser(Long userId, String destination, Object payload) {
log.info("Attempting to send to user: {}, destination: {}", userId, destination);
messagingTemplate.convertAndSendToUser(String.valueOf(userId), destination, payload);


String userIdStr = String.valueOf(userId);
String podName = System.getenv("HOSTNAME"); // 현재 컨테이너(Pod)의 이름

// ======================== [진단용 로그 시작] ========================
// 현재 이 서버 인스턴스의 SimpUserRegistry에 등록된 모든 사용자 이름을 가져옵니다.
java.util.Set<String> connectedUsers = userRegistry.getUsers().stream()
.map(org.springframework.messaging.simp.user.SimpUser::getName)
.collect(java.util.stream.Collectors.toSet());

log.warn("##### DIAGNOSTIC [Server: {}] #####", podName);
log.warn(" -> Checking for User: '{}'", userIdStr);
log.warn(" -> Currently registered users on THIS server: {}", connectedUsers);
// ======================== [진단용 로그 끝] ==========================

// 1. 일단 메시지 전송을 시도
// ㄴ 해당 사용자가 현재 서버에 없으면 이 코드는 아무 일도 없음
messagingTemplate.convertAndSendToUser(userIdStr, destination, payload);

// 2. 실제 메시지 전송이 '이 서버에서' 일어났는지 확인하고 로그 출력
// ㄴ userRegistry.getUsers()는 현재 서버에 연결된 모든 사용자를 반환
boolean wasSentFromThisServer = userRegistry.getUsers().stream()
.anyMatch(simpUser -> simpUser.getName().equals(userIdStr));

if (wasSentFromThisServer) {
log.info("✅✅✅ [Server: {}] Successfully sent message to User '{}' on THIS server.", podName, userId);
}


}

}
27 changes: 9 additions & 18 deletions src/main/java/com/goteego/global/redis/RedisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,17 @@ public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connec
public RedisMessageListenerContainer redisMessageListener(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
@Qualifier("directChatTopic") ChannelTopic directChatTopic, // directChatTopic 주입
@Qualifier("groupChatTopic") ChannelTopic groupChatTopic, // groupChatTopic 주입
// @Qualifier("chatTopic") ChannelTopic chatTopic,
@Qualifier("notificationTopic") ChannelTopic notificationTopic
) {
@Qualifier("directChatTopic") ChannelTopic directChatTopic,
@Qualifier("groupChatTopic") ChannelTopic groupChatTopic,
@Qualifier("notificationTopic") ChannelTopic notificationTopic) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// [chat] 채널로부터 메시지가 오면 listenerAdapter가 처리하도록 설정
// container.addMessageListener(listenerAdapter, chatTopic);

container.addMessageListener(listenerAdapter, directChatTopic);
container.addMessageListener(listenerAdapter, groupChatTopic);
// [notification] 채널로부터 메시지가 오면 listenerAdapter가 처리하도록 설정
container.addMessageListener(listenerAdapter, notificationTopic);

return container;
}

Expand All @@ -88,29 +86,22 @@ public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "sendMessage");
}


// Pub/Sub에서 사용할 채팅 관련 공용 채널 정의
// @Bean
// @Qualifier("chatTopic")
// public ChannelTopic chatTopic() {
// // 여기서는 모든 채팅 메시지를 "chat"이라는 단일 토픽으로 처리
// return new ChannelTopic("chat");
// }
// 1:1 채팅 메시지 전용 토픽
@Bean
@Qualifier("directChatTopic")
public ChannelTopic directChatTopic() {
return new ChannelTopic("directChat");
}

// [수정] 그룹 채팅 메시지 전용 토픽
// 그룹 채팅 메시지 전용 토픽
@Bean
@Qualifier("groupChatTopic")
public ChannelTopic groupChatTopic() {
return new ChannelTopic("groupChat");
}


// Pub/Sub에서 사용할 알림 관련 공용 채널 정의
// 알림 전용 토픽
@Bean
@Qualifier("notificationTopic")
public ChannelTopic notificationTopic() {
Expand Down
Loading