賊好用,這款精準(zhǔn)定時任務(wù)和延時隊列框架
項目簡述
Mykit體系中提供的簡單、穩(wěn)定、可擴展的延遲消息隊列框架,提供精準(zhǔn)的定時任務(wù)和延遲隊列處理功能
項目模塊說明
mykit-delay-common: mykit-delay 延遲消息隊列框架通用工具模塊,提供全局通用的工具類
mykit-delay-config: mykit-delay 延遲消息隊列框架通用配置模塊,提供全局配置
mykit-delay-queue: mykit-delay 延遲消息隊列框架核心實現(xiàn)模塊,目前所有主要的功能都在此模塊實現(xiàn)
mykit-delay-controller: mykit-delay 延遲消息隊列框架Restful接口實現(xiàn)模塊,對外提供Restful接口訪問,兼容各種語言調(diào)用
mykit-delay-core: mykit-delay 延遲消息隊列框架的入口,整個框架的啟動程序在此模塊實現(xiàn)
mykit-delay-rpc:mykit-delay延時消息隊列的RPC模塊,支持Dubbo、brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等主流RPC的實現(xiàn)
mykit-delay-test: mykit-delay 延遲消息隊列框架通用測試模塊,主要提供Junit單元測試用例
需求背景
用戶下訂單后未支付,30分鐘后支付超時
在某個時間點通知用戶參加系統(tǒng)活動
業(yè)務(wù)執(zhí)行失敗之后隔10分鐘重試一次
類似的場景比較多 簡單的處理方式就是使用定時任務(wù) 假如數(shù)據(jù)比較多的時候 有的數(shù)據(jù)可能延遲比較嚴(yán)重,而且越來越多的定時業(yè)務(wù)導(dǎo)致任務(wù)調(diào)度很繁瑣不好管理。
隊列設(shè)計
整體架構(gòu)設(shè)計如下圖所示。
開發(fā)前需要考慮的問題
及時性 消費端能按時收到
同一時間消息的消費權(quán)重
可靠性 消息不能出現(xiàn)沒有被消費掉的情況
可恢復(fù) 假如有其他情況 導(dǎo)致消息系統(tǒng)不可用了 至少能保證數(shù)據(jù)可以恢復(fù)
可撤回 因為是延遲消息 沒有到執(zhí)行時間的消息支持可以取消消費
高可用 多實例 這里指HA/主備模式并不是多實例同時一起工作
消費端如何消費
當(dāng)然初步選用Redis作為數(shù)據(jù)緩存的主要原因是因為redis自身支持zset的數(shù)據(jù)結(jié)構(gòu)(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間維度去判定執(zhí)行的順序 同時也支持Map list數(shù)據(jù)結(jié)構(gòu)。
簡單定義一個消息數(shù)據(jù)結(jié)構(gòu)
private String topic;/***topic**/private String id;/***自動生成 全局惟一 snowflake**/private String bizKey;private long delay;/***延時毫秒數(shù)**/private int priority;//優(yōu)先級private long ttl;/**消費端消費的ttl**/private String body;/***消息體**/private long createTime=System.currentTimeMillis();private int status= Status.WaitPut.ordinal();
運行原理
用Map來存儲元數(shù)據(jù)。id作為key,整個消息結(jié)構(gòu)序列化(JSON/…)之后作為value,放入元消息池中。
將id放入其中(有N個)一個zset有序列表中,以createTime delay priority作為score。修改狀態(tài)為正在延遲中
使用timer實時監(jiān)控zset有序列表中top 10的數(shù)據(jù) 。 如果數(shù)據(jù)score<=當(dāng)前時間毫秒就取出來,根據(jù)topic重新放入一個新的可消費列表(list)中,在zset中刪除已經(jīng)取出來的數(shù)據(jù),并修改狀態(tài)為待消費
客戶端獲取數(shù)據(jù)只需要從可消費隊列中獲取就可以了。并且狀態(tài)必須為待消費 運行時間需要<=當(dāng)前時間的 如果不滿足 重新放入zset列表中,修改狀態(tài)為正在延遲。如果滿足修改狀態(tài)為已消費。或者直接刪除元數(shù)據(jù)。
客戶端
因為涉及到不同程序語言的問題,所以當(dāng)前默認(rèn)支持http訪問方式。
添加延時消息添加成功之后返回消費唯一ID POST /push {……消息體}
刪除延時消息 需要傳遞消息ID GET /delete?id=
恢復(fù)延時消息 GET /reStore?expire=true|false expire是否恢復(fù)已過期未執(zhí)行的消息。
恢復(fù)單個延時消息 需要傳遞消息ID GET /reStore/id
獲取消息 需要長連接 GET /get/topic
用Nginx暴露服務(wù),配置為輪詢 在添加延遲消息的時候就可以流量平均分配。
目前系統(tǒng)中客戶端并沒有采用HTTP長連接的方式來消費消息,而是采用MQ的方式來消費數(shù)據(jù)這樣客戶端就可以不用關(guān)心延遲消息隊列。只需要在發(fā)送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統(tǒng)處理。
消息可恢復(fù)
實現(xiàn)恢復(fù)的原理 正常情況下一般都是記錄日志,比如mysql的binlog等。
這里我們直接采用mysql數(shù)據(jù)庫作為記錄日志。
目前創(chuàng)建以下2張表:
消息表 字段包括整個消息體
消息流轉(zhuǎn)表 字段包括消息ID、變更狀態(tài)、變更時間、zset掃描線程Name、host/ip
定義zset掃描線程Name是為了更清楚的看到消息被分發(fā)到具體哪個zset中。前提是zset的key和監(jiān)控zset的線程名稱要有點關(guān)系 這里也可以是zset key。
支持消息恢復(fù)
假如redis服務(wù)器宕機了,重啟之后發(fā)現(xiàn)數(shù)據(jù)也沒有了。所以這個恢復(fù)是很有必要的,只需要從表1也就是消息表中把消息狀態(tài)不等于已消費的數(shù)據(jù)全部重新分發(fā)到延遲隊列中去,然后同步一下狀態(tài)就可以了。
當(dāng)然恢復(fù)單個任務(wù)也可以這么干。
數(shù)據(jù)表設(shè)計
這里,我就直接給出創(chuàng)建數(shù)據(jù)表的SQL語句。
DROP TABLE IF EXISTS `mykit_delay_queue_job`;CREATE TABLE `mykit_delay_queue_job` ( `id` varchar(128) NOT NULL, `bizkey` varchar(128) DEFAULT NULL, `topic` varchar(128) DEFAULT NULL, `subtopic` varchar(250) DEFAULT NULL, `delay` bigint(20) DEFAULT NULL, `create_time` bigint(20) DEFAULT NULL, `body` text, `status` int(11) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `update_time` datetime(6)(6)(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`), KEY `mykit_delay_queue_job_STATUS` (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for mykit_delay_queue_job_log-- ----------------------------DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;CREATE TABLE `mykit_delay_queue_job_log` ( `id` varchar(128) NOT NULL, `status` int(11) DEFAULT NULL, `thread` varchar(60) DEFAULT NULL, `update_time` datetime(6)(6)(3) DEFAULT NULL, `host` varchar(128) DEFAULT NULL, KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
關(guān)于高可用
分布式協(xié)調(diào)還是選用zookeeper。
如果有多個實例最多同時只能有1個實例工作 這樣就避免了分布式競爭鎖帶來的壞處,當(dāng)然如果業(yè)務(wù)需要多個實例同時工作也是支持的,也就是一個消息最多只能有1個實例處理,可以選用zookeeper或者redis就能實現(xiàn)分布式鎖了。
最終做了一下測試多實例同時運行,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。所以比較推薦基于docker的主備部署模式。
運行模式
支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk集群地址列表
支持 cluster 會涉及到分布式鎖競爭 效果不是很明顯 分布式鎖采用redis的 setNx實現(xiàn)
StandAlone
目前,經(jīng)過測試,推薦使用master slave的模式,并且,在升級版本中,進一步增強了Master Slave模式。后期會優(yōu)化Cluster模式。
如何接入
為了提供一個統(tǒng)一的精準(zhǔn)定時任務(wù)和延時隊列框架,mykit-delay提供了HTTP Rest接口和RPC方式供其他業(yè)務(wù)系統(tǒng)調(diào)用,接口使用簡單方便,只需要簡單的調(diào)用接口,傳遞相應(yīng)的參數(shù)即可。
RPC方式調(diào)用,后續(xù)支持的方式有:
Dubbo(已實現(xiàn))
brpc(預(yù)留支持)
grpc(預(yù)留支持)
Motan(預(yù)留支持)
Sofa(預(yù)留支持)
SpringCloud(預(yù)留支持)
SpringCloud Alibaba(預(yù)留支持)
HTTP方式接入
消息體
以JSON數(shù)據(jù)格式參數(shù) 目前提供了http 協(xié)議。
body 業(yè)務(wù)消息體
delay 延時毫秒 距createTime的間隔毫秒數(shù)
id 任務(wù)ID 系統(tǒng)自動生成 任務(wù)創(chuàng)建成功返回
status 狀態(tài) 默認(rèn)不填寫
topic 標(biāo)題
subtopic 保留字段
ttl 保留字段
createTime 創(chuàng)建任務(wù)時間 非必填 系統(tǒng)默認(rèn)
啟動HTTP Rest服務(wù)
首先,從gitHub Clone項目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進入mykit-delay框架目錄。
cd mykit-delay
執(zhí)行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進入 mykit-delay-core 的 target 目錄下,運行如下命令。
java -jar mykit-delay-core-xxx.jar
其中,xxx是版本號,以實際下載的版本號為準(zhǔn)。
接下來,就可以調(diào)用HTTP Restful接口來使用mykit-delay框架了。
添加任務(wù)
/push POST application/json{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}
刪除任務(wù)
刪除任務(wù) 需要記錄一個JobId
/delete?jobId=xxx GET
恢復(fù)單個任務(wù)
用于任務(wù)錯亂 腦裂情況 根據(jù)日志恢復(fù)任務(wù)
/reStore?expire=true GET
參數(shù)expire 表示是否需要恢復(fù)已過期還未執(zhí)行的數(shù)據(jù)
清空隊列數(shù)據(jù)
根據(jù)日志中未完成的數(shù)據(jù)清空隊列中全部數(shù)據(jù)。清空之后 會刪除緩存中的所有任務(wù)
/clearAll GET
Dubbo方式接入
消息體
以JSON數(shù)據(jù)格式參數(shù) 目前提供了http 協(xié)議。
body 業(yè)務(wù)消息體
delay 延時毫秒 距createTime的間隔毫秒數(shù)
id 任務(wù)ID 系統(tǒng)自動生成 任務(wù)創(chuàng)建成功返回
status 狀態(tài) 默認(rèn)不填寫
topic 標(biāo)題
subtopic 保留字段
ttl 保留字段
createTime 創(chuàng)建任務(wù)時間 非必填 系統(tǒng)默認(rèn)
啟動Dubbo服務(wù)
首先,從GitHub Clone項目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進入mykit-delay框架目錄。
cd mykit-delay
執(zhí)行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進入 mykit-rpc-dubbo模塊下的 mykit-rpc-dubbo-server服務(wù) 的 target 目錄下,運行如下命令。
mykit-rpc-dubbo-server-xxx.jar
其中,xxx是版本號,以實際下載的版本號為準(zhǔn)。
引入mykit-delay依賴
以Dubbo方式接入mykit-delay,需要引入mykit-delay的依賴,如下所示。
<dependency> <groupId>io.mykit.delay</groupId> <artifactId>mykit-rpc-dubbo-common</artifactId> <version>1.0-SNAPSHOT</version></dependency>
然后,在需要調(diào)用Dubbo服務(wù)的類中以如下方式注入MykitDelayDubboInterface。
@DubboReference(version = "1.0.0")private MykitDelayDubboInterface mykitDelayDubboInterface;
其中,MykitDelayDubboInterface接口的定義如下所示。
/** * @author binghe * @version 1.0.0 * @description 發(fā)布的Dubbo接口 */public interface MykitDelayDubboInterface { /** * 推送消息 */ ResponseMessage push(JobWrapp jobMsg); /** * 刪除任務(wù) */ ResponseMessage delete(String jobId); /** * 完成任務(wù) */ ResponseMessage finish(String jobId); /** * 恢復(fù)單個任務(wù) */ ResponseMessage reStoreJob(String jobId); /** * 提供一個方法 假設(shè)緩存中間件出現(xiàn)異常 以及數(shù)據(jù)錯亂的情況 提供恢復(fù)功能 * @param expire 過期的數(shù)據(jù)是否需要重發(fā) true需要, false不需要 默認(rèn)為true */ ResponseMessage reStore(Boolean expire); /** * 清除所有的任務(wù) */ ResponseMessage clearAll();}
接下來,就可以以Dubbo方式接入mykit-delay框架了。
注意:無論是以HTTP方式,還是以RPC方式啟動mykit-delay服務(wù),都需要通過如下方式加載基本配置信息。
StartGetReady.ready(ConsumeQueueProvider.class.getName());
客戶端獲取隊列方式
目前默認(rèn)實現(xiàn)了RocketMQ與ActiveMQ的推送方式。依賴MQ的方式來實現(xiàn)延時框架與具體業(yè)務(wù)系統(tǒng)的解耦。同時,框架已SPI的形式加載相應(yīng)的MQ,也就是說,集成MQ的方式是可擴展的。
消息體中消息與RocketMQ和 ActiveMQ 消息字段對應(yīng)關(guān)系
mykit-delay RocketMQ ActiveMQ 備注
topic topic topic 點對點發(fā)送隊列名稱或者主題名稱
subtopic subtopic subtopic 點對點發(fā)送隊列子名稱或者主題子名稱
body 消息內(nèi)容 消息內(nèi)容 消息內(nèi)容
關(guān)于系統(tǒng)配置
延遲框架與具體執(zhí)行業(yè)務(wù)系統(tǒng)的交互方式通過延遲框架配置實現(xiàn),具體配置文件位置為mykit-delay-config項目下的resources/properties/starter.properties文件中。
測試
需要配置好數(shù)據(jù)庫地址和Redis的地址 如果不是單機模式 也需要配置好Zookeeper
運行mykit-delay-test模塊下的測試類io.mykit.delay.test.PushTest添加任務(wù)到隊列中
啟動mykit-delay-test模塊下的io.mykit.delay.TestDelayQueue消費前面添加數(shù)據(jù) 為了方便查詢效果 默認(rèn)的消費方式是consoleCQ 控制臺輸出
擴展
支持zset隊列個數(shù)可配置,避免大數(shù)據(jù)帶來高延遲的問題。進一步增強框架的高可用。
近期規(guī)劃
brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等RPC擴展
支持RabbitMQ、Kafka等消息中間件
分區(qū)(buck)支持動態(tài)設(shè)置
redis與數(shù)據(jù)庫數(shù)據(jù)一致性的問題 (重要)
實現(xiàn)自己的推拉機制
支持可切換實現(xiàn)方式,目前只是依賴Redis實現(xiàn),后續(xù)待優(yōu)化,支持更多的可配置選項
支持Web控制臺管理隊列
實現(xiàn)消息消費TTL機制
增加對框架和定時任務(wù)的監(jiān)控