본문 바로가기

SpringBoot

스프링부트에서 RabbitMQ 메세지 큐로 사용

RabbitMQ란?

Erlang으로 작성된 RabbitMQ로 Erlang을 먼저 설치해야 한다.

도커에서 Eralng이 포함된 이미지로 컨테이너를 만들어주고 있어서 도커를 사용하면 간단히 사용 가능하다.

RabbitMQ 하나로 여러 개의 MSA를 관리할 수 있다.

Producer, Consumer 개념 

Producer는 메세지 큐에 publish(발행)해주는 역할을 한다.

Consumer는 큐에 적재된 메세지를 꺼내서 소비하는 역할을 한다.

 

대시보드에서는 publish를 통해 메세지큐에 메세제를 적재해서 확인할 수 있다.

대시보드에서는 getMessage 명령을 통해 꺼낼 수 있다

 

도커를 사용한 RabbitMQ

docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq

# 위 명령어로 실행후 추가로 
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management

대시보드는 15672, 코어부(엔진포트)를 5672로 둔다.

대시보드와 처리용 포트번호가 다를 때 포트를 2개 이상 주는데 포트번호 중 맨 앞이 1로 시작하는 것은  대시보드용으로 둡니다.

guest 계정으로 사용 방법

guest 계정에서 localhost:15672로 첫 접속 시 기본적으로 아이디와 비번은 guest입니다.

위 계정은 최초 admin계정에 해당하고, 추가로 계정을 생성하고 싶으면

로그인 후 상단 우측에 보면 아래 사진처럼  admin이라는 페이지가 있어서 그곳에서 새로운 계정을 만들 수 있습니다.

 

Queues and Streams 메뉴탭은 큐를 생성하여 관리하는 탭으로 여러 개의 메세지 큐를 사용할 수 있습니다.

아래 사진처럼 Add a new queue에서 큐를 생성할 수 있습니다.

 

사이트 내 Publish message는 큐에 메세지를 넣어줍니다.

 

사이트 내 Get message는 큐에서 메시지를 뽑아내는 것으로  Ack Mode : Reject requeue false로 설정을 바꿔주셔야 합니다.  왜냐하면, requeue true 기본 설정을 가지고 있으면 뺀 메세지를 다시 집어넣기 때문에 빈 메세지 큐를 확인할 수 없습니다.

큐는 순서대로 나오기 때문에 특정 메시지를 지정해서 뽑을 수 가 없습니다.

위 사진처럼 모든 메세지를 다 빼면 큐 내부가 비어있음을 확인해주는 창이 나옵니다.

 

스프링부트를 사용한 RabbitMQ

mvn에서 Spring Boot Starter AMQP 의존성 build.gradle에 추가합니다.

//spring-boot-starter-amqp
implementation 'org.springframework.boot:spring-boot-starter-amqp:3.1.2'

application.yml 설정 파일을 추가합니다.

// application.yml 파일 
spring:
  rabbitmq:
    host: ip주소
    username : 계정명
    password : 비번
    port: 포트번호(관리페이지말고) -> 5672

 

단일 Message : Producer 구현

package com.example.itemservice.util;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ItemProducer {
    // 멤버 변수로 RabbitTemplate을 생성
    private final RabbitTemplate rabbitTemplate;
    
    public void sendTestMessage(String message){

// 서버 내에서 어떤 큐로 보낼 지 세팅 진행:  rabbitTemplate.convertAndSend("대시보드에서 생성한 큐 이름", 보낼 페이지)//
        rabbitTemplate.convertAndSend("ITEM_CREATE_QUEUE","testMessage");
    }
}

 

itemService

package com.example.itemservice.service;

import com.example.itemservice.domain.Item;
import com.example.itemservice.domain.Order;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.feginClient.ItemtoOrderFeignClient;
import com.example.itemservice.repository.ItemRepository;
import com.example.itemservice.util.CustomFeignException;
import com.example.itemservice.util.ItemProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;

import java.util.List;
import java.util.Optional;

@Service
@RequiredArgsConstructor

public class ItemService {
    private final ItemRepository itemRepository;
    // producer 추가
    private final ItemProducer producer;

  public void publishTestMessage(String message){
        producer.sendTestMessage(message);
    }


}

 

itemController

package com.example.itemservice.controller;

import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("item-service")
public class ItemController {

    private final ItemService itemService;


    // 빈 초기화 확인 - Spring의 의존성 주입을 사용하는 경우 Environment 빈이 올바르게 초기화되었으며 주입 가능하도록 만들었는지 확인하세요.
    // private final로 하여 @RequiredArgsConstructor 사용
    private final Environment env;
    
      @GetMapping("pro-check")
    public String configProCheck(){
        return env.getProperty("pro.file");
    }


    // PathVariable을 이용해서 message를 큐에 적재할 수 있도록 엔드포인트를 직접 설정
    @GetMapping("items/{message}/message")
    public ResponseEntity<?> publishTestMessage(@PathVariable String message){
        itemService.publishTestMessage(message);
        return ResponseEntity.ok().build();
    }

}

 

포스트맨으로 작성한 API를 요청하면 대시보드에서 Payload에서 message가 꺼내지는 것을 볼 수 있습니다.

한글, 영어 모두 가능합니다.

 

 

 

단일 Message :Consumer 구현

Consumer.java

package com.example.itemservice.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ItemConsumer {

    private final ObjectMapper objectMapper;

// Consumer의 경우 메서드 요청 로직 없이 자동으로 진행됩니다.
    // @RabbitListener 어노테이션이 큐에 담긴 것을 인식합니다. 단 내부 로직을 실행하고 다음을 수행한 준비가 될 때만 동작함
    @RabbitListener(queues = "ITEM_CREATE_QUEUE")
    public void getTestMessage(String message){
        System.out.println("큐에서 뽑아낸 메세지:"+ message);

    }
}


자바 객체 : Producer 구현

이번에는 객체를 큐에 적재하겠습니다.

이때, objectMapper을 이용해 역직렬화 과정이 필요합니다.

objectMapper란, Jackson 라이브러리의 ObjectMapper 클래스입니다. JSON 컨텐츠를 Java 객체로 deserialization 하거나 Java 객체를 JSON으로 serialization하는 역할을 합니다.

 

- itemProducer  코드

package com.example.itemservice.util;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ItemProducer {
    // 멤버 변수로 RabbitTemplate을 생성
    private final RabbitTemplate rabbitTemplate;
    
    // 전송한 message가 큐에 적재 
    // java object를 String으로
    public void sendCreateItemMessage(String message){
    
	// 서버 내에서 어떤 큐로 보낼 지 세팅 진행:  rabbitTemplate.convertAndSend("대시보드에서 생성한 큐 이름", 보낼 페이지)
   
        rabbitTemplate.convertAndSend("ITEM_CREATE_QUEUE",message);
    }
}

 

- itemService 코드

package com.example.itemservice.service;

import com.example.itemservice.domain.Item;
import com.example.itemservice.domain.Order;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.feginClient.ItemtoOrderFeignClient;
import com.example.itemservice.repository.ItemRepository;
import com.example.itemservice.util.CustomFeignException;
import com.example.itemservice.util.ItemProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;

import java.util.List;
import java.util.Optional;

@Service
@RequiredArgsConstructor
public class ItemService {
    private final ItemRepository itemRepository;
    private final ItemtoOrderFeignClient orderFeignClient;
    private final ItemProducer producer;

    // 직렬화와 역직렬화를 담당하는 라이브러리
    private final ObjectMapper objectMapper;

    public void createItem(RequestCreateItemDto ItemDto){
        Item saveItem = ItemDto.toEntity();
        itemRepository.save(saveItem);
    }

    public void publishCreateItemMessage(RequestCreateItemDto itemDto) throws JsonProcessingException {
        // DTO를 json(String)으로 직렬화
        String message = "";
        message = objectMapper.writeValueAsString(itemDto);
        producer.sendCreateItemMessage(message);
    }
}

 

- itemController 코드

package com.example.itemservice.controller;

import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("item-service")
public class ItemController {

    private final ItemService itemService;


    // 빈 초기화 확인 - Spring의 의존성 주입을 사용하는 경우 Environment 빈이 올바르게 초기화되었으며 주입 가능하도록 만들었는지 확인하세요.
    // private final로 하여 @RequiredArgsConstructor 사용
    private final Environment env;

    @GetMapping("health-check")
    public String healthCheck(){
        return "item-service service is available";
    }

    // 등록 성공시 201
    @PostMapping("items")
    public ResponseEntity<?> CreateItem(@RequestBody RequestCreateItemDto ItemDto) throws JsonProcessingException {
        // 서비스에 직접적인 처리 로직을 보내지 않고 적절한 요청인 지 확인, 메세지큐로 보내는 역할만 합니다.
        itemService.publishCreateItemMessage(ItemDto);
        // 메세지 큐에서 서비스로 전달하는 로직을 Cunsumer가 직접적으로 합니다.

        return ResponseEntity.ok("메세지 생성 요청 검증 완료");
    }
}

포스트맨으로 json 형식으로 post요청을 받습니다
자바 객체를 String으로 직렬화하여 저장됨을 볼 수 있습니다.

자바 객체 : Cusmuer구현 

내부적인 호출 없이 producer로 큐에 저장된 값을 자동으로 꺼내줍니다.

package com.example.itemservice.util;

import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ItemConsumer {

    private final ObjectMapper objectMapper;
    private final ItemService itemService;

    // 어노테이션이 큐에 담긴 것을 인식합니다. 단 내부 로직을 실행하고 다음을 수행한 준비가 될 때만 동작
    // json-> 엔티티
    @RabbitListener(queues = "ITEM_CREATE_QUEUE")
    public void createItem(String message) throws JsonProcessingException { // 큐에서 뽑아낸 message
        // objectMapper.readValue("String형식인 JSON", 목적객체.class == DTO.class);
        RequestCreateItemDto dto = objectMapper.readValue(message, RequestCreateItemDto.class);
        // service단에서 DTO를 입력받아 DB에 INSERT해주는 로직을 호출
        itemService.createItem(dto);
    }
}

 

=