본문 바로가기

Kafka, RabbitMQ

RabbitMQ에 대해 알아보자 - (7) Pub-Sub 패턴 / Fanout Exchange 전략

들어가기 앞서

RabbitMQ의 Publish-Subscribe 패턴은 하나의 메시지를 여러 소비자에게 동시에 전달해야 할 때 사용하는 대표적인 메시징 방식이다.
보통 Fanout Exchange 전략을 활용해 구현하며, 로그 수집, 이벤트 브로드캐스트, 시스템 알림 등에서 자주 사용된다.

Pub-Sub 패턴 / Fanout Exchange 전략

  • 동작 : 하나의 메시지를 여러 소비자에게 동시에 전달
  • Exchange : Fanout Exchange
  • 사용 예제 : 로그 수집, 이벤트 브로드캐스트, 시스템 알림 등

Pub-Sub 모델 / Fanout Exchange 전략

Pub/Sub는 Producer가 메시지를 발행하고, Subscriber가 해당 메시지를 구독하는 방식의 메시징 패턴 입니다. RabbitMQ에서는 Fanout Exchange를 활용하여, 발행된 메시지를 연결된 모든 큐에 브로드캐스트함으로써 이 패턴을 구현할 수 있다. Fanout Exchange에서는 라우팅 키가 필요 없습니다. 단순히 모든 메시지를 모든 큐에 복사해서 전달하는 구조입니다. 

 

Pub/Sub은 프로듀서와 컨슈머를 동시에 동작할 필요가 없으며, 구독자는 비동기로 메시지를 수신합니다. 또한 컨슈머를 언제든지 추가와 제거가 가능하며, 시스템에 영향을 주지 않고 유연하게 확장이 가능합니다.

 

코드는 아래 링크에서 확인해 주시면 감사하겠습니다.

https://www.notion.so/Fanout-Exchange-Code-1ec3565ac5e4805aab81f65c8d789589

 

간단하게 코드를 살펴 보겠습니다.

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class WorkQueueProducer {

    private final RabbitTemplate rabbitTemplate;

    public void send(String workQueueMessage) {
        String message = workQueueMessage;
        rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", message);
    }

}
converAndSend 라이브러리

 

위 코드는 Producer 입니다. API 호출이 되면 프로듀서가 호출되고, 여기서 RabbitTemplate를 통해 Fanout Exchange에 바인딩 된 모든 큐에 메시지를 보내게 됩니다. 이때, 두번째 파라미터는 라우팅 키를 지정하는 부분 입니다. Fanout Exchange는 라우팅 키와 상관 없이 바인딩 된 모든 큐에 메시지를 전달 합니다. 따라서 저렇게 공백을 넣어도 동작에 이상이 없습니다.

 

 

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMqConfig {

    public static final String Queue_Name = "workqueue1";
    public static final String Queue_Name2 = "workqueue2";
    public static final String Queue_Name3 = "workqueue3";
    public static final String FANOUT_EXCHANGE = "fanoutExchange";

    @Bean
    public Queue queue() {
        return new Queue(Queue_Name, true);
    }

    @Bean
    public Queue queue2() {
        return new Queue(Queue_Name2, true);
    }

    @Bean
    public Queue queue3() {
        return new Queue(Queue_Name3, true);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 메시지를 수신하면 연결된 모든 큐로 브로드캐스트
        return new FanoutExchange(FANOUT_EXCHANGE);
    }


    @Bean
    public Binding javaBinding(Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }


    @Bean
    public Binding vueBinding(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }


    @Bean
    public Binding springBinding(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }


}

 

위 코드는 RabbitMQ에 대한 Config 코드 입니다. 큐를 3개 생성하고 Fanout Exchange에 모두 바인딩 하였습니다. 

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkQueueConsumer {

    @RabbitListener(queues = RabbitMqConfig.Queue_Name)
    public void workQueueTask(String message) {
        System.out.println("# Consumer 1 : " + message);
    }

    @RabbitListener(queues = RabbitMqConfig.Queue_Name2)
    public void workQueueTask2(String message) {
        System.out.println("# Consumer 2 : " + message);
    }

    @RabbitListener(queues = RabbitMqConfig.Queue_Name3)
    public void workQueueTask3(String message) {
        System.out.println("# Consumer 3 : " + message);
    }

}

 

그리고 각 Consumer에서는 전달 받은 메시지를 출력하도록 구성하였습니다.

 

Fanout Exchange 실행 결과

 

실행 결과를 살펴보면, 모든 큐에 메시지가 동일하게 전달 되는 것을 확인할 수 있습니다.