diff --git a/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java b/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java index fc9b30af..72a79965 100644 --- a/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java +++ b/src/main/java/com/haru/api/domain/moodTracker/service/MoodTrackerCommandServiceImpl.java @@ -146,6 +146,9 @@ public void delete( User user, MoodTracker moodTracker ) { + // redis queue에서 비워줘서 중복 처리 제외 + redisReportConsumer.removeFromQueue(moodTracker.getId()); + UserWorkspace foundUserWorkspace = userWorkspaceRepository.findByWorkspaceIdAndUserId(moodTracker.getWorkspace().getId(), user.getId()) .orElseThrow(() -> new UserWorkspaceHandler(ErrorStatus.USER_WORKSPACE_NOT_FOUND)); diff --git a/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java b/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java index 30a5a61d..7d3c2630 100644 --- a/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java +++ b/src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java @@ -25,22 +25,21 @@ public class RedisReportConsumer { private static final long BATCH_SIZE = 20; @Transactional - @Scheduled(cron = "0 0/30 * * * *") // 매시 0분, 30분 실행 + @Scheduled(cron = "0 0/5 * * * *") // 정각부터 5분 마다 실행 public void pollQueueEvery30Minutes() { long now = Instant.now().getEpochSecond(); - Set dueIds = redisTemplate.opsForZSet() - .rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE); + while (true) { + Set dueIds = redisTemplate.opsForZSet() + .rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE); - if (dueIds == null || dueIds.isEmpty()) return; + if (dueIds == null || dueIds.isEmpty()) break; - for (String moodTrackerId : dueIds) { - try { - // PDF, DOCX파일 바이트 배열로 생성 및 썸네일 생성 & 업로드 / DB에 keyName저장 - moodTrackerReportService.generateAndUploadReportFileAndThumbnail(Long.parseLong(moodTrackerId)); - redisTemplate.opsForZSet().remove(QUEUE_KEY, moodTrackerId); - } catch (Exception e) { - log.error("GPT 리포트 생성 실패: {}", moodTrackerId, e); + for (String id : dueIds) { + // Worker Queue로 push + redisTemplate.opsForList().leftPush("report-worker-queue", id); + // ZSET에서는 제거 + redisTemplate.opsForZSet().remove(QUEUE_KEY, id); } } } diff --git a/src/main/java/com/haru/api/infra/redis/ReportWorker.java b/src/main/java/com/haru/api/infra/redis/ReportWorker.java new file mode 100644 index 00000000..b20e5bf1 --- /dev/null +++ b/src/main/java/com/haru/api/infra/redis/ReportWorker.java @@ -0,0 +1,51 @@ +package com.haru.api.infra.redis; + +import com.haru.api.domain.moodTracker.service.MoodTrackerReportService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ReportWorker { + + private final StringRedisTemplate redisTemplate; + private final MoodTrackerReportService reportService; + private final ExecutorService executor = Executors.newFixedThreadPool(5); // 5개 병렬 Worker + + private static final String WORKER_QUEUE = "report-worker-queue"; + + @Scheduled(fixedDelay = 2000) // 2초마다 큐 확인 + public void consumeTasks() { + String task = redisTemplate.opsForList().rightPop(WORKER_QUEUE); + if (task != null) { + Long moodTrackerId = Long.parseLong(task); + executor.submit(() -> process(moodTrackerId)); + } + } + + private void process(Long moodTrackerId) { + try { + reportService.generateAndUploadReportFileAndThumbnail(moodTrackerId); + log.info("Report 생성 성공: {}", moodTrackerId); + } catch (Exception e) { + log.error("Report 생성 실패 (재시도 예정): {}", moodTrackerId, e); + + String key = "retry-count:" + moodTrackerId; + Long retry = redisTemplate.opsForValue().increment(key); + + if (retry != null && retry <= 3) { + redisTemplate.opsForList().leftPush(WORKER_QUEUE, moodTrackerId.toString()); + } else { + log.error("재시도 한계 초과, 실패 큐로 이동: {}", moodTrackerId); + redisTemplate.opsForList().leftPush("report-failed-queue", moodTrackerId.toString()); + } + } + } +}