Spring
Spring
Message Queue

메시지 큐

Spring에서의 동기 처리는 API Gateway -> API Server -> 가맹점 Server -> 가맹점 파트너의 응답 -> 가맹점 Server -> API Server -> API Gateway 순으로 이루어진다. 이때 가맹점 파트너의 응답이 늦어지면 가맹점 Server는 응답을 받지 못하고 계속 대기하게 되는데, 이를 방지하기 위해 비동기 처리를 사용한다.
메시지 큐를 사용하기 위해서 메시지큐를 사용하게 되는데 메시지 큐를 사용하게 되면 User 요청 -> Queue에 스택으로 쌓이고 가맹점 파트너의 응답이 오면 Queue에서 꺼내서 처리하게 된다. 이때 가맹점 파트너의 처리가 완료된 시점으로 Push Notifiaction을 통해서 Client에서 처리가 완료되었음을 알려줄 수 있다.

RabbitMQ

  1. RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 기반으로 작동하며, 대규모 분산 시스템에서 사용되는 메시지 큐 서비스를 제공한다.
  2. RabbitMQ는 프로듀서(메시지를 생성하는 어플리케이션)와 컨슈머(메시지를 소비하는 어플리케이션) 간의 비동기적인 통신을 용이하게 한다.
ℹ️

메시지 브로커 : 송신자와 수신자 간의 효율적인 메시지 전달을 중개하는 역할을 담당

Docker Compose

도커 컴포즈 파일을 만들고 👇

docker-compose.yaml
version: '3.7'
services:
  rabbitmq:
    image: rabbitmq:latest
    ports:
      - "5672:5672" # rabbit amap port
      - "15672:15672" # manage port
    environment:
      - RABBITMQ_DEFAULT_USER=admin # 기본사용자 이름 
      - RABBITMQ_DEFAULT_PASS=df159357 # 기본사용자 비밀번호

컴포즈 명령어로 도커를 실행 👇

sudo docker compose up -d 
ℹ️

-d : 백그라운드에서 실행

실행된 컨테이너에 접속해서 👇 명령어 실행.
해당 명령어는 RabbitMQ 서버에서 관리 플러그인을 활성화하는 명령어임.

rabbitmq-plugins enable rabbitmq_management

해당 명령어를 활성화하면 다음👇과 같이 RabbitMQ 관리자 화면에 접속할 수 있음.

RabbitMQ 로직

Spring과 연동

Producer

RabbitMqConfig 추가

MessageConver는 바로 아래에 있고 ConnectionFactory는 application.yaml에 작성함.

@Configuration
public class RabbitMqConfig {
 
  @Bean
  /// Exchnage 생성
  public DirectExchange directExchange() {
    return new DirectExchange("delivery.exchange");
  }
 
  @Bean
  /// Queue 생성
  public Queue queue() {
    return new Queue("delivery.queue");
  }
 
  @Bean
  /// Excnage랑 Queue를 delivery.key로 binding
  public Binding binding(DirectExchange directExchange, Queue queue) {
    return BindingBuilder.bind(queue)
        .to(directExchange)
        .with("delivery.key");
  }
 
  @Bean
  /// RabbitTemplate : Object <-> Json 해주는 역할
  public RabbitTemplate rabbitTemplate(
      /// rabbit.connection.ConnectionFactory
      /// application.yaml 파일에 작성
      ConnectionFactory connectionFactory,
      /// amqp.support.converter.MessageConverter
      MessageConverter messageConverter
  ) {
 
    var rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);
    return rabbitTemplate;
  }
 
  @Bean
  /// Spring이 Bean으로 등록되어 있는 ObjectMapper를 사용
  public MessageConverter messageConverter(ObjectMapper objectMapper) {
    return new Jackson2JsonMessageConverter(objectMapper);
  }
}

application.yaml

spring:
  rabbitmq:
    host: # rabbit mq server ip
    port: 5672
    username: # 내가 설정한 username
    password: # password
  
  ... other code ...

Producer 추가

실제 Controller등이나 Service에서 사용할 Producer를 추가함.
‼️ exchange, routekey는 반드시 RabbitMqConfig에서 작성한 것과 동일하게 작성해야 함.

@Component
@RequiredArgsConstructor
public class Producer {
  private final RabbitTemplate rabbitTemplate;
  public void send(String exchange, String routekey, Object object) {
    rabbitTemplate.convertAndSend(exchange, routekey, object);
  }
}

Consumer

Consumer는 Producer 보다 더 간단하다.
Queue나 Exchange, binding 등을 설정할 필요가 없어서 MessageConverter만 설정해주면 된다.

storeadmin/config/rabbitmq/RabbitMqConfig.java
@Configuration
public class RabbitMqConfig {
  @Bean
  public MessageConverter messageConverter(ObjectMapper objectMapper) {
    return new Jackson2JsonMessageConverter(objectMapper);
  }
}
storeadmin/domain/userorder/consumer/UserOrderConsumer.java
@Component
@Slf4j
public class UserOrderConsumer {
  @RabbitListener(queues = "delivery.queue")
  public void consume(
      UserOrderMessage userOrderMessage
  ) {
    log.info("userOrderMessage : {}", userOrderMessage);
  }
}

상세 구현 Issue Link

https://github.com/rookedsysc/delivery-app/issues/11 (opens in a new tab)

Server Side Envents(SSE)란?

단방향 통신을 통해 서버에서 클라이언트로 실시간 이벤트를 전송하는 웹 기술이다.

  1. 일반적인 웹 소켓과 비교하면, SSE는 단방향 통신만을 지원하며, 추가적인 설정 없이도 웹 브라우저에서 내장된 기능으로 지원됩니다.
  2. SSE는 텍스트 기반 형식으로 데이터를 전송한다. (JSON)
  3. SSE는 기존의 HTTP 연결을 재사용하여 데이터를 전송한다. 따라서 별도의 특별한 프로토콜이나 서버구성이 필요하지 않다.

SSE with RabbitMQ

초기화 로직이 중요하다 Sse를 사용해서

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class SseApiController {
 
  /// Thread Safe한 자료구조
  /// TODO: ConcurrentHashMap에 대해 공부
  private static final Map<String, SseEmitter> userConnection = new ConcurrentHashMap<>();
 
  @GetMapping(path = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public ResponseBodyEmitter connect(
      @Parameter(hidden = true) String userId,
      @AuthenticationPrincipal UserSession userSesssion
  ) {
    log.info("login user {}", userSesssion);
 
    /// milisecond 단위로 timeout 시간 설정가능
    var emitter = new SseEmitter();
    userConnection.put(userSesssion.getUserId()
        .toString(), emitter);
 
 
    /// Default Timeout: 30s
    emitter.onTimeout(() -> {
      log.info("on timeout");
      emitter.complete();
    });
 
    /// 연결이 종료될 때(timeout 등으로) 해줘야 하는 작업
    emitter.onCompletion(() -> {
      log.info("on completion");
      /// connection 제거
      userConnection.remove(userSesssion.getUserId()
          .toString());
    });
 
    /// 최초 연결시 응답 전송
    var event = SseEmitter.event()
        .name("connect")
        .data("connected");
    try {
      emitter.send(event);
    } catch (IOException e) {
      emitter.completeWithError(e);
    }
 
    return emitter;
  }
 
  @GetMapping("/push-event")
  public void pushEvent(@Parameter(hidden = true) @AuthenticationPrincipal UserSession userSession) {
    /// 기존에 연결된 유저 찾기
    var emitter = userConnection.get(userSession.getUserId()
        .toString());
 
    var event = SseEmitter.event()
        .data("hello"); // onmessage
 
    try {
      emitter.send(event);
    } catch (IOException e) {
      emitter.completeWithError(e);
    }
  }
}

Sse Consumer 단순화

SseConnectPool 생성

sse connection 상태를 관리하기 위한 SseConnectPool 객체를 생성한다.

/domain/sse/connection/SseConnectPool
@Component
/// UserSseConnection은 매번 new로 생성되는 객체이고
/// SseConnectionPool은 Spring이 관리하는 하나만 존재하는 static 객체임
public class SseConnectionPool implements ConnectionPoolIfs<String, UserSseConnection> {
  private final Map<String, UserSseConnection> connectionPool = new ConcurrentHashMap<>();
 
  @Override
  public void addSession(String uniqueKey, UserSseConnection userSseConnection) {
    connectionPool.put(uniqueKey, userSseConnection);
  }
 
  @Override
  public UserSseConnection getSession(String uniqueKey) {
    return connectionPool.get(uniqueKey);
  }
 
  @Override
  public void onCompletionCallback(UserSseConnection session) {
    connectionPool.remove(session.getUniqueKey());
  }
}

SseConnection 생성

실제로 맺어져 있는 Connection에 해당하는 UserSseConnection 객체를 생성한다.

/domain/sse/connection/model/UserSseConnection
@Getter
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
public class UserSseConnection {
  private final String uniqueKey;
  private final SseEmitter sseEmitter;
  private final ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs;
  private final ObjectMapper objectMapper;
 
  public static UserSseConnection connect(
      String uniqueKey,
      ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs,
      ObjectMapper objectMapper
  ) {
    return new UserSseConnection(uniqueKey, connectionPoolIfs, objectMapper);
  }
 
  private UserSseConnection(
      String uniqueKey,
      ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs,
      ObjectMapper objectMapper
  ) {
    this.uniqueKey = uniqueKey;
    this.connectionPoolIfs = connectionPoolIfs;
    this.objectMapper = objectMapper;
 
    this.sseEmitter = new SseEmitter(60 * 1000L);
 
    this.sseEmitter.onCompletion(() -> {
      this.connectionPoolIfs.onCompletionCallback(this);
    });
 
    this.sseEmitter.onTimeout(() -> {
      this.sseEmitter.complete();
    });
 
    /// sse가 초기화되면 반드시 onopen 메시지를 처리해야함
    sendMessage("onopen", "connected");
  }
 
  public void sendMessage(String eventName, Object data) {
 
    try {
      var json = this.objectMapper.writeValueAsString(data);
      var event = SseEmitter.event()
          .name(eventName)
          .data(json);
      this.sseEmitter.send(event);
    } catch (IOException e) {
      this.sseEmitter.completeWithError(e);
    }
  }
 
  public void sendMessage(Object data) {
 
    try {
      var json = this.objectMapper.writeValueAsString(data);
      var event = SseEmitter.event()
          .data(json);
      this.sseEmitter.send(event);
    } catch (IOException e) {
      this.sseEmitter.completeWithError(e);
    }
  }
}

SseApiController

SseController에서 userConnection을 초기화하고 event를 수정하던 로직을 위에서 생성한 객체를 이용해서 단순화 한다.

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class SseApiController {
  /// Thread Safe한 자료구조
  private final SseConnectionPool sseConnectionPool;
  private final ObjectMapper objectMapper;
 
  @GetMapping(path = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public ResponseBodyEmitter connect(
      @Parameter(hidden = true) String userId,
      @AuthenticationPrincipal UserSession userSesssion
  ) {
    log.info("login user {}", userSesssion);
 
    /// userSseConnection 생성
    var userSseConnection = UserSseConnection.connect(userSesssion.getStoreId()
        .toString(), sseConnectionPool, objectMapper);
 
    /// connection pool에 session 추가 
    sseConnectionPool.addSession(userSseConnection.getUniqueKey(), userSseConnection);
 
    return userSseConnection.getSseEmitter();
  }
 
  @GetMapping("/push-event")
  public void pushEvent(@Parameter(hidden = true) @AuthenticationPrincipal UserSession userSession) {
    var userSseConnection = sseConnectionPool.getSession(userSession.getStoreId()
        .toString());
 
    Optional.ofNullable(userSseConnection)
        .ifPresent(it -> {
          it.sendMessage("hello world");
        });
  }
}

Producer의 Consumer 수정

Producer의 UserOrderConsumer 부분에서 주문이 들어오면 주문을 Message Queue에 넣게 되고
Consumer의 RabbitListener에서 이를 듣고 있다가 Message를 send해주게 수정해준다.

기존 코드 👇

@Component
@Slf4j
public class UserOrderConsumer {
  /// 그냥 듣고만 있는 코드 
  @RabbitListener(queues = "delivery.queue")
  public void consume(
      UserOrderMessage userOrderMessage
  ) {
    log.info("userOrderMessage : {}", userOrderMessage);
  }
}

수정한 코드 👇

@Component
@Slf4j
@RequiredArgsConstructor
public class UserOrderConsumer {
  private final UserOrderBusiness userOrderBusiness;
 
  @RabbitListener(queues = "delivery.queue")
  public void consume(
      UserOrderMessage userOrderMessage
  ) {
    log.info("userOrderMessage : {}", userOrderMessage);
 
    try {
      userOrderBusiness.pushUserOrder(userOrderMessage);
    } catch (Exception e) {
      log.error("consume error : {}", e.getMessage());
    }
  }
}