Spring Boot EP 18:使用RabbitMQ,資料不漏接
前言
實際上以本系列的範例,是沒有Message Queue (以下簡稱MQ)的需求,但活用MQ其實是很重要的技能,MQ可以讓系統做到:
- 將MQ作為Buffer,讓系統慢慢消化不斷進來的資訊。
- 透過MQ來保證資料不會因為來不及處理而遺失。
- 非同步作業。
- 平行處理。
- …等等
本篇使用RabbitMQ (官方網站),以存取MQ作為範例,將每天收盤價放到MQ裡,並使用一個Restful API來取得Queue內的資料。
實作 – 將資料(每日收盤價)寫入MQ
步驟1:在pom.xml加入spring-boot-starter-amqp。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步驟2:在application.properties寫入RabbitMQ的設定參數。
# IP或fqdn
spring.rabbitmq.host=127.0.0.1
# 連接埠
spring.rabbitmq.port=5672
# 帳號
spring.rabbitmq.username=guest
# 密碼
spring.rabbitmq.password=guest
步驟3:在”Repositories/StockRepository.java”裡增加從資料庫取得收盤資料的程式。
// 從資料庫取得一段時間區間的收盤價,並用字串接起來
@Query(value = "SELECT CONCAT(STOCK_ID, '|', DATE_FORMAT(DATE_OF_TRADING, '%Y-%m-%d'), '|', CAST(CLOSE_PRICE AS CHAR)) FROM STOCK_MARKET.STOCKS_PRICE WHERE STOCK_ID=:STOCK_ID AND DATE_OF_TRADING BETWEEN :START_DATE AND :END_DATE ORDER BY DATE_OF_TRADING;", nativeQuery = true)
public List<String> getStockClosePriceWithDate(@Param("STOCK_ID") String STOCK_ID, @Param("START_DATE") String START_DATE, @Param("END_DATE") String END_DATE);
步驟4:在”Controllers/StockController.java”裡增加一個Controller。
@Autowired
// 注入RabbitTemplate
RabbitTemplate rabbitTemplate;
@GetMapping("stocks/price/{id}/{star}/{end}")
// 從資料庫取得一段時間的收盤資料,並寫入Queue中
public RspBody setStockPriceIntoMq(@PathVariable("id") String strId, @PathVariable("star") String dateStar, @PathVariable("end") String dateEnd) {
// 取得指定股票代碼與區間的收盤價
List<String> listPrice = stockRepository.getStockClosePriceWithDate(strId, dateStar, dateEnd);
for (String price : listPrice) {
// 寫入Queue
rabbitTemplate.convertAndSend("stock_price", price);
}
return new RspBody("0000", "Success", "OK.");
}
步驟5:呼叫步驟4的API,將資料寫入MQ。
步驟6:可以從RabbitMQ的後台看到透過API將收盤價寫入的數量。
實作 – 取用MQ內的每日收盤價資料
步驟1:在”Controllers/StockController.java”裡增加一個Controller。
@GetMapping("stocks/price")
// 從Queue取得收盤價資料
public RspBody getStockPriceFromMq() {
// 從名為stock_price的Queue取得資料(dequeue)
Message message = rabbitTemplate.receive("stock_price");
// 檢查是否有取得資料
if (message != null) {
// 將自Queue取得的資料(型態為Bytes)轉成String,方便輸出
String strMessageBoby = new String(message.getBody(), StandardCharsets.UTF_8);
return new RspBody("0000", "Success", strMessageBoby);
} else {
return new RspBody("0000", "Success", "Queue size = 0.");
}
}
步驟2:透過步驟1建立的API取得MQ的資料。
步驟3:可以看到Queue裡面的長度減少了,原本20,Dequeue一次後剩下19。
~ END ~