From e40b8996fd178fe6bec0589a5c6512417c7b733a Mon Sep 17 00:00:00 2001 From: Sangmin Date: Tue, 6 Aug 2024 19:23:35 +0900 Subject: [PATCH 1/2] =?UTF-8?q?docs=20:=20=EB=82=98=EC=83=81=EB=AF=BC=20-?= =?UTF-8?q?=204=EC=A3=BC=EC=B0=A8=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chapter_4_4.md" | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git "a/study/week_04/\353\202\230\354\203\201\353\257\274/chapter_4_4.md" "b/study/week_04/\353\202\230\354\203\201\353\257\274/chapter_4_4.md" index 530961d..f1e346a 100644 --- "a/study/week_04/\353\202\230\354\203\201\353\257\274/chapter_4_4.md" +++ "b/study/week_04/\353\202\230\354\203\201\353\257\274/chapter_4_4.md" @@ -109,4 +109,160 @@ public class SpringProducerApplication implements CommandLineRunner { }); } } +``` + +## 4.4.2 스프링 카프카 컨슈머 +스프링 카프카의 컨슈머는 기존 컨슈멀를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다. +우선, 타입은 레코드 리스너와 배치 리스너가 있다. + +레코드 리스너는 단 1개의 레코드를 처리한다. 반면, 배치 리스너는 기존 카프카 클라이언트 라이브러리의 poll() 메서드로 리턴받은 ConsumerRecords처럼 한 번에 여러개 레코드들을 처리할 수 있다. + +### 메시지 리스너 종류와 구현에 필요한 파라미터(Record) +| 리스너 이름 | 생성 메서드 파라미터 | 상세 설명 | +|----------------|-------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------| +| MessageListener| onMessage(ConsumerRecord data)
onMessage(V data) | Record 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 | +|AcknowledgingMessageListener| onMessage(ConsumerRecord data, Acknowledgment acknowledgment)
onMessage(V data, Acknowledgment acknowledgment)| Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우 | +|ConsumerAwareMessageListener| onMessage(ConsumerRecord data, Consumer consumer)
onMessage(V data, Consumer consumer)| Record 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우 | +|AcknowledgingConsumerAwareMessageListener| onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer consumer)
onMessage(V data, Acknowledgment acknowledgment, Consumer consumer)| Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 | + +### 메시지 리스너 종류와 구현에 필요한 파라미터(Batch) +| 리스너 이름 | 생성 메서드 파라미터 | 상세 설명 | +|-----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------| +| BatchMessageListener | onMessage(ConsumerRecords data)
onMessage(List data) | Record 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 | +| BatchAcknowledgingMessageListener | onMessage(ConsumerRecords data, Acknowledgment acknowledgment)
onMessage(List data, Acknowledgment acknowledgment) | Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우 | +| BatchConsumerAwareMessageListener | onMessage(ConsumerRecords data, Consumer consumer)
onMessage(List data, Consumer consumer) | Record 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우 | +| BatchAcknowledgingConsumerAwareMessageListener | onMessage(ConsumerRecords data, Acknowledgment acknowledgment, Consumer consumer)
onMessage(List data, Acknowledgment acknowledgment, Consumer consumer) | Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 | + +카프카 컨슈머에서 커밋을 직접 구현할 때는 오토 커밋, 동기 커밋, 비동기 커밋 크게 세가지로 나뉘지만 실제 운영환경에서는 다양한 종류의 커밋을 구현해서 사용한다. +그러나 스프링 카프카에서는 사용자가 사용할 만한 커밋의 종류를 7가지(RECORD,VATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE)로 나누어 제공한다. +스프링 카프카에서는 커밋이라고 부르지 않고 'AckMode'라고 부른다. + +### AckMode 종류 +| AckMode 이름 | 설명 | +|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| RECORD | 레코드 단위로 프로세싱 이후 커밋 | +| BATCH | poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋
스프링 카프카 컨슈머의 AckMode 기본값 | +| TIME | 특정시간 이후에 커밋
이 옵션을 사용할 경우에는 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다. | +| COUNT | 특정 개수만큼 레코드가 처리된 이후에 커밋
이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다 | +| COUNT_TIME | TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋 | +| MANUAL | Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll()때 커밋을 한다. 매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야한다 | +| MANUAL_IMMEDIATE | Acknowledgement.acknowledge()메서드를 호출한 즉시 커밋한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다. | + +리스너를 생성하고 사용하는 방식은 크게 두 가지가 있다. 첫 번째는 기본 리스너 컨테이너를 사용하는 것이고 두 번째는 컨테이너 팩토리를 사용하여 직접 리스너를 만드는 것이다. + +### 기본 리스너 컨테이너 + +### record 리스너 예제 +- application.yaml +```yaml +spring: + kafka: + consumer: + bootstrap-servers: localhost + listener: + ack-mode: MANUAL_IMMEDIATE + type: RECORD +``` +- SpringConsumerApplication.java +```java +@SpringBootApplication +public class SpringConsumerApplication { + public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); + + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(SpringConsumerApplication.class); + application.run(args); + } + + // 기본적인 리스너 선언 + @KafkaListener(topics = "test", + groupId = "test-group-00") + public void recordListener(ConsumerRecord record) { + logger.info(record.toString()); + } + + // 메시지 값을 파라미터로 받는 리스너 + @KafkaListener(topics = "test", + groupId = "test-group-01") + public void singleTopicListener(String messageValue) { + logger.info(messageValue); + } + + // 개별 리스너에 카프카 컨슈머 옵션값을 부여하고 싶다면 properties를 사용한다. + @KafkaListener(topics = "test", + groupId = "test-group-02", properties = { + "max.poll.interval.ms:60000", + "auto.offset.reset:earliest" + }) + public void singleTopicWithPropertiesListener(String messageValue) { + logger.info(messageValue); + } + + // 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶다면 concurrency 옵션을 사용하면 된다. + @KafkaListener(topics = "test", + groupId = "test-group-03", + concurrency = "3") + public void concurrentTopicListener(String messageValue) { + logger.info(messageValue); + } + + // 특정 파티션을 리스닝하고 싶다면 topicPartitions를 사용한다. + @KafkaListener(topicPartitions = + { + @TopicPartition(topic = "test01", partitions = {"0", "1"}), + @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")) + }, + groupId = "test-group-04") + public void listenSpecificPartition(ConsumerRecord record) { + logger.info(record.toString()); + } +} +``` + +### batch 리스너 예제 +배치 리스너는 레코드 리스너와 다르게 메서드의 파라미터를 List또는 ConsumerRecords로 받는다. + +- application.yaml +```yaml +spring: + kafka: + consumer: + bootstrap-servers: localhost + listener: + ack-mode: MANUAL_IMMEDIATE + type: BATCH +``` + +- SpringConsumerApplication.java +```java +@SpringBootApplication +public class SpringConsumerApplication { + public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(SpringConsumerApplication.class); + application.run(args); + } + + @KafkaListener(topics = "test", + groupId = "test-group-01") + public void batchListener(ConsumerRecords records) { + records.forEach(record -> logger.info(record.toString())); + } + + @KafkaListener(topics = "test", + groupId = "test-group-02") + public void batchListener(List list) { + list.forEach(recordValue -> logger.info(recordValue)); + } + + @KafkaListener(topics = "test", + groupId = "test-group-03", + concurrency = "3") + public void concurrentBatchListener(ConsumerRecords records) { + records.forEach(record -> logger.info(record.toString())); + } + +} ``` \ No newline at end of file From b28ff3a09457773ffd079fc81afafe93332e2a41 Mon Sep 17 00:00:00 2001 From: Sangmin Date: Thu, 22 Aug 2024 21:48:42 +0900 Subject: [PATCH 2/2] =?UTF-8?q?docs=20:=20=EB=82=98=EC=83=81=EB=AF=BC=20-?= =?UTF-8?q?=205=EC=A3=BC=EC=B0=A8=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chapter_5_1.md" | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 "study/week_05/\353\202\230\354\203\201\353\257\274/chapter_5_1.md" diff --git "a/study/week_05/\353\202\230\354\203\201\353\257\274/chapter_5_1.md" "b/study/week_05/\353\202\230\354\203\201\353\257\274/chapter_5_1.md" new file mode 100644 index 0000000..4313c2c --- /dev/null +++ "b/study/week_05/\353\202\230\354\203\201\353\257\274/chapter_5_1.md" @@ -0,0 +1,87 @@ +# 5.1 웹 페이지 이벤트 적재 파이프라인 생성 +웹 페이지에서 생성되는 이벤트들을 분석하기 위해 HDFS와 엘라스틱서치에 적재하는 파이프라인을 만드는 프로젝트를 진행한다. + +## 5.1.1 요구 사항 +이름을 입력하고 자신이 좋아하는 색상을 고르는 버튼을 누르면 해당 이벤트와 유저 에이전트 정보를 카프카 토픽으로 전달하고 최종적으로 하둡과 엘라스틱서치에 적재되는 것을 목표로 한다. + +## 5.1.2 정책 및 기능 정의 +### 1. 적재 정책 + +웹 페이지의 사용자 이벤트는 사용자가 사용하는 환경에 따라 유실이 발생할 가능성이 크다. + +최종 적재 하고자 하는 HDFS적재의 경우 트랜잭션이 지원되지 않으므로 컨슈머의 장애가 발생했을 때 데이터의 중복이 발생할 여지가 있다. 따라서 다음과 같은 정책이 나온다. + +### 파이프라인 정책 +- 일부 데이터의 유실 또는 중복 허용 +- 안정적으로 끊임없는 적재 +- 갑작스럽게 발생하는 많은 데이터양을 허용 + +### 2. 데이터 포맷 +데이터 포맷을 선택할 때 두 가지를 우선적으로 생각해 볼 수 있는데 첫 번째는 스키마의 변화의 유연성이고 두 번째는 명령어를 통한 디버깅의 편리성이다. + +현재 데이어 파이프라인을 통해 데이터가 최종 적재되는 타깃 애플리케이션은 HDFS와 엘라스틱서치인데 두 애플리케이션 다 JSON 형태의 데이터에 친화적이다. + +### 3. 웹 페이지 +사용자의 이벤트를 수집하는 웹 페이지는 html만 사용하여 개발한다. button 태그를 누르면 ajax를 사용하여 비동기로 사용자 이벤트르 전송한다. + +### 4. 프로듀서 +프로듀서를 운영할 때 고민해야 할 옵션 몇 가지가 있다. + +첫 번째 고민해야 할 옵션은 ack를 어떤 값으로 설정할지다. acks를 all로 설정하면 클러스터 네트워크에 이상이 생겼을 경우 복구할 확률이 높지만 그만큼 프로듀서가 클러스터로 데이터를 저장하는 데에 시간이 오래 걸린다. +이번 에플리케이션은 데이터 전송에 있어 일부 유실이나 중복이 발생하더라도 안정적이고 빠른 파이프라인을 구성하는 것이 목표이므로 acks를 1로 설정한다. +acks를 1로 설정함으로써 최소한 리더 파티션에는 데이터가 적재되는 것을 보장할 수 있다. + +다음은 최소 동기화 맆ㄹ리카 설정이다. acks를 1로 선택한 경우에는 `min.insync.replicas` 옵션을 무시하고 리더 파티션에 계속 적재하므로 따로 설정할 필요가 없다. + +다음은 파티셔너 설정이다. 파티셔너를 활용하면 메시지 키 또는 메시지 값을 기반으로 어떤 파티션으로 전달될지 결정하는 로직을 적용할 수 있다. +그러나 웹 페이지에서 생성된 데이터는 특별히 파티션을 분류할 필요가 없기 때문에 프로듀서에 설정할 파티셔너는 기본 파티셔너인 `UniformStickyPartitioner`를 사용한다. + +다음은 재시도 설정이다. 클러스터 또는 네트워크 이슈로 인해 데이터가 정상적으로 전송되지 않았을 때 프로듀서는 다시 전송을 시도한다. +프로듀서가 전송을 재시도할 경우 토픽으로 전송된 데이터의 중복이 발생할 수 있고, 전송 시점의 역전으로 인해 전송 순서와 토픽에 적재된 데이터의 순서가 바뀔 수 있다. +여기서는 토픽의 데이터 순서를 지키지 않고 데이터의 중복을 허용하기 때문에 retries 옵션은 따로 설정하지 않고 기본값을 사용한다. + +다음은 프로듀서의 압축 옵션이다. 압축을 하면 클로서터에 적재되는 데이터의 총 용량을 줄이고 네트워크의 사용량을 줄이는 데에 효과적이지만 프로듀서와 컨슈머에서 데이터를 사용할 때 CPU와 메모리 사용량이 늘어나는 단점이 있다. +이 애플리케이션은 처리량이 많지 않고 실습용 파이프라인을 구축하는 데에 목적이 있으므로 여기서는 압축을 하지 않는다. + +나머지 프로듀서 옵션들은 기본 옵션값으로 사용한다. + +### 5. 토픽 +토픽을 설정할 때 가장 처음 고민하는 것은 파티션 개수이다. +여기서는 데이터 처리 순서를 지키지 않아도 되므로 파티션 개수를 엄격하게 정해서 가져가지 않아도 된다. 그러므로 토픽의 파티션 개수는 2개 이상으로 설정한다. + +다음으로 고민해야 할 부분은 메시지 키의 사용 여부다. +여기서는 웹 페이지의 데이터를 메시지 값에만 저장하고 메시지 키에는 저장하지 않는다. 메시지 키를 사용하지 않으므로 토픽에 들어오는 데이터의 양에 따라 파티션 개수를 가변적으로 설정할 수 있다. + +다음으로는 복제 개수이다. +복제 개수가 높으면 높을수록 데이터의 복구 확률이 높아진다. +다만, 복제 개수가 너무 높으면 팔로워 파티션이 데이터를 복제하는 데에 시간이 오래 걸릴 수 있으며 클러스터 전체를 봤을 때 저장되는 데이터의 용량도 그만큼 늘어난다. +여기서는 클러스터의 브로커 1대에 이슈가 발생했을 경우에도 안정적으로 데이터를 받기 위한 최소 설정으로 2를 설정한다. + +나머지 토픽 옵션은 기본으로 한다. + +### 6. 컨슈머 +토픽에 저장되어 있는 웹 이벤트를 하둡과 엘라스틱서치에 저장하는 로직을 만드는 방법은 크게 두 가지 방법이 있다. + +첫번째는 컨슈머 API를 사용하여 직접 애플리케이션을 개발하는 방법이다. +컨슈머 API를 직접 개발하고 운영하는 것은 컨슈머를 직관적으로 운영하는 가장 좋은 방법이다. + +두번째는 커넥트를 사용하는 것이다. 분산 커넥트를 사용하면 REST API를 통해 커넥터로 반복적인 파이프라인을 쉽게 생성할 수 있다. + +컨슈머API를 사용할 것인지 커스텀 싱크 커넥터를 사용할 것인지 각자 개발하는 환경에 따라 선택하면 된다. +이미 상용 인프라에 분산 커넥트가 구축되어 있다면 싱크 커넥터를 활용하는 것이 좋지만 분산 커넥트가 없다면 구축하는 데에 시간과 인프라 비용이 발생할 수 있으므로 컨슈머API를 활용하여 멀티 스레드 자바 애플리케이션을 만들고 배포하여 운영하는 것이 좋다. + +## 5.1.3 기능 구현 + +하둡을 실행 할 때 기본 파일 시스템을 하둡으로 설정하기 위해 fs.defaultFS값을 hdfs://localhost:9000으로 설정한다. +```xml + + + fs.defaultFS + hdfs://localhost:9000 + + +``` +토픽 생성 +```shell +./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic select-color +```