diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index d7c715d..0933821 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -15,6 +15,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -22,12 +23,23 @@ public class SseService { private final Map>> sinks = new ConcurrentHashMap<>(); private final Map> courseStudentMap = new ConcurrentHashMap<>(); + private static final int MAX_SINKS = 1500; + private final AtomicInteger currentConnections = new AtomicInteger(0); private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10; private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5; private final int RETRY_INTERVAL_SECONDS = 1; public Flux>> subscribeCourseMessages(String courseId) { - Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> Sinks.many().multicast().onBackpressureBuffer()); + if (checkIfConnectionFull()) { + return Flux.empty(); + } + Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); + throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); + } + return Sinks.many().multicast().onBackpressureBuffer(); + }); courseStudentMap.computeIfAbsent(courseId, k -> ConcurrentHashMap.newKeySet()); MessageResponse initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null); ServerSentEvent> initEvent = ServerSentEvent.>builder() @@ -40,11 +52,20 @@ public Flux>> subscribeCourseMessages(String } public Flux>> subscribeStudentMessages(String studentId, String courseId) { + if (checkIfConnectionFull()) { + return Flux.empty(); + } if (!courseStudentMap.containsKey(courseId)) { log.debug("courseId와 일치하는 수업을 찾을 수 없습니다."); return Flux.empty(); } - Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> Sinks.many().multicast().onBackpressureBuffer()); + Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); + throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); + } + return Sinks.many().multicast().onBackpressureBuffer(); + }); courseStudentMap.get(courseId).add(studentId); MessageResponse initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null); ServerSentEvent> initEvent = ServerSentEvent.>builder() @@ -78,13 +99,14 @@ public void sendMessageToStudents(String courseId, MessageResponse message) { } private Flux>> openCourseConnection(Sinks.Many> sink, ServerSentEvent> initEvent, String courseId) { + showCurrentUsers(); return Flux.concat(Mono.just(initEvent), sink.asFlux() .map(data -> ServerSentEvent.>builder(data).build()) ) .timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_COURSE)) .onErrorResume(TimeoutException.class, e -> { - log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId={}", courseId); + log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId = {}", courseId); closeConnection(sink, courseId); return Flux.empty(); }) @@ -95,13 +117,14 @@ private Flux>> openCourseConnection(Sinks.Man } private Flux>> openStudentConnection(Sinks.Many> sink, ServerSentEvent> initEvent, String studentId, String courseId) { + showCurrentUsers(); return Flux.concat(Mono.just(initEvent), sink.asFlux() .map(data -> ServerSentEvent.>builder(data).build()) ) .timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_STUDENT)) .onErrorResume(TimeoutException.class, e -> { - log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId={}", studentId); + log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId = {}", studentId); closeConnection(sink, studentId); return Flux.empty(); }) @@ -131,8 +154,24 @@ private void closeStudentConnection(String studentId, String courseId) { closeConnection(sinks.get(studentId), studentId); } - private void closeConnection(Sinks.Many> sink, String courseId) { - sinks.remove(courseId); - sink.tryEmitComplete(); + private void closeConnection(Sinks.Many> sink, String id) { + if (sink != null) { + sink.tryEmitComplete(); + sinks.remove(id); + currentConnections.decrementAndGet(); + } + } + + private boolean checkIfConnectionFull() { + int connectionCount = currentConnections.get(); + if (connectionCount >= MAX_SINKS) { + log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", connectionCount); + return true; + } + return false; + } + + private void showCurrentUsers() { + log.debug("현재 {} 명의 사용자가 연결 중입니다. ", currentConnections.get()); } } diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java index d35984e..21dc477 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java @@ -8,6 +8,7 @@ @AllArgsConstructor public enum SseErrorCode implements ErrorCode { USER_NOT_FOUND(HttpStatus.NOT_FOUND, "전송 대상을 찾을 수 없습니다."), + CONNECTION_LIMIT_EXCEEDED(HttpStatus.TOO_MANY_REQUESTS, "현재 사용자가 너무 많습니다."), MESSAGE_SEND_FAILURE(HttpStatus.INTERNAL_SERVER_ERROR, "서버 문제로 메시지 전송에 실패했습니다."); private final HttpStatus status;