rabbitmq

Spring Boot EP 18:使用RabbitMQ,資料不漏接

前言

實際上以本系列的範例,是沒有Message Queue (以下簡稱MQ)的需求,但活用MQ其實是很重要的技能,MQ可以讓系統做到:

  1. 將MQ作為Buffer,讓系統慢慢消化不斷進來的資訊。
  2. 透過MQ來保證資料不會因為來不及處理而遺失。
  3. 非同步作業。
  4. 平行處理。
  5. …等等

本篇使用RabbitMQ (官方網站),以存取MQ作為範例,將每天收盤價放到MQ裡,並使用一個Restful API來取得Queue內的資料。

實作 – 將資料(每日收盤價)寫入MQ

步驟1:在pom.xml加入spring-boot-starter-amqp。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步驟2:在application.properties寫入RabbitMQ的設定參數。

# IP或fqdn
spring.rabbitmq.host=127.0.0.1
# 連接埠
spring.rabbitmq.port=5672
# 帳號
spring.rabbitmq.username=guest
# 密碼
spring.rabbitmq.password=guest

步驟3:在”Repositories/StockRepository.java”裡增加從資料庫取得收盤資料的程式。

// 從資料庫取得一段時間區間的收盤價,並用字串接起來
@Query(value = "SELECT CONCAT(STOCK_ID, '|', DATE_FORMAT(DATE_OF_TRADING, '%Y-%m-%d'), '|', CAST(CLOSE_PRICE AS CHAR)) FROM STOCK_MARKET.STOCKS_PRICE WHERE STOCK_ID=:STOCK_ID AND DATE_OF_TRADING BETWEEN :START_DATE AND :END_DATE ORDER BY DATE_OF_TRADING;", nativeQuery = true)
public List<String> getStockClosePriceWithDate(@Param("STOCK_ID") String STOCK_ID, @Param("START_DATE") String START_DATE, @Param("END_DATE") String END_DATE);

步驟4:在”Controllers/StockController.java”裡增加一個Controller。

@Autowired
// 注入RabbitTemplate
RabbitTemplate rabbitTemplate;

@GetMapping("stocks/price/{id}/{star}/{end}")
// 從資料庫取得一段時間的收盤資料,並寫入Queue中
public RspBody setStockPriceIntoMq(@PathVariable("id") String strId, @PathVariable("star") String dateStar, @PathVariable("end") String dateEnd) {

    // 取得指定股票代碼與區間的收盤價
    List<String> listPrice = stockRepository.getStockClosePriceWithDate(strId, dateStar, dateEnd);

    for (String price : listPrice) {
        // 寫入Queue
        rabbitTemplate.convertAndSend("stock_price", price);
    }

    return new RspBody("0000", "Success", "OK.");
}

步驟5:呼叫步驟4的API,將資料寫入MQ。

步驟6:可以從RabbitMQ的後台看到透過API將收盤價寫入的數量。

實作 – 取用MQ內的每日收盤價資料

步驟1:在”Controllers/StockController.java”裡增加一個Controller。

@GetMapping("stocks/price")
// 從Queue取得收盤價資料
public RspBody getStockPriceFromMq() {

    // 從名為stock_price的Queue取得資料(dequeue)
    Message message = rabbitTemplate.receive("stock_price");

    // 檢查是否有取得資料
    if (message != null) {
        // 將自Queue取得的資料(型態為Bytes)轉成String,方便輸出
        String strMessageBoby = new String(message.getBody(), StandardCharsets.UTF_8);
        return new RspBody("0000", "Success", strMessageBoby);
    } else {
        return new RspBody("0000", "Success", "Queue size = 0.");
    }
}

步驟2:透過步驟1建立的API取得MQ的資料。

步驟3:可以看到Queue裡面的長度減少了,原本20,Dequeue一次後剩下19。

~ END ~


, ,

Related posts

Latest posts