From 948cc3bd131c18510fdcb112da7fbb442d5cb162 Mon Sep 17 00:00:00 2001 From: sunohkim Date: Thu, 27 Feb 2025 00:34:02 +0900 Subject: [PATCH 1/5] =?UTF-8?q?Fix:=20=EC=B5=9C=EB=8C=80=20=EC=97=B0?= =?UTF-8?q?=EA=B2=B0=20=EA=B0=80=EB=8A=A5=ED=95=9C=20SSE=20=ED=86=B5?= =?UTF-8?q?=EC=8B=A0=20=EA=B0=9C=EC=88=98=20=EC=A0=9C=ED=95=9C=20#251?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 서버가 최대 1703개의 connection을 동시에 연결 가능하다는 점을 고려 - 최대 연결 가능한 SSE 통신 개수를 1500개로 지정 --- .../domain/sse/service/SseService.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index d7c715d..be562ce 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -22,11 +22,15 @@ public class SseService { private final Map>> sinks = new ConcurrentHashMap<>(); private final Map> courseStudentMap = new ConcurrentHashMap<>(); + private static final int MAX_SINKS = 1500; private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10; private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5; private final int RETRY_INTERVAL_SECONDS = 1; public Flux>> subscribeCourseMessages(String courseId) { + if (checkIfConnectionFull()) { + return Flux.empty(); + } Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> Sinks.many().multicast().onBackpressureBuffer()); courseStudentMap.computeIfAbsent(courseId, k -> ConcurrentHashMap.newKeySet()); MessageResponse initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null); @@ -40,6 +44,9 @@ public Flux>> subscribeCourseMessages(String } public Flux>> subscribeStudentMessages(String studentId, String courseId) { + if (checkIfConnectionFull()) { + return Flux.empty(); + } if (!courseStudentMap.containsKey(courseId)) { log.debug("courseId와 일치하는 수업을 찾을 수 없습니다."); return Flux.empty(); @@ -84,7 +91,7 @@ private Flux>> openCourseConnection(Sinks.Man ) .timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_COURSE)) .onErrorResume(TimeoutException.class, e -> { - log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId={}", courseId); + log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId = {}", courseId); closeConnection(sink, courseId); return Flux.empty(); }) @@ -101,7 +108,7 @@ private Flux>> openStudentConnection(Sinks.Ma ) .timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_STUDENT)) .onErrorResume(TimeoutException.class, e -> { - log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId={}", studentId); + log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId = {}", studentId); closeConnection(sink, studentId); return Flux.empty(); }) @@ -135,4 +142,12 @@ private void closeConnection(Sinks.Many> sink, String courseI sinks.remove(courseId); sink.tryEmitComplete(); } + + private boolean checkIfConnectionFull() { + if (sinks.size() >= MAX_SINKS) { + log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", sinks.size()); + return true; + } + return false; + } } From 7f3b3465d502c9b4d3c1e58647c087bf3a96a279 Mon Sep 17 00:00:00 2001 From: sunohkim Date: Thu, 27 Feb 2025 00:43:43 +0900 Subject: [PATCH 2/5] =?UTF-8?q?Chore:=20=ED=98=84=EC=9E=AC=20=EC=97=B0?= =?UTF-8?q?=EA=B2=B0=EB=90=9C=20=EC=82=AC=EC=9A=A9=EC=9E=90=20=EC=88=98?= =?UTF-8?q?=EB=A5=BC=20=ED=99=95=EC=9D=B8=ED=95=98=EB=8A=94=20=EB=A1=9C?= =?UTF-8?q?=EA=B7=B8=20=EC=B6=94=EA=B0=80=20#251?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../reacton_classroom/domain/sse/service/SseService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index be562ce..c23950b 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -85,6 +85,7 @@ public void sendMessageToStudents(String courseId, MessageResponse message) { } private Flux>> openCourseConnection(Sinks.Many> sink, ServerSentEvent> initEvent, String courseId) { + showCurrentUsers(); return Flux.concat(Mono.just(initEvent), sink.asFlux() .map(data -> ServerSentEvent.>builder(data).build()) @@ -102,6 +103,7 @@ private Flux>> openCourseConnection(Sinks.Man } private Flux>> openStudentConnection(Sinks.Many> sink, ServerSentEvent> initEvent, String studentId, String courseId) { + showCurrentUsers(); return Flux.concat(Mono.just(initEvent), sink.asFlux() .map(data -> ServerSentEvent.>builder(data).build()) @@ -150,4 +152,8 @@ private boolean checkIfConnectionFull() { } return false; } + + private void showCurrentUsers() { + log.debug("현재 {} 명의 사용자가 연결 중입니다. ", sinks.size()); + } } From 1b7cd1b1d02057be26e88341b9bee6e5cacab9f6 Mon Sep 17 00:00:00 2001 From: sunohkim Date: Thu, 27 Feb 2025 11:03:56 +0900 Subject: [PATCH 3/5] =?UTF-8?q?Fix:=20=EB=8F=99=EC=8B=9C=EC=84=B1=20?= =?UTF-8?q?=EC=9D=B4=EC=8A=88=EB=A5=BC=20=ED=95=B4=EA=B2=B0=ED=95=98?= =?UTF-8?q?=EA=B8=B0=20=EC=9C=84=ED=95=B4=20=EC=97=B0=EA=B2=B0=EB=90=9C=20?= =?UTF-8?q?=EC=82=AC=EC=9A=A9=EC=9E=90=20=EC=88=98=EB=A5=BC=20=EA=B4=80?= =?UTF-8?q?=EB=A6=AC=ED=95=98=EB=8A=94=20AtomicInteger=20=EB=8F=84?= =?UTF-8?q?=EC=9E=85=20#251?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AtomicInteger로 멀티스레드 환경에서 race condition 방지 - 실제 connection을 추가하는 시점에서 개수 체크를 수행함으로써, 동시성 이슈 개선 --- .../domain/sse/service/SseService.java | 34 ++++++++++++++----- .../global/exception/code/SseErrorCode.java | 1 + 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index c23950b..0933821 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -15,6 +15,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -23,6 +24,7 @@ public class SseService { private final Map>> sinks = new ConcurrentHashMap<>(); private final Map> courseStudentMap = new ConcurrentHashMap<>(); private static final int MAX_SINKS = 1500; + private final AtomicInteger currentConnections = new AtomicInteger(0); private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10; private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5; private final int RETRY_INTERVAL_SECONDS = 1; @@ -31,7 +33,13 @@ public Flux>> subscribeCourseMessages(String if (checkIfConnectionFull()) { return Flux.empty(); } - Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> Sinks.many().multicast().onBackpressureBuffer()); + Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); + throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); + } + return Sinks.many().multicast().onBackpressureBuffer(); + }); courseStudentMap.computeIfAbsent(courseId, k -> ConcurrentHashMap.newKeySet()); MessageResponse initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null); ServerSentEvent> initEvent = ServerSentEvent.>builder() @@ -51,7 +59,13 @@ public Flux>> subscribeStudentMessages(String log.debug("courseId와 일치하는 수업을 찾을 수 없습니다."); return Flux.empty(); } - Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> Sinks.many().multicast().onBackpressureBuffer()); + Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); + throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); + } + return Sinks.many().multicast().onBackpressureBuffer(); + }); courseStudentMap.get(courseId).add(studentId); MessageResponse initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null); ServerSentEvent> initEvent = ServerSentEvent.>builder() @@ -140,20 +154,24 @@ private void closeStudentConnection(String studentId, String courseId) { closeConnection(sinks.get(studentId), studentId); } - private void closeConnection(Sinks.Many> sink, String courseId) { - sinks.remove(courseId); - sink.tryEmitComplete(); + private void closeConnection(Sinks.Many> sink, String id) { + if (sink != null) { + sink.tryEmitComplete(); + sinks.remove(id); + currentConnections.decrementAndGet(); + } } private boolean checkIfConnectionFull() { - if (sinks.size() >= MAX_SINKS) { - log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", sinks.size()); + int connectionCount = currentConnections.get(); + if (connectionCount >= MAX_SINKS) { + log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", connectionCount); return true; } return false; } private void showCurrentUsers() { - log.debug("현재 {} 명의 사용자가 연결 중입니다. ", sinks.size()); + log.debug("현재 {} 명의 사용자가 연결 중입니다. ", currentConnections.get()); } } diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java index d35984e..21dc477 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/global/exception/code/SseErrorCode.java @@ -8,6 +8,7 @@ @AllArgsConstructor public enum SseErrorCode implements ErrorCode { USER_NOT_FOUND(HttpStatus.NOT_FOUND, "전송 대상을 찾을 수 없습니다."), + CONNECTION_LIMIT_EXCEEDED(HttpStatus.TOO_MANY_REQUESTS, "현재 사용자가 너무 많습니다."), MESSAGE_SEND_FAILURE(HttpStatus.INTERNAL_SERVER_ERROR, "서버 문제로 메시지 전송에 실패했습니다."); private final HttpStatus status; From b8a37ed27a9ea5491efad753110b31d6f221ef81 Mon Sep 17 00:00:00 2001 From: sunohkim Date: Thu, 27 Feb 2025 11:36:26 +0900 Subject: [PATCH 4/5] =?UTF-8?q?Fix:=20AtomicInteger=EB=A5=BC=20=EC=A0=9C?= =?UTF-8?q?=EA=B1=B0=ED=95=98=EA=B3=A0=20ConcurrentHashMap.size()=EB=A1=9C?= =?UTF-8?q?=20=EC=97=B0=EA=B2=B0=20=EA=B0=9C=EC=88=98=20=EA=B4=80=EB=A6=AC?= =?UTF-8?q?=20#251?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/sse/service/SseService.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index 0933821..fd011d3 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -15,7 +15,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -24,7 +23,6 @@ public class SseService { private final Map>> sinks = new ConcurrentHashMap<>(); private final Map> courseStudentMap = new ConcurrentHashMap<>(); private static final int MAX_SINKS = 1500; - private final AtomicInteger currentConnections = new AtomicInteger(0); private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10; private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5; private final int RETRY_INTERVAL_SECONDS = 1; @@ -34,8 +32,7 @@ public Flux>> subscribeCourseMessages(String return Flux.empty(); } Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> { - if (currentConnections.incrementAndGet() > MAX_SINKS) { - currentConnections.decrementAndGet(); + if (sinks.size() >= MAX_SINKS) { throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); } return Sinks.many().multicast().onBackpressureBuffer(); @@ -60,8 +57,7 @@ public Flux>> subscribeStudentMessages(String return Flux.empty(); } Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> { - if (currentConnections.incrementAndGet() > MAX_SINKS) { - currentConnections.decrementAndGet(); + if (sinks.size() >= MAX_SINKS) { throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); } return Sinks.many().multicast().onBackpressureBuffer(); @@ -158,20 +154,19 @@ private void closeConnection(Sinks.Many> sink, String id) { if (sink != null) { sink.tryEmitComplete(); sinks.remove(id); - currentConnections.decrementAndGet(); } } private boolean checkIfConnectionFull() { - int connectionCount = currentConnections.get(); - if (connectionCount >= MAX_SINKS) { - log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", connectionCount); + int currentConnections = sinks.size(); + if (currentConnections >= MAX_SINKS) { + log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", currentConnections); return true; } return false; } private void showCurrentUsers() { - log.debug("현재 {} 명의 사용자가 연결 중입니다. ", currentConnections.get()); + log.debug("현재 {} 명의 사용자가 연결 중입니다. ", sinks.size()); } } From 8f06300ccacbb67b9b02c9cc6d6deb8abe59b1e8 Mon Sep 17 00:00:00 2001 From: sunohkim Date: Thu, 27 Feb 2025 11:41:44 +0900 Subject: [PATCH 5/5] =?UTF-8?q?Revert=20"Fix:=20AtomicInteger=EB=A5=BC=20?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0=ED=95=98=EA=B3=A0=20ConcurrentHashMap.size()?= =?UTF-8?q?=EB=A1=9C=20=EC=97=B0=EA=B2=B0=20=EA=B0=9C=EC=88=98=20=EA=B4=80?= =?UTF-8?q?=EB=A6=AC=20#251"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit b8a37ed27a9ea5491efad753110b31d6f221ef81. --- .../domain/sse/service/SseService.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java index fd011d3..0933821 100644 --- a/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java +++ b/back-end/reacton-classroom/src/main/java/com/softeer/reacton_classroom/domain/sse/service/SseService.java @@ -15,6 +15,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -23,6 +24,7 @@ public class SseService { private final Map>> sinks = new ConcurrentHashMap<>(); private final Map> courseStudentMap = new ConcurrentHashMap<>(); private static final int MAX_SINKS = 1500; + private final AtomicInteger currentConnections = new AtomicInteger(0); private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10; private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5; private final int RETRY_INTERVAL_SECONDS = 1; @@ -32,7 +34,8 @@ public Flux>> subscribeCourseMessages(String return Flux.empty(); } Sinks.Many> sink = sinks.computeIfAbsent(courseId, k -> { - if (sinks.size() >= MAX_SINKS) { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); } return Sinks.many().multicast().onBackpressureBuffer(); @@ -57,7 +60,8 @@ public Flux>> subscribeStudentMessages(String return Flux.empty(); } Sinks.Many> sink = sinks.computeIfAbsent(studentId, k -> { - if (sinks.size() >= MAX_SINKS) { + if (currentConnections.incrementAndGet() > MAX_SINKS) { + currentConnections.decrementAndGet(); throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED); } return Sinks.many().multicast().onBackpressureBuffer(); @@ -154,19 +158,20 @@ private void closeConnection(Sinks.Many> sink, String id) { if (sink != null) { sink.tryEmitComplete(); sinks.remove(id); + currentConnections.decrementAndGet(); } } private boolean checkIfConnectionFull() { - int currentConnections = sinks.size(); - if (currentConnections >= MAX_SINKS) { - log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", currentConnections); + int connectionCount = currentConnections.get(); + if (connectionCount >= MAX_SINKS) { + log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", connectionCount); return true; } return false; } private void showCurrentUsers() { - log.debug("현재 {} 명의 사용자가 연결 중입니다. ", sinks.size()); + log.debug("현재 {} 명의 사용자가 연결 중입니다. ", currentConnections.get()); } }