Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs : 나상민 - 4주차 정리 #13

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions study/week_04/나상민/chapter_4_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,160 @@ public class SpringProducerApplication implements CommandLineRunner {
});
}
}
```

## 4.4.2 스프링 카프카 컨슈머
스프링 카프카의 컨슈머는 기존 컨슈멀를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다.
우선, 타입은 레코드 리스너와 배치 리스너가 있다.

레코드 리스너는 단 1개의 레코드를 처리한다. 반면, 배치 리스너는 기존 카프카 클라이언트 라이브러리의 poll() 메서드로 리턴받은 ConsumerRecords처럼 한 번에 여러개 레코드들을 처리할 수 있다.

### 메시지 리스너 종류와 구현에 필요한 파라미터(Record)
| 리스너 이름 | 생성 메서드 파라미터 | 상세 설명 |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------|
| MessageListener| onMessage(ConsumerRecord<K, V> data)<br/>onMessage(V data) | Record 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 |
|AcknowledgingMessageListener| onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)<br/>onMessage(V data, Acknowledgment acknowledgment)| Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우 |
|ConsumerAwareMessageListener| onMessage(ConsumerRecord<K, V> data, Consumer<K, V> consumer)<br/>onMessage(V data, Consumer<K, V> consumer)| Record 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우 |
|AcknowledgingConsumerAwareMessageListener| onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer)<br/>onMessage(V data, Acknowledgment acknowledgment, Consumer<K, V> consumer)| Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 |

### 메시지 리스너 종류와 구현에 필요한 파라미터(Batch)
| 리스너 이름 | 생성 메서드 파라미터 | 상세 설명 |
|-----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------|
| BatchMessageListener | onMessage(ConsumerRecords<K, V> data)<br/>onMessage(List<V> data) | Record 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 |
| BatchAcknowledgingMessageListener | onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment)<br/>onMessage(List<V> data, Acknowledgment acknowledgment) | Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우 |
| BatchConsumerAwareMessageListener | onMessage(ConsumerRecords<K, V> data, Consumer<K, V> consumer)<br/>onMessage(List<V> data, Consumer<K, V> consumer) | Record 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우 |
| BatchAcknowledgingConsumerAwareMessageListener | onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer)<br/>onMessage(List<V> data, Acknowledgment acknowledgment, Consumer<K, V> consumer) | Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 |

카프카 컨슈머에서 커밋을 직접 구현할 때는 오토 커밋, 동기 커밋, 비동기 커밋 크게 세가지로 나뉘지만 실제 운영환경에서는 다양한 종류의 커밋을 구현해서 사용한다.
그러나 스프링 카프카에서는 사용자가 사용할 만한 커밋의 종류를 7가지(RECORD,VATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE)로 나누어 제공한다.
스프링 카프카에서는 커밋이라고 부르지 않고 'AckMode'라고 부른다.

### AckMode 종류
| AckMode 이름 | 설명 |
|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| RECORD | 레코드 단위로 프로세싱 이후 커밋 |
| BATCH | poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋<br/>스프링 카프카 컨슈머의 AckMode 기본값 |
| TIME | 특정시간 이후에 커밋<br/>이 옵션을 사용할 경우에는 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다. |
| COUNT | 특정 개수만큼 레코드가 처리된 이후에 커밋<br/>이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다 |
| COUNT_TIME | TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋 |
| MANUAL | Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll()때 커밋을 한다. 매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야한다 |
| MANUAL_IMMEDIATE | Acknowledgement.acknowledge()메서드를 호출한 즉시 커밋한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다. |

리스너를 생성하고 사용하는 방식은 크게 두 가지가 있다. 첫 번째는 기본 리스너 컨테이너를 사용하는 것이고 두 번째는 컨테이너 팩토리를 사용하여 직접 리스너를 만드는 것이다.

### 기본 리스너 컨테이너

### record 리스너 예제
- application.yaml
```yaml
spring:
kafka:
consumer:
bootstrap-servers: localhost
listener:
ack-mode: MANUAL_IMMEDIATE
type: RECORD
```
- SpringConsumerApplication.java
```java
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);


public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}

// 기본적인 리스너 선언
@KafkaListener(topics = "test",
groupId = "test-group-00")
public void recordListener(ConsumerRecord<String,String> record) {
logger.info(record.toString());
}

// 메시지 값을 파라미터로 받는 리스너
@KafkaListener(topics = "test",
groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
logger.info(messageValue);
}

// 개별 리스너에 카프카 컨슈머 옵션값을 부여하고 싶다면 properties를 사용한다.
@KafkaListener(topics = "test",
groupId = "test-group-02", properties = {
"max.poll.interval.ms:60000",
"auto.offset.reset:earliest"
})
public void singleTopicWithPropertiesListener(String messageValue) {
logger.info(messageValue);
}

// 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶다면 concurrency 옵션을 사용하면 된다.
@KafkaListener(topics = "test",
groupId = "test-group-03",
concurrency = "3")
public void concurrentTopicListener(String messageValue) {
logger.info(messageValue);
}

// 특정 파티션을 리스닝하고 싶다면 topicPartitions를 사용한다.
@KafkaListener(topicPartitions =
{
@TopicPartition(topic = "test01", partitions = {"0", "1"}),
@TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
},
groupId = "test-group-04")
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
}
```

### batch 리스너 예제
배치 리스너는 레코드 리스너와 다르게 메서드의 파라미터를 List또는 ConsumerRecords로 받는다.

- application.yaml
```yaml
spring:
kafka:
consumer:
bootstrap-servers: localhost
listener:
ack-mode: MANUAL_IMMEDIATE
type: BATCH
```

- SpringConsumerApplication.java
```java
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);

public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}

@KafkaListener(topics = "test",
groupId = "test-group-01")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}

@KafkaListener(topics = "test",
groupId = "test-group-02")
public void batchListener(List<String> list) {
list.forEach(recordValue -> logger.info(recordValue));
}

@KafkaListener(topics = "test",
groupId = "test-group-03",
concurrency = "3")
public void concurrentBatchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}

}
```
87 changes: 87 additions & 0 deletions study/week_05/나상민/chapter_5_1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# 5.1 웹 페이지 이벤트 적재 파이프라인 생성
웹 페이지에서 생성되는 이벤트들을 분석하기 위해 HDFS와 엘라스틱서치에 적재하는 파이프라인을 만드는 프로젝트를 진행한다.

## 5.1.1 요구 사항
이름을 입력하고 자신이 좋아하는 색상을 고르는 버튼을 누르면 해당 이벤트와 유저 에이전트 정보를 카프카 토픽으로 전달하고 최종적으로 하둡과 엘라스틱서치에 적재되는 것을 목표로 한다.

## 5.1.2 정책 및 기능 정의
### 1. 적재 정책

웹 페이지의 사용자 이벤트는 사용자가 사용하는 환경에 따라 유실이 발생할 가능성이 크다.

최종 적재 하고자 하는 HDFS적재의 경우 트랜잭션이 지원되지 않으므로 컨슈머의 장애가 발생했을 때 데이터의 중복이 발생할 여지가 있다. 따라서 다음과 같은 정책이 나온다.

### 파이프라인 정책
- 일부 데이터의 유실 또는 중복 허용
- 안정적으로 끊임없는 적재
- 갑작스럽게 발생하는 많은 데이터양을 허용

### 2. 데이터 포맷
데이터 포맷을 선택할 때 두 가지를 우선적으로 생각해 볼 수 있는데 첫 번째는 스키마의 변화의 유연성이고 두 번째는 명령어를 통한 디버깅의 편리성이다.

현재 데이어 파이프라인을 통해 데이터가 최종 적재되는 타깃 애플리케이션은 HDFS와 엘라스틱서치인데 두 애플리케이션 다 JSON 형태의 데이터에 친화적이다.

### 3. 웹 페이지
사용자의 이벤트를 수집하는 웹 페이지는 html만 사용하여 개발한다. button 태그를 누르면 ajax를 사용하여 비동기로 사용자 이벤트르 전송한다.

### 4. 프로듀서
프로듀서를 운영할 때 고민해야 할 옵션 몇 가지가 있다.

첫 번째 고민해야 할 옵션은 ack를 어떤 값으로 설정할지다. acks를 all로 설정하면 클러스터 네트워크에 이상이 생겼을 경우 복구할 확률이 높지만 그만큼 프로듀서가 클러스터로 데이터를 저장하는 데에 시간이 오래 걸린다.
이번 에플리케이션은 데이터 전송에 있어 일부 유실이나 중복이 발생하더라도 안정적이고 빠른 파이프라인을 구성하는 것이 목표이므로 acks를 1로 설정한다.
acks를 1로 설정함으로써 최소한 리더 파티션에는 데이터가 적재되는 것을 보장할 수 있다.

다음은 최소 동기화 맆ㄹ리카 설정이다. acks를 1로 선택한 경우에는 `min.insync.replicas` 옵션을 무시하고 리더 파티션에 계속 적재하므로 따로 설정할 필요가 없다.

다음은 파티셔너 설정이다. 파티셔너를 활용하면 메시지 키 또는 메시지 값을 기반으로 어떤 파티션으로 전달될지 결정하는 로직을 적용할 수 있다.
그러나 웹 페이지에서 생성된 데이터는 특별히 파티션을 분류할 필요가 없기 때문에 프로듀서에 설정할 파티셔너는 기본 파티셔너인 `UniformStickyPartitioner`를 사용한다.

다음은 재시도 설정이다. 클러스터 또는 네트워크 이슈로 인해 데이터가 정상적으로 전송되지 않았을 때 프로듀서는 다시 전송을 시도한다.
프로듀서가 전송을 재시도할 경우 토픽으로 전송된 데이터의 중복이 발생할 수 있고, 전송 시점의 역전으로 인해 전송 순서와 토픽에 적재된 데이터의 순서가 바뀔 수 있다.
여기서는 토픽의 데이터 순서를 지키지 않고 데이터의 중복을 허용하기 때문에 retries 옵션은 따로 설정하지 않고 기본값을 사용한다.

다음은 프로듀서의 압축 옵션이다. 압축을 하면 클로서터에 적재되는 데이터의 총 용량을 줄이고 네트워크의 사용량을 줄이는 데에 효과적이지만 프로듀서와 컨슈머에서 데이터를 사용할 때 CPU와 메모리 사용량이 늘어나는 단점이 있다.
이 애플리케이션은 처리량이 많지 않고 실습용 파이프라인을 구축하는 데에 목적이 있으므로 여기서는 압축을 하지 않는다.

나머지 프로듀서 옵션들은 기본 옵션값으로 사용한다.

### 5. 토픽
토픽을 설정할 때 가장 처음 고민하는 것은 파티션 개수이다.
여기서는 데이터 처리 순서를 지키지 않아도 되므로 파티션 개수를 엄격하게 정해서 가져가지 않아도 된다. 그러므로 토픽의 파티션 개수는 2개 이상으로 설정한다.

다음으로 고민해야 할 부분은 메시지 키의 사용 여부다.
여기서는 웹 페이지의 데이터를 메시지 값에만 저장하고 메시지 키에는 저장하지 않는다. 메시지 키를 사용하지 않으므로 토픽에 들어오는 데이터의 양에 따라 파티션 개수를 가변적으로 설정할 수 있다.

다음으로는 복제 개수이다.
복제 개수가 높으면 높을수록 데이터의 복구 확률이 높아진다.
다만, 복제 개수가 너무 높으면 팔로워 파티션이 데이터를 복제하는 데에 시간이 오래 걸릴 수 있으며 클러스터 전체를 봤을 때 저장되는 데이터의 용량도 그만큼 늘어난다.
여기서는 클러스터의 브로커 1대에 이슈가 발생했을 경우에도 안정적으로 데이터를 받기 위한 최소 설정으로 2를 설정한다.

나머지 토픽 옵션은 기본으로 한다.

### 6. 컨슈머
토픽에 저장되어 있는 웹 이벤트를 하둡과 엘라스틱서치에 저장하는 로직을 만드는 방법은 크게 두 가지 방법이 있다.

첫번째는 컨슈머 API를 사용하여 직접 애플리케이션을 개발하는 방법이다.
컨슈머 API를 직접 개발하고 운영하는 것은 컨슈머를 직관적으로 운영하는 가장 좋은 방법이다.

두번째는 커넥트를 사용하는 것이다. 분산 커넥트를 사용하면 REST API를 통해 커넥터로 반복적인 파이프라인을 쉽게 생성할 수 있다.

컨슈머API를 사용할 것인지 커스텀 싱크 커넥터를 사용할 것인지 각자 개발하는 환경에 따라 선택하면 된다.
이미 상용 인프라에 분산 커넥트가 구축되어 있다면 싱크 커넥터를 활용하는 것이 좋지만 분산 커넥트가 없다면 구축하는 데에 시간과 인프라 비용이 발생할 수 있으므로 컨슈머API를 활용하여 멀티 스레드 자바 애플리케이션을 만들고 배포하여 운영하는 것이 좋다.

## 5.1.3 기능 구현

하둡을 실행 할 때 기본 파일 시스템을 하둡으로 설정하기 위해 fs.defaultFS값을 hdfs://localhost:9000으로 설정한다.
```xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
```
토픽 생성
```shell
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic select-color
```