RabbitMQ란?
Erlang으로 작성된 RabbitMQ로 Erlang을 먼저 설치해야 한다.
도커에서 Eralng이 포함된 이미지로 컨테이너를 만들어주고 있어서 도커를 사용하면 간단히 사용 가능하다.
RabbitMQ 하나로 여러 개의 MSA를 관리할 수 있다.
Producer, Consumer 개념
Producer는 메세지 큐에 publish(발행)해주는 역할을 한다.
Consumer는 큐에 적재된 메세지를 꺼내서 소비하는 역할을 한다.
대시보드에서는 publish를 통해 메세지큐에 메세제를 적재해서 확인할 수 있다.
대시보드에서는 getMessage 명령을 통해 꺼낼 수 있다
도커를 사용한 RabbitMQ
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq
# 위 명령어로 실행후 추가로
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
대시보드는 15672, 코어부(엔진포트)를 5672로 둔다.
guest 계정으로 사용 방법
guest 계정에서 localhost:15672로 첫 접속 시 기본적으로 아이디와 비번은 guest입니다.
위 계정은 최초 admin계정에 해당하고, 추가로 계정을 생성하고 싶으면
로그인 후 상단 우측에 보면 아래 사진처럼 admin이라는 페이지가 있어서 그곳에서 새로운 계정을 만들 수 있습니다.
Queues and Streams 메뉴탭은 큐를 생성하여 관리하는 탭으로 여러 개의 메세지 큐를 사용할 수 있습니다.
아래 사진처럼 Add a new queue에서 큐를 생성할 수 있습니다.
사이트 내 Publish message는 큐에 메세지를 넣어줍니다.
사이트 내 Get message는 큐에서 메시지를 뽑아내는 것으로 Ack Mode : Reject requeue false로 설정을 바꿔주셔야 합니다. 왜냐하면, requeue true 기본 설정을 가지고 있으면 뺀 메세지를 다시 집어넣기 때문에 빈 메세지 큐를 확인할 수 없습니다.
큐는 순서대로 나오기 때문에 특정 메시지를 지정해서 뽑을 수 가 없습니다.
위 사진처럼 모든 메세지를 다 빼면 큐 내부가 비어있음을 확인해주는 창이 나옵니다.
스프링부트를 사용한 RabbitMQ
mvn에서 Spring Boot Starter AMQP 의존성 build.gradle에 추가합니다.
//spring-boot-starter-amqp
implementation 'org.springframework.boot:spring-boot-starter-amqp:3.1.2'
application.yml 설정 파일을 추가합니다.
// application.yml 파일
spring:
rabbitmq:
host: ip주소
username : 계정명
password : 비번
port: 포트번호(관리페이지말고) -> 5672
단일 Message : Producer 구현
package com.example.itemservice.util;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ItemProducer {
// 멤버 변수로 RabbitTemplate을 생성
private final RabbitTemplate rabbitTemplate;
public void sendTestMessage(String message){
// 서버 내에서 어떤 큐로 보낼 지 세팅 진행: rabbitTemplate.convertAndSend("대시보드에서 생성한 큐 이름", 보낼 페이지)//
rabbitTemplate.convertAndSend("ITEM_CREATE_QUEUE","testMessage");
}
}
itemService
package com.example.itemservice.service;
import com.example.itemservice.domain.Item;
import com.example.itemservice.domain.Order;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.feginClient.ItemtoOrderFeignClient;
import com.example.itemservice.repository.ItemRepository;
import com.example.itemservice.util.CustomFeignException;
import com.example.itemservice.util.ItemProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;
import java.util.List;
import java.util.Optional;
@Service
@RequiredArgsConstructor
public class ItemService {
private final ItemRepository itemRepository;
// producer 추가
private final ItemProducer producer;
public void publishTestMessage(String message){
producer.sendTestMessage(message);
}
}
itemController
package com.example.itemservice.controller;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequiredArgsConstructor
@RequestMapping("item-service")
public class ItemController {
private final ItemService itemService;
// 빈 초기화 확인 - Spring의 의존성 주입을 사용하는 경우 Environment 빈이 올바르게 초기화되었으며 주입 가능하도록 만들었는지 확인하세요.
// private final로 하여 @RequiredArgsConstructor 사용
private final Environment env;
@GetMapping("pro-check")
public String configProCheck(){
return env.getProperty("pro.file");
}
// PathVariable을 이용해서 message를 큐에 적재할 수 있도록 엔드포인트를 직접 설정
@GetMapping("items/{message}/message")
public ResponseEntity<?> publishTestMessage(@PathVariable String message){
itemService.publishTestMessage(message);
return ResponseEntity.ok().build();
}
}
포스트맨으로 작성한 API를 요청하면 대시보드에서 Payload에서 message가 꺼내지는 것을 볼 수 있습니다.
한글, 영어 모두 가능합니다.
단일 Message :Consumer 구현
Consumer.java
package com.example.itemservice.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ItemConsumer {
private final ObjectMapper objectMapper;
// Consumer의 경우 메서드 요청 로직 없이 자동으로 진행됩니다.
// @RabbitListener 어노테이션이 큐에 담긴 것을 인식합니다. 단 내부 로직을 실행하고 다음을 수행한 준비가 될 때만 동작함
@RabbitListener(queues = "ITEM_CREATE_QUEUE")
public void getTestMessage(String message){
System.out.println("큐에서 뽑아낸 메세지:"+ message);
}
}
자바 객체 : Producer 구현
이번에는 객체를 큐에 적재하겠습니다.
이때, objectMapper을 이용해 역직렬화 과정이 필요합니다.
objectMapper란, Jackson 라이브러리의 ObjectMapper 클래스입니다. JSON 컨텐츠를 Java 객체로 deserialization 하거나 Java 객체를 JSON으로 serialization하는 역할을 합니다.
- itemProducer 코드
package com.example.itemservice.util;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ItemProducer {
// 멤버 변수로 RabbitTemplate을 생성
private final RabbitTemplate rabbitTemplate;
// 전송한 message가 큐에 적재
// java object를 String으로
public void sendCreateItemMessage(String message){
// 서버 내에서 어떤 큐로 보낼 지 세팅 진행: rabbitTemplate.convertAndSend("대시보드에서 생성한 큐 이름", 보낼 페이지)
rabbitTemplate.convertAndSend("ITEM_CREATE_QUEUE",message);
}
}
- itemService 코드
package com.example.itemservice.service;
import com.example.itemservice.domain.Item;
import com.example.itemservice.domain.Order;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.feginClient.ItemtoOrderFeignClient;
import com.example.itemservice.repository.ItemRepository;
import com.example.itemservice.util.CustomFeignException;
import com.example.itemservice.util.ItemProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;
import java.util.List;
import java.util.Optional;
@Service
@RequiredArgsConstructor
public class ItemService {
private final ItemRepository itemRepository;
private final ItemtoOrderFeignClient orderFeignClient;
private final ItemProducer producer;
// 직렬화와 역직렬화를 담당하는 라이브러리
private final ObjectMapper objectMapper;
public void createItem(RequestCreateItemDto ItemDto){
Item saveItem = ItemDto.toEntity();
itemRepository.save(saveItem);
}
public void publishCreateItemMessage(RequestCreateItemDto itemDto) throws JsonProcessingException {
// DTO를 json(String)으로 직렬화
String message = "";
message = objectMapper.writeValueAsString(itemDto);
producer.sendCreateItemMessage(message);
}
}
- itemController 코드
package com.example.itemservice.controller;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.dto.ResponseOrderByItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequiredArgsConstructor
@RequestMapping("item-service")
public class ItemController {
private final ItemService itemService;
// 빈 초기화 확인 - Spring의 의존성 주입을 사용하는 경우 Environment 빈이 올바르게 초기화되었으며 주입 가능하도록 만들었는지 확인하세요.
// private final로 하여 @RequiredArgsConstructor 사용
private final Environment env;
@GetMapping("health-check")
public String healthCheck(){
return "item-service service is available";
}
// 등록 성공시 201
@PostMapping("items")
public ResponseEntity<?> CreateItem(@RequestBody RequestCreateItemDto ItemDto) throws JsonProcessingException {
// 서비스에 직접적인 처리 로직을 보내지 않고 적절한 요청인 지 확인, 메세지큐로 보내는 역할만 합니다.
itemService.publishCreateItemMessage(ItemDto);
// 메세지 큐에서 서비스로 전달하는 로직을 Cunsumer가 직접적으로 합니다.
return ResponseEntity.ok("메세지 생성 요청 검증 완료");
}
}
자바 객체 : Cusmuer구현
내부적인 호출 없이 producer로 큐에 저장된 값을 자동으로 꺼내줍니다.
package com.example.itemservice.util;
import com.example.itemservice.dto.RequestCreateItemDto;
import com.example.itemservice.service.ItemService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ItemConsumer {
private final ObjectMapper objectMapper;
private final ItemService itemService;
// 어노테이션이 큐에 담긴 것을 인식합니다. 단 내부 로직을 실행하고 다음을 수행한 준비가 될 때만 동작
// json-> 엔티티
@RabbitListener(queues = "ITEM_CREATE_QUEUE")
public void createItem(String message) throws JsonProcessingException { // 큐에서 뽑아낸 message
// objectMapper.readValue("String형식인 JSON", 목적객체.class == DTO.class);
RequestCreateItemDto dto = objectMapper.readValue(message, RequestCreateItemDto.class);
// service단에서 DTO를 입력받아 DB에 INSERT해주는 로직을 호출
itemService.createItem(dto);
}
}
'SpringBoot' 카테고리의 다른 글
카프카를 활용한 메세지 큐 (0) | 2024.04.02 |
---|---|
artillery 스트레스 테스트 (0) | 2024.02.29 |
스프링 시큐리티 5.X에서 6버전으로 변경 (0) | 2023.12.31 |
메세지큐 톺아보기 (2) | 2023.12.07 |
프로젝트 설정 : DB 연동 및 민감한 정보 마스킹 (0) | 2023.10.25 |