본문 바로가기

Kafka, RabbitMQ

RabbitMQ에 대해 알아보자 - (10) Header 기반 라우팅 / Headers Exchange 전략

들어가기 앞서

RabbitMQ의 Headers Exchange는 기존의 Fanout, Direct, Topic과는 다르게 라우팅 키가 아닌 메시지의 헤더 값을 기준으로 큐로 라우팅하는 방식입니다.

header routing 패턴 / header exchange 전략


메시지 자체의 메타데이터(header)를 활용하므로, 보다 정교하고 유연한 조건 기반 라우팅이 가능하다는 장점이 있습니다.

복잡한 조건 매칭이나 다중 속성을 고려한 분기 로직이 필요한 경우 유용하게 사용할 수 있습니다.

 

  • 동작 : 메시지의 헤더(key-value)에 따라 큐에 라우팅
  • Exchange : Headers Exchange
  • 사용 예제 : 국가별 정책 분기, 사용자 속성 기반 라우팅, A/B 테스트 등

Headers 모델 / 전략 설명

Headers Exchange는 바인딩할 때 라우팅 키가 아닌 헤더 조건을 명시하며, x-match 속성을 통해 조건 결합 방식을 선택할 수 있습니다.

  • x-match=all : 바인딩된 모든 헤더 조건을 모두 만족해야 큐로 전달
  • x-match=any : 바인딩된 조건 중 하나라도 만족하면 큐로 전달

코드 

1. 메시지 발행

@Component
@RequiredArgsConstructor
public class HeaderProducer {

    private final RabbitTemplate rabbitTemplate;

    public void send(String message, Map<String, Object> headers) {
        MessageProperties properties = new MessageProperties();
        headers.forEach(properties::setHeader);

        Message rabbitMessage = new Message(message.getBytes(), properties);
        rabbitTemplate.send(RabbitMqConfig.HEADERS_EXCHANGE, "", rabbitMessage);
    }
}
 
라우팅 키는 사용하지 않으며, 메시지의 헤더에 조건을 설정합니다.

2. 설정 파일 (RabbitMqConfig)

@Configuration
public class RabbitMqConfig {

    public static final String HEADERS_EXCHANGE = "headersExchange";
    public static final String HEADER_QUEUE_ONE = "header.queue1";
    public static final String HEADER_QUEUE_TWO = "header.queue2";

    @Bean
    public Queue headerQueue1() {
        return new Queue(HEADER_QUEUE_ONE, true);
    }

    @Bean
    public Queue headerQueue2() {
        return new Queue(HEADER_QUEUE_TWO, true);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE);
    }

    @Bean
    public Binding bindingHeaderQueue1(Queue headerQueue1, HeadersExchange headersExchange) {
        Map<String, Object> headerValues = new HashMap<>();
        headerValues.put("x-match", "all");
        headerValues.put("format", "pdf");
        headerValues.put("type", "report");
        return BindingBuilder.bind(headerQueue1).to(headersExchange).whereAll(headerValues).match();
    }

    @Bean
    public Binding bindingHeaderQueue2(Queue headerQueue2, HeadersExchange headersExchange) {
        Map<String, Object> headerValues = new HashMap<>();
        headerValues.put("x-match", "any");
        headerValues.put("format", "zip");
        headerValues.put("type", "backup");
        return BindingBuilder.bind(headerQueue2).to(headersExchange).whereAny(headerValues).match();
    }
}
  • header.queue1은 format=pdf AND type=report일 때 메시지를 받습니다.
  • header.queue2는 format=zip OR type=backup 중 하나라도 맞으면 수신합니다.

3. Consumer 구성

@Component
public class HeaderConsumer {

    @RabbitListener(queues = RabbitMqConfig.HEADER_QUEUE_ONE)
    public void receiveHeaderQueue1(String message) {
        System.out.println("[HeaderQueue1] " + message);
    }

    @RabbitListener(queues = RabbitMqConfig.HEADER_QUEUE_TWO)
    public void receiveHeaderQueue2(String message) {
        System.out.println("[HeaderQueue2] " + message);
    }
}

실행 결과

  1. 헤더: { format=pdf, type=report }
    • Queue1 수신 (all 조건 만족)
    • Queue2 수신 안됨
  2. 헤더: { format=zip }
    • Queue1 수신 안됨
    • Queue2 수신 (any 조건 만족)
  3. 헤더: { format=txt, type=backup }
    • Queue2 수신

Headers Exchange는 기존 라우팅 키 기반보다 더 세밀한 조건 제어가 가능한 반면, 성능과 복잡도 측면에서는 다소 부담이 될 수 있기 때문에 정말 필요한 경우에만 사용하는 것이 좋습니다. 조건 분기, 다양한 메타데이터 기반 분류, 혹은 라우팅 키 없이 메시지를 라우팅해야 하는 특별한 상황에서 좋은 방안이 될 수 있을것 같습니다.