카테고리 없음
[Kafka] 카프카(Kafka) - 프로듀서(Producer)
늘이
2023. 12. 4. 16:15
"Kafka 프로듀서는 단순히 메시지를 전송하는 게 아니라, 성능, 신뢰성, 확장성을 조절할 수 있는 다양한 설정을 가진 강력한 구성요소입니다."
1. 프로듀서 기본 흐름
Producer | 👉🏻 | send(record) | 👉🏻 | serializer | 👉🏻 | partitioner | 👉🏻 | Accumulator | 👉🏻 | Sender | 👉🏻 | Broker |
전송 | byte[]로 변환 | 파티션 결정 | 배치에 모음 | 전송 |
- send(): 메시지 전송 시작
- Serializer: 문자열 → byte[] 변환
- Partitioner: 어떤 파티션으로 보낼지 결정
- Accumulator: 배치 버퍼에 잠시 쌓아둠
- Sender: 실제 전송 담당 스레드
프로듀서의 파티션 선택 방법
라운드로빈 또는 키로 선택
같은 키를 갖는 메시지는 같은 파티션에 저장(같은 키는 순서 유지됨)
2. 전송 성능 튜닝 요소
# application.properties
spring.kafka.producer.batch-size=32768 # 한 배치에 최대 32KB
spring.kafka.producer.linger-ms=100 # 최대 100ms까지 기다림
1) batch.size
- 한 번에 전송할 메시지 크기(= 배치 크기)
- 배치가 다 차면 바로 전송
- 너무 작으면 전송 요청이 많아져 성능 저하
2) linger.ms
- 전송 대기 시간(기본값O)
- 대기 시간이 없으면 배치가 덜 차도 브로커로 바로 전송
- 대기 시간을 주면 그 시간 만큼 배치에 메시지 추가가 가능해서 한번의 전송 요청에 더 많은 데이터 처리 가능
3. 전송 신뢰성
1. send() 메소드 사용
1) 전송 결과가 필요 없는경우
코드
// controller
@GetMapping("/send/no-result")
public String sendNoResult(@RequestParam String message) {
kafkaProducerService.sendWithoutResult(message);
return "전송 요청 완료 (no result)";
}
// service
public void sendWithoutResult(String message) {
kafkaTemplate.send(TOPIC, message);
log.info("메시지 전송 요청 (결과 기다리지 않음): {}", message);
}
요청
로그 확인
2) 전송 결과가 필요한 경우
① Future 사용
- 처리량 저하 있음
코드
// controller
@GetMapping("/send/future")
public String sendWithFuture(@RequestParam String message) {
kafkaProducerService.sendWithFuture(message);
return "전송 완료 (with future)";
}
// service
public void sendWithFuture(String message) {
try {
SendResult<String, String> result = kafkaTemplate.send(TOPIC, message).get(); // blocking
log.info("전송 성공 (Future): offset={}, partition={}",
result.getRecordMetadata().offset(),
result.getRecordMetadata().partition());
} catch (Exception e) {
log.error("전송 실패 (Future): {}", e.getMessage());
}
}
요청
로그
② Callback 사용
- 처리량 저하 없음
코드
// controller
@GetMapping("/send/callback")
public String sendWithCallback(@RequestParam String message) {
kafkaProducerService.sendWithCallback(message);
return "전송 요청 완료 (with callback)";
}
// service
public void sendWithCallback(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("전송 성공 (Callback): offset={}, partition={}",
result.getRecordMetadata().offset(),
result.getRecordMetadata().partition());
}
@Override
public void onFailure(Throwable ex) {
log.error("전송 실패 (Callback): {}", ex.getMessage());
}
});
}
요청
로그
2. 전송 보장과 ACK 설정
"Kafka에서 acks=all은 모든 리플리카에 저장될 때만 전송 성공으로 간주하지만, 실제로는 min.insync.replicas 수만큼만 저장되어도 성공으로 판단됩니다. 이 수보다 적어지면 Kafka는 안전하지 않다고 판단해 전송을 실패시킵니다."
1) acks란?
- 프로듀서가 메시지를 전송한 뒤, 카프카가 응답을 언제 주느냐에 대한 설정
ACK 옵션 | 의미 | 장단점 |
acks=0 | 응답 기다리지 않음, 전송 보장 안됨 | - 빠르지만 메시지 유실 가능 - 성능 좋음 |
acks=1 | 파티션의 리더 브로커에 저장되면 응답 | - 리더 장애 시 유실 가능 - 성능 보통 |
acks=all(또는 -1) | - 모든 리플리카(리더 + 팔로워) 모두 저장되면 응답 - 브로커 설정 *min.insync.replicas에 따라 달라짐 |
- 안정성 높음 - 성능 낮음 |
2) *min.insync.replicas란?
- 브로커 옵션 중 하나인 min.insync.replicas
- acks=all 일 때, 카프카가 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
- 예시 상황(acks 설정이 all 이라고 가정)
리플리카 | min.insync.replicas | 장애상황 | 전송 결과 | 설명 |
3 | 1 | 없음 | 성공 | - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개에 저장하면 성공 응답 - ack =1 과 동일하게 동작(리더 장애 시 메시지 유실 가능) |
3 | 2 | 없음 | 성공 | - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개, 팔로워 1개 총 2개 저장 시 성공 응답 |
3 | 3 | 팔로워 1 다운 | 실패 | - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개, 팔로워 2개 총 3개 저장 시 성공 응답 - 장애 상황 발생, 팔로워 1개가 다운됐기 때문에 리더 1개, 팔로워 1개 저장되어 결과는 실패 |
4. 프로듀서의 파티션 선택 방식
- Kafka 프로듀서는 메시지를 특정 토픽에 보낼 때, 어떤 파티션에 보낼지를 결정해야함
- 파티션 선택은 전송 성능뿐 아니라 순서 보장과 메시지 처리 전략에도 영향을 미칠 수 있으므로 전송 신뢰성 설정과 함께 파티션 전략도 반드시 고려 필요
1. 선택 방식
- Partitioner라는 컴포넌트를 통해 결정되며 기본적으로 두 가지 방식이 있음
방식 | 설명 | 특징 |
라운드로빈(Round-robin) | 키 없이 보낼 경우 자동 분산 | 균등 분산, 순서 보장X 로그 같이 순서 보장이 필요하지 않은 경우 사용 |
Key 기반 분산 | 키가 있는 경우, 동일 키 -> 동일 파티션 | 순서보장, 특정 파티션 쏠림 가능 사용자별 처리, 트랜잭션, 이벤트 순서가 필요한 경우 사용 |
① 라운드로빈(Round-robin)
kafkaTemplate.send("topic-name", "hello");
- 파티션이 3개라면 순차적으로 0 → 1 → 2 → 0 식으로 분산
② Key 기반 분산
kafkaTemplate.send("topic-name", "user123", "hello");
- "user123"이라는 키의 Hash값을 기준으로 특정 파티션에 고정
- 같은 키를 가진 메시지는 항상 같은 파티션으로 가기 때문에 순서가 보장됨
5. 에러 유형 및 실패 대응 전략
1. 에러 유형
1) 전송 과정에서 실패
실패 원인 | 설명 | 대응 전략 |
네트워크 타임아웃, 일시적 네트워크 오류 | 브로커 응답 지연 | retries 설정 -> 재시도 |
리더 다운 -> 리더 재선출 중 | 일시적 장애 | 재시도 또는 큐 재적재 |
메시지 크기 제한 초과 | 브로커 설정 한도 초과 | max.request.size, 브로커 message.max.bytes 조정 |
2) 전송 전 실패
실패 원인 | 설명 | 대응 전략 |
직렬화 실패 | DTO 변환 오류 등 | 예외 처리 & 기록 |
요청 크기 초과 | 프로듀서 측 제한 초과 | buffer.memory 조정 |
대기 시간 초과 | 배치 버퍼 가득 찬 경우 | linger.ms, batch.size 최적화 |
2. 실패 대응 전략
1) 재시도
- 재시도 가능한 에러는 재시도 처
- 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도(retries 속성)
- send()메서드에서 익셉션 받으면 타입에 따라 send() 재호출
- 재시도가 필요한 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등
- 특별한 이유가 없다면 무한 재시도는 하지 않는 것이 좋음
① 재시도 관련 설정
spring.kafka.producer.retries=3 // 전송 실패 시 3번까지 재시도
spring.kafka.producer.enable-idempotence=true // 중복 방지
spring.kafka.producer.max-in-flight-requests-per-connection=1 // 순서 보장
- spring.kafka.producer.retries=3
- 전송 실패 시 최대 몇 번까지 재시도할지 설정
- 설정 값이 3이라면 1회 원래 전송 + 3회 재시도로 총 4번의 전송을함
- 재시도하면 메시지가 중복될 가능성이 생길 수 있으므로 enable-idempotence=true와 함께 설정해야 안전함
- spring.kafka.producer.max-in-flight-requests-per-connection=1
- Kafka 서버에 동시에 전송 중인 요청의 개수 제한
- 설정 값이 1이면 동시에 1개 메시지를 전송하여 순서를 보장하지만 성능은 떨어질 수 있음
- 설정값이 1 이상이면 동시에 여러 메시지를 전송할 수 있지만 순서를 보장할 수 없음
2) 기록
- 추후 처리를 위한 기록
- 별도 파일, DB, Redis 등을 이용해서 실패한 메시지 기록 저장
- 추후에 수동(또는 자동) 보정 작업 진행
실패 기록 위치 별 상세 설명
위치 | 설명 |
send() 호출 자체에서 예외 발생 | 직렬화 오류 등 전송 시도 전 예외 |
send().get() 호출 시 | Future 방식으로 응답 받을 때 예외 발생 |
addCallback()의 onFailure() | 전송 요청은 성공했지만 실제 전송 실패 시 |
① send() 메서드 자체에서 예외 발생
try {
kafkaTemplate.send(topic, message);
} catch (Exception e) {
log.error("Kafka 전송 시도 자체 실패: {}", e.getMessage());
saveFailedMessage(topic, message, e.getMessage());
}
② send().get() 호출 시 예외 발생(Futrue 방식)
try {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); // Blocking
log.info("Kafka 전송 성공: offset = {}", result.getRecordMetadata().offset());
} catch (Exception e) {
log.error("Kafka 전송 실패 (Future): {}", e.getMessage());
saveFailedMessage(topic, message, e.getMessage());
}
③ Callback 방식에서 실패 시 예외 발생
kafkaTemplate.send(topic, message).addCallback(
result -> log.info("Kafka 전송 성공 (Callback): offset = {}", result.getRecordMetadata().offset()),
ex -> {
log.error("Kafka 전송 실패 (Callback): {}", ex.getMessage());
saveFailedMessage(topic, message, ex.getMessage());
}
);