- 消息重復概率比較高時(shí),需要對重復消息進(jìn)行合并處理避免浪費有限的資源,減少消費延遲;
- 需要根據業(yè)務(wù)自定義優(yōu)先級進(jìn)行消息處理,高優(yōu)先級的消息比低優(yōu)先級的消息先處理;
- 消息需要定時(shí)消費的場(chǎng)景,消息只有在設定的消費時(shí)間到了之后立馬被消費。
本文將介紹一種基于Redis實(shí)現的消息隊列(Redis message queue, RMQ),RMQ可以作為傳統消息隊列的互補選擇,在傳統消息隊列沒(méi)有涉及的場(chǎng)景中使用RMQ。
功能介紹
RMQ設計為一個(gè)二方庫,可以幫助用戶(hù)基于Redis快速實(shí)現消息隊列的功能,RMQ消息隊列具有消息合并、區分優(yōu)先級、支持定時(shí)消息等特性。RMQ消息隊列可以用于異步解耦、削峰填谷,支持億級數據堆積。RMQ消息隊列目前支持三種類(lèi)型的消息,分別是RangeMergeMessage(區間重復合并消息)、PriorityMessage(優(yōu)先級消息)、FixedTimeMessage(任意定時(shí)消息)。
區間重復合并消息
RangeMergeMessage支持區間重復消息合并,發(fā)送消息時(shí)需要設置時(shí)間區間,消息延遲該時(shí)間區間長(cháng)度后被消費,在該時(shí)間區間內如果發(fā)送重復的消息,重復消息將會(huì )被合并。如果消息在Redis服務(wù)端發(fā)生堆積,重復到來(lái)的消息依然會(huì )被合并處理。該類(lèi)型消息適用于消息重復率較高且希望重復消息合并處理的場(chǎng)景,對重復消息進(jìn)行合并可以減少下游消費系統的壓力,減少不必要的資源消耗,將有限的資源最大化的利用,提升消費效率。
優(yōu)先級消息
PriorityMessage支持給消息設置任意等級的優(yōu)先級,優(yōu)先級高的消息會(huì )被優(yōu)先消費,相同優(yōu)先級的消息被隨機消費。如果消息在Redis服務(wù)端發(fā)生堆積,重復的消息將被合并處理,合并后消息的優(yōu)先級等于最后存儲的消息的優(yōu)先級。該類(lèi)型消息適用于希望重復消息合并處理且需要設置優(yōu)先級的場(chǎng)景,下游消費者資源有限時(shí),合并重復消息且優(yōu)先處理優(yōu)先級高的消息將可以合理利用有限的資源。
任意定時(shí)消息
FixedTimeMessage支持給消息設置任意消費時(shí)間,只有消費時(shí)間到了之后消息才被消費,消費時(shí)間可精確到秒。消息到期后沒(méi)有及時(shí)被消費時(shí),消費者將按照時(shí)間由遠及近進(jìn)行消費。如果消息在Redis服務(wù)端發(fā)生堆積,重復的消息將被合并處理,合并后消息的消費時(shí)間等于最后存儲的消息的消費時(shí)間。該類(lèi)型消息適用于希望重復消息合并處理且需要定時(shí)消費的場(chǎng)景,定時(shí)消息應用場(chǎng)景非常豐富,比如定時(shí)打標去標、活動(dòng)結束后清理動(dòng)作、訂單超時(shí)關(guān)閉等。
并發(fā)消費控制
使用傳統消息中間件進(jìn)行集群消費的時(shí)候,為了避免并發(fā)處理同一元數據導致不一致問(wèn)題,通常需要對元數據加分布式鎖,頻繁的鎖沖突會(huì )導致消費效率低下。加分布式鎖的最終目的其實(shí)就是保障屬于同一元數據的消息被串行消費。加分布式鎖并不是最好的方案,最好的方案應該是從根上解決并發(fā)問(wèn)題,讓屬于同一元數據的消息串行消費。RMQ消息隊列具有并發(fā)消費控制能力,屬于同一元數據的消息只會(huì )被分配給全局唯一一個(gè)線(xiàn)程進(jìn)行消費,因此屬于同一元數據的消息將被串行消費。使用方如果需要該能力,除了需要提供Redis,還需要提供ZooKeeper。
重試次數控制
RMQ消息隊列支持失敗重試消費16次,業(yè)務(wù)返回消費失敗后,消息會(huì )被回滾并等待重試消費,重試16次后消息進(jìn)入死信隊列,消息不再被消費,除非人工干預。
技術(shù)原理
總體框架

RMQ消息隊列由三部分組成,分別為ZooKeeper、RMQ二方庫、Redis。ZooKeeper負責維護集群worker的信息,將topic的所有slot分配給全局的woker。Redis負責存儲消息,采用Sorted Set結構存儲,Store Queue是消息存放的隊列,Prepare Queue是采用二階段消費方式正在消費的消息存放隊列,Dead Queue是死信隊列。RMQ二方庫由RmqClient、Consumer、Producer三部分組成。RmqClient負責RMQ的啟動(dòng)工作,包括上報TopicDef、Worker給ZooKeeper,分配Slot給Worker,掃描業(yè)務(wù)定義的MessageListener Bean。Producer負責根據不用消息類(lèi)型將消息按照指定的方式存儲到Redis。Consumer負責根據不用消息類(lèi)型按照指定方式從Redis彈出消息并調用業(yè)務(wù)的MessageListener。
消息存儲

Topic的設計
Topic的定義有三部分組成,topic表示主題名稱(chēng),slotAmount表示消息存儲劃分的槽數量,topicType表示消息的類(lèi)型。主題名稱(chēng)是一個(gè)Topic的唯一標示,相同主題名稱(chēng)Topic的slotAmount和topicType一定是一樣的。消息存儲采用Redis的Sorted Set結構,為了支持大量消息的堆積,需要把消息分散存儲到很多個(gè)槽中,slotAmount表示該Topic消息存儲共使用的槽數量,槽數量一定需要是2的n次冪。在消息存儲的時(shí)候,采用對指定數據或者消息體哈希求余得到槽位置。
StoreQueue 的設計
上圖中topic劃分了8個(gè)槽位,編號0-7。如果發(fā)送方指定了消息的slotBasis,則計算slotBasis的CRC32值,CRC32值對槽數量進(jìn)行取模得到槽序號,SlotKey設計為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符。發(fā)送方需要保證相同內容的消息的slotBasis相同,如果沒(méi)有指定slotBasis則采用消息內容計算SlotKey,這樣內容相同的消息體就會(huì )落在同一個(gè)Sorted Set里面,所以?xún)热菹嗤南?huì )進(jìn)行合并。Redis的Sorted Set中的數據按照分數排序,實(shí)現不同類(lèi)型的消息的關(guān)鍵就在于如何利用分數、如何添加消息到Sorted Set、如何從Sorted Set中彈出消息。優(yōu)先級消息將優(yōu)先級作為分數,消費時(shí)每次彈出分數最大的消息。任意定時(shí)消息將時(shí)間戳作為分數,消費時(shí)每次彈出分數大于當前時(shí)間戳的一個(gè)消息。區間重復合并消息將時(shí)間戳作為分數,添加消息時(shí)將(當前時(shí)間戳+時(shí)間區間)作為分數,消費時(shí)每次彈出分數大于當前時(shí)間戳的一個(gè)消息。
PrepareQueue 的設計
為了保障RMQ消息隊列的可用性,做到每條消息至少消費一次,消費者不是直接pop有序集合中的元素,而是將元素從StoreQueue移動(dòng)到PrepareQueue并返回消息給消費者,等消費成功后再從PrepareQueue從刪除,或者消費失敗后從PreapreQueue重新移動(dòng)到StoreQueue,這便是根據二階段提交的思想實(shí)現的二階段消費。在后面將會(huì )詳細介紹二階段消費的實(shí)現思路,這里重點(diǎn)介紹下PrepareQueue的存儲設計。StoreQueue中每一個(gè)Slot對應PrepareQueue中的Slot,PrepareQueue的SlotKey設計為prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作為存儲,消息移動(dòng)到PrepareQueue時(shí)刻對應的(秒級時(shí)間戳*1000+重試次數)作為分數,字符串存儲的是消息體內容。這里分數的設計與重試次數的設計密切相關(guān),所以在重試次數設計章節詳細介紹。PrepareQueue的SlotKey設計中需要注意的一點(diǎn),由于消息從StoreQueue移動(dòng)到PrepareQueue是通過(guò)Lua腳本操作的,因此需要保證Lua腳本操作的Slot在同一個(gè)Redis節點(diǎn)上,如何保證PrepareQueue的SlotKey和對應的StoreQueue的SlotKey被hash到同一個(gè)Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分參與計算hash,這一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。
DeadQueue 的設計
消息重試消費16次后,消息將進(jìn)入DeadQueue。DeadQueue的SlotKey設計為prepare{#{topic}#{index}},這里同樣采用hash tag功能保證DeadQueue的SlotKey與對應StoreQueue的SlotKey存儲在同一Redis節點(diǎn)。
生產(chǎn)者
生產(chǎn)者的任務(wù)就是將消息添加到Redis的Sorted Set中。首先,需要計算出消息添加到Redis的SlotKey,如果發(fā)送方指定了消息的slotBasis(否則采用content代替),則計算slotBasis的CRC32值,CRC32值對槽數量進(jìn)行取模得到槽序號,SlotKey設計為#{topic}_#{index},其中#{}表示占位符。然后,不同類(lèi)型的消息有不同的添加方式,因此分布講述三種類(lèi)型消息的添加過(guò)程。
區間重復合并消息
發(fā)送該消息時(shí)需要設置timeRange,timeRange必須大于0,單位為毫秒,表示消息將延遲timeRange毫秒后被消費,期間到來(lái)的重復消息將被合并,合并后的消息依然維持原來(lái)的消費時(shí)間。因此在存儲該類(lèi)型消息的時(shí)候,采用(當前時(shí)間戳+timeRange)作為分數,添加消息采用Lua腳本執行,保證操作的原子性,Lua腳本首先采用zscore命令檢查消息是否已經(jīng)存在,如果已經(jīng)存在則直接返回,如果不存在則執行zadd命令添加。
優(yōu)先級消息
發(fā)送該消息時(shí)需要設置priority,priority必須大于16,表示消息的優(yōu)先級,數值越大表示優(yōu)先級越高。因此在存儲該類(lèi)型消息的時(shí)候,采用priority作為分數,采用zadd命令直接添加。
任意定時(shí)消息
發(fā)送該類(lèi)型消息時(shí)需要設置fixedTime,fixedTime必須大于當前時(shí)間,表示消費時(shí)間戳,當前時(shí)間大于該消費時(shí)間戳的時(shí)候,消息才會(huì )被消費。因此在存儲該類(lèi)型消息的時(shí)候,采用fixedTime作為分數,采用命令zadd直接添加。
消費者
二階段消費方式
三種消費模式
一般消息隊列存在三種消費模式,分別是:最多消費一次、至少消費一次、只消費一次。最多消費一次模式消息可能丟失,一般不怎么使用。至少消費一次模式消息不會(huì )丟失,但是可能存在重復消費,比較常用。只消費一次模式消息被精確只消費一次,實(shí)現較困難,一般需要業(yè)務(wù)記錄冪等ID來(lái)實(shí)現。RMQ實(shí)現了至少消費一次的模式,那么如何保證消息至少被消費一次呢?
至少消費一次模式實(shí)現的難點(diǎn)
從最簡(jiǎn)單的消費模式——最多消費一次說(shuō)起,消費者端只需要從消息隊列服務(wù)中取出消息就行,即執行Redis的zpopmax命令,不倫消費者是否接收到該消息并成功消費,消息隊列服務(wù)都認為消息消費成功。最多一次消費模式導致消息丟失的因素可能有:網(wǎng)絡(luò )丟包導致消費者沒(méi)有接收到消息,消費者接收到消息但在消費的時(shí)候宕機了,消費者接收到消息但消費失敗。針對消費失敗導致消息丟失的情況比較好解決,只需要把消費失敗的消息重新放入消息隊列服務(wù)就行,但是網(wǎng)絡(luò )丟包和消費系統異常導致的消息丟失問(wèn)題不好解決。可能有人會(huì )想到,我們不把元素從有序集合中pop出來(lái),我們先查詢(xún)優(yōu)先級最高的元素,然后消費,再刪除消費成功的元素,但是這樣消息服務(wù)隊列就變成了同步阻塞隊列,性能會(huì )很差。
至少消費一次模式的實(shí)現
至少消費一次的問(wèn)題比較類(lèi)似銀行轉賬問(wèn)題,A向B賬戶(hù)轉賬100元,如何保障A賬戶(hù)扣減100同時(shí)B賬戶(hù)增加100,因此我們可以想到二階段提交的思想。第一個(gè)準備階段,A、B分別進(jìn)行資源凍結并持久化undo和redo日志,A、B分別告訴協(xié)調者已經(jīng)準備好;第二個(gè)提交階段,協(xié)調者告訴A、B進(jìn)行提交,A、B分別提交事務(wù)。RMQ基于二階段提交的思想來(lái)實(shí)現至少消費一次的模式。RMQ存儲設計中PrepareQueue的作用就是用來(lái)凍結資源并記錄事務(wù)日志,消費者端即是參與者也是協(xié)調者。第一個(gè)準備階段,消費者端通過(guò)執行Lua腳本從StoreQueue中Pop消息并存儲到PrepareQueue,同時(shí)消息傳輸到消費者端,消費者端消費該消息;第二個(gè)提交階段,消費者端根據消費結果是否成功協(xié)調消息隊列服務(wù)是提交還是回滾,如果消費成功則提交事務(wù),該消息從PrepareQueue中刪除,如果消費失敗則回滾事務(wù),消費者端將該消息從PrepareQueue移動(dòng)到StoreQueue,如果因為各種異常導致PrepareQueue中消息滯留超時(shí),超時(shí)后將自動(dòng)執行回滾操作。二階段消費的流程圖如下所示。

我們來(lái)分析下采用二階段消費方案可能存在的異常情況,從以下分析來(lái)看二階段消費方案可以保障消息至少被消費一次。
- 網(wǎng)絡(luò )丟包導致消費者沒(méi)有接收到消息,這時(shí)消息已經(jīng)記錄到PrepareQueue,如果到了超時(shí)時(shí)間,消息被回滾放回StoreQueue,等待下次被消費,消息不丟失。
- 消費者接收到了消息,但是消費者還沒(méi)來(lái)得及消費完成系統就宕機了,消息消費超時(shí)到了后,消息會(huì )被重新放入StoreQueue,等待下次被消費,消息不丟失。
- 消費者接收到了消息并消費成功,消費者端在協(xié)調事務(wù)提交的時(shí)候宕機了,消息消費超時(shí)到了后,消息會(huì )被重新放入StoreQueue,等待下次被消費,消息被重復消費。
- 消費者接收到了消息但消費失敗,消費者端在協(xié)調事務(wù)提交的時(shí)候宕機了,消息消費超時(shí)到了后,消息會(huì )被重新放入StoreQueue,等待下次被消費,消息不丟失。
- 消費者接收到了消息并消費成功,但是由于fullgc等原因使消費時(shí)間太長(cháng),PrepareQueue中的消息由于超時(shí)已經(jīng)回滾到StoreQueue,等待下次被消費,消息被重復消費。
重試次數控制的實(shí)現
采用二階段消費方式,需要將消息在StoreQueue和PrepareQueue之間移動(dòng),如何實(shí)現重試次數控制呢,其關(guān)鍵在StoreQueue和PrepareQueue的分數設計。PrepareQueue的分數需要與時(shí)間相關(guān),正常情況下,消費者不管消費失敗還是消費成功,都會(huì )從PrepareQueue刪除消息,當消費者系統發(fā)生異常或者宕機的時(shí)候,消息就無(wú)法從PrepareQueue中刪除,我們也不知道消費者是否消費成功,為保障消息至少被消費一次,我們需要做到超時(shí)回滾,因此分數需要與消費時(shí)間相關(guān)。當PrepareQueue中的消息發(fā)生超時(shí)的時(shí)候,將消息從PrepareQueue移動(dòng)到StoreQueue。因此PrepareQueue的分數設計為:秒級時(shí)間戳*1000+重試次數。不同類(lèi)型的消息首次存儲到StoreQueue中的分數表示的含義不盡相同,區間重復合并消息和任意定時(shí)消息存儲時(shí)的分數表示消費時(shí)間戳,優(yōu)先級消息存儲時(shí)的分數表示優(yōu)先級。如果消息消費失敗,消息從PrepareQueue回滾到StoreQueue,所有類(lèi)型的消息存儲時(shí)的分數都表示剩余重試次數,剩余重試次數從16次不斷降低最后為0,消息進(jìn)入死信隊列。消息在StoreQueue和PrepareQueue之間移動(dòng)流程如下:

Pop 消息
不同類(lèi)型的消息在消費的時(shí)候Pop消息的方式不一樣,因此接下來(lái)分別講述三種類(lèi)型消息的Pop方式。
區間重復合并消息
該消息存儲的分數設計為消費時(shí)間戳,當前時(shí)間大于消息的消費時(shí)間戳時(shí),該消息應該被消費。因此采用Redis命令ZRANGEBYSCORE彈出分數小于當前時(shí)間戳的一條消息。
優(yōu)先級消息
該消息存儲的分數設計為優(yōu)先級,優(yōu)先級越高分數越大,因此采用Redis命令ZPOPMAX彈出分數最大的一條消息。
任意定時(shí)消息該消息存儲的分數設計為消費時(shí)間戳,當前時(shí)間大于消息的消費時(shí)間戳時(shí),該消息應該被消費。因此采用Redis命令ZRANGEBYSCORE彈出分數小于當前時(shí)間戳的一條消息。
相關(guān)應用
主圖價(jià)格表達項目

在主圖價(jià)格表達中需要實(shí)現一個(gè)功能,商品價(jià)格發(fā)生變化時(shí)將商品價(jià)格打印在商品主圖上面,那么需要在價(jià)格發(fā)生變動(dòng)的時(shí)候觸發(fā)合成一張帶價(jià)格的圖片,每一次觸發(fā)合圖時(shí)計算價(jià)格都是獲取當前最新的價(jià)格。上游價(jià)格變化的因素很多,變化很頻繁,下游合圖消耗GPU資源較大,處理容量較低。因此需要盡可能合并觸發(fā)合圖消息,減輕下游處理壓力,于是使用了RMQ作為消息隊列來(lái)進(jìn)行削峰填谷、消息合并。不僅如此,還可以根據商家等級劃分觸發(fā)合圖消息的等級,使KA商家能夠優(yōu)先得到處理,縮短價(jià)格變化的延遲。
在線(xiàn)上實(shí)際環(huán)境中,集群共130臺機器,RMQ消息隊列的發(fā)送消息能力和消費消息能力均可以達到5w tps,而且這并不是峰值,理論上可以達到10w tps。
在線(xiàn)數據圈選引擎
在線(xiàn)數據圈選引擎需要處理各種來(lái)源的大量動(dòng)態(tài)數據,需要將一段時(shí)間區間內的消息合并處理,減少處理壓力,并且在對同一元數據進(jìn)行并發(fā)處理需要加分布式鎖,鎖沖突導致消費效率下降。RMQ的區間重復合并消息和并發(fā)消費控制能力可以幫助解決這些問(wèn)題。目前,在線(xiàn)數據圈選引擎已經(jīng)采用了RMQ消息隊列作為核心組件,RMQ消息隊列發(fā)揮了很大的作用。
總結
本文提出了一種可實(shí)現的基于Redis的消息隊列,充分利用Sorted Set結構設計了消息合并、優(yōu)先級、定時(shí)等特性,與傳統消息隊列形成互補,彌補傳統消息隊列這方面特性的缺失。為了實(shí)現高可用,本文在二階段提交的思想上進(jìn)行改進(jìn)設計了二階段消費方式,保障消息至少被消費一次。未來(lái)將基于Redis的特性打造更多獨特的功能,與傳統消息中間件形成互補。在消費控制方面會(huì )增加流量自動(dòng)調控能力,根據消息類(lèi)型調控消費速度,減少因為某種類(lèi)型消息消費瓶頸導致整體消費性能下降。