1. MQTT
- MQTT(Queue Telemetry Transport)는 Iot(Internet of Thins) 장치 간의 경량 메시징 프로토콜
- 네트워크 대역폭이 제한적이거나 지연 시간이 민감한 환경에서도 효율적인 통신을 가능하게함
2. QoS(Quality of Service)
- MQTT는 QoS 레벨을 사용하여 메시지 전달의 품질을 보장함
- 메시지가 어떻게, 그리고 얼마나 신뢰성 있게 전송되어야 하는지 정의
- 세 가지 QoS 레벨로 나눠짐
1) QoS 0(At most once)
- 메시지가 최대 한 번 전송되도록 보장
- 메시지가 전송 후 전송 여부에 대한 확인이나 재시도가 없으므로 네트워크 오류로 인한 메시지 손실 가능성이 있음
- 가장 낮은 레벨로 가장 낮은 오버헤드를 가지며 빠르고 경량의 통신이 필요할 때 사용
2) QoS 1(At least once)
- 메시지가 최소 한 번은 수신자에게 도달하도록 보장
- 수신자는 메시지를 받았을 때 응답을 보내고, 송신자는 해당 응답을 받을 때까지 메시지를 재전송
- 중요한 메시지 전송에 적합한 레벨로 메시지 손실 없이 전송될 수 있지만, 네트워크 지연이나 오버헤드가 더 클 수 있음
3) QoS(Exactly once)
- 메시지가 정확히 한번만 수신자에게 도달하도록 보장
- 가장 신뢰성 높은 전송 방식이며 송수신 양측에서 여러 단계의 핸드셰이크를 통해 이를 보장함
- 메시지가 중복되지 않고 정확히 한 번만 전송되도록 함
- 이 레벨은 가장 높은신뢰성을 요구하는 통신에 사용되지만 그만큼 오버헤드와 지연 시간이 증가할 수 있음
3. 에러 발생과 해결 과정
🤯 에러 발생
MQTT 브로커로 연속적으로 publish 하는 상황(스트리밍 데이터를 전송하는 상황)에서 11개 발송 후 에러가 발생해서 그 후 메시지 발행이 안되는 상태
Error receiving data: Failed to publish to MQTT in the [bean 'mqttOutbound'; defined in: 'class path resource [com/*/projecttemplate/config/MqttConfig.class]'; from source: 'com.*.projecttemplate.config.MqttConfig.mqttOutbound()']
상세에러
ERROR 16566 --- [ctor-http-nio-3] [] c.d.p.service.FinManagerService : [org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:248), org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:372), org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105), org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90), org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70), org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59), org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145), org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105), org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90), org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70), org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132), org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133), org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106), org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72), org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378), org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349), org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329), org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302), com.douzone.projecttemplate.service.FinManagerService.send(FinManagerService.java:275), com.douzone.projecttemplate.service.FinManagerService.lambda$financial$0(FinManagerService.java:147), reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160), reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99), reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181), reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:251), reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79), reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539), reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:197), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:317), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:227), reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:200), reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503), reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299), reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107), reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113), reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:202), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724), reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256), reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122), reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200), reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122), reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294), reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403), reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426), reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:804), reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436), io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346), io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333), io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454), io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290), io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412), io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440), io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420), io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919), io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166), io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788), io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724), io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650), io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562), io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997), io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74), io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30), java.base/java.lang.Thread.run(Thread.java:1583)]
- Spring Integration의 MQTT 아웃바운드 채널에서 메시지를 게시하는 과정에서 실패했음을 나타냄
- 에러 발생 위치는 MqttPahoMessageHandler.publish() 메서드로 MQTT 메시지를 실제로 게시하는 컴포넌트
📝 에러 원인 찾기
- 검토사항: MQTT 브로커 연결, QoS 설정, 토픽 설정, 메시지 페이로드, 보안 및 권한 설정 등을 확인해야함
- QoS 2 설정 사용 시 아래와 같은 사항을 고려해야함
1) MQTT 브로커 지원
- 사용중인 MQTT 브로커가 QoS를 지원하는지 확인해야함
- 대부분의 MQTT 브로커는 QoS 2를 지원하지만 설정 또는 특정 조건 하에서 제한을 둘 수 있음
- 일부 브로커에서는 높은 QoS 레벨을 사용하기 위해 추가적인 설정이나 권한 부여가 필요할 수 있음
2) 클라이언트와 브로커 간의 연결 안정성
- QoS 1, 2는 메시지 전송 성공 보장을 위한 추가적인 통신을 수행하기 때문에 네트워크 연결이 불안정하거나 지연이 심한 환경에서는 메시지 전송이 실패할 수 있으므로 연결 품질이 좋아야함
- QoS 2를 사용할 때 세션 지속성이 중요함, 클라이언트가 브로커에 재연결할 때 이전 세션을 이어갈 수 있도록 적절한 클라이언트 ID와 클린 세션 설정을 사용하는지 확인 필요
3) 메시지 크기와 전송 빈도
- 큰 메시지 크기는 QoS 1, 2를 사용할 때 메시지 전송 시간과 네트워크 부하를 증가시킬 수 있으므로 가능한 메시지 크기를 최소화 할 것
- 높은 전송 빈도는 QoS 1, 2를 사용할 때 오버헤드를 증가시키며 이는 성능 저하 또는 전송 실패로 이어질 수 있음
4) 클라이언트 및 브로커의 로그와 디버깅
- 에러의 구체적인 원인을 파악하기 위해서는 클라이언트와 브로커의 로그 정보를 보고 원인 파악해야함
- 가능하다면 MQTT 클라이언트 라이브러리의 디버깅 기능을 활성화하여 추가적인 정보를 수집할 것
5) 소프트웨어 버전 및 호환성
- 사용중인 MQTT 클라이언트 라이브러리와 MQTT 브로커의 버전이 최신인지 확인할 것, 일부 버전에서는 QoS 관련 문제가 해결 될 수 있음
👊🏻 해결 과정
- QoS 설정이 기존에 2였기 때문에 1, 0으로 테스트 해봤더니 0에서는 에러가 발생하지 않고 MQTT 발송되는 것을 확인할 수 있었음
- 하지만 QoS 0으로 설정하고 나니까 메시지 손실이 발생하는 상황 발생하였고 결국 QoS1, 2를 설정해야만 하는 상황
- 에러 원인 파악 과정에서 MQTT 버전이 최신이 아닌 것을 확인함, 현재 MQTT 클라이언트 라이브러리가 MQTT 3버전을 사용중인데, 5버전을 사용하면 QoS 문제를 해결할 수 있다는 글을 보고 MQTT 클라이언트 버전을 변경함
1) MQTT v3 과 v5 비교
- MQTT 프로토콜 버전 3(특히 3.1.1)과 버전 5 사이에는 기능성, 성능, 사용 용이성 측면에서 여러 중요한 차이점이 있으며 MQTT를 사용하는 애플리케이션에 영향을 줌
① 에러 보고 및 응답 메시지
MQTTv3 | 클라이언트와 서버 간의 통신에서 에러 발생 시 에러의 원인이나 세부사항을 알려주는 명확한 메커니즘이 부족함 |
MQTTv5 | 에러 응답과 함께 상태 코드와 이유 문자열을 반환할 수 있으므로 문제 해결이 용이해져 개발자는 에러의 원인을 좀 더 쉽게 진단할 수 있음 |
② 세션 만료
MQTTv3 | 클라이언트가 연결을 끊을 때, 모든 세션 데이터를 즉시 제거하거나 서버 재시작 시까지 유지해야 함 세부적인 제어가 부족함 |
MQTTv5 | 세션 만료 간격을 설정할 수 있어 연결이 끊긴 후 클라이언트 세션이 얼마나 지속될 지 제어할 수 있음 애플리케이션 요구에 따라 세션 유지 관리를 더 세밀하게 조장할 수 있음 |
③ 향상된 메시지 속성
MQTTv3 | 메시지에 대한 메타데이터나 추가 정보를 포함하는 방법이 제한적 |
MQTTv5 | 사용자 정의 속성을 메시지에 포함할 수 있으며, 페이로드 형식 지정자, 응답 토픽, 코렐레이션 데이터 등 메시지와 관련된 다양한 정보를 전송할 수 있음 |
④ 토픽 별명
MQTTv3 | 지원하지 않음 |
MQTTv5 | 네트워크 대역폭을 절약하기 위해 토픽 이름 대신 토픽 별명을 사용할 수 있음, 이는 반복적인 토픽 이름의 전송을 줄이는데 도움이 됨 |
⑤ 흐름 제어
MQTTv3 | QoS를 통한 메시지 전달 보장 수준의 설정만 제공 |
MQTTv5 | 요청-응답 패턴을 구현할 수 있는 기능과 서비스 품질(QoS), 구독 옵션(No Local Subscription, Retain as Published, Subscription Identifier 등)을 통해 메시지 전송의 흐름 제어가 세밀해짐 |
⑥ 보안 기능
MQTTv3 | 기본적인 사용자 이름과 비밀번호 기반 인증 지원 |
MQTTv5 | 향상된 인증 메커니즘을 지원하여, 연결 설정 과정에서 다양한 인증 방식과 인증 과정 중 정보 교환을 할 수 있음 |
2) 결과
mqtt publish 10개 연속 요청에도 위 에러가 발생하지 않는 것을 확인
참고로 브로커는 모스키토 브로커 사용
'오늘의 공부 & 기록' 카테고리의 다른 글
랭체인 공부 (0) | 2024.06.25 |
---|---|
[Kubernetes] 쿠버네티스와 마이크로 서비스(Microservice) (0) | 2024.04.14 |
api 동시 요청 수 제한하는 방법 (0) | 2023.12.11 |
HTTP 상태코드 (0) | 2023.09.30 |
HTTP API설계 예시 (0) | 2023.09.30 |