본문 바로가기

오늘의 공부 & 기록

[MQTT] 에러 해결 과정 정리

 

 

 

1. MQTT

  • MQTT(Queue Telemetry Transport)는 Iot(Internet of Thins) 장치 간의 경량 메시징 프로토콜
  • 네트워크 대역폭이 제한적이거나 지연 시간이 민감한 환경에서도 효율적인 통신을 가능하게함

 

2. QoS(Quality of Service)

  • MQTT는 QoS 레벨을 사용하여 메시지 전달의 품질을 보장함
  • 메시지가 어떻게, 그리고 얼마나 신뢰성 있게 전송되어야 하는지 정의
  • 세 가지 QoS 레벨로 나눠짐

 

1) QoS 0(At most once) 

  • 메시지가 최대 한 번 전송되도록 보장
  • 메시지가 전송 후 전송 여부에 대한 확인이나 재시도가 없으므로 네트워크 오류로 인한 메시지 손실 가능성이 있음
  • 가장 낮은 레벨로 가장 낮은 오버헤드를 가지며 빠르고 경량의 통신이 필요할 때 사용

 

2) QoS 1(At least once)

  • 메시지가 최소 한 번은 수신자에게 도달하도록 보장
  • 수신자는 메시지를 받았을 때 응답을 보내고, 송신자는 해당 응답을 받을 때까지 메시지를 재전송
  • 중요한 메시지 전송에 적합한 레벨로 메시지 손실 없이 전송될 수 있지만, 네트워크 지연이나 오버헤드가 더 클 수 있음

 

3) QoS(Exactly once)

  • 메시지가 정확히 한번만 수신자에게 도달하도록 보장
  • 가장 신뢰성 높은 전송 방식이며 송수신 양측에서 여러 단계의 핸드셰이크를 통해 이를 보장함
  • 메시지가 중복되지 않고 정확히 한 번만 전송되도록 함
  • 이 레벨은 가장 높은신뢰성을 요구하는 통신에 사용되지만 그만큼 오버헤드와 지연 시간이 증가할 수 있음

 

 

3. 에러 발생과 해결 과정

 

🤯 에러 발생

MQTT 브로커로 연속적으로 publish 하는 상황(스트리밍 데이터를 전송하는 상황)에서 11개 발송 후 에러가 발생해서 그 후 메시지 발행이 안되는 상태

Error receiving data: Failed to publish to MQTT in the [bean 'mqttOutbound'; defined in: 'class path resource [com/*/projecttemplate/config/MqttConfig.class]'; from source: 'com.*.projecttemplate.config.MqttConfig.mqttOutbound()']

상세에러
 ERROR 16566 --- [ctor-http-nio-3] [] c.d.p.service.FinManagerService          : [org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:248), org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:372), org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105), org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90), org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70), org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59), org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145), org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105), org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90), org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70), org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132), org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133), org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106), org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72), org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378), org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349), org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329), org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302), com.douzone.projecttemplate.service.FinManagerService.send(FinManagerService.java:275), com.douzone.projecttemplate.service.FinManagerService.lambda$financial$0(FinManagerService.java:147), reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160), reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99), reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181), reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:251), reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79), reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539), reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:197), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:317), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:227), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:200), reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503), reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299), reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107), reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113), reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:202), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256), reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122), reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200), reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122), reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294), reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403), reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426), reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:804), reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436), io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346), io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333), io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454), io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290), io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919), io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166), io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788), io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724), io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650), io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562), io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997), io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74), io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30), java.base/java.lang.Thread.run(Thread.java:1583)]
  • Spring Integration의 MQTT 아웃바운드 채널에서 메시지를 게시하는 과정에서 실패했음을 나타냄
  • 에러 발생 위치는 MqttPahoMessageHandler.publish() 메서드로 MQTT 메시지를 실제로 게시하는 컴포넌트

 

 

📝 에러 원인 찾기

  • 검토사항: MQTT 브로커 연결, QoS 설정, 토픽 설정, 메시지 페이로드, 보안 및 권한 설정 등을 확인해야함
  • QoS 2 설정 사용 시 아래와 같은 사항을 고려해야함

 

1) MQTT 브로커 지원

  • 사용중인 MQTT 브로커가 QoS를 지원하는지 확인해야함
  • 대부분의 MQTT 브로커는 QoS 2를 지원하지만 설정 또는 특정 조건 하에서 제한을 둘 수 있음
  • 일부 브로커에서는 높은 QoS 레벨을 사용하기 위해 추가적인 설정이나 권한 부여가 필요할 수 있음

 

2) 클라이언트와 브로커 간의 연결 안정성

  • QoS 1, 2는 메시지 전송 성공 보장을 위한 추가적인 통신을 수행하기 때문에 네트워크 연결이 불안정하거나 지연이 심한 환경에서는 메시지 전송이 실패할 수 있으므로 연결 품질이 좋아야함
  • QoS 2를 사용할 때 세션 지속성이 중요함, 클라이언트가 브로커에 재연결할 때 이전 세션을 이어갈 수 있도록 적절한 클라이언트 ID와 클린 세션 설정을 사용하는지 확인 필요

 

3) 메시지 크기와 전송 빈도

  • 큰 메시지 크기는 QoS 1, 2를 사용할 때 메시지 전송 시간과 네트워크 부하를 증가시킬 수 있으므로 가능한 메시지 크기를 최소화 할 것
  • 높은 전송 빈도는 QoS 1, 2를 사용할 때 오버헤드를 증가시키며 이는 성능 저하 또는 전송 실패로 이어질 수 있음

 

4) 클라이언트 및 브로커의 로그와 디버깅

  • 에러의 구체적인 원인을 파악하기 위해서는 클라이언트와 브로커의 로그 정보를 보고 원인 파악해야함
  • 가능하다면 MQTT 클라이언트 라이브러리의 디버깅 기능을 활성화하여 추가적인 정보를 수집할 것

 

5) 소프트웨어 버전 및 호환성

  • 사용중인 MQTT 클라이언트 라이브러리와 MQTT 브로커의 버전이 최신인지 확인할 것, 일부 버전에서는 QoS 관련 문제가 해결 될 수 있음

 

 

👊🏻 해결 과정

  • QoS 설정이 기존에 2였기 때문에 1, 0으로 테스트 해봤더니 0에서는 에러가 발생하지 않고 MQTT 발송되는 것을 확인할 수 있었음
  • 하지만 QoS 0으로 설정하고 나니까 메시지 손실이 발생하는 상황 발생하였고 결국 QoS1, 2를 설정해야만 하는 상황
  • 에러 원인 파악 과정에서 MQTT 버전이 최신이 아닌 것을 확인함, 현재 MQTT 클라이언트 라이브러리가 MQTT 3버전을 사용중인데, 5버전을 사용하면 QoS 문제를 해결할 수 있다는 글을 보고 MQTT 클라이언트 버전을 변경함

 

1) MQTT v3 과 v5 비교

  • MQTT 프로토콜 버전 3(특히 3.1.1)과 버전 5 사이에는 기능성, 성능, 사용 용이성 측면에서 여러 중요한 차이점이 있으며 MQTT를 사용하는 애플리케이션에 영향을 줌

 

① 에러 보고 및 응답 메시지

MQTTv3 클라이언트와 서버 간의 통신에서 에러 발생 시 에러의 원인이나 세부사항을 알려주는 명확한 메커니즘이 부족함
MQTTv5 에러 응답과 함께 상태 코드와 이유 문자열을 반환할 수 있으므로 문제 해결이 용이해져 개발자는 에러의 원인을 좀 더 쉽게 진단할 수 있음

 

② 세션 만료

MQTTv3 클라이언트가 연결을 끊을 때, 모든 세션 데이터를 즉시 제거하거나 서버 재시작 시까지 유지해야 함
세부적인 제어가 부족함
MQTTv5 세션 만료 간격을 설정할 수 있어 연결이 끊긴 후 클라이언트 세션이 얼마나 지속될 지 제어할 수 있음
애플리케이션 요구에 따라 세션 유지 관리를 더 세밀하게 조장할 수 있음

 

③ 향상된 메시지 속성

MQTTv3 메시지에 대한 메타데이터나 추가 정보를 포함하는 방법이 제한적 
MQTTv5 사용자 정의 속성을 메시지에 포함할 수 있으며, 페이로드 형식 지정자, 응답 토픽, 코렐레이션 데이터 등 메시지와 관련된 다양한 정보를 전송할 수 있음

 

④ 토픽 별명

MQTTv3 지원하지 않음
MQTTv5 네트워크 대역폭을 절약하기 위해 토픽 이름 대신 토픽 별명을 사용할 수 있음, 이는 반복적인 토픽 이름의 전송을 줄이는데 도움이 됨

 

⑤ 흐름 제어

MQTTv3 QoS를 통한 메시지 전달 보장 수준의 설정만 제공
MQTTv5 요청-응답 패턴을 구현할 수 있는 기능과 서비스 품질(QoS), 구독 옵션(No Local Subscription, Retain as Published, Subscription Identifier 등)을 통해 메시지 전송의 흐름 제어가 세밀해짐

 

⑥ 보안 기능

MQTTv3 기본적인 사용자 이름과 비밀번호 기반 인증 지원
MQTTv5 향상된 인증 메커니즘을 지원하여, 연결 설정 과정에서 다양한 인증 방식과 인증 과정 중 정보 교환을 할 수 있음

 

2) 결과

mqtt  publish 10개 연속 요청에도 위 에러가 발생하지 않는 것을 확인

참고로 브로커는 모스키토 브로커 사용

'오늘의 공부 & 기록' 카테고리의 다른 글

랭체인 공부  (0) 2024.06.25
[Kubernetes] 쿠버네티스와 마이크로 서비스(Microservice)  (0) 2024.04.14
api 동시 요청 수 제한하는 방법  (0) 2023.12.11
HTTP 상태코드  (0) 2023.09.30
HTTP API설계 예시  (0) 2023.09.30