카테고리 없음

[Kafka] 맥북 m2 homebrew 카프카 설치

늘이 2023. 12. 4. 16:55

 

1. 설치하기

아래 명령어 입력하면 끝!

$ brew install kafka

 

2. 실행

 

1) 주키퍼(zookeeper) 실행


주키퍼 실행 명령

$./bin/zookeeper-server-start.sh ./config/zookeeper.properties

 

  • 카프카 실행 하기 전 주키퍼 먼저 실행해야함(주키퍼 먼저 실행 안해서 아래와 같이 에러 발생)
[2023-12-05 10:38:59,603] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2023-12-05 10:38:59,731] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2023-12-05 10:38:59,769] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) [2023-12-05 10:38:59,770] INFO starting (kafka.server.KafkaServer) [2023-12-05 10:38:59,770] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2023-12-05 10:38:59,777] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) [2023-12-05 10:38:59,779] INFO Client environment:zookeeper.version=3.8.2-139d619b58292d7734b4fc83a0f44be4e7b0c986, built on 2023-07-05 19:24 UTC (org.apache.zookeeper.ZooKeeper) [2023-12-05 10:38:59,779] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper) [2023-12-05 10:38:59,780] INFO Client environment:java.version=17.0.9 (org.apache.zookeeper.ZooKeeper) [2023-12-05 10:38:59,780] INFO Client environment:java.vendor=Homebrew (org.apache.zookeeper.ZooKeeper) [2023-12-05 10:38:59,780] INFO Client environment:java.home=/opt/homebrew/Cellar/openjdk@17/17.0.9/libexec/openjdk.jdk/Contents/Home (org.apache.zookeeper.ZooKeeper)
// 생략
[2023-12-05 10:39:18,739] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258) at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:116) at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:2266) at kafka.zk.KafkaZkClient$.createZkClient(KafkaZkClient.scala:2358) at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:657) at kafka.server.KafkaServer.startup(KafkaServer.scala:222) at kafka.Kafka$.main(Kafka.scala:113) at kafka.Kafka.main(Kafka.scala) [2023-12-05 10:39:18,739] INFO shutting down (kafka.server.KafkaServer)

 

 

 

2) 카프카 실행

  • 주키퍼 실행되고 있는 터미널 유지하고, 새로운 터미널을 열어 아래와 같이 명령어를 입력해 실행
$./bin/kafka-server-start.sh ./config/server.properties

 

명령어 해석

'kafka-server.start.sh 스크립트를 실행하되 설정파일로 sever.properties파일을 사용해라'라는 의미

{kafka-server-start.sh 경로} {server.properties파일 상대경로}

  • server-start.sh: 카프카 서버를 시작하는 스크립트 파일 , 위치는 /opt/homebrew/Cellar/kafka/3.6.0/libexec/bin
  • server.properties: 카프카 서버를 시작하는데 사용할 설정 파일, 위치는 /opt/homebrew/Cellar/kafka/3.6.0/libexec/config 

경로 확인

 

 

 

3. 사용하기

 

1) 토픽(Topic) 생성하기

$./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic {토픽이름}

/bin/kafka-topics.sh: 카프카 토픽을 관리하기 위한 스크립트

--create: 토픽을 생성하는 옵션

--bootstrap-server localhost:9092: 카프카 클러스터에 연결할 브로커의 부트스트랩 서버 지정, 토픽을 생성할 때 브로커에 연결할 수 있는 주소(localhost:9092)를 제공함 

--topic {토픽이름}: 생성할 토픽 이름 지정 

 

2) 토픽 목록 확인

$./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

 

3) 토픽 정보 확인

$./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic {토픽이름}

 

4) 토픽 삭제

$./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic {토픽이름}

 

5) Kafka 통신 테스트

터미널 새 창을 열고 Producer 실행

$sh ./bin/kafka-console-producer.sh --topic {토픽이름} --bootstrap-server localhost:9092

 

또 다른 터미널 새 창을 열어 Consumer 실행

$sh ./bin/kafka-console-consumer.sh --topic {토픽이름} --from-beginning --bootstrap-server localhost:9092

 

Producer에서 메시지를 입력하면 Counsumer에서 출력되는 것을 확인할 수 있음

 

 

6) 기타 명령어

# Topic 생성 - partitions, replication-factor
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic {토픽이름} --partitions 1 --replication-factor 1

# Topic 확인
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic {토픽이름}

# Topic 목록
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# Topic 삭제
./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic {토픽이름}

# Producer
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {토픽이름}

# Producer - Key, Value
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {토픽이름} --property "parse.key=true" --property "key.separator=:" --property "print.key=ture"

# Producer - Message
ABCDE

# Producer - Key, Value Message
key:{"val1":"A","val2":"B","val3":3}

# Consumer
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {토픽이름} --from-beginning

# Consumer Group 확인
./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# Consumer Group Topic 확인
./bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group {그룹이름}