Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class SseService {

private final Map<String, Sinks.Many<MessageResponse<?>>> sinks = new ConcurrentHashMap<>();
private final Map<String, Set<String>> courseStudentMap = new ConcurrentHashMap<>();
private static final int MAX_SINKS = 1500;
private final AtomicInteger currentConnections = new AtomicInteger(0);
private final int CONNECTION_TIMEOUT_MINUTES_COURSE = 10;
private final int CONNECTION_TIMEOUT_MINUTES_STUDENT = 5;
private final int RETRY_INTERVAL_SECONDS = 1;

public Flux<ServerSentEvent<MessageResponse<?>>> subscribeCourseMessages(String courseId) {
Sinks.Many<MessageResponse<?>> sink = sinks.computeIfAbsent(courseId, k -> Sinks.many().multicast().onBackpressureBuffer());
if (checkIfConnectionFull()) {
return Flux.empty();
}
Sinks.Many<MessageResponse<?>> sink = sinks.computeIfAbsent(courseId, k -> {
if (currentConnections.incrementAndGet() > MAX_SINKS) {
currentConnections.decrementAndGet();
throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED);
}
return Sinks.many().multicast().onBackpressureBuffer();
});
courseStudentMap.computeIfAbsent(courseId, k -> ConcurrentHashMap.newKeySet());
MessageResponse<?> initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null);
ServerSentEvent<MessageResponse<?>> initEvent = ServerSentEvent.<MessageResponse<?>>builder()
Expand All @@ -40,11 +52,20 @@ public Flux<ServerSentEvent<MessageResponse<?>>> subscribeCourseMessages(String
}

public Flux<ServerSentEvent<MessageResponse<?>>> subscribeStudentMessages(String studentId, String courseId) {
if (checkIfConnectionFull()) {
return Flux.empty();
}
if (!courseStudentMap.containsKey(courseId)) {
log.debug("courseId와 일치하는 수업을 찾을 수 없습니다.");
return Flux.empty();
}
Sinks.Many<MessageResponse<?>> sink = sinks.computeIfAbsent(studentId, k -> Sinks.many().multicast().onBackpressureBuffer());
Sinks.Many<MessageResponse<?>> sink = sinks.computeIfAbsent(studentId, k -> {
if (currentConnections.incrementAndGet() > MAX_SINKS) {
currentConnections.decrementAndGet();
throw new BaseException(SseErrorCode.CONNECTION_LIMIT_EXCEEDED);
}
return Sinks.many().multicast().onBackpressureBuffer();
});
courseStudentMap.get(courseId).add(studentId);
MessageResponse<?> initMessage = new MessageResponse<>("CONNECTION_ESTABLISHED", null);
ServerSentEvent<MessageResponse<?>> initEvent = ServerSentEvent.<MessageResponse<?>>builder()
Expand Down Expand Up @@ -78,13 +99,14 @@ public void sendMessageToStudents(String courseId, MessageResponse<?> message) {
}

private Flux<ServerSentEvent<MessageResponse<?>>> openCourseConnection(Sinks.Many<MessageResponse<?>> sink, ServerSentEvent<MessageResponse<?>> initEvent, String courseId) {
showCurrentUsers();
return Flux.concat(Mono.just(initEvent),
sink.asFlux()
.map(data -> ServerSentEvent.<MessageResponse<?>>builder(data).build())
)
.timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_COURSE))
.onErrorResume(TimeoutException.class, e -> {
log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId={}", courseId);
log.warn("SSE 연결 제한 시간이 초과되었습니다. : courseId = {}", courseId);
closeConnection(sink, courseId);
return Flux.empty();
})
Expand All @@ -95,13 +117,14 @@ private Flux<ServerSentEvent<MessageResponse<?>>> openCourseConnection(Sinks.Man
}

private Flux<ServerSentEvent<MessageResponse<?>>> openStudentConnection(Sinks.Many<MessageResponse<?>> sink, ServerSentEvent<MessageResponse<?>> initEvent, String studentId, String courseId) {
showCurrentUsers();
return Flux.concat(Mono.just(initEvent),
sink.asFlux()
.map(data -> ServerSentEvent.<MessageResponse<?>>builder(data).build())
)
.timeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES_STUDENT))
.onErrorResume(TimeoutException.class, e -> {
log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId={}", studentId);
log.warn("학생 SSE 연결 제한 시간이 초과되었습니다. : studentId = {}", studentId);
closeConnection(sink, studentId);
return Flux.empty();
})
Expand Down Expand Up @@ -131,8 +154,24 @@ private void closeStudentConnection(String studentId, String courseId) {
closeConnection(sinks.get(studentId), studentId);
}

private void closeConnection(Sinks.Many<MessageResponse<?>> sink, String courseId) {
sinks.remove(courseId);
sink.tryEmitComplete();
private void closeConnection(Sinks.Many<MessageResponse<?>> sink, String id) {
if (sink != null) {
sink.tryEmitComplete();
sinks.remove(id);
currentConnections.decrementAndGet();
}
}

private boolean checkIfConnectionFull() {
int connectionCount = currentConnections.get();
if (connectionCount >= MAX_SINKS) {
log.debug("SSE 연결 제한을 초과했습니다. : 현재 연결 개수 = {}", connectionCount);
return true;
}
return false;
}

private void showCurrentUsers() {
log.debug("현재 {} 명의 사용자가 연결 중입니다. ", currentConnections.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
@AllArgsConstructor
public enum SseErrorCode implements ErrorCode {
USER_NOT_FOUND(HttpStatus.NOT_FOUND, "전송 대상을 찾을 수 없습니다."),
CONNECTION_LIMIT_EXCEEDED(HttpStatus.TOO_MANY_REQUESTS, "현재 사용자가 너무 많습니다."),
MESSAGE_SEND_FAILURE(HttpStatus.INTERNAL_SERVER_ERROR, "서버 문제로 메시지 전송에 실패했습니다.");

private final HttpStatus status;
Expand Down