From 06b75b6a64a016c18684b15d88248cbed2f1f42b Mon Sep 17 00:00:00 2001 From: Kim Taemin Date: Thu, 6 Nov 2025 21:32:55 +0900 Subject: [PATCH] =?UTF-8?q?[FEAT]=20=EC=9E=90=EC=9E=AC=20=EC=A3=BC?= =?UTF-8?q?=EB=AC=B8=20=EB=AA=A9=EB=A1=9D=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20?= =?UTF-8?q?=EB=B0=9C=ED=96=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 4 + .../sampoom/purchase/PurchaseApplication.java | 2 + .../controller/PurchaseController.java | 7 + .../purchase/dto/PurchaseOrderItemDto.java | 2 + .../purchase/dto/PurchaseOrderRequestDto.java | 5 +- .../dto/PurchaseOrderResponseDto.java | 7 +- .../api/purchase/entity/PurchaseOrder.java | 8 +- .../purchase/entity/PurchaseOrderItem.java | 2 + .../api/purchase/service/PurchaseService.java | 131 +++++++++++++++++- .../common/config/swagger/SwaggerConfig.java | 2 +- .../purchase/common/event/OutboxStatus.java | 8 ++ .../purchase/common/event/PurchaseEvent.java | 29 ++++ .../common/event/PurchaseEventService.java | 92 ++++++++++++ .../purchase/common/event/PurchaseOutbox.java | 96 +++++++++++++ .../common/event/PurchaseOutboxPublisher.java | 75 ++++++++++ .../event/PurchaseOutboxRepository.java | 19 +++ 16 files changed, 475 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/sampoom/purchase/common/event/OutboxStatus.java create mode 100644 src/main/java/com/sampoom/purchase/common/event/PurchaseEvent.java create mode 100644 src/main/java/com/sampoom/purchase/common/event/PurchaseEventService.java create mode 100644 src/main/java/com/sampoom/purchase/common/event/PurchaseOutbox.java create mode 100644 src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxPublisher.java create mode 100644 src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxRepository.java diff --git a/build.gradle b/build.gradle index 5d96016..9fc996e 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,10 @@ dependencies { //Swagger implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.7.0' + + + implementation 'org.springframework.kafka:spring-kafka' + implementation 'com.fasterxml.jackson.core:jackson-databind' } tasks.named('test') { diff --git a/src/main/java/com/sampoom/purchase/PurchaseApplication.java b/src/main/java/com/sampoom/purchase/PurchaseApplication.java index 4d5c62d..d5ea182 100644 --- a/src/main/java/com/sampoom/purchase/PurchaseApplication.java +++ b/src/main/java/com/sampoom/purchase/PurchaseApplication.java @@ -2,8 +2,10 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication +@EnableScheduling public class PurchaseApplication { public static void main(String[] args) { diff --git a/src/main/java/com/sampoom/purchase/api/purchase/controller/PurchaseController.java b/src/main/java/com/sampoom/purchase/api/purchase/controller/PurchaseController.java index 52735a3..be56ca0 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/controller/PurchaseController.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/controller/PurchaseController.java @@ -38,6 +38,13 @@ public ResponseEntity> cancelOrder( return ApiResponse.success(SuccessStatus.OK, purchaseService.cancelOrder(orderId)); } + @Operation(summary = "자재 주문 입고 처리", description = "주문된 자재를 입고 처리합니다.") + @PatchMapping("/{orderId}/receive") + public ResponseEntity> receiveOrder( + @PathVariable Long orderId) { + return ApiResponse.success(SuccessStatus.OK, purchaseService.receiveOrder(orderId)); + } + @Operation(summary = "자재 주문 삭제", description = "주문을 삭제합니다(소프트 삭제).") @DeleteMapping("/{orderId}") public ResponseEntity> deleteOrder( diff --git a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderItemDto.java b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderItemDto.java index afb30c0..ea49aa7 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderItemDto.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderItemDto.java @@ -18,6 +18,7 @@ public class PurchaseOrderItemDto { private String unit; private Long quantity; private BigDecimal unitPrice; + private Integer leadTimeDays; // 자재 리드타임 (일 단위) public static PurchaseOrderItemDto from(PurchaseOrderItem item) { return PurchaseOrderItemDto.builder() @@ -26,6 +27,7 @@ public static PurchaseOrderItemDto from(PurchaseOrderItem item) { .unit(item.getUnit()) .quantity(item.getQuantity()) .unitPrice(item.getUnitPrice()) + .leadTimeDays(item.getLeadTimeDays()) .build(); } } diff --git a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderRequestDto.java b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderRequestDto.java index 727d004..9703f42 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderRequestDto.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderRequestDto.java @@ -7,7 +7,7 @@ import lombok.NoArgsConstructor; import org.springframework.format.annotation.DateTimeFormat; -import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.List; @Getter @@ -18,8 +18,7 @@ public class PurchaseOrderRequestDto { private Long factoryId; private String factoryName; - @JsonFormat(pattern = "yyyy-MM-dd") - private LocalDate requiredAt; + private LocalDateTime requiredAt; private String requesterName; // 요청자 이름 추가 diff --git a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderResponseDto.java b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderResponseDto.java index 753730f..f04a95c 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderResponseDto.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/dto/PurchaseOrderResponseDto.java @@ -1,6 +1,5 @@ package com.sampoom.purchase.api.purchase.dto; -import com.fasterxml.jackson.annotation.JsonFormat; import com.sampoom.purchase.api.purchase.entity.OrderStatus; import com.sampoom.purchase.api.purchase.entity.PurchaseOrder; import com.sampoom.purchase.api.purchase.entity.PurchaseOrderItem; @@ -25,8 +24,9 @@ public class PurchaseOrderResponseDto { private String orderCode; private LocalDateTime orderAt; - @JsonFormat(pattern = "yyyy-MM-dd") - private LocalDate requiredAt; // 날짜만 + private LocalDateTime requiredAt; + + private LocalDateTime expectedDeliveryAt; private Long factoryId; private String factoryName; @@ -43,6 +43,7 @@ public static PurchaseOrderResponseDto from(PurchaseOrder order, List items; + public void receive() { if (this.status != OrderStatus.ORDERED) { throw new BadRequestException(ErrorStatus.ORDER_ALREADY_PROCESSED); diff --git a/src/main/java/com/sampoom/purchase/api/purchase/entity/PurchaseOrderItem.java b/src/main/java/com/sampoom/purchase/api/purchase/entity/PurchaseOrderItem.java index e81be9f..9031da8 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/entity/PurchaseOrderItem.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/entity/PurchaseOrderItem.java @@ -31,4 +31,6 @@ public class PurchaseOrderItem { @Column(precision = 19, scale = 2) private BigDecimal unitPrice; + + private Integer leadTimeDays; // 자재 리드타임 (일 단위) } diff --git a/src/main/java/com/sampoom/purchase/api/purchase/service/PurchaseService.java b/src/main/java/com/sampoom/purchase/api/purchase/service/PurchaseService.java index d32f263..934f23b 100644 --- a/src/main/java/com/sampoom/purchase/api/purchase/service/PurchaseService.java +++ b/src/main/java/com/sampoom/purchase/api/purchase/service/PurchaseService.java @@ -5,6 +5,7 @@ import com.sampoom.purchase.api.purchase.entity.*; import com.sampoom.purchase.api.purchase.repository.PurchaseOrderItemRepository; import com.sampoom.purchase.api.purchase.repository.PurchaseOrderRepository; +import com.sampoom.purchase.common.event.PurchaseEventService; import com.sampoom.purchase.common.exception.NotFoundException; import com.sampoom.purchase.common.response.ErrorStatus; import com.sampoom.purchase.common.response.PageResponseDto; @@ -30,6 +31,7 @@ public class PurchaseService { private final PurchaseOrderRepository orderRepository; private final PurchaseOrderItemRepository orderItemRepository; + private final PurchaseEventService purchaseEventService; @Transactional public PurchaseOrderResponseDto createMaterialOrder(PurchaseOrderRequestDto requestDto) { @@ -43,6 +45,16 @@ public PurchaseOrderResponseDto createMaterialOrder(PurchaseOrderRequestDto requ // 긴급도 계산: 오늘과 필요일 차이 기준 UrgencyLevel urgency = calculateUrgency(requestDto.getRequiredAt()); + // 최대 리드타임 계산: 자재들 중 가장 긴 리드타임 + Integer maxLeadTime = requestDto.getItems() == null ? 0 : + requestDto.getItems().stream() + .mapToInt(item -> item.getLeadTimeDays() == null ? 0 : item.getLeadTimeDays()) + .max() + .orElse(0); + + // 예정일 계산: 주문일 + 최대 리드타임 + LocalDateTime expectedDeliveryAt = LocalDateTime.now().plusDays(maxLeadTime); + PurchaseOrder order = PurchaseOrder.builder() .code(generateOrderCode()) .factoryId(requestDto.getFactoryId()) @@ -50,6 +62,7 @@ public PurchaseOrderResponseDto createMaterialOrder(PurchaseOrderRequestDto requ .orderAt(LocalDateTime.now()) .factoryName(requestDto.getFactoryName()) .requiredAt(requestDto.getRequiredAt()) + .expectedDeliveryAt(expectedDeliveryAt) .requesterName(requestDto.getRequesterName()) .expectedAmount(expectedAmount) .urgency(urgency) @@ -57,25 +70,50 @@ public PurchaseOrderResponseDto createMaterialOrder(PurchaseOrderRequestDto requ orderRepository.save(order); + // lambda에서 사용하기 위해 final 변수로 복사 + final PurchaseOrder savedOrder = order; + List orderItems = requestDto.getItems().stream() .map(itemDto -> PurchaseOrderItem.builder() - .purchaseOrder(order) + .purchaseOrder(savedOrder) .materialCode(itemDto.getMaterialCode()) .materialName(itemDto.getMaterialName()) .unit(itemDto.getUnit()) .quantity(itemDto.getQuantity()) .unitPrice(itemDto.getUnitPrice()) + .leadTimeDays(itemDto.getLeadTimeDays()) .build()) .collect(Collectors.toList()); orderItemRepository.saveAll(orderItems); - return PurchaseOrderResponseDto.from(order, orderItems); + // order에 items 설정 (이벤트에서 사용하기 위해) + PurchaseOrder orderWithItems = PurchaseOrder.builder() + .id(savedOrder.getId()) + .code(savedOrder.getCode()) + .factoryId(savedOrder.getFactoryId()) + .factoryName(savedOrder.getFactoryName()) + .status(savedOrder.getStatus()) + .orderAt(savedOrder.getOrderAt()) + .receivedAt(savedOrder.getReceivedAt()) + .canceledAt(savedOrder.getCanceledAt()) + .requiredAt(savedOrder.getRequiredAt()) + .expectedDeliveryAt(savedOrder.getExpectedDeliveryAt()) + .requesterName(savedOrder.getRequesterName()) + .expectedAmount(savedOrder.getExpectedAmount()) + .urgency(savedOrder.getUrgency()) + .items(orderItems) + .build(); + + // 주문 생성 이벤트 발행 + purchaseEventService.recordOrderCreated(orderWithItems); + + return PurchaseOrderResponseDto.from(savedOrder, orderItems); } - private UrgencyLevel calculateUrgency(LocalDate requiredAt) { + private UrgencyLevel calculateUrgency(LocalDateTime requiredAt) { if (requiredAt == null) return UrgencyLevel.LOW; - long days = ChronoUnit.DAYS.between(LocalDate.now(), requiredAt); + long days = ChronoUnit.DAYS.between(LocalDateTime.now(), requiredAt); if (days <= 1) return UrgencyLevel.HIGH; if (days <= 3) return UrgencyLevel.MEDIUM; return UrgencyLevel.LOW; @@ -119,7 +157,31 @@ public PurchaseOrderResponseDto cancelOrder(Long orderId) { .orElseThrow(() -> new NotFoundException(ErrorStatus.ORDER_NOT_FOUND)); order.cancel(); orderRepository.save(order); + + // items 로드하여 이벤트에 포함 List items = orderItemRepository.findByPurchaseOrderId(orderId); + + // order에 items 설정 + order = PurchaseOrder.builder() + .id(order.getId()) + .code(order.getCode()) + .factoryId(order.getFactoryId()) + .factoryName(order.getFactoryName()) + .status(order.getStatus()) + .orderAt(order.getOrderAt()) + .receivedAt(order.getReceivedAt()) + .canceledAt(order.getCanceledAt()) + .requiredAt(order.getRequiredAt()) + .expectedDeliveryAt(order.getExpectedDeliveryAt()) + .requesterName(order.getRequesterName()) + .expectedAmount(order.getExpectedAmount()) + .urgency(order.getUrgency()) + .items(items) + .build(); + + // 주문 취소 이벤트 발행 + purchaseEventService.recordOrderCanceled(order); + return PurchaseOrderResponseDto.from(order, items); } @@ -127,12 +189,71 @@ public PurchaseOrderResponseDto cancelOrder(Long orderId) { public void deleteOrder(Long orderId) { PurchaseOrder order = orderRepository.findById(orderId) .orElseThrow(() -> new NotFoundException(ErrorStatus.ORDER_NOT_FOUND)); + + // items 로드하여 이벤트에 포함 + List items = orderItemRepository.findByPurchaseOrderId(orderId); + + // order에 items 설정 + order = PurchaseOrder.builder() + .id(order.getId()) + .code(order.getCode()) + .factoryId(order.getFactoryId()) + .factoryName(order.getFactoryName()) + .status(order.getStatus()) + .orderAt(order.getOrderAt()) + .receivedAt(order.getReceivedAt()) + .canceledAt(order.getCanceledAt()) + .requiredAt(order.getRequiredAt()) + .expectedDeliveryAt(order.getExpectedDeliveryAt()) + .requesterName(order.getRequesterName()) + .expectedAmount(order.getExpectedAmount()) + .urgency(order.getUrgency()) + .items(items) + .build(); + + // 주문 삭제 이벤트 발행 + purchaseEventService.recordOrderDeleted(order); + orderRepository.delete(order); } + @Transactional + public PurchaseOrderResponseDto receiveOrder(Long orderId) { + PurchaseOrder order = orderRepository.findById(orderId) + .orElseThrow(() -> new NotFoundException(ErrorStatus.ORDER_NOT_FOUND)); + order.receive(); + orderRepository.save(order); + + // items 로드하여 이벤트에 포함 + List items = orderItemRepository.findByPurchaseOrderId(orderId); + + // order에 items 설정 + order = PurchaseOrder.builder() + .id(order.getId()) + .code(order.getCode()) + .factoryId(order.getFactoryId()) + .factoryName(order.getFactoryName()) + .status(order.getStatus()) + .orderAt(order.getOrderAt()) + .receivedAt(order.getReceivedAt()) + .canceledAt(order.getCanceledAt()) + .requiredAt(order.getRequiredAt()) + .expectedDeliveryAt(order.getExpectedDeliveryAt()) + .requesterName(order.getRequesterName()) + .expectedAmount(order.getExpectedAmount()) + .urgency(order.getUrgency()) + .items(items) + .build(); + + // 자재 입고 처리 이벤트 발행 + purchaseEventService.recordOrderReceived(order); + + return PurchaseOrderResponseDto.from(order, items); + } + private String generateOrderCode() { String datePart = java.time.LocalDate.now() - .format(java.time.format.DateTimeFormatter.BASIC_ISO_DATE); // YYYYMMDD + .format(java.time.format.DateTimeFormatter.ofPattern("yyyy")); // YYYY만 사용 String prefix = "ORD-" + datePart + "-"; String lastCode = orderRepository diff --git a/src/main/java/com/sampoom/purchase/common/config/swagger/SwaggerConfig.java b/src/main/java/com/sampoom/purchase/common/config/swagger/SwaggerConfig.java index e47eb0d..66a1b09 100644 --- a/src/main/java/com/sampoom/purchase/common/config/swagger/SwaggerConfig.java +++ b/src/main/java/com/sampoom/purchase/common/config/swagger/SwaggerConfig.java @@ -20,7 +20,7 @@ public class SwaggerConfig { @Bean public OpenAPI openAPI() { Server localServer = new Server() - .url("http://localhost:8080/") + .url("http://localhost:8081/") .description("로컬 서버"); Server prodServer = new Server() diff --git a/src/main/java/com/sampoom/purchase/common/event/OutboxStatus.java b/src/main/java/com/sampoom/purchase/common/event/OutboxStatus.java new file mode 100644 index 0000000..97e7ef2 --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/OutboxStatus.java @@ -0,0 +1,8 @@ +package com.sampoom.purchase.common.event; + +public enum OutboxStatus { + READY, + PUBLISHED, + FAILED, + DEAD +} diff --git a/src/main/java/com/sampoom/purchase/common/event/PurchaseEvent.java b/src/main/java/com/sampoom/purchase/common/event/PurchaseEvent.java new file mode 100644 index 0000000..50deda2 --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/PurchaseEvent.java @@ -0,0 +1,29 @@ +package com.sampoom.purchase.common.event; + +import java.util.List; + +public record PurchaseEvent( + String eventId, + String eventType, + Long version, + String occurredAt, + Payload payload +) { + public record Payload( + Long orderId, + String orderCode, + Long factoryId, + String factoryName, + String status, + String receivedAt, + Boolean deleted, + List materials + ) {} + + public record Material( + String materialCode, + String materialName, + Integer quantity, + String unit + ) {} +} diff --git a/src/main/java/com/sampoom/purchase/common/event/PurchaseEventService.java b/src/main/java/com/sampoom/purchase/common/event/PurchaseEventService.java new file mode 100644 index 0000000..e570f4f --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/PurchaseEventService.java @@ -0,0 +1,92 @@ +package com.sampoom.purchase.common.event; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sampoom.purchase.api.purchase.entity.PurchaseOrder; +import com.sampoom.purchase.api.purchase.entity.PurchaseOrderItem; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +@Service +@RequiredArgsConstructor +public class PurchaseEventService { + + private final PurchaseOutboxRepository outboxRepository; + private final ObjectMapper objectMapper; + + @Transactional + public void recordOrderReceived(PurchaseOrder order) { + enqueueEvent("PurchaseOrderReceived", order, false); + } + + @Transactional + public void recordOrderCanceled(PurchaseOrder order) { + enqueueEvent("PurchaseOrderCanceled", order, false); + } + + @Transactional + public void recordOrderCreated(PurchaseOrder order) { + enqueueEvent("PurchaseOrderCreated", order, false); + } + + @Transactional + public void recordOrderDeleted(PurchaseOrder order) { + enqueueEvent("PurchaseOrderDeleted", order, true); + } + + // 공통 헬퍼 메서드 + private void enqueueEvent(String eventType, PurchaseOrder order, Boolean deleted) { + // 자재 정보를 Material 리스트로 변환 + List materials = order.getItems() != null ? + order.getItems().stream() + .map(this::convertToMaterial) + .collect(Collectors.toList()) : + List.of(); + + PurchaseEvent evt = new PurchaseEvent( + UUID.randomUUID().toString(), + eventType, + 1L, // 버전 정보 + OffsetDateTime.now().toString(), + new PurchaseEvent.Payload( + order.getId(), + order.getCode(), + order.getFactoryId(), + order.getFactoryName(), + order.getStatus().name(), + order.getReceivedAt() != null ? order.getReceivedAt().toString() : null, + deleted, + materials + ) + ); + + try { + JsonNode payload = objectMapper.valueToTree(evt); + outboxRepository.save( + PurchaseOutbox.ready( + order.getId(), + eventType, + UUID.fromString(evt.eventId()), + payload + ) + ); + } catch (Exception e) { + throw new IllegalStateException("Serialize " + eventType + " event failed", e); + } + } + + private PurchaseEvent.Material convertToMaterial(PurchaseOrderItem item) { + return new PurchaseEvent.Material( + item.getMaterialCode(), + item.getMaterialName(), + item.getQuantity().intValue(), // Long을 Integer로 변환 + item.getUnit() + ); + } +} diff --git a/src/main/java/com/sampoom/purchase/common/event/PurchaseOutbox.java b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutbox.java new file mode 100644 index 0000000..fc0cb72 --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutbox.java @@ -0,0 +1,96 @@ +package com.sampoom.purchase.common.event; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.persistence.*; +import lombok.*; +import org.hibernate.annotations.JdbcTypeCode; +import org.hibernate.type.SqlTypes; + +import java.time.LocalDateTime; +import java.util.UUID; + +@Entity +@Table( + name = "purchase_outbox", + uniqueConstraints = { + @UniqueConstraint(name = "uq_purchase_outbox_event_id", columnNames = {"event_id"}) + } +) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +@Builder +public class PurchaseOutbox { + + @Id + @Column(name = "purchase_outbox_id") + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(nullable = false) + private String eventType; + + @Column(nullable = false) + private Long aggregateId; + + @Column(name = "event_id", nullable = false, unique = true, columnDefinition = "uuid") + private UUID eventId; + + @JdbcTypeCode(SqlTypes.JSON) + @Column(nullable = false, columnDefinition = "jsonb") + private JsonNode payload; + + @Enumerated(EnumType.STRING) + @Column(nullable = false, length = 20) + private OutboxStatus status; + + @Column(nullable = false) + private LocalDateTime occurredAt; + + @Builder.Default + @Column(nullable = false) + private Integer retryCount = 0; + + @Column(columnDefinition = "text") + private String lastError; + + private LocalDateTime publishedAt; + + private LocalDateTime lastTriedAt; + + private LocalDateTime nextRetryAt; + + // 상태 전환 메서드 + public void markPublished() { + this.status = OutboxStatus.PUBLISHED; + this.publishedAt = LocalDateTime.now(); + this.lastTriedAt = LocalDateTime.now(); + this.nextRetryAt = null; + } + + public void markFailed(String error, LocalDateTime nextRetryAt) { + this.status = OutboxStatus.FAILED; + this.lastError = error; + this.retryCount = (this.retryCount == null ? 1 : this.retryCount + 1); + this.lastTriedAt = LocalDateTime.now(); + this.nextRetryAt = nextRetryAt; + } + + public void markDead(String error) { + this.status = OutboxStatus.DEAD; + this.lastError = error; + this.lastTriedAt = LocalDateTime.now(); + } + + public static PurchaseOutbox ready(Long aggregateId, String eventType, UUID eventId, JsonNode payloadJson) { + return PurchaseOutbox.builder() + .aggregateId(aggregateId) + .eventType(eventType) + .eventId(eventId) + .payload(payloadJson) + .occurredAt(LocalDateTime.now()) + .status(OutboxStatus.READY) + .retryCount(0) + .build(); + } +} diff --git a/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxPublisher.java b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxPublisher.java new file mode 100644 index 0000000..0d9e876 --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxPublisher.java @@ -0,0 +1,75 @@ +package com.sampoom.purchase.common.event; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PurchaseOutboxPublisher { + + private final PurchaseOutboxRepository repo; + private final KafkaTemplate kafkaTemplate; + + private static final String TOPIC_PURCHASE = "purchase-events"; + private static final int BATCH = 100; + private static final int MAX_RETRY = 10; + private static final long BASE_BACKOFF_MS = 500; // 0.5s + private static final long MAX_BACKOFF_MS = 60_000; // 60s + + + @Scheduled(fixedDelay = 500) + @Transactional + public void publishBatch() { + List batch = repo.pickReadyBatch(BATCH, MAX_RETRY); + if (batch.isEmpty()) return; + + for (PurchaseOutbox o : batch) { + try { + Object evt; + String topic; + + // Purchase 이벤트 처리 - JsonNode를 직접 전송 + topic = TOPIC_PURCHASE; + evt = o.getPayload(); // JsonNode를 직접 사용 + + kafkaTemplate.send(topic, String.valueOf(o.getAggregateId()), evt) + .get(5, TimeUnit.SECONDS); + + o.markPublished(); + + } catch (Exception e) { + int nextRetry = o.getRetryCount() + 1; + + if (nextRetry >= MAX_RETRY) { + o.markDead(shorten(e.getMessage(), 2000)); + log.error("Outbox DEAD id={} retry={} cause={}", o.getId(), o.getRetryCount(), e.toString()); + continue; + } + + long backoffMs = computeBackoffMs(nextRetry); + LocalDateTime next = LocalDateTime.now().plusNanos(backoffMs * 1_000_000); + o.markFailed(shorten(e.getMessage(), 2000), next); + log.warn("Outbox publish failed id={} retry={} cause={}", o.getId(), o.getRetryCount(), e.toString()); + } + } + } + + private String shorten(String s, int max) { + return (s == null || s.length() <= max) ? s : s.substring(0, max); + } + + private long computeBackoffMs(int retry) { + double exp = Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * Math.pow(2, Math.max(0, retry - 1))); + double jitter = exp * (Math.random() * 0.1); // 0~10% 지터 + return (long) Math.min(MAX_BACKOFF_MS, exp + jitter); + } +} diff --git a/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxRepository.java b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxRepository.java new file mode 100644 index 0000000..3ad7fa8 --- /dev/null +++ b/src/main/java/com/sampoom/purchase/common/event/PurchaseOutboxRepository.java @@ -0,0 +1,19 @@ +package com.sampoom.purchase.common.event; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.List; + +public interface PurchaseOutboxRepository extends JpaRepository { + + @Query(""" + SELECT o FROM PurchaseOutbox o + WHERE o.status = 'READY' + OR (o.status = 'FAILED' AND o.nextRetryAt <= CURRENT_TIMESTAMP AND o.retryCount < :maxRetry) + ORDER BY o.occurredAt ASC + LIMIT :batchSize + """) + List pickReadyBatch(@Param("batchSize") int batchSize, @Param("maxRetry") int maxRetry); +}