這篇文章主要介紹“如何使用redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用Redis的streams”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
目前創新互聯建站已為上千多家的企業提供了網站建設、域名、雅安服務器托管、綿陽服務器托管、企業網站設計、洪洞網站維護等服務,公司將堅持客戶導向、應用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協力一起成長,共同發展。
在 Redis 4.0 中引入模塊之后,用戶開始考慮他們自己怎么去修復這些問題。其中一個用戶 Timothy Downs 通過 IRC 和我說道:
\<forkfork> 我計劃給這個模塊增加一個事務日志式的數據類型 —— 這意味著大量的訂閱者可以在不導致 redis 內存激增的情況下做一些像發布/訂閱那樣的事情\<forkfork> 訂閱者持有他們在消息隊列中的位置,而不是讓 Redis 必須維護每個消費者的位置和為每個訂閱者復制消息
他的思路啟發了我。我想了幾天,并且意識到這可能是我們馬上同時解決上面所有問題的契機。我需要去重新構思 “日志” 的概念是什么。日志是個基本的編程元素,每個人都使用過它,因為它只是簡單地以追加模式打開一個文件,并以一定的格式寫入數據。然而 Redis 數據結構必須是抽象的。它們在內存中,并且我們使用內存并不是因為我們懶,而是因為使用一些指針,我們可以概念化數據結構并把它們抽象,以使它們擺脫明確的限制。例如,一般來說日志有幾個問題:偏移不是邏輯化的,而是真實的字節偏移,如果你想要與條目插入的時間相關的邏輯偏移應該怎么辦?我們有范圍查詢可用。同樣,日志通常很難進行垃圾回收:在一個只能進行追加操作的數據結構中怎么去刪除舊的元素?好吧,在我們理想的日志中,我們只需要說,我想要數字***的那個條目,而舊的元素一個也不要,等等。
當我從 Timothy 的想法中受到啟發,去嘗試著寫一個規范的時候,我使用了 Redis 集群中的 radix 樹去實現,優化了它內部的某些部分。這為實現一個有效利用空間的日志提供了基礎,而且仍然有可能在對數時間內訪問范圍。同時,我開始去讀關于 Kafka 的流相關的內容以獲得另外的靈感,它也非常適合我的設計,***借鑒了 Kafka消費組的概念,并且再次針對 Redis 進行優化,以適用于 Redis 在內存中使用的情況。然而,該規范僅停留在紙面上,在一段時間后我幾乎把它從頭到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到 Redis 升級中。我希望 Redis 流能成為對于時間序列有用的特性,而不僅是一個常見的事件和消息類的應用程序。
從 Redis 大會回來后,整個夏天我都在實現一個叫 listpack 的庫。這個庫是 ziplist.c
的繼任者,那是一個表示在單個分配中的字符串元素列表的數據結構。它是一個非常特殊的序列化格式,其特點在于也能夠以逆序(從右到左)解析:以便在各種用例中替代 ziplists。
結合 radix 樹和 listpacks 的特性,它可以很容易地去構建一個空間高效的日志,并且還是可索引的,這意味著允許通過 ID 和時間進行隨機訪問。自從這些就緒后,我開始去寫一些代碼以實現流數據結構。我還在完成這個實現,不管怎樣,現在在 Github 上的 Redis 的 streams 分支里它已經可以跑起來了。我并沒有聲稱那個 API 是 100% 的最終版本,但是,這有兩個有意思的事實:一,在那時只有消費群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已經實現了。二,一旦各個方面比較穩定了之后,我決定大概用兩個月的時間將所有的流的特性向后移植到 4.0 分支。這意味著 Redis 用戶想要使用流,不用等待 Redis 4.2 發布,它們在生產環境馬上就可用了。這是可能的,因為作為一個新的數據結構,幾乎所有的代碼改變都出現在新的代碼里面。除了阻塞列表操作之外:該代碼被重構了,我們對于流和列表阻塞操作共享了相同的代碼,而極大地簡化了 Redis 內部實現。
在某些方面,你可以認為流是 Redis 列表的一個增強版本。流元素不再是一個單一的字符串,而是一個字段和值組成的對象。范圍查詢更適用而且更快。在流中,每個條目都有一個 ID,它是一個邏輯偏移量。不同的客戶端可以阻塞等待比指定的 ID 更大的元素。Redis 流的一個基本的命令是 XADD
。是的,所有的 Redis 流命令都是以一個 X
為前綴的。
> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0
這個 XADD
命令將追加指定的條目作為一個指定的流 —— “mystream” 的新元素。上面示例中的這個條目有兩個字段:sensor-id
和 temperature
,每個條目在同一個流中可以有不同的字段。使用相同的字段名可以更好地利用內存。有意思的是,字段的排序是可以保證順序的。XADD
僅返回插入的條目的 ID,因為在第三個參數中是星號(*
),表示由命令自動生成 ID。通常這樣做就夠了,但是也可以去強制指定一個 ID,這種情況用于復制這個命令到從服務器和AOF文件。
這個 ID 是由兩部分組成的:一個毫秒時間和一個序列號。1506871964177
是毫秒時間,它只是一個毫秒級的 UNIX 時間戳。圓點(.
)后面的數字 0
是一個序號,它是為了區分相同毫秒數的條目增加上去的。這兩個數字都是 64 位的無符號整數。這意味著,我們可以在流中增加所有想要的條目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服務器的當前本地時間生成的 ID 和流中的***一個條目 ID 兩者間的***的一個。因此,舉例來說,即使是計算機時間回跳,這個 ID 仍然是增加的。在某些情況下,你可以認為流條目的 ID 是完整的 128 位數字。然而,事實上它們與被添加到的實例的本地時間有關,這意味著我們可以在毫秒級的精度的范圍隨意查詢。
正如你想的那樣,快速添加兩個條目后,結果是僅一個序號遞增了。我們可以用一個 MULTI
/EXEC
塊來簡單模擬“快速插入”:
> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1
在上面的示例中,也展示了無需指定任何初始模式的情況下,對不同的條目使用不同的字段。會發生什么呢?就像前面提到的一樣,只有每個塊(它通常包含 50-150 個消息內容)的***個消息被使用。并且,相同字段的連續條目都使用了一個標志進行了壓縮,這個標志表示與“它們與這個塊中的***個條目的字段相同”。因此,使用相同字段的連續消息可以節省許多內存,即使是字段集隨著時間發生緩慢變化的情況下也很節省內存。
為了從流中檢索數據,這里有兩種方法:范圍查詢,它是通過 XRANGE
命令實現的;流播,它是通過 XREAD
命令實現的。XRANGE
命令僅取得包括從開始到停止范圍內的全部條目。因此,舉例來說,如果我知道它的 ID,我可以使用如下的命名取得單個條目:
> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"
不管怎樣,你都可以使用指定的開始符號 -
和停止符號 +
表示最小和***的 ID。為了限制返回條目的數量,也可以使用 COUNT
選項。下面是一個更復雜的 XRANGE
示例:
> XRANGE mystream - + COUNT 21) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"2) 1) 1506872463535.0 2) 1) "foo" 2) "10"
這里我們講的是 ID 的范圍,然后,為了取得在一個給定時間范圍內的特定范圍的元素,你可以使用 XRANGE
,因為 ID 的“序號” 部分可以省略。因此,你可以只指定“毫秒”時間即可,下面的命令的意思是:“從 UNIX 時間 1506872463 開始給我 10 個條目”:
127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0 2) 1) "foo" 2) "10"2) 1) 1506872463535.1 2) 1) "bar" 2) "20"
關于 XRANGE
需要注意的最重要的事情是,假設我們在回復中收到 ID,隨后連續的 ID 只是增加了序號部分,所以可以使用 XRANGE
遍歷整個流,接收每個調用的指定個數的元素。Redis 中的*SCAN
系列命令允許迭代 Redis 數據結構,盡管事實上它們不是為迭代設計的,但這樣可以避免再犯相同的錯誤。
當我們想通過 ID 或時間去訪問流中的一個范圍或者是通過 ID 去獲取單個元素時,使用 XRANGE
是非常***的。然而,在使用流的案例中,當數據到達時,它必須由不同的客戶端來消費時,這就不是一個很好的解決方案,這需要某種形式的匯聚池。(對于 某些 應用程序來說,這可能是個好主意,因為它們僅是偶爾連接查詢的)
XREAD
命令是為讀取設計的,在同一個時間,從多個流中僅指定我們從該流中得到的***條目的 ID。此外,如果沒有數據可用,我們可以要求阻塞,當數據到達時,就解除阻塞。類似于阻塞列表操作產生的效果,但是這里并沒有消費從流中得到的數據,并且多個客戶端可以同時訪問同一份數據。
這里有一個典型的 XREAD
調用示例:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ $
它的意思是:從 mystream
和 otherstream
取得數據。如果沒有數據可用,阻塞客戶端 5000 毫秒。在 STREAMS
選項之后指定我們想要監聽的關鍵字,***的是指定想要監聽的 ID,指定的 ID 為 $
的意思是:假設我現在需要流中的所有元素,因此,只需要從下一個到達的元素開始給我。
如果我從另一個客戶端發送這樣的命令:
> XADD otherstream * message “Hi There”
在 XREAD
側會出現什么情況呢?
1) 1) "otherstream" 2) 1) 1) 1506935385635.0 2) 1) "message" 2) "Hi There"
與收到的數據一起,我們也得到了數據的關鍵字。在下次調用中,我們將使用接收到的***消息的 ID:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0
依次類推。然而需要注意的是使用方式,客戶端有可能在一個非常大的延遲之后再次連接(因為它處理消息需要時間,或者其它什么原因)。在這種情況下,期間會有很多消息堆積,為了確保客戶端不被消息淹沒,以及服務器不會因為給單個客戶端提供大量消息而浪費太多的時間,使用 XREAD
的 COUNT
選項是非常明智的。
目前看起來還不錯……然而,有些時候,流需要刪除一些舊的消息。幸運的是,這可以使用 XADD
命令的 MAXLEN
選項去做:
> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2
它是基本意思是,如果在流中添加新元素后發現消息數量超過了 1000000
個,那么就刪除舊的消息,以便于元素總量重新回到 1000000
以內。它很像是在列表中使用的 RPUSH
+ LTRIM
,但是,這次我們是使用了一個內置機制去完成的。然而,需要注意的是,上面的意思是每次我們增加一個新的消息時,我們還需要另外的工作去從流中刪除舊的消息。這將消耗一些 CPU 資源,所以在計算 MAXLEN
之前,盡可能使用 ~
符號,以表明我們不要求非常 精確 的 1000000 個消息,就是稍微多一些也不是大問題:
> XADD mystream MAXLEN ~ 1000000 * foo bar
這種方式的 XADD 僅當它可以刪除整個節點的時候才會刪除消息。相比普通的 XADD
,這種方式幾乎可以自由地對流進行封頂。
這是***個 Redis 中尚未實現而在開發中的特性。靈感也是來自 Kafka,盡管在這里是以不同的方式實現的。重點是使用了 XREAD
,客戶端也可以增加一個 GROUP <name>
選項。相同組的所有客戶端將自動得到 不同的 消息。當然,同一個流可以被多個組讀取。在這種情況下,所有的組將收到流中到達的消息的相同副本。但是,在每個組內,消息是不會重復的。
當指定組時,能夠指定一個 RETRY <milliseconds>
選項去擴展組:在這種情況下,如果消息沒有通過 XACK
進行確認,它將在指定的毫秒數后進行再次投遞。這將為消息投遞提供更佳的可靠性,這種情況下,客戶端沒有私有的方法將消息標記為已處理。這一部分也正在開發中。
因為用來建模 Redis 流的設計,內存使用率是非常低的。這取決于它們的字段、值的數量和長度,對于簡單的消息,每使用 100MB 內存可以有幾百萬條消息。此外,該格式設想為需要極少的序列化:listpack 塊以 radix 樹節點方式存儲,在磁盤上和內存中都以相同方式表示的,因此它們可以很輕松地存儲和讀取。例如,Redis 可以在 0.3 秒內從 RDB 文件中讀取 500 萬個條目。這使流的復制和持久存儲非常高效。
我還計劃允許從條目中間進行部分刪除。現在僅實現了一部分,策略是在條目在標記中標識條目為已刪除,并且,當已刪除條目占全部條目的比例達到指定值時,這個塊將被回收重寫,如果需要,它將被連到相鄰的另一個塊上,以避免碎片化。
到此,關于“如何使用Redis的streams”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注創新互聯網站,小編會繼續努力為大家帶來更多實用的文章!
當前題目:如何使用Redis的streams
標題URL:http://vcdvsql.cn/article46/jhjjhg.html
成都網站建設公司_創新互聯,為您提供服務器托管、商城網站、動態網站、網站策劃、微信小程序、企業網站制作
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯