본문 바로가기

카테고리 없음

[Kafka] 카프카(Kafka) - 컨슈머(consumer)

 

토픽 파티션에서 레코드 조회

 

토픽 파티션은 그룹 단위로 할당

컨슈머 그룹 단위로 파티션 할당(파티션  개수 >= 컨슈머 그룹 개수)

 

커밋과 오프셋

 

커밋된 오프셋이 없는 경우

처음 접근이거나 커밋한 오프셋이 없는 경우

auto.offset.reset. 설정 사용

earliest: 맨 처음 오프셋 사용

latest: 가장 마지막 오프세 사용(기본값)

none: 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생

 

컨슈머 설정

조회에 영향을 주는 주요 설정

fetch.min.bytes 조회시 브로커가 전송할 최소 데이터 크기
기본값: 1, 이 값이 크면 대기 시간을 늘지만 처리량 증가
fetch.max.wait.ms 데이터 최소 크기가 될 때까지 기다릴 시간
기본값: 500(ms), 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름
max.partition.fetch.bytes 파티션 당 서버가 리턴할 수 있는 최대 크기
기본값: 1048576(1MB)

 

자동 커밋/수동 커밋

enable.auto.commit  true: 일정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)
false: 수동으로 커밋 실행
enable.commit.interval.ms 자동 커밋 주기
기본값: 5000(ms)
poll(), close() 메서드 호출 시 자동 커밋 실행

 

수동 커밋: 동기/ 비동기

consumer.commitSync() 동기 커밋
커밋 실패 시 에러 발생
consumer.commitAsync() 비동기 커밋
성공 실패 여부를 확인하려면 콜백 사용 필요

 

 

재처리와 순서

동일 메시지 조회 가능성

일시적 커밋 실패, 리밸런스 등에 의해 발생

 

컨슈머는 멱등성(idempotence)을 고려해야 함

에를들어 '조회수 1증가 -> 좋아요 1증가 -> 조회수 1증가'를 재처리 할 경우 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음

 

데이터 특성에 따라 타임스탬프, 일련 번호 등을 활용

 

세션 타임아웃, 하트비트, 최대 poll 간격

컨슈머는 하트비트를 전송해서 연결 유지

브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행

관련 설정

session.timeout.ms 세션 타임 아웃 시간
기본값: 10초
heartbeat.interval.ms 하트비트 전송 주기
기본값: 3초
session.timeout.ms의 1/3 이하 추천
max.poll.interval.ms poll()메서드의 최대 호출 간격
이 시간이 자나도록 poll() 하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 

종료 처리

다른 쓰레드에서 wakeup() 메서드 호출

poll() 메서드가 WakeupException 발생 -> close() 메서드로 종료 처리

 

주의: 쓰레드 안전하지 않음

KafkaConsumer는 스레드에 안전핮 ㅣ않음

여러 스레드에서 동시에 사용하지 말 것

wakeup() 메서드는 예외