'使用Redis實現延時任務'

Redis MySQL PowerPoint 設計 Java架構師CAT 2019-08-25
"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

使用Redis實現延時任務

否決的候選方案實現思路

下面介紹一下其它四個不選用的候選方案,結合一些偽代碼和流程分析一下實現過程。

JDK內置延遲隊列

DelayQueue是一個阻塞隊列的實現,它的隊列元素必須是Delayed的子類,這裡做個簡單的例子:

public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默認延遲5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延遲6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延遲10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("開始執行調度線程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 默認延遲5000毫秒
*/
private static final long DELAY_MS = 1000L * 5;
/**
* 訂單ID
*/
private final String orderId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述
*/
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}

注意一下,OrderMessage實現Delayed接口,關鍵是需要實現Delayed#getDelay()和Delayed#compareTo()。運行一下main()方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執行調度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調度框架 + MySQL

使用調度框架對MySQL表進行短間隔輪詢是實現難度比較低的方案,通常服務剛上線,表數據不多並且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

  • 注意輪詢間隔不能太短,否則會對MySQL實例產生影響。
  • 注意每次查詢的數量,結果集數量太多有可能會導致調度阻塞和佔用應用大量內存,從而影響時效性。
  • 注意要設計狀態值和最大重試次數,這樣才能儘量避免大量數據積壓和重複查詢的問題。
  • 最好通過時間列做索引,查詢指定時間範圍內的數據。

引入Quartz、MySQL的Java驅動包和spring-boot-starter-jdbc(這裡只是為了方便用相對輕量級的框架實現,生產中可以按場景按需選擇其他更合理的框架):

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(50) NOT NULL COMMENT '訂單ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建日期時間',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重試次數',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '訂單狀態',
INDEX idx_order_id (order_id),
INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';
# 寫入兩條測試數據
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {
public static final int MAX_RETRY_TIMES = 5;
public static final int PENDING = 0;
public static final int SUCCESS = 1;
public static final int FAIL = -1;
public static final int LIMIT = 10;
}
// 實體
@Builder
@Data
public class OrderMessage {
private Long id;
private String orderId;
private LocalDateTime createTime;
private LocalDateTime editTime;
private Integer retryTimes;
private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
List<OrderMessage> list = Lists.newArrayList();
while (r.next()) {
list.add(OrderMessage.builder()
.id(r.getLong("id"))
.orderId(r.getString("order_id"))
.createTime(r.getTimestamp("create_time").toLocalDateTime())
.editTime(r.getTimestamp("edit_time").toLocalDateTime())
.retryTimes(r.getInt("retry_times"))
.orderStatus(r.getInt("order_status"))
.build());
}
return list;
};
public List<OrderMessage> selectPendingRecords(LocalDateTime start,
LocalDateTime end,
List<Integer> statusList,
int maxRetryTimes,
int limit) {
StringJoiner joiner = new StringJoiner(",");
statusList.forEach(s -> joiner.add(String.valueOf(s)));
return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
"AND order_status IN (?) AND retry_times < ? LIMIT ?",
p -> {
p.setTimestamp(1, Timestamp.valueOf(start));
p.setTimestamp(2, Timestamp.valueOf(end));
p.setString(3, joiner.toString());
p.setInt(4, maxRetryTimes);
p.setInt(5, limit);
}, M);
}
public int updateOrderStatus(Long id, int status) {
return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
p -> {
p.setInt(1, status);
p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
p.setLong(3, id);
});
}
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
private final OrderMessageDao orderMessageDao;
private static final List<Integer> STATUS = Lists.newArrayList();
static {
STATUS.add(OrderConstants.PENDING);
STATUS.add(OrderConstants.FAIL);
}
public void executeDelayJob() {
LOGGER.info("訂單處理定時任務開始執行......");
LocalDateTime end = LocalDateTime.now();
// 一天前
LocalDateTime start = end.minusDays(1);
List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
if (!list.isEmpty()) {
for (OrderMessage m : list) {
LOGGER.info("處理訂單[{}],狀態由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
// 這裡其實可以優化為批量更新
orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
}
}
LOGGER.info("訂單處理定時任務開始完畢......");
}
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
service.executeDelayJob();
}
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
OrderMessageService service = new OrderMessageService(orderMessageDao);
// 內存模式的調度器
StdSchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
// 這裡沒有用到IOC容器,直接用Quartz數據集合傳遞服務引用
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("orderMessageService", service);
// 新建Job
JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
.withIdentity("orderMessageDelayJob", "delayJob")
.usingJobData(jobDataMap)
.build();
// 新建觸發器,10秒執行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("orderMessageDelayTrigger", "delayJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// 啟動調度器
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

這個例子裡面用了create_time做輪詢,實際上可以添加一個調度時間schedule_time列做輪詢,這樣子才能更容易定製空閒時和忙碌時候的調度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用RabbitMQ死信隊列依賴於RabbitMQ的兩個特性:TTL和DLX。

  • TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
  • DLX:Dead Letter Exchange,死信交換器。

畫個圖描述一下這兩個特性:

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

使用Redis實現延時任務

否決的候選方案實現思路

下面介紹一下其它四個不選用的候選方案,結合一些偽代碼和流程分析一下實現過程。

JDK內置延遲隊列

DelayQueue是一個阻塞隊列的實現,它的隊列元素必須是Delayed的子類,這裡做個簡單的例子:

public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默認延遲5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延遲6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延遲10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("開始執行調度線程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 默認延遲5000毫秒
*/
private static final long DELAY_MS = 1000L * 5;
/**
* 訂單ID
*/
private final String orderId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述
*/
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}

注意一下,OrderMessage實現Delayed接口,關鍵是需要實現Delayed#getDelay()和Delayed#compareTo()。運行一下main()方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執行調度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調度框架 + MySQL

使用調度框架對MySQL表進行短間隔輪詢是實現難度比較低的方案,通常服務剛上線,表數據不多並且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

  • 注意輪詢間隔不能太短,否則會對MySQL實例產生影響。
  • 注意每次查詢的數量,結果集數量太多有可能會導致調度阻塞和佔用應用大量內存,從而影響時效性。
  • 注意要設計狀態值和最大重試次數,這樣才能儘量避免大量數據積壓和重複查詢的問題。
  • 最好通過時間列做索引,查詢指定時間範圍內的數據。

引入Quartz、MySQL的Java驅動包和spring-boot-starter-jdbc(這裡只是為了方便用相對輕量級的框架實現,生產中可以按場景按需選擇其他更合理的框架):

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(50) NOT NULL COMMENT '訂單ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建日期時間',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重試次數',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '訂單狀態',
INDEX idx_order_id (order_id),
INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';
# 寫入兩條測試數據
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {
public static final int MAX_RETRY_TIMES = 5;
public static final int PENDING = 0;
public static final int SUCCESS = 1;
public static final int FAIL = -1;
public static final int LIMIT = 10;
}
// 實體
@Builder
@Data
public class OrderMessage {
private Long id;
private String orderId;
private LocalDateTime createTime;
private LocalDateTime editTime;
private Integer retryTimes;
private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
List<OrderMessage> list = Lists.newArrayList();
while (r.next()) {
list.add(OrderMessage.builder()
.id(r.getLong("id"))
.orderId(r.getString("order_id"))
.createTime(r.getTimestamp("create_time").toLocalDateTime())
.editTime(r.getTimestamp("edit_time").toLocalDateTime())
.retryTimes(r.getInt("retry_times"))
.orderStatus(r.getInt("order_status"))
.build());
}
return list;
};
public List<OrderMessage> selectPendingRecords(LocalDateTime start,
LocalDateTime end,
List<Integer> statusList,
int maxRetryTimes,
int limit) {
StringJoiner joiner = new StringJoiner(",");
statusList.forEach(s -> joiner.add(String.valueOf(s)));
return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
"AND order_status IN (?) AND retry_times < ? LIMIT ?",
p -> {
p.setTimestamp(1, Timestamp.valueOf(start));
p.setTimestamp(2, Timestamp.valueOf(end));
p.setString(3, joiner.toString());
p.setInt(4, maxRetryTimes);
p.setInt(5, limit);
}, M);
}
public int updateOrderStatus(Long id, int status) {
return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
p -> {
p.setInt(1, status);
p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
p.setLong(3, id);
});
}
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
private final OrderMessageDao orderMessageDao;
private static final List<Integer> STATUS = Lists.newArrayList();
static {
STATUS.add(OrderConstants.PENDING);
STATUS.add(OrderConstants.FAIL);
}
public void executeDelayJob() {
LOGGER.info("訂單處理定時任務開始執行......");
LocalDateTime end = LocalDateTime.now();
// 一天前
LocalDateTime start = end.minusDays(1);
List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
if (!list.isEmpty()) {
for (OrderMessage m : list) {
LOGGER.info("處理訂單[{}],狀態由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
// 這裡其實可以優化為批量更新
orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
}
}
LOGGER.info("訂單處理定時任務開始完畢......");
}
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
service.executeDelayJob();
}
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
OrderMessageService service = new OrderMessageService(orderMessageDao);
// 內存模式的調度器
StdSchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
// 這裡沒有用到IOC容器,直接用Quartz數據集合傳遞服務引用
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("orderMessageService", service);
// 新建Job
JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
.withIdentity("orderMessageDelayJob", "delayJob")
.usingJobData(jobDataMap)
.build();
// 新建觸發器,10秒執行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("orderMessageDelayTrigger", "delayJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// 啟動調度器
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

這個例子裡面用了create_time做輪詢,實際上可以添加一個調度時間schedule_time列做輪詢,這樣子才能更容易定製空閒時和忙碌時候的調度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用RabbitMQ死信隊列依賴於RabbitMQ的兩個特性:TTL和DLX。

  • TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
  • DLX:Dead Letter Exchange,死信交換器。

畫個圖描述一下這兩個特性:

使用Redis實現延時任務

下面為了簡單起見,TTL使用了針對隊列的維度。引入RabbitMQ的Java驅動:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
<scope>test</scope>
</dependency>

代碼如下:

public class DlxMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel producerChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// dlx交換器名稱為dlx.exchange,類型是direct,綁定鍵為dlx.key,隊列名為dlx.queue
producerChannel.exchangeDeclare("dlx.exchange", "direct");
producerChannel.queueDeclare("dlx.queue", false, false, false, null);
producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
Map<String, Object> queueArgs = new HashMap<>();
// 設置隊列消息過期時間,5秒
queueArgs.put("x-message-ttl", 5000);
// 指定DLX相關參數
queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
queueArgs.put("x-dead-letter-routing-key", "dlx.key");
// 聲明業務隊列
producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("DlxConsumer");
return thread;
});
// 啟動消費者
executorService.execute(() -> {
try {
consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
OrderMessage message = new OrderMessage("10086");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10087");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10088");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
Thread.sleep(Integer.MAX_VALUE);
}
private static class DlxConsumer extends DefaultConsumer {
DlxConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
LOGGER.info("處理消息成功:{}", new String(body, StandardCharsets.UTF_8));
}
}
private static class OrderMessage {
private final String orderId;
private final long timestamp;
private final String description;
OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.description = String.format("訂單[%s],訂單創建時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public String getDescription() {
return description;
}
}
}

運行main()方法結果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10086],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10087],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10088],訂單創建時間為:2019-08-20 16:35:58

時間輪

時間輪TimingWheel是一種高效、低延遲的調度數據結構,底層採用數組實現存儲任務列表的環形隊列,示意圖如下:

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

使用Redis實現延時任務

否決的候選方案實現思路

下面介紹一下其它四個不選用的候選方案,結合一些偽代碼和流程分析一下實現過程。

JDK內置延遲隊列

DelayQueue是一個阻塞隊列的實現,它的隊列元素必須是Delayed的子類,這裡做個簡單的例子:

public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默認延遲5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延遲6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延遲10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("開始執行調度線程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 默認延遲5000毫秒
*/
private static final long DELAY_MS = 1000L * 5;
/**
* 訂單ID
*/
private final String orderId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述
*/
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}

注意一下,OrderMessage實現Delayed接口,關鍵是需要實現Delayed#getDelay()和Delayed#compareTo()。運行一下main()方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執行調度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調度框架 + MySQL

使用調度框架對MySQL表進行短間隔輪詢是實現難度比較低的方案,通常服務剛上線,表數據不多並且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

  • 注意輪詢間隔不能太短,否則會對MySQL實例產生影響。
  • 注意每次查詢的數量,結果集數量太多有可能會導致調度阻塞和佔用應用大量內存,從而影響時效性。
  • 注意要設計狀態值和最大重試次數,這樣才能儘量避免大量數據積壓和重複查詢的問題。
  • 最好通過時間列做索引,查詢指定時間範圍內的數據。

引入Quartz、MySQL的Java驅動包和spring-boot-starter-jdbc(這裡只是為了方便用相對輕量級的框架實現,生產中可以按場景按需選擇其他更合理的框架):

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(50) NOT NULL COMMENT '訂單ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建日期時間',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重試次數',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '訂單狀態',
INDEX idx_order_id (order_id),
INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';
# 寫入兩條測試數據
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {
public static final int MAX_RETRY_TIMES = 5;
public static final int PENDING = 0;
public static final int SUCCESS = 1;
public static final int FAIL = -1;
public static final int LIMIT = 10;
}
// 實體
@Builder
@Data
public class OrderMessage {
private Long id;
private String orderId;
private LocalDateTime createTime;
private LocalDateTime editTime;
private Integer retryTimes;
private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
List<OrderMessage> list = Lists.newArrayList();
while (r.next()) {
list.add(OrderMessage.builder()
.id(r.getLong("id"))
.orderId(r.getString("order_id"))
.createTime(r.getTimestamp("create_time").toLocalDateTime())
.editTime(r.getTimestamp("edit_time").toLocalDateTime())
.retryTimes(r.getInt("retry_times"))
.orderStatus(r.getInt("order_status"))
.build());
}
return list;
};
public List<OrderMessage> selectPendingRecords(LocalDateTime start,
LocalDateTime end,
List<Integer> statusList,
int maxRetryTimes,
int limit) {
StringJoiner joiner = new StringJoiner(",");
statusList.forEach(s -> joiner.add(String.valueOf(s)));
return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
"AND order_status IN (?) AND retry_times < ? LIMIT ?",
p -> {
p.setTimestamp(1, Timestamp.valueOf(start));
p.setTimestamp(2, Timestamp.valueOf(end));
p.setString(3, joiner.toString());
p.setInt(4, maxRetryTimes);
p.setInt(5, limit);
}, M);
}
public int updateOrderStatus(Long id, int status) {
return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
p -> {
p.setInt(1, status);
p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
p.setLong(3, id);
});
}
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
private final OrderMessageDao orderMessageDao;
private static final List<Integer> STATUS = Lists.newArrayList();
static {
STATUS.add(OrderConstants.PENDING);
STATUS.add(OrderConstants.FAIL);
}
public void executeDelayJob() {
LOGGER.info("訂單處理定時任務開始執行......");
LocalDateTime end = LocalDateTime.now();
// 一天前
LocalDateTime start = end.minusDays(1);
List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
if (!list.isEmpty()) {
for (OrderMessage m : list) {
LOGGER.info("處理訂單[{}],狀態由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
// 這裡其實可以優化為批量更新
orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
}
}
LOGGER.info("訂單處理定時任務開始完畢......");
}
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
service.executeDelayJob();
}
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
OrderMessageService service = new OrderMessageService(orderMessageDao);
// 內存模式的調度器
StdSchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
// 這裡沒有用到IOC容器,直接用Quartz數據集合傳遞服務引用
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("orderMessageService", service);
// 新建Job
JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
.withIdentity("orderMessageDelayJob", "delayJob")
.usingJobData(jobDataMap)
.build();
// 新建觸發器,10秒執行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("orderMessageDelayTrigger", "delayJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// 啟動調度器
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

這個例子裡面用了create_time做輪詢,實際上可以添加一個調度時間schedule_time列做輪詢,這樣子才能更容易定製空閒時和忙碌時候的調度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用RabbitMQ死信隊列依賴於RabbitMQ的兩個特性:TTL和DLX。

  • TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
  • DLX:Dead Letter Exchange,死信交換器。

畫個圖描述一下這兩個特性:

使用Redis實現延時任務

下面為了簡單起見,TTL使用了針對隊列的維度。引入RabbitMQ的Java驅動:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
<scope>test</scope>
</dependency>

代碼如下:

public class DlxMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel producerChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// dlx交換器名稱為dlx.exchange,類型是direct,綁定鍵為dlx.key,隊列名為dlx.queue
producerChannel.exchangeDeclare("dlx.exchange", "direct");
producerChannel.queueDeclare("dlx.queue", false, false, false, null);
producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
Map<String, Object> queueArgs = new HashMap<>();
// 設置隊列消息過期時間,5秒
queueArgs.put("x-message-ttl", 5000);
// 指定DLX相關參數
queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
queueArgs.put("x-dead-letter-routing-key", "dlx.key");
// 聲明業務隊列
producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("DlxConsumer");
return thread;
});
// 啟動消費者
executorService.execute(() -> {
try {
consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
OrderMessage message = new OrderMessage("10086");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10087");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10088");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
Thread.sleep(Integer.MAX_VALUE);
}
private static class DlxConsumer extends DefaultConsumer {
DlxConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
LOGGER.info("處理消息成功:{}", new String(body, StandardCharsets.UTF_8));
}
}
private static class OrderMessage {
private final String orderId;
private final long timestamp;
private final String description;
OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.description = String.format("訂單[%s],訂單創建時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public String getDescription() {
return description;
}
}
}

運行main()方法結果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10086],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10087],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10088],訂單創建時間為:2019-08-20 16:35:58

時間輪

時間輪TimingWheel是一種高效、低延遲的調度數據結構,底層採用數組實現存儲任務列表的環形隊列,示意圖如下:

使用Redis實現延時任務

這裡暫時不對時間輪和其實現作分析,只簡單舉例說明怎麼使用時間輪實現延時任務。這裡使用Netty提供的HashedWheelTimer,引入依賴:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.39.Final</version>
</dependency>

代碼如下:

public class HashedWheelTimerMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
AtomicInteger counter = new AtomicInteger();
ThreadFactory factory = r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
return thread;
};
// tickDuration - 每tick一次的時間間隔, 每tick一次就會到達下一個槽位
// unit - tickDuration的時間單位
// ticksPerWhee - 時間輪中的槽位數
Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
TimerTask timerTask = new DefaultTimerTask("10086");
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10087");
timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10088");
timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
Thread.sleep(Integer.MAX_VALUE);
}
private static class DefaultTimerTask implements TimerTask {
private final String orderId;
private final long timestamp;
public DefaultTimerTask(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
}
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(String.format("任務執行時間:%s,訂單創建時間:%s,訂單ID:%s",
LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
}
}
}

運行結果:

任務執行時間:2019-08-20 17:19:49.310,訂單創建時間:2019-08-20 17:19:43.294,訂單ID:10086
任務執行時間:2019-08-20 17:19:54.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10087
任務執行時間:2019-08-20 17:19:59.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10088

一般來說,任務執行的時候應該使用另外的業務線程池,以免阻塞時間輪本身的運動。

選用的方案實現過程

最終選用了基於Redis的有序集合Sorted Set和Quartz短輪詢進行實現。具體方案是:

  1. 訂單創建的時候,訂單ID和當前時間戳分別作為Sorted Set的member和score添加到訂單隊列Sorted Set中。
  2. 訂單創建的時候,訂單ID和推送內容JSON字符串分別作為field和value添加到訂單隊列內容Hash中。
  3. 第1步和第2步操作的時候用Lua腳本保證原子性。
  4. 使用一個異步線程通過Sorted Set的命令ZREVRANGEBYSCORE彈出指定數量的訂單ID對應的訂單隊列內容Hash中的訂單推送內容數據進行處理。

對於第4點處理有兩種方案:

  • 方案一:彈出訂單內容數據的同時進行數據刪除,也就是ZREVRANGEBYSCORE、ZREM和HDEL命令要在同一個Lua腳本中執行,這樣的話Lua腳本的編寫難度大,並且由於彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從數據庫重新查詢補償。
  • 方案二:彈出訂單內容數據之後,在數據處理完成的時候再主動刪除訂單隊列Sorted Set和訂單隊列內容Hash中對應的數據,這樣的話需要控制併發,有重複執行的可能性。

最終暫時選用了方案一,也就是從Sorted Set彈出訂單ID並且從Hash中獲取完推送數據之後馬上刪除這兩個集合中對應的數據。方案的流程圖大概是這樣:

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

使用Redis實現延時任務

否決的候選方案實現思路

下面介紹一下其它四個不選用的候選方案,結合一些偽代碼和流程分析一下實現過程。

JDK內置延遲隊列

DelayQueue是一個阻塞隊列的實現,它的隊列元素必須是Delayed的子類,這裡做個簡單的例子:

public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默認延遲5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延遲6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延遲10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("開始執行調度線程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 默認延遲5000毫秒
*/
private static final long DELAY_MS = 1000L * 5;
/**
* 訂單ID
*/
private final String orderId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述
*/
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}

注意一下,OrderMessage實現Delayed接口,關鍵是需要實現Delayed#getDelay()和Delayed#compareTo()。運行一下main()方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執行調度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調度框架 + MySQL

使用調度框架對MySQL表進行短間隔輪詢是實現難度比較低的方案,通常服務剛上線,表數據不多並且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

  • 注意輪詢間隔不能太短,否則會對MySQL實例產生影響。
  • 注意每次查詢的數量,結果集數量太多有可能會導致調度阻塞和佔用應用大量內存,從而影響時效性。
  • 注意要設計狀態值和最大重試次數,這樣才能儘量避免大量數據積壓和重複查詢的問題。
  • 最好通過時間列做索引,查詢指定時間範圍內的數據。

引入Quartz、MySQL的Java驅動包和spring-boot-starter-jdbc(這裡只是為了方便用相對輕量級的框架實現,生產中可以按場景按需選擇其他更合理的框架):

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(50) NOT NULL COMMENT '訂單ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建日期時間',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重試次數',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '訂單狀態',
INDEX idx_order_id (order_id),
INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';
# 寫入兩條測試數據
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {
public static final int MAX_RETRY_TIMES = 5;
public static final int PENDING = 0;
public static final int SUCCESS = 1;
public static final int FAIL = -1;
public static final int LIMIT = 10;
}
// 實體
@Builder
@Data
public class OrderMessage {
private Long id;
private String orderId;
private LocalDateTime createTime;
private LocalDateTime editTime;
private Integer retryTimes;
private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
List<OrderMessage> list = Lists.newArrayList();
while (r.next()) {
list.add(OrderMessage.builder()
.id(r.getLong("id"))
.orderId(r.getString("order_id"))
.createTime(r.getTimestamp("create_time").toLocalDateTime())
.editTime(r.getTimestamp("edit_time").toLocalDateTime())
.retryTimes(r.getInt("retry_times"))
.orderStatus(r.getInt("order_status"))
.build());
}
return list;
};
public List<OrderMessage> selectPendingRecords(LocalDateTime start,
LocalDateTime end,
List<Integer> statusList,
int maxRetryTimes,
int limit) {
StringJoiner joiner = new StringJoiner(",");
statusList.forEach(s -> joiner.add(String.valueOf(s)));
return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
"AND order_status IN (?) AND retry_times < ? LIMIT ?",
p -> {
p.setTimestamp(1, Timestamp.valueOf(start));
p.setTimestamp(2, Timestamp.valueOf(end));
p.setString(3, joiner.toString());
p.setInt(4, maxRetryTimes);
p.setInt(5, limit);
}, M);
}
public int updateOrderStatus(Long id, int status) {
return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
p -> {
p.setInt(1, status);
p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
p.setLong(3, id);
});
}
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
private final OrderMessageDao orderMessageDao;
private static final List<Integer> STATUS = Lists.newArrayList();
static {
STATUS.add(OrderConstants.PENDING);
STATUS.add(OrderConstants.FAIL);
}
public void executeDelayJob() {
LOGGER.info("訂單處理定時任務開始執行......");
LocalDateTime end = LocalDateTime.now();
// 一天前
LocalDateTime start = end.minusDays(1);
List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
if (!list.isEmpty()) {
for (OrderMessage m : list) {
LOGGER.info("處理訂單[{}],狀態由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
// 這裡其實可以優化為批量更新
orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
}
}
LOGGER.info("訂單處理定時任務開始完畢......");
}
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
service.executeDelayJob();
}
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
OrderMessageService service = new OrderMessageService(orderMessageDao);
// 內存模式的調度器
StdSchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
// 這裡沒有用到IOC容器,直接用Quartz數據集合傳遞服務引用
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("orderMessageService", service);
// 新建Job
JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
.withIdentity("orderMessageDelayJob", "delayJob")
.usingJobData(jobDataMap)
.build();
// 新建觸發器,10秒執行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("orderMessageDelayTrigger", "delayJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// 啟動調度器
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

這個例子裡面用了create_time做輪詢,實際上可以添加一個調度時間schedule_time列做輪詢,這樣子才能更容易定製空閒時和忙碌時候的調度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用RabbitMQ死信隊列依賴於RabbitMQ的兩個特性:TTL和DLX。

  • TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
  • DLX:Dead Letter Exchange,死信交換器。

畫個圖描述一下這兩個特性:

使用Redis實現延時任務

下面為了簡單起見,TTL使用了針對隊列的維度。引入RabbitMQ的Java驅動:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
<scope>test</scope>
</dependency>

代碼如下:

public class DlxMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel producerChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// dlx交換器名稱為dlx.exchange,類型是direct,綁定鍵為dlx.key,隊列名為dlx.queue
producerChannel.exchangeDeclare("dlx.exchange", "direct");
producerChannel.queueDeclare("dlx.queue", false, false, false, null);
producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
Map<String, Object> queueArgs = new HashMap<>();
// 設置隊列消息過期時間,5秒
queueArgs.put("x-message-ttl", 5000);
// 指定DLX相關參數
queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
queueArgs.put("x-dead-letter-routing-key", "dlx.key");
// 聲明業務隊列
producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("DlxConsumer");
return thread;
});
// 啟動消費者
executorService.execute(() -> {
try {
consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
OrderMessage message = new OrderMessage("10086");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10087");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10088");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
Thread.sleep(Integer.MAX_VALUE);
}
private static class DlxConsumer extends DefaultConsumer {
DlxConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
LOGGER.info("處理消息成功:{}", new String(body, StandardCharsets.UTF_8));
}
}
private static class OrderMessage {
private final String orderId;
private final long timestamp;
private final String description;
OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.description = String.format("訂單[%s],訂單創建時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public String getDescription() {
return description;
}
}
}

運行main()方法結果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10086],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10087],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10088],訂單創建時間為:2019-08-20 16:35:58

時間輪

時間輪TimingWheel是一種高效、低延遲的調度數據結構,底層採用數組實現存儲任務列表的環形隊列,示意圖如下:

使用Redis實現延時任務

這裡暫時不對時間輪和其實現作分析,只簡單舉例說明怎麼使用時間輪實現延時任務。這裡使用Netty提供的HashedWheelTimer,引入依賴:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.39.Final</version>
</dependency>

代碼如下:

public class HashedWheelTimerMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
AtomicInteger counter = new AtomicInteger();
ThreadFactory factory = r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
return thread;
};
// tickDuration - 每tick一次的時間間隔, 每tick一次就會到達下一個槽位
// unit - tickDuration的時間單位
// ticksPerWhee - 時間輪中的槽位數
Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
TimerTask timerTask = new DefaultTimerTask("10086");
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10087");
timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10088");
timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
Thread.sleep(Integer.MAX_VALUE);
}
private static class DefaultTimerTask implements TimerTask {
private final String orderId;
private final long timestamp;
public DefaultTimerTask(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
}
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(String.format("任務執行時間:%s,訂單創建時間:%s,訂單ID:%s",
LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
}
}
}

運行結果:

任務執行時間:2019-08-20 17:19:49.310,訂單創建時間:2019-08-20 17:19:43.294,訂單ID:10086
任務執行時間:2019-08-20 17:19:54.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10087
任務執行時間:2019-08-20 17:19:59.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10088

一般來說,任務執行的時候應該使用另外的業務線程池,以免阻塞時間輪本身的運動。

選用的方案實現過程

最終選用了基於Redis的有序集合Sorted Set和Quartz短輪詢進行實現。具體方案是:

  1. 訂單創建的時候,訂單ID和當前時間戳分別作為Sorted Set的member和score添加到訂單隊列Sorted Set中。
  2. 訂單創建的時候,訂單ID和推送內容JSON字符串分別作為field和value添加到訂單隊列內容Hash中。
  3. 第1步和第2步操作的時候用Lua腳本保證原子性。
  4. 使用一個異步線程通過Sorted Set的命令ZREVRANGEBYSCORE彈出指定數量的訂單ID對應的訂單隊列內容Hash中的訂單推送內容數據進行處理。

對於第4點處理有兩種方案:

  • 方案一:彈出訂單內容數據的同時進行數據刪除,也就是ZREVRANGEBYSCORE、ZREM和HDEL命令要在同一個Lua腳本中執行,這樣的話Lua腳本的編寫難度大,並且由於彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從數據庫重新查詢補償。
  • 方案二:彈出訂單內容數據之後,在數據處理完成的時候再主動刪除訂單隊列Sorted Set和訂單隊列內容Hash中對應的數據,這樣的話需要控制併發,有重複執行的可能性。

最終暫時選用了方案一,也就是從Sorted Set彈出訂單ID並且從Hash中獲取完推送數據之後馬上刪除這兩個集合中對應的數據。方案的流程圖大概是這樣:

使用Redis實現延時任務

這裡先詳細說明一下用到的Redis命令。

Sorted Set相關命令

  • ZADD命令 - 將一個或多個成員元素及其分數值加入到有序集當中。

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN


  • ZREVRANGEBYSCORE命令 - 返回有序集中指定分數區間內的所有的成員。有序集成員按分數值遞減(從大到小)的次序排列。

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max:分數區間 - 最大分數。
  • min:分數區間 - 最小分數。
  • WITHSCORES:可選參數,是否返回分數值,指定則會返回得分值。
  • LIMIT:可選參數,offset和count原理和MySQL的LIMIT offset,size一致,如果不指定此參數則返回整個集合的數據。

  • ZREM命令 - 用於移除有序集中的一個或多個成員,不存在的成員將被忽略。

ZREM key member [member ...]

Hash相關命令

  • HMSET命令 - 同時將多個field-value(字段-值)對設置到哈希表中。

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN


  • HDEL命令 - 刪除哈希表key中的一個或多個指定字段,不存在的字段將被忽略。

HDEL KEY_NAME FIELD1.. FIELDN

Lua相關

  • 加載Lua腳本並且返回腳本的SHA-1字符串:SCRIPT LOAD script。
  • 執行已經加載的Lua腳本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]。
  • unpack函數可以把table類型的參數轉化為可變參數,不過需要注意的是unpack函數必須使用在非變量定義的函數調用的最後一個參數,否則會失效,詳細見Stackoverflow的提問table.unpack() only returns the first element。

PS:如果不熟悉Lua語言,建議系統學習一下,因為想用好Redis,一定離不開Lua。

引入依賴:

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.7.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.59</version>
</dependency>
</dependencies>

編寫Lua腳本/lua/enqueue.lua和/lua/dequeue.lua:

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil
-- /lua/dequeue.lua
-- 參考jesque的部分Lua腳本實現
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裡利用next做一輪迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
-- unpack函數能把table轉化為可變參數
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
return nil

編寫核心API代碼:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
private JedisPool jedisPool;
@Override
public void afterPropertiesSet() throws Exception {
jedisPool = new JedisPool();
}
public Jedis provide(){
return jedisPool.getResource();
}
}
// OrderMessage
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
}
// 延遲隊列接口
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min, String max, String offset, String limit);
List<OrderMessage> dequeue();
String enqueueSha();
String dequeueSha();
}
// 延遲隊列實現類
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
private static final String ORDER_QUEUE = "ORDER_QUEUE";
private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
private static final List<String> KEYS = Lists.newArrayList();
private final JedisProvider jedisProvider;
static {
KEYS.add(ORDER_QUEUE);
KEYS.add(ORDER_DETAIL_QUEUE);
}
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis()));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
}
}
@Override
public List<OrderMessage> dequeue() {
// 30分鐘之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
}
@SuppressWarnings("unchecked")
@Override
public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
List<String> args = new ArrayList<>();
args.add(max);
args.add(min);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
try (Jedis jedis = jedisProvider.provide()) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e, OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha() {
return ENQUEUE_LUA_SHA.get();
}
@Override
public String dequeueSha() {
return DEQUEUE_LUA_SHA.get();
}
@Override
public void afterPropertiesSet() throws Exception {
// 加載Lua腳本
loadLuaScript();
}
private void loadLuaScript() throws Exception {
try (Jedis jedis = jedisProvider.provide()) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.compareAndSet(null, sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.compareAndSet(null, sha);
}
}
public static void main(String[] as) throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
JedisProvider jedisProvider = new JedisProvider();
jedisProvider.afterPropertiesSet();
RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
queue.afterPropertiesSet();
// 寫入測試數據
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(10086));
message.setOrderId("ORDER_ID_10086");
message.setUserId(10086L);
message.setTimestamp(LocalDateTime.now().format(f));
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
// 測試需要,score設置為30分鐘之前
args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
}
List<OrderMessage> dequeue = queue.dequeue();
System.out.println(dequeue);
}
}

這裡先執行一次main()方法驗證一下延遲隊列是否生效:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

確定延遲隊列的代碼沒有問題,接著編寫一個Quartz的Job類型的消費者OrderMessageConsumer:

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LOGGER.info("訂單消息處理定時任務開始執行......");
List<OrderMessage> messages = orderDelayQueue.dequeue();
if (!messages.isEmpty()) {
// 簡單的列表等分放到線程池中執行
List<List<OrderMessage>> partition = Lists.partition(messages, 2);
int size = partition.size();
final CountDownLatch latch = new CountDownLatch(size);
for (List<OrderMessage> p : partition) {
BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
}
try {
latch.await();
} catch (InterruptedException ignore) {
//ignore
}
}
stopWatch.stop();
LOGGER.info("訂單消息處理定時任務執行完畢,耗時:{} ms......", stopWatch.getTotalTimeMillis());
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final List<OrderMessage> messages;
private final CountDownLatch latch;
@Override
public void run() {
try {
// 實際上這裡應該單條捕獲異常
for (OrderMessage message : messages) {
LOGGER.info("處理訂單信息,內容:{}", message);
}
} finally {
latch.countDown();
}
}
}
}

上面的消費者設計的時候需要有以下考量:

  • 使用@DisallowConcurrentExecution註解不允許Job併發執行,其實多個Job併發執行意義不大,因為我們採用的是短間隔的輪詢,而Redis是單線程處理命令,在客戶端做多線程其實效果不佳。
  • 線程池BUSINESS_WORKER_POOL的線程容量或者隊列應該綜合LIMIT值、等分訂單信息列表中使用的size值以及ConsumeTask裡面具體的執行時間進行考慮,這裡只是為了方便使用了固定容量的線程池。
  • ConsumeTask中應該對每一條訂單信息的處理單獨捕獲異常和吞併異常,或者把處理單個訂單信息的邏輯封裝成一個不拋出異常的方法。

其他Quartz相關的代碼:

// Quartz配置類
@Configuration
public class QuartzAutoConfiguration {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setAutoStartup(true);
factory.setJobFactory(quartzAutowiredJobFactory);
return factory;
}
@Bean
public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
return new QuartzAutowiredJobFactory();
}
public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
// 這裡利用AutowireCapableBeanFactory從新建的Job實例做一次自動裝配,得到一個原型(prototype)的JobBean實例
autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
}

這裡暫時使用了內存態的RAMJobStore去存放任務和觸發器的相關信息,如果在生產環境最好替換成基於MySQL也就是JobStoreTX進行集群化,最後是啟動函數和CommandLineRunner的實現:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
// 準備一些測試數據
prepareOrderMessageData();
JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer", "DelayTask")
.build();
// 觸發器5秒觸發一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("OrderMessageConsumerTrigger", "DelayTask")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
}
private void prepareOrderMessageData() throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
try (Jedis jedis = jedisProvider.provide()) {
List<OrderMessage> messages = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(i));
message.setOrderId("ORDER_ID_" + i);
message.setUserId((long) i);
message.setTimestamp(LocalDateTime.now().format(f));
messages.add(message);
}
// 這裡暫時不使用Lua
Map<String, Double> map = Maps.newHashMap();
Map<String, String> hash = Maps.newHashMap();
for (OrderMessage message : messages) {
// 故意把score設計成30分鐘前
map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
hash.put(message.getOrderId(), JSON.toJSONString(message));
}
jedis.zadd("ORDER_QUEUE", map);
jedis.hmset("ORDER_DETAIL_QUEUE", hash);
}
}
}

輸出結果如下:

2019-08-21 22:45:59.518 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務開始執行......
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務執行完畢,耗時:22 ms......
2019-08-21 22:46:04.515 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務開始執行......
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務執行完畢,耗時:1 ms......
......

首次執行的時候涉及到一些組件的初始化,會比較慢,後面看到由於我們只是簡單打印訂單信息,所以定時任務執行比較快。如果在不調整當前架構的情況下,生產中需要注意:

  • 切換JobStore為JDBC模式,Quartz官方有完整教程,或者看筆者之前翻譯的Quartz文檔。
  • 需要監控或者收集任務的執行狀態,添加預警等等。

這裡其實有一個性能隱患,命令ZREVRANGEBYSCORE的時間複雜度可以視為為O(N),N是集合的元素個數,由於這裡把所有的訂單信息都放進了同一個Sorted Set(ORDER_QUEUE)中,所以在一直有新增數據的時候,dequeue腳本的時間複雜度一直比較高,後續訂單量升高之後會此處一定會成為性能瓶頸,後面會給出解決的方案。

小結

這篇文章主要從一個實際生產案例的仿真例子入手,分析了當前延時任務的一些實現方案,還基於Redis和Quartz給出了一個完整的示例。當前的示例只是處於可運行的狀態,有些問題尚未解決。下一篇文章會著眼於解決兩個方面的問題:

  1. 分片。
  2. 監控。

還有一點,架構是基於業務形態演進出來的,很多東西需要結合場景進行方案設計和改進,思路僅供參考,切勿照搬代碼


免費分享Java技術資料,需要的可以在關注後私信我

"

前提

最近在生產環境剛好遇到了延時任務的場景,調研了一下目前主流的方案,分析了一下優劣並且敲定了最終的方案。這篇文章記錄了調研的過程,以及初步方案的實現。

候選方案對比

下面是想到的幾種實現延時任務的方案,總結了一下相應的優勢和劣勢。

使用Redis實現延時任務

如果應用的數據量不高,實時性要求比較低,選用調度框架和MySQL進行短間隔輪詢這個方案是最優的方案。但是筆者遇到的場景數據量相對比較大,實時性並不高,採用掃庫的方案一定會對MySQL實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統演進》,其中裡面有一張圖片給予筆者一點啟發:

使用Redis實現延時任務

裡面剛好用到了調度框架和Redis進行短間隔輪詢實現延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑑於筆者當前業務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現。

由於PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現一次整個方案實現的詳細過程。

場景設計

實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單後需要延時30分鐘推送對應的附件。這裡簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做OrderMessage),訂單消息需要延遲5到15秒後進行異步處理。

使用Redis實現延時任務

否決的候選方案實現思路

下面介紹一下其它四個不選用的候選方案,結合一些偽代碼和流程分析一下實現過程。

JDK內置延遲隊列

DelayQueue是一個阻塞隊列的實現,它的隊列元素必須是Delayed的子類,這裡做個簡單的例子:

public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默認延遲5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延遲6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延遲10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("開始執行調度線程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 默認延遲5000毫秒
*/
private static final long DELAY_MS = 1000L * 5;
/**
* 訂單ID
*/
private final String orderId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述
*/
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("訂單[%s]-創建時間為:%s,超時時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}

注意一下,OrderMessage實現Delayed接口,關鍵是需要實現Delayed#getDelay()和Delayed#compareTo()。運行一下main()方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執行調度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調度框架 + MySQL

使用調度框架對MySQL表進行短間隔輪詢是實現難度比較低的方案,通常服務剛上線,表數據不多並且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

  • 注意輪詢間隔不能太短,否則會對MySQL實例產生影響。
  • 注意每次查詢的數量,結果集數量太多有可能會導致調度阻塞和佔用應用大量內存,從而影響時效性。
  • 注意要設計狀態值和最大重試次數,這樣才能儘量避免大量數據積壓和重複查詢的問題。
  • 最好通過時間列做索引,查詢指定時間範圍內的數據。

引入Quartz、MySQL的Java驅動包和spring-boot-starter-jdbc(這裡只是為了方便用相對輕量級的框架實現,生產中可以按場景按需選擇其他更合理的框架):

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.7.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(50) NOT NULL COMMENT '訂單ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建日期時間',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重試次數',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '訂單狀態',
INDEX idx_order_id (order_id),
INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';
# 寫入兩條測試數據
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {
public static final int MAX_RETRY_TIMES = 5;
public static final int PENDING = 0;
public static final int SUCCESS = 1;
public static final int FAIL = -1;
public static final int LIMIT = 10;
}
// 實體
@Builder
@Data
public class OrderMessage {
private Long id;
private String orderId;
private LocalDateTime createTime;
private LocalDateTime editTime;
private Integer retryTimes;
private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
List<OrderMessage> list = Lists.newArrayList();
while (r.next()) {
list.add(OrderMessage.builder()
.id(r.getLong("id"))
.orderId(r.getString("order_id"))
.createTime(r.getTimestamp("create_time").toLocalDateTime())
.editTime(r.getTimestamp("edit_time").toLocalDateTime())
.retryTimes(r.getInt("retry_times"))
.orderStatus(r.getInt("order_status"))
.build());
}
return list;
};
public List<OrderMessage> selectPendingRecords(LocalDateTime start,
LocalDateTime end,
List<Integer> statusList,
int maxRetryTimes,
int limit) {
StringJoiner joiner = new StringJoiner(",");
statusList.forEach(s -> joiner.add(String.valueOf(s)));
return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
"AND order_status IN (?) AND retry_times < ? LIMIT ?",
p -> {
p.setTimestamp(1, Timestamp.valueOf(start));
p.setTimestamp(2, Timestamp.valueOf(end));
p.setString(3, joiner.toString());
p.setInt(4, maxRetryTimes);
p.setInt(5, limit);
}, M);
}
public int updateOrderStatus(Long id, int status) {
return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
p -> {
p.setInt(1, status);
p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
p.setLong(3, id);
});
}
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
private final OrderMessageDao orderMessageDao;
private static final List<Integer> STATUS = Lists.newArrayList();
static {
STATUS.add(OrderConstants.PENDING);
STATUS.add(OrderConstants.FAIL);
}
public void executeDelayJob() {
LOGGER.info("訂單處理定時任務開始執行......");
LocalDateTime end = LocalDateTime.now();
// 一天前
LocalDateTime start = end.minusDays(1);
List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
if (!list.isEmpty()) {
for (OrderMessage m : list) {
LOGGER.info("處理訂單[{}],狀態由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
// 這裡其實可以優化為批量更新
orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
}
}
LOGGER.info("訂單處理定時任務開始完畢......");
}
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
service.executeDelayJob();
}
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
OrderMessageService service = new OrderMessageService(orderMessageDao);
// 內存模式的調度器
StdSchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
// 這裡沒有用到IOC容器,直接用Quartz數據集合傳遞服務引用
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("orderMessageService", service);
// 新建Job
JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
.withIdentity("orderMessageDelayJob", "delayJob")
.usingJobData(jobDataMap)
.build();
// 新建觸發器,10秒執行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("orderMessageDelayTrigger", "delayJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// 啟動調度器
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

這個例子裡面用了create_time做輪詢,實際上可以添加一個調度時間schedule_time列做輪詢,這樣子才能更容易定製空閒時和忙碌時候的調度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用RabbitMQ死信隊列依賴於RabbitMQ的兩個特性:TTL和DLX。

  • TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
  • DLX:Dead Letter Exchange,死信交換器。

畫個圖描述一下這兩個特性:

使用Redis實現延時任務

下面為了簡單起見,TTL使用了針對隊列的維度。引入RabbitMQ的Java驅動:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
<scope>test</scope>
</dependency>

代碼如下:

public class DlxMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel producerChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// dlx交換器名稱為dlx.exchange,類型是direct,綁定鍵為dlx.key,隊列名為dlx.queue
producerChannel.exchangeDeclare("dlx.exchange", "direct");
producerChannel.queueDeclare("dlx.queue", false, false, false, null);
producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
Map<String, Object> queueArgs = new HashMap<>();
// 設置隊列消息過期時間,5秒
queueArgs.put("x-message-ttl", 5000);
// 指定DLX相關參數
queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
queueArgs.put("x-dead-letter-routing-key", "dlx.key");
// 聲明業務隊列
producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("DlxConsumer");
return thread;
});
// 啟動消費者
executorService.execute(() -> {
try {
consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
OrderMessage message = new OrderMessage("10086");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10087");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
message = new OrderMessage("10088");
producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
message.getDescription().getBytes(StandardCharsets.UTF_8));
LOGGER.info("發送消息成功,訂單ID:{}", message.getOrderId());
Thread.sleep(Integer.MAX_VALUE);
}
private static class DlxConsumer extends DefaultConsumer {
DlxConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
LOGGER.info("處理消息成功:{}", new String(body, StandardCharsets.UTF_8));
}
}
private static class OrderMessage {
private final String orderId;
private final long timestamp;
private final String description;
OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.description = String.format("訂單[%s],訂單創建時間為:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public String getDescription() {
return description;
}
}
}

運行main()方法結果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發送消息成功,訂單ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10086],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10087],訂單創建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10088],訂單創建時間為:2019-08-20 16:35:58

時間輪

時間輪TimingWheel是一種高效、低延遲的調度數據結構,底層採用數組實現存儲任務列表的環形隊列,示意圖如下:

使用Redis實現延時任務

這裡暫時不對時間輪和其實現作分析,只簡單舉例說明怎麼使用時間輪實現延時任務。這裡使用Netty提供的HashedWheelTimer,引入依賴:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.39.Final</version>
</dependency>

代碼如下:

public class HashedWheelTimerMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
AtomicInteger counter = new AtomicInteger();
ThreadFactory factory = r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
return thread;
};
// tickDuration - 每tick一次的時間間隔, 每tick一次就會到達下一個槽位
// unit - tickDuration的時間單位
// ticksPerWhee - 時間輪中的槽位數
Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
TimerTask timerTask = new DefaultTimerTask("10086");
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10087");
timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
timerTask = new DefaultTimerTask("10088");
timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
Thread.sleep(Integer.MAX_VALUE);
}
private static class DefaultTimerTask implements TimerTask {
private final String orderId;
private final long timestamp;
public DefaultTimerTask(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
}
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(String.format("任務執行時間:%s,訂單創建時間:%s,訂單ID:%s",
LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
}
}
}

運行結果:

任務執行時間:2019-08-20 17:19:49.310,訂單創建時間:2019-08-20 17:19:43.294,訂單ID:10086
任務執行時間:2019-08-20 17:19:54.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10087
任務執行時間:2019-08-20 17:19:59.297,訂單創建時間:2019-08-20 17:19:43.301,訂單ID:10088

一般來說,任務執行的時候應該使用另外的業務線程池,以免阻塞時間輪本身的運動。

選用的方案實現過程

最終選用了基於Redis的有序集合Sorted Set和Quartz短輪詢進行實現。具體方案是:

  1. 訂單創建的時候,訂單ID和當前時間戳分別作為Sorted Set的member和score添加到訂單隊列Sorted Set中。
  2. 訂單創建的時候,訂單ID和推送內容JSON字符串分別作為field和value添加到訂單隊列內容Hash中。
  3. 第1步和第2步操作的時候用Lua腳本保證原子性。
  4. 使用一個異步線程通過Sorted Set的命令ZREVRANGEBYSCORE彈出指定數量的訂單ID對應的訂單隊列內容Hash中的訂單推送內容數據進行處理。

對於第4點處理有兩種方案:

  • 方案一:彈出訂單內容數據的同時進行數據刪除,也就是ZREVRANGEBYSCORE、ZREM和HDEL命令要在同一個Lua腳本中執行,這樣的話Lua腳本的編寫難度大,並且由於彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從數據庫重新查詢補償。
  • 方案二:彈出訂單內容數據之後,在數據處理完成的時候再主動刪除訂單隊列Sorted Set和訂單隊列內容Hash中對應的數據,這樣的話需要控制併發,有重複執行的可能性。

最終暫時選用了方案一,也就是從Sorted Set彈出訂單ID並且從Hash中獲取完推送數據之後馬上刪除這兩個集合中對應的數據。方案的流程圖大概是這樣:

使用Redis實現延時任務

這裡先詳細說明一下用到的Redis命令。

Sorted Set相關命令

  • ZADD命令 - 將一個或多個成員元素及其分數值加入到有序集當中。

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN


  • ZREVRANGEBYSCORE命令 - 返回有序集中指定分數區間內的所有的成員。有序集成員按分數值遞減(從大到小)的次序排列。

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max:分數區間 - 最大分數。
  • min:分數區間 - 最小分數。
  • WITHSCORES:可選參數,是否返回分數值,指定則會返回得分值。
  • LIMIT:可選參數,offset和count原理和MySQL的LIMIT offset,size一致,如果不指定此參數則返回整個集合的數據。

  • ZREM命令 - 用於移除有序集中的一個或多個成員,不存在的成員將被忽略。

ZREM key member [member ...]

Hash相關命令

  • HMSET命令 - 同時將多個field-value(字段-值)對設置到哈希表中。

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN


  • HDEL命令 - 刪除哈希表key中的一個或多個指定字段,不存在的字段將被忽略。

HDEL KEY_NAME FIELD1.. FIELDN

Lua相關

  • 加載Lua腳本並且返回腳本的SHA-1字符串:SCRIPT LOAD script。
  • 執行已經加載的Lua腳本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]。
  • unpack函數可以把table類型的參數轉化為可變參數,不過需要注意的是unpack函數必須使用在非變量定義的函數調用的最後一個參數,否則會失效,詳細見Stackoverflow的提問table.unpack() only returns the first element。

PS:如果不熟悉Lua語言,建議系統學習一下,因為想用好Redis,一定離不開Lua。

引入依賴:

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.7.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.59</version>
</dependency>
</dependencies>

編寫Lua腳本/lua/enqueue.lua和/lua/dequeue.lua:

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil
-- /lua/dequeue.lua
-- 參考jesque的部分Lua腳本實現
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裡利用next做一輪迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
-- unpack函數能把table轉化為可變參數
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
return nil

編寫核心API代碼:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
private JedisPool jedisPool;
@Override
public void afterPropertiesSet() throws Exception {
jedisPool = new JedisPool();
}
public Jedis provide(){
return jedisPool.getResource();
}
}
// OrderMessage
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
}
// 延遲隊列接口
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min, String max, String offset, String limit);
List<OrderMessage> dequeue();
String enqueueSha();
String dequeueSha();
}
// 延遲隊列實現類
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
private static final String ORDER_QUEUE = "ORDER_QUEUE";
private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
private static final List<String> KEYS = Lists.newArrayList();
private final JedisProvider jedisProvider;
static {
KEYS.add(ORDER_QUEUE);
KEYS.add(ORDER_DETAIL_QUEUE);
}
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis()));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
}
}
@Override
public List<OrderMessage> dequeue() {
// 30分鐘之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
}
@SuppressWarnings("unchecked")
@Override
public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
List<String> args = new ArrayList<>();
args.add(max);
args.add(min);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
try (Jedis jedis = jedisProvider.provide()) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e, OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha() {
return ENQUEUE_LUA_SHA.get();
}
@Override
public String dequeueSha() {
return DEQUEUE_LUA_SHA.get();
}
@Override
public void afterPropertiesSet() throws Exception {
// 加載Lua腳本
loadLuaScript();
}
private void loadLuaScript() throws Exception {
try (Jedis jedis = jedisProvider.provide()) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.compareAndSet(null, sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.compareAndSet(null, sha);
}
}
public static void main(String[] as) throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
JedisProvider jedisProvider = new JedisProvider();
jedisProvider.afterPropertiesSet();
RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
queue.afterPropertiesSet();
// 寫入測試數據
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(10086));
message.setOrderId("ORDER_ID_10086");
message.setUserId(10086L);
message.setTimestamp(LocalDateTime.now().format(f));
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
// 測試需要,score設置為30分鐘之前
args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
}
List<OrderMessage> dequeue = queue.dequeue();
System.out.println(dequeue);
}
}

這裡先執行一次main()方法驗證一下延遲隊列是否生效:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

確定延遲隊列的代碼沒有問題,接著編寫一個Quartz的Job類型的消費者OrderMessageConsumer:

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LOGGER.info("訂單消息處理定時任務開始執行......");
List<OrderMessage> messages = orderDelayQueue.dequeue();
if (!messages.isEmpty()) {
// 簡單的列表等分放到線程池中執行
List<List<OrderMessage>> partition = Lists.partition(messages, 2);
int size = partition.size();
final CountDownLatch latch = new CountDownLatch(size);
for (List<OrderMessage> p : partition) {
BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
}
try {
latch.await();
} catch (InterruptedException ignore) {
//ignore
}
}
stopWatch.stop();
LOGGER.info("訂單消息處理定時任務執行完畢,耗時:{} ms......", stopWatch.getTotalTimeMillis());
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final List<OrderMessage> messages;
private final CountDownLatch latch;
@Override
public void run() {
try {
// 實際上這裡應該單條捕獲異常
for (OrderMessage message : messages) {
LOGGER.info("處理訂單信息,內容:{}", message);
}
} finally {
latch.countDown();
}
}
}
}

上面的消費者設計的時候需要有以下考量:

  • 使用@DisallowConcurrentExecution註解不允許Job併發執行,其實多個Job併發執行意義不大,因為我們採用的是短間隔的輪詢,而Redis是單線程處理命令,在客戶端做多線程其實效果不佳。
  • 線程池BUSINESS_WORKER_POOL的線程容量或者隊列應該綜合LIMIT值、等分訂單信息列表中使用的size值以及ConsumeTask裡面具體的執行時間進行考慮,這裡只是為了方便使用了固定容量的線程池。
  • ConsumeTask中應該對每一條訂單信息的處理單獨捕獲異常和吞併異常,或者把處理單個訂單信息的邏輯封裝成一個不拋出異常的方法。

其他Quartz相關的代碼:

// Quartz配置類
@Configuration
public class QuartzAutoConfiguration {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setAutoStartup(true);
factory.setJobFactory(quartzAutowiredJobFactory);
return factory;
}
@Bean
public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
return new QuartzAutowiredJobFactory();
}
public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
// 這裡利用AutowireCapableBeanFactory從新建的Job實例做一次自動裝配,得到一個原型(prototype)的JobBean實例
autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
}

這裡暫時使用了內存態的RAMJobStore去存放任務和觸發器的相關信息,如果在生產環境最好替換成基於MySQL也就是JobStoreTX進行集群化,最後是啟動函數和CommandLineRunner的實現:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
// 準備一些測試數據
prepareOrderMessageData();
JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer", "DelayTask")
.build();
// 觸發器5秒觸發一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("OrderMessageConsumerTrigger", "DelayTask")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
}
private void prepareOrderMessageData() throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
try (Jedis jedis = jedisProvider.provide()) {
List<OrderMessage> messages = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(i));
message.setOrderId("ORDER_ID_" + i);
message.setUserId((long) i);
message.setTimestamp(LocalDateTime.now().format(f));
messages.add(message);
}
// 這裡暫時不使用Lua
Map<String, Double> map = Maps.newHashMap();
Map<String, String> hash = Maps.newHashMap();
for (OrderMessage message : messages) {
// 故意把score設計成30分鐘前
map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
hash.put(message.getOrderId(), JSON.toJSONString(message));
}
jedis.zadd("ORDER_QUEUE", map);
jedis.hmset("ORDER_DETAIL_QUEUE", hash);
}
}
}

輸出結果如下:

2019-08-21 22:45:59.518 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務開始執行......
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務執行完畢,耗時:22 ms......
2019-08-21 22:46:04.515 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務開始執行......
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 處理訂單信息,內容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 訂單消息處理定時任務執行完畢,耗時:1 ms......
......

首次執行的時候涉及到一些組件的初始化,會比較慢,後面看到由於我們只是簡單打印訂單信息,所以定時任務執行比較快。如果在不調整當前架構的情況下,生產中需要注意:

  • 切換JobStore為JDBC模式,Quartz官方有完整教程,或者看筆者之前翻譯的Quartz文檔。
  • 需要監控或者收集任務的執行狀態,添加預警等等。

這裡其實有一個性能隱患,命令ZREVRANGEBYSCORE的時間複雜度可以視為為O(N),N是集合的元素個數,由於這裡把所有的訂單信息都放進了同一個Sorted Set(ORDER_QUEUE)中,所以在一直有新增數據的時候,dequeue腳本的時間複雜度一直比較高,後續訂單量升高之後會此處一定會成為性能瓶頸,後面會給出解決的方案。

小結

這篇文章主要從一個實際生產案例的仿真例子入手,分析了當前延時任務的一些實現方案,還基於Redis和Quartz給出了一個完整的示例。當前的示例只是處於可運行的狀態,有些問題尚未解決。下一篇文章會著眼於解決兩個方面的問題:

  1. 分片。
  2. 監控。

還有一點,架構是基於業務形態演進出來的,很多東西需要結合場景進行方案設計和改進,思路僅供參考,切勿照搬代碼


免費分享Java技術資料,需要的可以在關注後私信我

使用Redis實現延時任務

作者:Throwable

鏈接:https://juejin.im/post/5d5d675de51d4562043f571a

來源:掘金

"

相關推薦

推薦中...