카테고리 없음

[Kafka] 카프카(Kafka) - 프로듀서(Producer)

늘이 2023. 12. 4. 16:15

 

 

"Kafka 프로듀서는 단순히 메시지를 전송하는 게 아니라, 성능, 신뢰성, 확장성을 조절할 수 있는 다양한 설정을 가진 강력한 구성요소입니다."

 

 

1. 프로듀서 기본 흐름

Producer 👉🏻 send(record) 👉🏻 serializer 👉🏻 partitioner 👉🏻 Accumulator 👉🏻 Sender 👉🏻 Broker
전송 byte[]로 변환 파티션 결정 배치에 모음 전송

 

  • send(): 메시지 전송 시작
  • Serializer: 문자열 → byte[] 변환
  • Partitioner: 어떤 파티션으로 보낼지 결정
  • Accumulator: 배치 버퍼에 잠시 쌓아둠
  • Sender: 실제 전송 담당 스레드

 

프로듀서의 파티션 선택 방법

라운드로빈 또는 키로 선택

같은 키를 갖는 메시지는 같은 파티션에 저장(같은 키는 순서 유지됨)

 

2. 전송 성능 튜닝 요소

# application.properties
spring.kafka.producer.batch-size=32768        # 한 배치에 최대 32KB
spring.kafka.producer.linger-ms=100           # 최대 100ms까지 기다림

1) batch.size

  • 한 번에 전송할 메시지 크기(= 배치 크기)
  • 배치가 다 차면 바로 전송
  • 너무 작으면 전송 요청이 많아져 성능 저하

 

2) linger.ms

  • 전송 대기 시간(기본값O)
  • 대기 시간이 없으면 배치가 덜 차도 브로커로 바로 전송
  • 대기 시간을 주면 그 시간 만큼 배치에 메시지 추가가 가능해서 한번의 전송 요청에 더 많은 데이터 처리 가능

 

 

 

3. 전송 신뢰성

 

1. send() 메소드 사용

1) 전송 결과가 필요 없는경우

코드

// controller
@GetMapping("/send/no-result")
    public String sendNoResult(@RequestParam String message) {
        kafkaProducerService.sendWithoutResult(message);
        return "전송 요청 완료 (no result)";
    }



// service
public void sendWithoutResult(String message) {
        kafkaTemplate.send(TOPIC, message);
        log.info("메시지 전송 요청 (결과 기다리지 않음): {}", message);
    }

 

요청

로그 확인

 

2) 전송 결과가 필요한 경우

① Future 사용

  • 처리량 저하 있음

코드

// controller
    @GetMapping("/send/future")
    public String sendWithFuture(@RequestParam String message) {
        kafkaProducerService.sendWithFuture(message);
        return "전송 완료 (with future)";
    }

// service
public void sendWithFuture(String message) {
        try {
            SendResult<String, String> result = kafkaTemplate.send(TOPIC, message).get(); // blocking
            log.info("전송 성공 (Future): offset={}, partition={}",
                    result.getRecordMetadata().offset(),
                    result.getRecordMetadata().partition());
        } catch (Exception e) {
            log.error("전송 실패 (Future): {}", e.getMessage());
        }
    }

 

요청

 

로그

 

 Callback 사용

  • 처리량 저하 없음

코드

// controller
 @GetMapping("/send/callback")
    public String sendWithCallback(@RequestParam String message) {
        kafkaProducerService.sendWithCallback(message);
        return "전송 요청 완료 (with callback)";
    }
    
// service
public void sendWithCallback(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("전송 성공 (Callback): offset={}, partition={}",
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("전송 실패 (Callback): {}", ex.getMessage());
            }
        });
    }

 

요청

 

로그

 

 

2. 전송 보장과 ACK 설정

"Kafka에서 acks=all은 모든 리플리카에 저장될 때만 전송 성공으로 간주하지만, 실제로는 min.insync.replicas 수만큼만 저장되어도 성공으로 판단됩니다. 이 수보다 적어지면 Kafka는 안전하지 않다고 판단해 전송을 실패시킵니다."

 

1) acks란?

  • 프로듀서가 메시지를 전송한 뒤, 카프카가 응답을 언제 주느냐에 대한 설정
ACK 옵션 의미 장단점
acks=0 응답 기다리지 않음, 전송 보장 안됨 - 빠르지만 메시지 유실 가능
- 성능 좋음
acks=1 파티션의 리더 브로커에 저장되면 응답 - 리더 장애 시 유실 가능
- 성능 보통
acks=all(또는 -1) - 모든 리플리카(리더 + 팔로워) 모두 저장되면 응답
- 브로커 설정 *min.insync.replicas에 따라 달라짐
- 안정성 높음
- 성능 낮음

 

2) *min.insync.replicas란?

  • 브로커 옵션 중 하나인 min.insync.replicas
  • acks=all 일 때, 카프카가 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
  • 예시 상황(acks 설정이 all 이라고 가정)
리플리카 min.insync.replicas 장애상황 전송 결과 설명
3 1 없음 성공 - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개에 저장하면 성공 응답
- ack =1 과 동일하게 동작(리더 장애 시 메시지 유실 가능)
3 2 없음 성공 - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개, 팔로워 1개 총 2개 저장 시 성공 응답
3 3 팔로워 1 다운 실패 - 리플리카 3개(리더 1 + 팔로워2) 중 리더 1개, 팔로워 2개 총 3개 저장 시 성공 응답
- 장애 상황 발생, 팔로워 1개가 다운됐기 때문에 리더 1개, 팔로워 1개 저장되어 결과는 실패

 

4. 프로듀서의 파티션 선택 방식

  • Kafka 프로듀서는 메시지를 특정 토픽에 보낼 때, 어떤 파티션에 보낼지를 결정해야함
  • 파티션 선택은 전송 성능뿐 아니라 순서 보장과 메시지 처리 전략에도 영향을 미칠 수 있으므로 전송 신뢰성 설정과 함께 파티션 전략도 반드시 고려 필요

 

1. 선택 방식

  • Partitioner라는 컴포넌트를 통해 결정되며 기본적으로 두 가지 방식이 있음
방식 설명 특징
라운드로빈(Round-robin) 키 없이 보낼 경우 자동 분산 균등 분산, 순서 보장X
로그 같이 순서 보장이 필요하지 않은 경우 사용
Key 기반 분산 키가 있는 경우, 동일 키 -> 동일 파티션 순서보장, 특정 파티션 쏠림 가능
사용자별 처리, 트랜잭션, 이벤트 순서가 필요한 경우 사용

 

① 라운드로빈(Round-robin)

kafkaTemplate.send("topic-name", "hello");
  • 파티션이 3개라면 순차적으로 0 → 1 → 2 → 0 식으로 분산

 

② Key 기반 분산

kafkaTemplate.send("topic-name", "user123", "hello");
  • "user123"이라는 키의 Hash값을 기준으로 특정 파티션에 고정
  • 같은 키를 가진 메시지는 항상 같은 파티션으로 가기 때문에 순서가 보장됨

 

 

5. 에러 유형 및 실패 대응 전략

 

1. 에러 유형

1) 전송 과정에서 실패

실패 원인 설명 대응 전략
네트워크 타임아웃, 일시적 네트워크 오류 브로커 응답 지연 retries 설정 -> 재시도
리더 다운 -> 리더 재선출 중 일시적 장애 재시도 또는 큐 재적재
메시지 크기 제한 초과 브로커 설정 한도 초과 max.request.size, 브로커 message.max.bytes 조정

 

2) 전송 전 실패

실패 원인 설명 대응 전략
직렬화 실패 DTO 변환 오류 등 예외 처리 & 기록
요청 크기 초과 프로듀서 측 제한 초과 buffer.memory 조정
대기 시간 초과 배치 버퍼 가득 찬 경우 linger.ms, batch.size 최적화

 

2. 실패 대응 전략

1) 재시도

  • 재시도 가능한 에러는 재시도 처
  • 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도(retries 속성)
  • send()메서드에서 익셉션 받으면 타입에 따라 send() 재호출
  • 재시도가 필요한 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등
  • 특별한 이유가 없다면 무한 재시도는 하지 않는 것이 좋음

 

① 재시도 관련 설정

spring.kafka.producer.retries=3 // 전송 실패 시 3번까지 재시도
spring.kafka.producer.enable-idempotence=true // 중복 방지
spring.kafka.producer.max-in-flight-requests-per-connection=1 // 순서 보장

 

  • spring.kafka.producer.retries=3 
    • 전송 실패 시 최대 몇 번까지 재시도할지 설정
    • 설정 값이 3이라면 1회 원래 전송 + 3회 재시도로 총 4번의 전송을함
    • 재시도하면 메시지가 중복될 가능성이 생길 수 있으므로 enable-idempotence=true와 함께 설정해야 안전함
  • spring.kafka.producer.max-in-flight-requests-per-connection=1
    • Kafka 서버에 동시에 전송 중인 요청의 개수 제한
    • 설정 값이 1이면 동시에 1개 메시지를 전송하여 순서를 보장하지만 성능은 떨어질 수 있음
    • 설정값이 1 이상이면 동시에 여러 메시지를 전송할 수 있지만 순서를 보장할 수 없음

 

 

2) 기록

  • 추후 처리를 위한 기록
  • 별도 파일, DB, Redis 등을 이용해서 실패한 메시지 기록 저장
  • 추후에 수동(또는 자동) 보정 작업 진행

 

실패 기록 위치 별 상세 설명

위치 설명
send() 호출 자체에서 예외 발생 직렬화 오류 등 전송 시도 전 예외
send().get() 호출 시 Future 방식으로 응답 받을 때 예외 발생
addCallback()의 onFailure() 전송 요청은 성공했지만 실제 전송 실패 시

 

① send() 메서드 자체에서 예외 발생

try {
    kafkaTemplate.send(topic, message);
} catch (Exception e) {
    log.error("Kafka 전송 시도 자체 실패: {}", e.getMessage());
    saveFailedMessage(topic, message, e.getMessage());
}

 

② send().get() 호출 시 예외 발생(Futrue 방식)

try {
    SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); // Blocking
    log.info("Kafka 전송 성공: offset = {}", result.getRecordMetadata().offset());
} catch (Exception e) {
    log.error("Kafka 전송 실패 (Future): {}", e.getMessage());
    saveFailedMessage(topic, message, e.getMessage());
}

 

③ Callback 방식에서 실패 시 예외 발생

kafkaTemplate.send(topic, message).addCallback(
    result -> log.info("Kafka 전송 성공 (Callback): offset = {}", result.getRecordMetadata().offset()),
    ex -> {
        log.error("Kafka 전송 실패 (Callback): {}", ex.getMessage());
        saveFailedMessage(topic, message, ex.getMessage());
    }
);