일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 추천시스템
- 빅데이터플랫폼
- 데이터엔지니어링
- recommendation system
- DataEngineering
- apache spark
- spark
- 클라우데라
- Python
- pyspark
- kafka
- mlops
- Terraform
- cloudera
- 데이터엔지니어
- redis bloom filter
- 개발자혜성
- Spark structured streaming
- eks
- 개발자
- 하둡에코시스템
- 하둡
- AWS SageMaker
- BigData
- kubernetes
- dataengineer
- 빅데이터
- 블로그
- Data engineering
- hadoop
- Today
- Total
Hyesung Oh
Kafka 내부 동작 원리 이해하기 (2) Consumer 본문
Kafka 내부 동작 원리 이해하기 (1) Replication 편에 이어서. 다시 한번 강조하면 이 책은 고승덕님의 실전 카프카 개발부터 운영까지를 스터디하며 이해한 내용을 내 나름대로 정리한 포스팅입니다. 따라서 책과 다른 내용이 있다면 그것은 본인이 실수이니 수정 코멘트 부탁드리겠고, 자세하고 정확한 내용은 책을 참고바랍니다.
컨슈머 오프셋 관리
컨슈머가 제대로 동작하려면 오프셋(Offset) 관리가 핵심이다. 어디까지 읽었는지에 대한 북마크라고 이해하면 된다. 그렇다면 오프셋 정보는 어디에서 관리될까? 카프카에 맡길 수도 있고, 컨슈머 자체적으로 HDFS compatible storage에 저장하여 관리할 수도 있다 (ex. Spark Structured Streaming).
전자 방식의 경우 컨슈머 그룹은 자신의 오프셋 정보를 카프카의 전용 토픽에 저장하는데 default로 __consumer_offsets를 이용하며 offset 정보는 숫자로 표기된다. __consumer_offsets 토픽 또한 마찬가지로 partition(default 50), replication factor(default 3) 를 지정할 수 있다.
그룹 코디네이터
컨슈머들은 하나의 컨슈머 그룹에 속하며, 컨슈머 그룹내 컨슈머들은 자신의 정보를 공유한다. 컨슈머 그룹내 컨슈머들은 서로 통신을 할 뿐 독립적인 프로세스이므로 장애 또는 의도한 상황에서 떠날 수도 새롭게 합류할 수도 있다.
상황을 예를 들어보자. A Topic의 3개의 파티션을 A 컨슈머그룹의 1,2,3번 컨슈머가 각각 파티션과 1대1 매핑되어 컨슈밍하고 있다고 가정하자. 이 때 1번 컨슈머가 어떠한 이유로 컨슈머 그룹을 떠나게 되었다. 이때 1번 컨슈머의 빈자리를 남은 2,3번 컨슈머 중 하나가 맡아야 할 것이다. 이 경우 2,3번 컨슈머 중 아무나 맡아도 상관이 없지만, 특정 컨슈머가 이미 여러 파티션을 담당하고 있을 경우 부하는 다른 컨슈머에게 분산되어야 할 것이다. 작업을 균등하게 분배하는 동작을 `컨슈머 리밸런싱`이라고 한다. 컨슈머 그룹은 이러한 변화를 인지하고 각 컨슈머들에게 작업을 균등하게 배분해야 하고 이러한 역할을 `그룹 코디네이터`가 하게 된다.
그룹 코디네이터는 각 컨슈머 그룹 별로 존재하며 카프카 클러스터 내의 브로커 중 하나에 위치하게 된다. 그룹 코디네이터는 컨슈머 그룹의 컨슈머 변경과 구독하는 토픽 파티션 변경 등을 감지하며, 토픽의 파티션과 그룹의 맴버 변경이 일어나면 변경된 내용을 컨슈머들에게 알리는 역할도 한다.
개발자에게는 단순하게 컨슈머 객체 생성시, bootstrap.brokers와 group.id만 설정하면 컨슈머 그룹이 생성되어 카프카로부터 구독한 메세지를 읽어오는 것처럼 보이게 된다. 하지만 자세한 과정을 알고 있는 것이 중요하니 책에서 설명한 순서를 한번 나열하고 넘어가도록 하자.
1.컨슈머 클라이언트는 bootstrap.brokers 리스트에 있는 브로커에게 초기 커넥션 연결을 위한 요청을 보냄
2. 요청 받은 브로커는 그룹 코디네이터 프로세스를 생성하고 컨슈머에게 응답
3. 시작된 그룹 코디네이터는 group.initial.rebalance.delay.ms 설정 값 동안 컨슈머의 요청을 기다림.
4. 컨슈머는 그룹 코디네이터에 컨슈머 그룹 등록 요청을 보냄. 이때 가장 먼저 요청을 보낸 컨슈머를 그룹 코디네이터는 컨슈머 그룹의 리더로 정함.
5. 그룹 코디네이터는 해당 컨슈머 그룹이 구독하는 토픽, 파티션 리스트 그리고 4번에서 지정한 리더 컨슈머 정보를 응답으로 보냄
6. 리더 컨슈머는 정해진 컨슈머 파티션 할당 전략에 따라 그룹 내 컨슈머들에게 파티션 할당한 뒤 해당 결과를 그룹 코디네이터에게 전달. 7. 그룹 코디네이터는 해당 정보를 로컬 캐시하고 그룹 내 컨슈머들에게 성공을 알림
8. 각 컨슈머들은 할당된 파티션들을 컨슈밍하기 시작
여기서 집고 넘어갈 부분은 개인적으로 4~6번이었다. 컨슈머 그룹내에서도 리더가 있으며, 리더 컨슈머는 가장 먼저 요청을 보낸 순서를 기준으로 그룹 코디네이터 측에서 결정하여 통보한다는 사실. 그리고 컨슈머 그룹내 팔로워 컨슈머들에게 컨슈밍할 파티션을 할당하는 전략은 리더 파티션에게 위임한다는 사실이다. 그룹 코디네이터와 리더 파티션의 역할 또한 명확하게 구분지어져 있단 사실을 알 수 있었다.
그룹 코디네이터는 컨슈머들의 변경을 어떻게 감지할까?
그룹 코디네이터와 컨슈머들은 서로 하트비트를 주고받음으로서 컨슈머가 살아있는지 제대로 동작하는지 트래킹 할 수 있다. 그룹 코디네이터는 하트비트를 통한 컨슈머 상태 체크 뿐만 아니라, 할당된 파티션에서 컨슈머가 정상적으로 메세지를 가져가고 있는지 까지 트래킹하기 위해 컨슈머의 poll() 동작을 추가적으로 감지한다.
하지만 이러한 리밸런싱 동작은 우아하고 정교하게 일어나지만 매우 비싼 작업이다. 장애 상황을 빠르게 감지하고 대응하기 위해 session.timeout.ms 과 max.poll.interval.ms (자세한 옵션은 책 또는 공식 문서를 참고) 값을 작게 하면 일시적인 TCP 패킷 손실이나 컨슈머 타임아웃에도 불구하고 원치 않는 리밸런싱이 일어날 수 있다. 그렇다고 또 이 값을 너무 크게 잡으면 그 시간만큼 해당 파티션의 메세지를 읽지 못하는 시간이 길어지는 것이니 장단점이 명확하게 존재한다. 따라서 운영환경에서는 가능하면 기본 설정을 유지하길 권장한다고 한다.
스태틱 맴버십
일반적인 컨슈머 그룹 동작에서는 각 컨슈머를 식별하기 위해 엔티티 ID를 부여한다. 컨슈머별로 부여된 엔티티 ID는 임시 값이므로 컨슈머의 설정 변경이나 소프트웨어 업데이트가 필요하여 컨슈머를 재시작하는 경우, 컨슈머 그룹 내의 동일한 컨슈머임에도 불구하고 새로운 엔티티 ID가 부여되고 리밸런싱이 일어나게 된다. 하지만 이는 원치 않는 동작이므로 이를 보완하기 위해 Kafka 2.3 버전부터 스태틱 맴버십(static membership)이라는 개념이 도입되었다.
스태틱 맴버십은 간단하게 말하면 컨슈머 그룹내 컨슈머들을 인식할 수 있는 ID를 부여함으로서 다시 합류하더라도 그룹 코디네이터는 해당 컨슈머가 기존 구성원임을 인식할 수 있도록 하는 원리이다. 이를 통해 2번의 의도치 않은 리밸런싱 비용을 아낄 수 있다.
1. 스태틱 맴버십이 적용된 컨슈머는 그룹에서 떠날 때 그룹 코디네이터에게 통보하지 않으므로 리밸런싱이 일어나지 않음.
2. 스태틱 맴버십이 적용된 컨슈머는 그룹에 재합류 할 때 그룹 코디네이터는 기존 구성원임을 확인하고 리밸런싱이 일어나지 않음.
group.instance.id를 명시하면 간단하게 적용할 수 있지만 Kafka 버전 2.3 이상부터 사용가능하다고 한다. 하지만 여기에도 사용 시 주의해야할 점이 몇가지 있으니 집고 넘어가자.
1. gropu.instance.id는 그룹 코디네이터가 컨슈머를 식별하는데 사용하는 고유 식별자이므로, 컨슈머 인스턴스별로 고유한 값을 지정해야한다. prefix-suffix 의 구조로 consumer-host1, consumer-host2 와 같은 방법을 예로 들 수 있다.
2. 스태틱 맴버십을 적용한다는 것은 리밸런싱을 최대한 피해겠다는 의도이다. 따라서 session.timeout.ms 값을 기본값보다 큰 값으로 조정하는 것이 좋다. 왜냐면 스태틱 맴버십이 컨슈머에 적용되었다 할지라도, 떠난 후 재합류 하기 까지의 시간이 session.timeout.ms 값 보다 길어진다면 그룹 코디네이터는 리밸런싱을 하기 때문이다.
컨슈머 파티션 할당 전략
위에서 그룹 코디네이터로 부터 지정된 리더 컨슈머는 나머지 컨슈머들에게 각자 맡을 파티션을 할당한다고 했다. 이때 사용되는 전략이 바로 컨슈머 파티션 할당 전략이다. 컨슈머 파티션 할당 전략은 partition.assignment.stratgey 옵션을 통해 지정할 수 있고 크게 RangeAssignor(레인지 전략), RoundRobinAssignor(라운드 로빈 전략), StickyAssignor(스티키 전략), CooperativeStickyAssignor(협력적 스티키 전략) 총 네 가지를 제공한다.
각 전략별로 자세한 동작 원리는 책에 그림과 함께 잘 설명되어 있으니 책을 참고 바라며, 전략별 특징을 간단하게 표로만 정리하고 넘어가도록 하겠다.
파티션 할당 전략 | 설명 |
레인지 파티션 할당 전략 RangeAssignor | 파티션 할당 전략의 기본값으로서 토픽별로 할당 전략을 사용함. 동일한 키를 이용하는 2개 이상의 토픽을 컨슘할 때 유용 |
라운드 로빈 파티션 할당 전략 RoundRobinAssignor | 사용 가능한 파티션과 컨슈머들을 라운드 로빈으로 할당함. 균등한 분배 가능. |
스티키 파티션 할당 전략 StickyAssignor | 컨슈머가 컨슘하고 있는 파티션을 계속 유지할 수 있음. |
협렵적 스티키 파티션 할당 전략 CooperativeStickyAssignor | 스티키 방식과 유사하지만, 전체 일시 정지가 아닌 연속적인 재조정 방식임. |
출처: 실전 카프카 개발부터 운영까지
정확히 한 번 컨슈머 동작
이 내용은 사실 프로듀서의 정확히 한 번 전송에 대한 설명이 선행되어야 하며 후속 포스팅에서 더 자세히 다룰 예정이지만 이번 포스팅에서는 간략하게만 정리하고 넘어가도록 하겠다.
프로듀서의 '정확히 한 번 전송' 동작을 위해 브로커 측에서는 트랜잭션 관리 및 프로듀서의 동작을 보조하는 별도의 트랜잭션 코디네이터라는 프로세스가 실행된다. 트랜잭션 코디네이터는 프로듀서의 정확히 한 번 전송이 성공하면 해당 레코드의 트랜잭션 성공을 표시하는 특수한 메세지를 추가하는데, 따라서 컨슈머에게 이러한 특수한 메세지가 추가된 데이터만 읽도록 설정하면, 정확히 한 번 컨슈머가 동작할 수 있게 되는 원리이다.
컨슈머에서는 ISOLATION_LEVEL_CONFIG 옵션을 "read_committed" (default read_uncommited)로 설정하면 정확히 한 번 컨슈머 동작을 활성화 할 수 있다. 컨슈머가 트랜잭션 코디네이터와 따로 통신하는 부분은 없으며 위에서 설명했다싶이 트랜잭션이 완료되었다는 표시가 있는 데이터만 읽을 수 있게 하는 것이다.
하지만 여기엔 함정이 있는데, 컨슈머가 트랜잭션 코디네이터와 통신하는 부분이 없으므로 정확하게 메세지를 한 번 가져오는지는 보장할 수 없단 사실이다. 정확히 한번 보낸 메세지만 읽어갈 수 있게 한다고 했지, 카프카 클라이언트인 컨슈머의 어플리케이션 레벨에서 정확히 한번 처리하는지는 보장할 수 없단 것이다. 이런 컨슈머의 동작까지 정확히 한 번 처리하게 하려면 `컨슘-메세지 처리-프로듀싱` 과정을 하나의 트랜잭션으로 처리돼야 한다.
sendOffsetsToTransaction 메소드를 이용하여 컨슈머 그룹의 오프셋 커밋을 트랜잭션에 포함시키고, 이 처리 과정이 실패하면 해당 컨슈머 그룹의 커밋 오프셋이 증가하지 않게 함으로써 실패한 전체 트랜잭션을 다시 시작할 수 있게 하는 원리로 이해된다.
일부 컨슈머 애플리케이션에서는 정확히 한번 지원하기도 하며 Spark Structured Streaming 애플리케이션에서는 HDFS, S3에 쓰기시 Checkpointing 및 WAL(Write Ahead Log)를 통해 정확히 한 번을 지원하는 메커니즘이 구현되어있다. 더 자세한 내용은 여기를 참고 (또는 Spark Structured Streaming 공식 문서를 참고 바란다).
이상으로 글을 줄이며 긴 글 읽어주신 분들께 감사드립니다.
'Data Engineering > Apache Kafka' 카테고리의 다른 글
Kafka 내부 동작 원리 이해하기 (1) Replication (0) | 2022.05.31 |
---|