消息隊列,既然是隊列就能保證消息在進入隊列,以及出隊列的時候保證消息的有序性,顯然這是在消息的生產端(Producer),但是往往在生產環境中有多個消息的消費端(Consumer),盡管消費端在拉取消息時是有序的,但各個消息由于網絡等方面原因無法保證在各個消費端中處理時有序。
創新互聯公司是一家專業提供伽師企業網站建設,專注與網站建設、成都網站制作、HTML5、小程序制作等業務。10年已為伽師眾多企業、政府機構等服務。創新互聯專業網站制作公司優惠進行中。先后兩次修改了商品信息,消息A和消息B先后同步寫入MySQL,接著異步寫入消息隊列中發送消息,此時消息隊列生產端(Producer)按時序先后發出了A和B兩條消息(消息A先發出,消息B后發出)。按業務邏輯,商品信息的最終狀態需要以消息A和消息B綜合為準。
看似一個比較常見的同步寫數據庫,異步發送消息的場景,但實際上需要保證消息的有序消費。
可見,你無法保證消息中包含什么信息,此時必須保證消息的有序消費。
下面通過偽代碼的方式描述:
生產端偽代碼
insertWare(ware); #插入數據到數據庫,通常在插入數據庫時我們只會update修改的字段,而不會全量插入
ware = selectWareById(ware.getId); #獲取商品的全量信息(此時是最新的),用于將它放入到消息隊列中
syncMq(ware); #異步發送mq消息A
消費端偽代碼
ware = fetchWare(); #獲取消息
if (isLasted(ware)) #通過商品的修改時間戳判斷是否是最新的修改
? TODO #執行下一步業務邏輯
else
? return #丟棄該消息
重點在于消費端如何判斷該消息是否是最新的修改也就是isLasted
方法。
isLasted方法
Long modified = getCacheById(ware.getId); #獲取緩存中該條商品的最新修改時間
If (ware.getModified > modified) { #如果消息中商品修改時間大于緩存中的時間,說明是最新操作
? setCacheById(ware); #將該條消息的商品修改時間戳寫入到緩存中
? return true;
} else #如果消息中的商品修改時間小于緩存中的時間,說明該條消息屬于“歷史操作”,不對其更新? return false;
以上就是通過偽代碼的方式,描述如何通過業務手段保證消息有序消費,重點在于全量發送信息和緩存時間戳。在其中還有一些技術實現細節。
例如:消費端消費消息B,執行到獲取時間戳緩存之后,并在重新設置新的緩存之前,此時另一個消費端恰好也正在消費B它也正執行到獲取時間戳緩存,由于消息A此時并沒有更新緩存,消息A拿到的緩存仍然是舊的緩存,這時就會存在兩個消費端都認為自己所消費的消息時最新的,造成該丟棄的消息沒丟。
顯然,這是分布式線程安全問題,分布式鎖通常使用Redis或者ZooKeeper,加鎖后的執行時序如下圖所示。
這是從業務角度保證消息在消費端有序消費。通過在消息發送端全量發送消息以及在消息消費端緩存時間戳就可以保證消息的有序消費。
在上述場景中是先同步寫入MySQL,再獲取商品全量數據,接著再異步發送消息。這一系列的步驟可以通過接MySQL的binlog實現,在同步寫入MySQL后,MySQL發送binlog變更,通過阿里巴巴Canal中間件接收MySQL的binlog變更再發送消息到消息隊列。
創新互聯www.cdcxhl.cn,專業提供香港、美國云服務器,動態BGP最優骨干路由自動選擇,持續穩定高效的網絡助力業務部署。公司持有工信部辦法的idc、isp許可證, 機房獨有T級流量清洗系統配攻擊溯源,準確進行流量調度,確保服務器高可用性。佳節活動現已開啟,新人活動云服務器買多久送多久。
網站名稱:消費端如何保證消息隊列MQ的有序消費-創新互聯
分享地址:http://vcdvsql.cn/article6/ccesig.html
成都網站建設公司_創新互聯,為您提供定制開發、電子商務、微信小程序、動態網站、軟件開發、外貿網站建設
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯