diff --git a/notification/build.gradle b/notification/build.gradle index 4bdfea4c..3fcec962 100644 --- a/notification/build.gradle +++ b/notification/build.gradle @@ -43,6 +43,9 @@ dependencies { implementation 'org.springframework.cloud:spring-cloud-starter-openfeign' implementation 'org.springframework.cloud:spring-cloud-starter-bootstrap' + // Kafka + implementation 'org.springframework.kafka:spring-kafka' + // OpenAPI (Swagger) implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${springdocVersion}" @@ -68,6 +71,7 @@ dependencies { // Test testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation 'org.springframework.kafka:spring-kafka-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } diff --git a/notification/src/main/java/com/hubEleven/notification/NotificationApplication.java b/notification/src/main/java/com/hubEleven/notification/NotificationApplication.java index 609f5246..9ca405d1 100644 --- a/notification/src/main/java/com/hubEleven/notification/NotificationApplication.java +++ b/notification/src/main/java/com/hubEleven/notification/NotificationApplication.java @@ -5,11 +5,13 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableDiscoveryClient @EnableJpaAuditing @EnableFeignClients +@EnableScheduling public class NotificationApplication { public static void main(String[] args) { diff --git a/notification/src/main/java/com/hubEleven/notification/slack/application/service/SlackOutboxScheduler.java b/notification/src/main/java/com/hubEleven/notification/slack/application/service/SlackOutboxScheduler.java new file mode 100644 index 00000000..911c765f --- /dev/null +++ b/notification/src/main/java/com/hubEleven/notification/slack/application/service/SlackOutboxScheduler.java @@ -0,0 +1,48 @@ +package com.hubEleven.notification.slack.application.service; + +import com.hubEleven.notification.slack.domain.model.SlackOutbox; +import com.hubEleven.notification.slack.domain.repository.SlackOutboxRepository; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SlackOutboxScheduler { + + private final SlackOutboxRepository slackOutboxRepository; + private final KafkaTemplate kafkaTemplate; + private static final String TOPIC = "slack-message-send"; + + @Scheduled(fixedDelay = 1000) // 1초마다 실행 + public void publishOutboxMessages() { + List unpublishedOutboxes = slackOutboxRepository.findByPublishedFalse(); + + for (SlackOutbox outbox : unpublishedOutboxes) { + try { + // Kafka 발행 (동기 처리로 확실하게 전송 보장) + kafkaTemplate + .send(TOPIC, outbox.getMessageId().toString(), outbox.getPayload()) + .get(3, TimeUnit.SECONDS); + + // 발행 성공 시 상태 업데이트 + outbox.markAsPublished(); + slackOutboxRepository.save(outbox); + log.info("Kafka 발행 및 Outbox 업데이트 성공 - messageId={}", outbox.getMessageId()); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Kafka 발행 실패 - messageId={}", outbox.getMessageId(), e); + // 실패 시 다음 주기에 재시도 (DB 업데이트 하지 않음) + } catch (Exception e) { + log.error("Outbox 처리 중 알 수 없는 오류 - outboxId={}", outbox.getId(), e); + } + } + } +} diff --git a/notification/src/main/java/com/hubEleven/notification/slack/domain/event/handler/SlackDomainEventHandler.java b/notification/src/main/java/com/hubEleven/notification/slack/domain/event/handler/SlackDomainEventHandler.java index 878a113d..ca00d478 100644 --- a/notification/src/main/java/com/hubEleven/notification/slack/domain/event/handler/SlackDomainEventHandler.java +++ b/notification/src/main/java/com/hubEleven/notification/slack/domain/event/handler/SlackDomainEventHandler.java @@ -1,18 +1,12 @@ package com.hubEleven.notification.slack.domain.event.handler; -import com.commonLib.common.exception.GlobalException; -import com.hubEleven.notification.slack.application.port.SlackClient; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hubEleven.notification.slack.domain.event.SlackMessageSavedEvent; -import com.hubEleven.notification.slack.domain.model.SlackMessage; -import com.hubEleven.notification.slack.domain.repository.SlackMessageRepository; -import com.hubEleven.notification.slack.domain.vo.SlackMessageStatus; -import com.hubEleven.notification.slack.exception.SlackErrorCode; -import java.util.UUID; +import com.hubEleven.notification.slack.domain.model.SlackOutbox; +import com.hubEleven.notification.slack.domain.repository.SlackOutboxRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @@ -21,59 +15,19 @@ @RequiredArgsConstructor public class SlackDomainEventHandler { - private final SlackMessageRepository slackMessageRepository; - private final SlackClient slackClient; + private final SlackOutboxRepository slackOutboxRepository; + private final ObjectMapper objectMapper; - @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) - @Transactional(propagation = Propagation.REQUIRES_NEW) + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) public void handle(SlackMessageSavedEvent event) { - UUID messageId = event.messageId(); - try { - SlackMessage slackMessage = - slackMessageRepository - .findById(messageId) - .orElseThrow(() -> new GlobalException(SlackErrorCode.SLACK_MESSAGE_NOT_FOUND)); - - if (slackMessage.getStatus() == SlackMessageStatus.SENT) { - log.info("슬랙 전송 스킵 - messageId={}", messageId); - return; - } - - String channel = slackMessage.getChannel(); - String text = slackMessage.getMessage(); - - SlackClient.SlackSendResult result; - if (channel != null && !channel.isBlank()) { - result = slackClient.sendToChannel(channel, text); - } else { - String email = slackMessage.getRecipientId(); - result = slackClient.sendDmByEmail(email, text); - } - - if (result.success()) { - slackMessage.markAsSent(); - slackMessageRepository.save(slackMessage); - log.info( - "슬랙 전송 성공 - messageId={}, channelId={}, ts={}", - messageId, - result.channelId(), - result.ts()); - } else { - slackMessage.markAsFailed(); - slackMessageRepository.save(slackMessage); - log.warn("슬랙 전송 실패 - messageId={}, error={}", messageId, result.error()); - } - + String payload = objectMapper.writeValueAsString(event); + SlackOutbox outbox = SlackOutbox.create(event.messageId(), payload); + slackOutboxRepository.save(outbox); + log.info("Slack Outbox 저장 완료 - messageId={}", event.messageId()); } catch (Exception e) { - slackMessageRepository - .findById(messageId) - .ifPresent( - m -> { - m.markAsFailed(); - slackMessageRepository.save(m); - }); - log.error("슬랙 전송 처리 중 예외 발생 - messageId={}", messageId, e); + log.error("Slack Outbox 저장 실패 - messageId={}", event.messageId(), e); + throw new RuntimeException("Slack Outbox 저장 실패", e); } } } diff --git a/notification/src/main/java/com/hubEleven/notification/slack/domain/model/SlackOutbox.java b/notification/src/main/java/com/hubEleven/notification/slack/domain/model/SlackOutbox.java new file mode 100644 index 00000000..468b3eb8 --- /dev/null +++ b/notification/src/main/java/com/hubEleven/notification/slack/domain/model/SlackOutbox.java @@ -0,0 +1,45 @@ +package com.hubEleven.notification.slack.domain.model; + +import com.commonLib.common.model.BaseEntity; +import jakarta.persistence.*; +import java.time.LocalDateTime; +import java.util.UUID; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Entity +@Getter +@Table(name = "p_slack_outbox") +@NoArgsConstructor +public class SlackOutbox extends BaseEntity { + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + @Column(name = "outbox_id") + private UUID id; + + @Column(name = "message_id", nullable = false) + private UUID messageId; + + @Column(name = "payload", columnDefinition = "text", nullable = false) + private String payload; + + @Column(name = "published") + private boolean published = false; + + @Column(name = "published_at") + private LocalDateTime publishedAt; + + public static SlackOutbox create(UUID messageId, String payload) { + SlackOutbox outbox = new SlackOutbox(); + outbox.messageId = messageId; + outbox.payload = payload; + outbox.published = false; + return outbox; + } + + public void markAsPublished() { + this.published = true; + this.publishedAt = LocalDateTime.now(); + } +} diff --git a/notification/src/main/java/com/hubEleven/notification/slack/domain/repository/SlackOutboxRepository.java b/notification/src/main/java/com/hubEleven/notification/slack/domain/repository/SlackOutboxRepository.java new file mode 100644 index 00000000..2884324c --- /dev/null +++ b/notification/src/main/java/com/hubEleven/notification/slack/domain/repository/SlackOutboxRepository.java @@ -0,0 +1,10 @@ +package com.hubEleven.notification.slack.domain.repository; + +import com.hubEleven.notification.slack.domain.model.SlackOutbox; +import java.util.List; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface SlackOutboxRepository extends JpaRepository { + List findByPublishedFalse(); +} diff --git a/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/config/KafkaConfig.java b/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/config/KafkaConfig.java new file mode 100644 index 00000000..055bb5f8 --- /dev/null +++ b/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/config/KafkaConfig.java @@ -0,0 +1,76 @@ +package com.hubEleven.notification.slack.infrastructure.config; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.ExponentialBackOff; + +@EnableKafka +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + KafkaTemplate kafkaTemplate) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + + ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0); + backOff.setMaxElapsedTime(10000L); + + DeadLetterPublishingRecoverer recoverer = + new DeadLetterPublishingRecoverer( + kafkaTemplate, (r, e) -> new TopicPartition(r.topic() + ".DLQ", r.partition())); + + DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff); + factory.setCommonErrorHandler(errorHandler); + + return factory; + } +} diff --git a/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/messaging/SlackMessageConsumer.java b/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/messaging/SlackMessageConsumer.java new file mode 100644 index 00000000..ebfb39d5 --- /dev/null +++ b/notification/src/main/java/com/hubEleven/notification/slack/infrastructure/messaging/SlackMessageConsumer.java @@ -0,0 +1,70 @@ +package com.hubEleven.notification.slack.infrastructure.messaging; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.hubEleven.notification.slack.application.port.SlackClient; +import com.hubEleven.notification.slack.domain.event.SlackMessageSavedEvent; +import com.hubEleven.notification.slack.domain.model.SlackMessage; +import com.hubEleven.notification.slack.domain.repository.SlackMessageRepository; +import com.hubEleven.notification.slack.domain.vo.SlackMessageStatus; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SlackMessageConsumer { + + private final SlackMessageRepository slackMessageRepository; + private final SlackClient slackClient; + private final ObjectMapper objectMapper; + + @KafkaListener(topics = "slack-message-send", groupId = "${spring.kafka.consumer.group-id}") + @Transactional + public void consume(String message, Acknowledgment ack) { + try { + SlackMessageSavedEvent event = objectMapper.readValue(message, SlackMessageSavedEvent.class); + UUID messageId = event.messageId(); + + SlackMessage slackMessage = + slackMessageRepository + .findById(messageId) + .orElseThrow(() -> new RuntimeException("Slack Message not found: " + messageId)); + + if (slackMessage.getStatus() == SlackMessageStatus.SENT) { + log.info("이미 전송된 메시지입니다. - messageId={}", messageId); + ack.acknowledge(); + return; + } + + String channel = slackMessage.getChannel(); + String text = slackMessage.getMessage(); + + SlackClient.SlackSendResult result; + if (channel != null && !channel.isBlank()) { + result = slackClient.sendToChannel(channel, text); + } else { + String email = slackMessage.getRecipientId(); + result = slackClient.sendDmByEmail(email, text); + } + + if (result.success()) { + slackMessage.markAsSent(); + slackMessageRepository.save(slackMessage); + log.info("슬랙 전송 성공 - messageId={}", messageId); + ack.acknowledge(); + } else { + log.warn("슬랙 전송 실패 - messageId={}, error={}", messageId, result.error()); + throw new RuntimeException("Slack send failed: " + result.error()); + } + + } catch (Exception e) { + log.error("메시지 처리 중 오류 발생", e); + throw new RuntimeException(e); + } + } +}