RabbitMQ

로컬 테스트 환경

Docker :

↓shell

$ rabbitmqctl add_user tutorial tutorial Adding user "tutorial" ... Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more. $ rabbitmqctl set_permissions -p / tutorial ".*" ".*" ".*" Setting permissions for user "tutorial" in vhost "/" ...

AMQP; Advanced Message Queuing Protocol

AMP 모델

  • Queue : 복수의 메시지 큐를 이용한다
  • Queue option

    • 큐 설정은 불변이므로, 설정을 변경하려면 큐를 삭제하고 다시 만들어야 한다
    • auto-delete : 모든 소비자와의 연결 종료 > 큐 자동 제거
    • exclusive : 단일 소비자만 연결 허용 > 연결 종료 > 큐 자동 제거
    • x-expires : 지정 ms 동안 미사용 시 큐 자동 제거
    • durable : 메시지에 delivery-mode=2 설정이 없어도 디스크에 저장
    • x-message-ttl : 지정 ms 동안 소비되지 않은 메시지 만료; DLX 설정 시 DLX로 라우팅된다
    • x-max-length : 지정된 개수의 메시지까지만 보관. 초과시 가장 오래된 메시지 만료; DLX 설정 시 DLX로 라우팅된다
  • Exchange : 메시지 브로커(여기서는 RabbitMQ)에 들어온 메시지를 적절한 큐에 전달한다
  • Exchange type

    • direct : 바인드된 큐 중, 지정된 routing-key에 해당하는 모든 큐에 전달
    • fanout : 바인드된 모든 큐에 전달
    • topic : 바인드된 큐 중, 지정된 routing-key가 패턴에 매치하는 모든 큐에 전달
      • * 와일드카드 : .을 제외한 임의 문자열 == /[^.]+/
      • # 와일드카드 : 해당 위치 이후, .을 포함한 전체 문자열 == /.*$/
    • headers : 바인드된 큐 중, 헤더 키-값 조건에 맞는 모든 큐에 전달
      • x-match="any" : 하나라도 일치하면 라우팅
      • x-match="all" : 모두 일치해야 라우팅
  • Binding : 메시지로부터 적절한 큐를 찾는 평가 규칙
  • 단순하게 사용하는 경우, 메시지의 routing-key를 큐의 이름으로 설정할 수 있다

통신 규약

  • 채널 : 하나의 AMQP 연결은 여러 채널을 이용해 대화를 수행한다
  • 클래스 & 메서드 : AMQP 명령은 Connection.Start처럼 클래스와 메서드로 구별된다
  • 프레임 : 프레임 유형 | 채널 번호 | 프레임 바이트 크기 | 페이로드 | 0xce

예. Exchange와 Queue를 정의하고 Binding 설정

예. 메시지 발행/구독

    • channel.basicPublish(...)
    • channel.basicConsume(...)
      • channel.basicCancel(consumerTag)을 호출하기 전까지 RabbitMQ로부터 메시지를 수신한다
      • 소비자 등록 시 no-ack=true로 설정하면, RabbitMQ가 수신 응답을 기다리지 않는다
      • 소비자 생성 시 auto-ack를 설정하면, 수신 메시지에 대해 ack를 자동으로 응답한다
      • 클라이언트측 수신 소켓 버퍼 크기 조정 관련 참고사항 : 수신한 패킷이 버려짐
  • Basic.Ack 시 multiple=true로 설정하면, 이전 모든 메시지에 대해 Ack하는 것과 동일하다

  • GET http://localhost:55555/mq2/%EC%95%88%EB%85%95~%20%EC%84%B8%EC%83%81
  • ↓text

    10:05:19.801 I [pool-1-thread-7] [SimpleConsumer] exchange=MQ2, consumerTag=amq.ctag-DKpzPbkDiYCgN_39fSVgMg, deliveryTag=1, routingKey=MQ2, body=안녕~ 세상 10:05:25.821 I [pool-1-thread-8] [SimpleConsumer] exchange=MQ2, consumerTag=amq.ctag-DKpzPbkDiYCgN_39fSVgMg, deliveryTag=2, routingKey=MQ2, body=안녕~ 세상

publishers.html#message-properties

  • 아래 테이블에서 "app-specific"은 RabbitMQ 코어에서 기본으로 사용하지 않는, 생산자-소비자 사이의 약속임을 의미한다. 예를 들어, content-encoding이 무엇이든 AMQP는 이진 메시지를 그대로 전달하고, 각 소비자는 자기가 받아들이는 content-encoding과 일치하는 메시지만 ack하여 소비할 수 있다
  • Queue 정의 시 durable=true로 설정하면, 해당 큐에서는 모든 메시지가 delivery-mode=2로 전달된 것처럼 디스크에 저장한다
  • 디스크에 저장된 메시지는, 메시지에 대한 참조 카운트가 0이 되면 삭제된다

PropertyTypeDescriptionRequired?
delivery-mode1 or 22(persistent; 메시지를 디스크에 저장), 1(transient; 메시지 별도 저장 안 함)Yes
typeStringapp-specific 메시지 타입. e.g. order.createdNo
headersMap (string => any)app-specific 메시지 속성. RabbitMQ가 이를 해석하여 라우팅하도록 설정할 수 있다No
content-typeStringapp-specific 메시지 형식. e.g. text/plain, application/jsonNo
content-encodingStringapp-specific 메시지 인코딩. e.g. UTF-8, gzipNo
message-idStringapp-specific 메시지 IDNo
correlation-idStringapp-specific 메시지 식별 ID. 특정 요청 메시지에 대한 응답 메시지임을 식별할 수 있도록 돕는다. e.g. 요청 메시지의 message-idNo
reply-toStringapp-specific 응답 정보. e.g. 응답 큐 이름No
expirationString메시지 TTL(ms)No
timestampTimestampapp-specific 임의 시각(밀리초 단위 UNIX Timestamp)No
user-idString설정한 경우, 연결한 유저와 일치할 경우에만 발행된다No
app-idString앱 이름No

메시지 속성은 아니나, 메시지 배달 시 추가되는 속성

PropertyTypeDescription
Delivery tagPositive integer순증하는 배달 번호
RedeliveredBooleantrue인 경우, NACK(Negative ACK)로 거절 -> requeue -> 재전달(아마 다른 소비자로) 됐음을 의미
ExchangeStringExchange which routed this message
Routing keyStringRouting key used by the publisher
Consumer tagStringConsumer (subscription) identifier

예. 메시지 발행에 대한 결과 수신

  • ACK는 메시지 발행을 보장하지만, 그 외의 상태가 메시지 미발행/미수신을 보장하지는 않음에 유의
  • ACK가 아니어도 메시지가 발행-수신될 수 있다

  • channel.waitForConfirms(ms)

  • GET http://localhost:55555/mq3/%EC%95%88%EB%85%95~%20%EC%84%B8%EC%83%81
  • ↓text

    10:05:48.799 I [reactor-http-nio-2] [MQ3] Message[안녕~ 세상] <<< publish UNKNOWN 10:05:48.799 I [pool-1-thread-9] [SimpleConsumer] exchange=MQ3, consumerTag=amq.ctag-_lNyNthMsgokgSwUllxE6Q, deliveryTag=3, routingKey=MQ3, body=안녕~ 세상 10:05:52.115 I [reactor-http-nio-2] [MQ3] Message[안녕~ 세상] <<< publish ACK 10:05:52.115 I [pool-1-thread-10] [SimpleConsumer] exchange=MQ3, consumerTag=amq.ctag-_lNyNthMsgokgSwUllxE6Q, deliveryTag=4, routingKey=MQ3, body=안녕~ 세상

예. direct 익스체인지를 fallback으로 사용

  • Exchange 정의 시 alternate-exchange 속성 설정

  • GET http://localhost:55555/mq4/%EC%95%88%EB%85%95~%20%EC%84%B8%EC%83%81
  • ↓text

    10:27:01.393 I [pool-1-thread-7] [FallbackConsumer] Fallback >> exchange=MQ4, consumerTag=amq.ctag-juY9fQmTnu43a0lnBYF0IQ, deliveryTag=1, routingKey=NON_REGISTERED_ROUTING_KEY, body=안녕~ 세상 10:27:18.795 I [pool-1-thread-8] [FallbackConsumer] Fallback >> exchange=MQ4, consumerTag=amq.ctag-juY9fQmTnu43a0lnBYF0IQ, deliveryTag=2, routingKey=NON_REGISTERED_ROUTING_KEY, body=안녕~ 세상

예. 트랜잭션을 이용한 메시지 발행 보장

  • 트랜잭션은 단일 큐에 대한 작업만 원자적으로 수행함에 유의
  • 메시지 전달 보장이 아님에 유의

  • channel.txSelect() ~ txCommit() / txRollback()

  • GET http://localhost:55555/mq5/%EC%95%88%EB%85%95~%20%EC%84%B8%EC%83%81
  • ↓text

    12:07:44.882 I [pool-1-thread-10] [SimpleConsumer] exchange=MQ5, consumerTag=amq.ctag-fdWv9vjJw47g9U_x_jHxWw, deliveryTag=3, routingKey=MQ5, body=안녕~ 세상 12:07:47.475 I [reactor-http-nio-2] [MQ5] Message rejected forcibly

RabbitMQ 확장 기능

Basic.Nack

Basic.Reject가 메시지 1건에 대해 거부할 때, Nack는 이전 메시지를 포함하여 거부한다

Exchange.Bind

Exchange에서 바로 큐로 라우팅하지 않고, 다른 exchange로 라우팅하여 n단계 라우팅을 가능하게 한다

Consistent Hash Exchange

메시지 로드 밸런싱 또는 샤딩을 수행한다

Cluster

클러스터 런타임 상태

Exchange, queue, binding, virtual host, user, policy 등

클러스터 노드

  • 디스크 노드 : 클러스터 런타임 상태를 RAM과 디스크 모두에 저장
    • 디스크 노드는 클러스터 재접속 시 디스크 정보를 이용해 장애를 복구한다
    • 클러스터를 구성하려면 최소 1개의 디스크 노드가 필요하다
    • 여러 디스크 노드가 장애 상태가 되고, 과반이 공유 상태에 동의하지 않는 경우, 정상 노드만 먼저 재시작하면 된다
  • 램 노드 : 클러스터 런타임 상태를 RAM에만 저장
  • 클러스터 구성 예
  • 메인 디스크 노드 1(rabbitmq-management 플러그인 사용), 보조 디스크 노드 1(rabbitmq-management 플러그인 사용), 램 노드 N

Credit

  • 새로운 연결이 수립되면, 연결에 일정량의 credit을 할당한다
  • 연결로부터 RPC를 수신하면 제공된 credit에서 일정량을 차감한다
  • 수신된 RPC를 처리하면 차감된 credit을 복구한다
  • 소지한 credit이 부족한 경우, 수신한 RPC를 폐기한다
  • credit으로 인한 가용/비가용 상태 변경은 Connection.Blocked, Connection.Unblocked 메서드로 알려준다

DLX; Dead-Letter Exchange

  • 큐 정의 시 x-dead-letter-exchange를 설정하면, 거부된 메시지를 해당 exchange로 라우팅한다
  • 큐 정의 시 추가로 x-dead-letter-routing-key를 설정하면, 거부된 메시지의 routing-key를 해당 값으로 설정하여 DLX로 라우팅한다
  • 큐에 x-message-ttl이 설정된 경우, 만료된 메시지 역시 DLX로 라우팅된다

HA;Highly Available 큐

  • RabbitMQ 클러스터 노드 사이에 마스터-슬레이브 관계를 형성하고, 슬레이브 큐는 마스터 큐를 미러링한다
  • 미러링 종류
    • all : 클러스터 내 모든 노드가 복제본을 공유
    • exactly : min(총 노드 수, 지정 노드 수)개의 노드가 복제본을 공유
    • nodes : 지정한 노드들끼리 복제본을 공유
  • HA 큐에 대한 트랜잭션은, 모든 활성 미러링 큐가 일치할 때까지 응답을 대기한다

MQTT; MQ Telemetry Transport

  • MQTT 플러그인을 사용하면, MQTT 클라이언트들이 RabbitMQ를 통해 메시지를 발행-구독할 수 있다
  • MQTT는 불안정한 네트워크에서의 짧은 메시지 송수신에 적합함에 유의

STOMP; Streaming Text Oriented Message Protocol

  • STOMP 플러그인을 사용하면, STOMP 클라이언트들이 RabbitMQ를 통해 메시지를 발행-구독할 수 있다
  • destination="/queue/xxx"으로 메시지를 발행하면, RabbitMQ는 xxx 큐로 메시지를 라우팅한다; 큐가 없으면 auto-delete 큐가 생성된다
  • 소비할 때도 destination="/queue/xxx"

  • destination="/amq/queue/xxx"으로 메시지를 발행하면, RabbitMQ는 사용자 정의 xxx 큐로 메시지를 라우팅한다
  • 소비할 때도 destination="/amq/queue/xxx"

  • destination="/exchange/xxx/yyy"으로 메시지를 발행하면, RabbitMQ는 사용자 정의 xxx 익스체인지에 yyy 라우팅 키로 메시지를 라우팅한다; 큐가 없으면 auto-delete 큐가 생성된다
  • 소비할 때도 destination="/exchange/xxx/yyy"

  • destination="/topic/xxx"으로 메시지를 발행하면, RabbitMQ는 amp.topic 익스체인지에 xxx 라우팅 키로 메시지를 라우팅한다; 큐가 없으면 auto-delete 큐가 생성된다
  • amp.topic 익스체인지에 바인드된 모든 큐가 라우팅 검사 대상. 소비할 때도 destination="/topic/xxx"를 이용하고, xxx에 와일드카드(#, *) 사용 가능

  • reply-to="xxx" 헤더로 응답 메시지를 발행하면, RabbitMQ는 요청 메시지 발행자만 1회 구독 가능(exclusive && auto-delete)한 큐를 자동으로 생성하여 라우팅한다
  • STOMP 발행 메시지 헤더 중, AMQP 메시지 속성으로 정의된 키-값들은 자동으로 AMQP 메시지 속성으로 설정된다
  • AMQP 메시지 속성들은 자동으로 STOMP 구독 메시지 헤더로 설정된다

Web STOMP

기본 15670 포트로 동작하는 WebSocket 호환 HTTP 서버 + 이와 STOMP 통신하는 웹 브라우저용 SockJS 기반 클라이어트