From 2ad8db58d155dbd240ea1b2fe3757d18193001fa Mon Sep 17 00:00:00 2001 From: 2ghrms Date: Fri, 22 Aug 2025 00:41:12 +0900 Subject: [PATCH] =?UTF-8?q?refactor/#336:=20=EC=84=A4=EB=AC=B8=20=EB=A7=88?= =?UTF-8?q?=EA=B0=90=EC=9D=BC=EC=8B=9C=EC=97=90=20=EC=8B=A4=ED=8C=A8?= =?UTF-8?q?=EC=8B=9C=20retry=20=EC=A0=84=EB=9E=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 워커 처리 제외하도록 반영 - 재시도 로직 추가 - 5분 간격으로 시도하도록 변경 --- .../MoodTrackerCommandServiceImpl.java | 3 ++ .../api/infra/redis/RedisReportConsumer.java | 21 ++++---- .../haru/api/infra/redis/ReportWorker.java | 51 +++++++++++++++++++ 3 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/haru/api/infra/redis/ReportWorker.java diff --git a/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java b/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java index fc9b30af..72a79965 100644 --- a/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java +++ b/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java @@ -146,6 +146,9 @@ public void delete( User user, MoodTracker moodTracker ) { + // redis queue에서 비워줘서 중복 처리 제외 + redisReportConsumer.removeFromQueue(moodTracker.getId()); + UserWorkspace foundUserWorkspace = userWorkspaceRepository.findByWorkspaceIdAndUserId(moodTracker.getWorkspace().getId(), user.getId()) .orElseThrow(() -> new UserWorkspaceHandler(ErrorStatus.USER_WORKSPACE_NOT_FOUND)); diff --git a/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java b/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java index 30a5a61d..7d3c2630 100644 --- a/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java +++ b/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java @@ -25,22 +25,21 @@ public class RedisReportConsumer { private static final long BATCH_SIZE = 20; @Transactional - @Scheduled(cron = "0 0/30 * * * *") // 매시 0분, 30분 실행 + @Scheduled(cron = "0 0/5 * * * *") // 정각부터 5분 마다 실행 public void pollQueueEvery30Minutes() { long now = Instant.now().getEpochSecond(); - Set dueIds = redisTemplate.opsForZSet() - .rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE); + while (true) { + Set dueIds = redisTemplate.opsForZSet() + .rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE); - if (dueIds == null || dueIds.isEmpty()) return; + if (dueIds == null || dueIds.isEmpty()) break; - for (String moodTrackerId : dueIds) { - try { - // PDF, DOCX파일 바이트 배열로 생성 및 썸네일 생성 & 업로드 / DB에 keyName저장 - moodTrackerReportService.generateAndUploadReportFileAndThumbnail(Long.parseLong(moodTrackerId)); - redisTemplate.opsForZSet().remove(QUEUE_KEY, moodTrackerId); - } catch (Exception e) { - log.error("GPT 리포트 생성 실패: {}", moodTrackerId, e); + for (String id : dueIds) { + // Worker Queue로 push + redisTemplate.opsForList().leftPush("report-worker-queue", id); + // ZSET에서는 제거 + redisTemplate.opsForZSet().remove(QUEUE_KEY, id); } } } diff --git a/src/main/java/com/haru/api/infra/redis/ReportWorker.java b/src/main/java/com/haru/api/infra/redis/ReportWorker.java new file mode 100644 index 00000000..b20e5bf1 --- /dev/null +++ b/src/main/java/com/haru/api/infra/redis/ReportWorker.java @@ -0,0 +1,51 @@ +package com.haru.api.infra.redis; + +import com.haru.api.domain.moodTracker.service.MoodTrackerReportService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ReportWorker { + + private final StringRedisTemplate redisTemplate; + private final MoodTrackerReportService reportService; + private final ExecutorService executor = Executors.newFixedThreadPool(5); // 5개 병렬 Worker + + private static final String WORKER_QUEUE = "report-worker-queue"; + + @Scheduled(fixedDelay = 2000) // 2초마다 큐 확인 + public void consumeTasks() { + String task = redisTemplate.opsForList().rightPop(WORKER_QUEUE); + if (task != null) { + Long moodTrackerId = Long.parseLong(task); + executor.submit(() -> process(moodTrackerId)); + } + } + + private void process(Long moodTrackerId) { + try { + reportService.generateAndUploadReportFileAndThumbnail(moodTrackerId); + log.info("Report 생성 성공: {}", moodTrackerId); + } catch (Exception e) { + log.error("Report 생성 실패 (재시도 예정): {}", moodTrackerId, e); + + String key = "retry-count:" + moodTrackerId; + Long retry = redisTemplate.opsForValue().increment(key); + + if (retry != null && retry <= 3) { + redisTemplate.opsForList().leftPush(WORKER_QUEUE, moodTrackerId.toString()); + } else { + log.error("재시도 한계 초과, 실패 큐로 이동: {}", moodTrackerId); + redisTemplate.opsForList().leftPush("report-failed-queue", moodTrackerId.toString()); + } + } + } +}