Skip to content
4 changes: 4 additions & 0 deletions notification/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
implementation 'org.springframework.cloud:spring-cloud-starter-bootstrap'

// Kafka
implementation 'org.springframework.kafka:spring-kafka'

// OpenAPI (Swagger)
implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${springdocVersion}"

Expand All @@ -68,6 +71,7 @@ dependencies {

// Test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableDiscoveryClient
@EnableJpaAuditing
@EnableFeignClients
@EnableScheduling
public class NotificationApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.hubEleven.notification.slack.application.service;

import com.hubEleven.notification.slack.domain.model.SlackOutbox;
import com.hubEleven.notification.slack.domain.repository.SlackOutboxRepository;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class SlackOutboxScheduler {

private final SlackOutboxRepository slackOutboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "slack-message-send";

@Scheduled(fixedDelay = 1000) // 1초마다 실행
public void publishOutboxMessages() {
List<SlackOutbox> unpublishedOutboxes = slackOutboxRepository.findByPublishedFalse();

for (SlackOutbox outbox : unpublishedOutboxes) {
try {
// Kafka 발행 (동기 처리로 확실하게 전송 보장)
kafkaTemplate
.send(TOPIC, outbox.getMessageId().toString(), outbox.getPayload())
.get(3, TimeUnit.SECONDS);

// 발행 성공 시 상태 업데이트
outbox.markAsPublished();
slackOutboxRepository.save(outbox);
log.info("Kafka 발행 및 Outbox 업데이트 성공 - messageId={}", outbox.getMessageId());

} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Kafka 발행 실패 - messageId={}", outbox.getMessageId(), e);
// 실패 시 다음 주기에 재시도 (DB 업데이트 하지 않음)
} catch (Exception e) {
log.error("Outbox 처리 중 알 수 없는 오류 - outboxId={}", outbox.getId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package com.hubEleven.notification.slack.domain.event.handler;

import com.commonLib.common.exception.GlobalException;
import com.hubEleven.notification.slack.application.port.SlackClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hubEleven.notification.slack.domain.event.SlackMessageSavedEvent;
import com.hubEleven.notification.slack.domain.model.SlackMessage;
import com.hubEleven.notification.slack.domain.repository.SlackMessageRepository;
import com.hubEleven.notification.slack.domain.vo.SlackMessageStatus;
import com.hubEleven.notification.slack.exception.SlackErrorCode;
import java.util.UUID;
import com.hubEleven.notification.slack.domain.model.SlackOutbox;
import com.hubEleven.notification.slack.domain.repository.SlackOutboxRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

Expand All @@ -21,59 +15,19 @@
@RequiredArgsConstructor
public class SlackDomainEventHandler {

private final SlackMessageRepository slackMessageRepository;
private final SlackClient slackClient;
private final SlackOutboxRepository slackOutboxRepository;
private final ObjectMapper objectMapper;

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handle(SlackMessageSavedEvent event) {
UUID messageId = event.messageId();

try {
SlackMessage slackMessage =
slackMessageRepository
.findById(messageId)
.orElseThrow(() -> new GlobalException(SlackErrorCode.SLACK_MESSAGE_NOT_FOUND));

if (slackMessage.getStatus() == SlackMessageStatus.SENT) {
log.info("슬랙 전송 스킵 - messageId={}", messageId);
return;
}

String channel = slackMessage.getChannel();
String text = slackMessage.getMessage();

SlackClient.SlackSendResult result;
if (channel != null && !channel.isBlank()) {
result = slackClient.sendToChannel(channel, text);
} else {
String email = slackMessage.getRecipientId();
result = slackClient.sendDmByEmail(email, text);
}

if (result.success()) {
slackMessage.markAsSent();
slackMessageRepository.save(slackMessage);
log.info(
"슬랙 전송 성공 - messageId={}, channelId={}, ts={}",
messageId,
result.channelId(),
result.ts());
} else {
slackMessage.markAsFailed();
slackMessageRepository.save(slackMessage);
log.warn("슬랙 전송 실패 - messageId={}, error={}", messageId, result.error());
}

String payload = objectMapper.writeValueAsString(event);
SlackOutbox outbox = SlackOutbox.create(event.messageId(), payload);
slackOutboxRepository.save(outbox);
log.info("Slack Outbox 저장 완료 - messageId={}", event.messageId());
} catch (Exception e) {
slackMessageRepository
.findById(messageId)
.ifPresent(
m -> {
m.markAsFailed();
slackMessageRepository.save(m);
});
log.error("슬랙 전송 처리 중 예외 발생 - messageId={}", messageId, e);
log.error("Slack Outbox 저장 실패 - messageId={}", event.messageId(), e);
throw new RuntimeException("Slack Outbox 저장 실패", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.hubEleven.notification.slack.domain.model;

import com.commonLib.common.model.BaseEntity;
import jakarta.persistence.*;
import java.time.LocalDateTime;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Entity
@Getter
@Table(name = "p_slack_outbox")
@NoArgsConstructor
public class SlackOutbox extends BaseEntity {

@Id
@GeneratedValue(strategy = GenerationType.UUID)
@Column(name = "outbox_id")
private UUID id;

@Column(name = "message_id", nullable = false)
private UUID messageId;

@Column(name = "payload", columnDefinition = "text", nullable = false)
private String payload;

@Column(name = "published")
private boolean published = false;

@Column(name = "published_at")
private LocalDateTime publishedAt;

public static SlackOutbox create(UUID messageId, String payload) {
SlackOutbox outbox = new SlackOutbox();
outbox.messageId = messageId;
outbox.payload = payload;
outbox.published = false;
return outbox;
}

public void markAsPublished() {
this.published = true;
this.publishedAt = LocalDateTime.now();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.hubEleven.notification.slack.domain.repository;

import com.hubEleven.notification.slack.domain.model.SlackOutbox;
import java.util.List;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;

public interface SlackOutboxRepository extends JpaRepository<SlackOutbox, UUID> {
List<SlackOutbox> findByPublishedFalse();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.hubEleven.notification.slack.infrastructure.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.ExponentialBackOff;

@EnableKafka
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(10000L);

DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(
kafkaTemplate, (r, e) -> new TopicPartition(r.topic() + ".DLQ", r.partition()));

DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
factory.setCommonErrorHandler(errorHandler);

return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.hubEleven.notification.slack.infrastructure.messaging;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hubEleven.notification.slack.application.port.SlackClient;
import com.hubEleven.notification.slack.domain.event.SlackMessageSavedEvent;
import com.hubEleven.notification.slack.domain.model.SlackMessage;
import com.hubEleven.notification.slack.domain.repository.SlackMessageRepository;
import com.hubEleven.notification.slack.domain.vo.SlackMessageStatus;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Component
@RequiredArgsConstructor
public class SlackMessageConsumer {

private final SlackMessageRepository slackMessageRepository;
private final SlackClient slackClient;
private final ObjectMapper objectMapper;

@KafkaListener(topics = "slack-message-send", groupId = "${spring.kafka.consumer.group-id}")
@Transactional
public void consume(String message, Acknowledgment ack) {
try {
SlackMessageSavedEvent event = objectMapper.readValue(message, SlackMessageSavedEvent.class);
UUID messageId = event.messageId();

SlackMessage slackMessage =
slackMessageRepository
.findById(messageId)
.orElseThrow(() -> new RuntimeException("Slack Message not found: " + messageId));

if (slackMessage.getStatus() == SlackMessageStatus.SENT) {
log.info("이미 전송된 메시지입니다. - messageId={}", messageId);
ack.acknowledge();
return;
}

String channel = slackMessage.getChannel();
String text = slackMessage.getMessage();

SlackClient.SlackSendResult result;
if (channel != null && !channel.isBlank()) {
result = slackClient.sendToChannel(channel, text);
} else {
String email = slackMessage.getRecipientId();
result = slackClient.sendDmByEmail(email, text);
}

if (result.success()) {
slackMessage.markAsSent();
slackMessageRepository.save(slackMessage);
log.info("슬랙 전송 성공 - messageId={}", messageId);
ack.acknowledge();
} else {
log.warn("슬랙 전송 실패 - messageId={}, error={}", messageId, result.error());
throw new RuntimeException("Slack send failed: " + result.error());
}

} catch (Exception e) {
log.error("메시지 처리 중 오류 발생", e);
throw new RuntimeException(e);
}
}
}