Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public void delete(
User user,
MoodTracker moodTracker
) {
// redis queueμ—μ„œ λΉ„μ›Œμ€˜μ„œ 쀑볡 처리 μ œμ™Έ
redisReportConsumer.removeFromQueue(moodTracker.getId());

UserWorkspace foundUserWorkspace = userWorkspaceRepository.findByWorkspaceIdAndUserId(moodTracker.getWorkspace().getId(), user.getId())
.orElseThrow(() -> new UserWorkspaceHandler(ErrorStatus.USER_WORKSPACE_NOT_FOUND));

Expand Down
21 changes: 10 additions & 11 deletions src/main/java/com/haru/api/infra/redis/RedisReportConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,21 @@ public class RedisReportConsumer {
private static final long BATCH_SIZE = 20;

@Transactional
@Scheduled(cron = "0 0/30 * * * *") // λ§€μ‹œ 0λΆ„, 30λΆ„ μ‹€ν–‰
@Scheduled(cron = "0 0/5 * * * *") // 정각뢀터 5λΆ„ λ§ˆλ‹€ μ‹€ν–‰
public void pollQueueEvery30Minutes() {
long now = Instant.now().getEpochSecond();

Set<String> dueIds = redisTemplate.opsForZSet()
.rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE);
while (true) {
Set<String> dueIds = redisTemplate.opsForZSet()
.rangeByScore(QUEUE_KEY, 0, now, 0, BATCH_SIZE);

if (dueIds == null || dueIds.isEmpty()) return;
if (dueIds == null || dueIds.isEmpty()) break;

for (String moodTrackerId : dueIds) {
try {
// PDF, DOCX파일 λ°”μ΄νŠΈ λ°°μ—΄λ‘œ 생성 및 썸넀일 생성 & μ—…λ‘œλ“œ / DB에 keyNameμ €μž₯
moodTrackerReportService.generateAndUploadReportFileAndThumbnail(Long.parseLong(moodTrackerId));
redisTemplate.opsForZSet().remove(QUEUE_KEY, moodTrackerId);
} catch (Exception e) {
log.error("GPT 리포트 생성 μ‹€νŒ¨: {}", moodTrackerId, e);
for (String id : dueIds) {
// Worker Queue둜 push
redisTemplate.opsForList().leftPush("report-worker-queue", id);
// ZSETμ—μ„œλŠ” 제거
redisTemplate.opsForZSet().remove(QUEUE_KEY, id);
}
}
}
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/com/haru/api/infra/redis/ReportWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.haru.api.infra.redis;

import com.haru.api.domain.moodTracker.service.MoodTrackerReportService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
@RequiredArgsConstructor
@Slf4j
public class ReportWorker {

private final StringRedisTemplate redisTemplate;
private final MoodTrackerReportService reportService;
private final ExecutorService executor = Executors.newFixedThreadPool(5); // 5개 병렬 Worker

private static final String WORKER_QUEUE = "report-worker-queue";

@Scheduled(fixedDelay = 2000) // 2μ΄ˆλ§ˆλ‹€ 큐 확인
public void consumeTasks() {
String task = redisTemplate.opsForList().rightPop(WORKER_QUEUE);
if (task != null) {
Long moodTrackerId = Long.parseLong(task);
executor.submit(() -> process(moodTrackerId));
}
}

private void process(Long moodTrackerId) {
try {
reportService.generateAndUploadReportFileAndThumbnail(moodTrackerId);
log.info("Report 생성 성곡: {}", moodTrackerId);
} catch (Exception e) {
log.error("Report 생성 μ‹€νŒ¨ (μž¬μ‹œλ„ μ˜ˆμ •): {}", moodTrackerId, e);

String key = "retry-count:" + moodTrackerId;
Long retry = redisTemplate.opsForValue().increment(key);

if (retry != null && retry <= 3) {
redisTemplate.opsForList().leftPush(WORKER_QUEUE, moodTrackerId.toString());
} else {
log.error("μž¬μ‹œλ„ ν•œκ³„ 초과, μ‹€νŒ¨ 큐둜 이동: {}", moodTrackerId);
redisTemplate.opsForList().leftPush("report-failed-queue", moodTrackerId.toString());
}
}
}
}