bl双性强迫侵犯h_国产在线观看人成激情视频_蜜芽188_被诱拐的少孩全彩啪啪漫画

如何使用Redis的streams

這篇文章主要介紹“如何使用redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用Redis的streams”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

目前創新互聯建站已為上千多家的企業提供了網站建設、域名、雅安服務器托管綿陽服務器托管、企業網站設計、洪洞網站維護等服務,公司將堅持客戶導向、應用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協力一起成長,共同發展。

起源

在 Redis 4.0 中引入模塊之后,用戶開始考慮他們自己怎么去修復這些問題。其中一個用戶 Timothy Downs 通過 IRC 和我說道:

\<forkfork> 我計劃給這個模塊增加一個事務日志式的數據類型 &mdash;&mdash; 這意味著大量的訂閱者可以在不導致 redis 內存激增的情況下做一些像發布/訂閱那樣的事情\<forkfork> 訂閱者持有他們在消息隊列中的位置,而不是讓 Redis 必須維護每個消費者的位置和為每個訂閱者復制消息

他的思路啟發了我。我想了幾天,并且意識到這可能是我們馬上同時解決上面所有問題的契機。我需要去重新構思 “日志”  的概念是什么。日志是個基本的編程元素,每個人都使用過它,因為它只是簡單地以追加模式打開一個文件,并以一定的格式寫入數據。然而 Redis  數據結構必須是抽象的。它們在內存中,并且我們使用內存并不是因為我們懶,而是因為使用一些指針,我們可以概念化數據結構并把它們抽象,以使它們擺脫明確的限制。例如,一般來說日志有幾個問題:偏移不是邏輯化的,而是真實的字節偏移,如果你想要與條目插入的時間相關的邏輯偏移應該怎么辦?我們有范圍查詢可用。同樣,日志通常很難進行垃圾回收:在一個只能進行追加操作的數據結構中怎么去刪除舊的元素?好吧,在我們理想的日志中,我們只需要說,我想要數字***的那個條目,而舊的元素一個也不要,等等。

當我從 Timothy 的想法中受到啟發,去嘗試著寫一個規范的時候,我使用了 Redis 集群中的 radix 樹去實現,優化了它內部的某些部分。這為實現一個有效利用空間的日志提供了基礎,而且仍然有可能在對數時間logarithmic time內訪問范圍。同時,我開始去讀關于 Kafka 的流相關的內容以獲得另外的靈感,它也非常適合我的設計,***借鑒了 Kafka消費組consumer groups的概念,并且再次針對  Redis 進行優化,以適用于 Redis  在內存中使用的情況。然而,該規范僅停留在紙面上,在一段時間后我幾乎把它從頭到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到  Redis 升級中。我希望 Redis 流能成為對于時間序列有用的特性,而不僅是一個常見的事件和消息類的應用程序。

讓我們寫一些代碼吧

從 Redis 大會回來后,整個夏天我都在實現一個叫 listpack 的庫。這個庫是 ziplist.c 的繼任者,那是一個表示在單個分配中的字符串元素列表的數據結構。它是一個非常特殊的序列化格式,其特點在于也能夠以逆序(從右到左)解析:以便在各種用例中替代 ziplists。

結合 radix 樹和 listpacks 的特性,它可以很容易地去構建一個空間高效的日志,并且還是可索引的,這意味著允許通過 ID  和時間進行隨機訪問。自從這些就緒后,我開始去寫一些代碼以實現流數據結構。我還在完成這個實現,不管怎樣,現在在 Github 上的 Redis 的  streams 分支里它已經可以跑起來了。我并沒有聲稱那個 API 是 100%  的最終版本,但是,這有兩個有意思的事實:一,在那時只有消費群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已經實現了。二,一旦各個方面比較穩定了之后,我決定大概用兩個月的時間將所有的流的特性向后移植backport到  4.0 分支。這意味著 Redis 用戶想要使用流,不用等待 Redis 4.2  發布,它們在生產環境馬上就可用了。這是可能的,因為作為一個新的數據結構,幾乎所有的代碼改變都出現在新的代碼里面。除了阻塞列表操作之外:該代碼被重構了,我們對于流和列表阻塞操作共享了相同的代碼,而極大地簡化了  Redis 內部實現。

教程:歡迎使用 Redis 的 streams

在某些方面,你可以認為流是 Redis 列表的一個增強版本。流元素不再是一個單一的字符串,而是一個字段fieldvalue組成的對象。范圍查詢更適用而且更快。在流中,每個條目都有一個 ID,它是一個邏輯偏移量。不同的客戶端可以阻塞等待blocking-wait比指定的 ID 更大的元素。Redis 流的一個基本的命令是 XADD。是的,所有的 Redis 流命令都是以一個 X 為前綴的。

> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

這個 XADD 命令將追加指定的條目作為一個指定的流 &mdash;&mdash; “mystream” 的新元素。上面示例中的這個條目有兩個字段:sensor-idtemperature,每個條目在同一個流中可以有不同的字段。使用相同的字段名可以更好地利用內存。有意思的是,字段的排序是可以保證順序的。XADD 僅返回插入的條目的 ID,因為在第三個參數中是星號(*),表示由命令自動生成 ID。通常這樣做就夠了,但是也可以去強制指定一個 ID,這種情況用于復制這個命令到服務器slave serverAOFappend-only file文件。

這個 ID 是由兩部分組成的:一個毫秒時間和一個序列號。1506871964177 是毫秒時間,它只是一個毫秒級的 UNIX 時間戳。圓點(.)后面的數字 0  是一個序號,它是為了區分相同毫秒數的條目增加上去的。這兩個數字都是 64  位的無符號整數。這意味著,我們可以在流中增加所有想要的條目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服務器的當前本地時間生成的  ID 和流中的***一個條目 ID 兩者間的***的一個。因此,舉例來說,即使是計算機時間回跳,這個 ID  仍然是增加的。在某些情況下,你可以認為流條目的 ID 是完整的 128  位數字。然而,事實上它們與被添加到的實例的本地時間有關,這意味著我們可以在毫秒級的精度的范圍隨意查詢。

正如你想的那樣,快速添加兩個條目后,結果是僅一個序號遞增了。我們可以用一個 MULTI/EXEC 塊來簡單模擬“快速插入”:

> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1

在上面的示例中,也展示了無需指定任何初始模式schema的情況下,對不同的條目使用不同的字段。會發生什么呢?就像前面提到的一樣,只有每個塊(它通常包含  50-150  個消息內容)的***個消息被使用。并且,相同字段的連續條目都使用了一個標志進行了壓縮,這個標志表示與“它們與這個塊中的***個條目的字段相同”。因此,使用相同字段的連續消息可以節省許多內存,即使是字段集隨著時間發生緩慢變化的情況下也很節省內存。

為了從流中檢索數據,這里有兩種方法:范圍查詢,它是通過 XRANGE 命令實現的;流播streaming,它是通過 XREAD 命令實現的。XRANGE 命令僅取得包括從開始到停止范圍內的全部條目。因此,舉例來說,如果我知道它的 ID,我可以使用如下的命名取得單個條目:

> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"

不管怎樣,你都可以使用指定的開始符號 - 和停止符號 + 表示最小和***的 ID。為了限制返回條目的數量,也可以使用 COUNT 選項。下面是一個更復雜的 XRANGE 示例:

> XRANGE mystream - + COUNT 21) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"2) 1) 1506872463535.0   2) 1) "foo"      2) "10"

這里我們講的是 ID 的范圍,然后,為了取得在一個給定時間范圍內的特定范圍的元素,你可以使用 XRANGE,因為 ID 的“序號” 部分可以省略。因此,你可以只指定“毫秒”時間即可,下面的命令的意思是:“從 UNIX 時間 1506872463 開始給我 10 個條目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0   2) 1) "foo"      2) "10"2) 1) 1506872463535.1   2) 1) "bar"      2) "20"

關于 XRANGE 需要注意的最重要的事情是,假設我們在回復中收到 ID,隨后連續的 ID 只是增加了序號部分,所以可以使用 XRANGE 遍歷整個流,接收每個調用的指定個數的元素。Redis 中的*SCAN 系列命令允許迭代 Redis 數據結構,盡管事實上它們不是為迭代設計的,但這樣可以避免再犯相同的錯誤。

使用 XREAD 處理流播:阻塞新的數據

當我們想通過 ID 或時間去訪問流中的一個范圍或者是通過 ID 去獲取單個元素時,使用 XRANGE 是非常***的。然而,在使用流的案例中,當數據到達時,它必須由不同的客戶端來消費時,這就不是一個很好的解決方案,這需要某種形式的匯聚池pooling。(對于 某些 應用程序來說,這可能是個好主意,因為它們僅是偶爾連接查詢的)

XREAD 命令是為讀取設計的,在同一個時間,從多個流中僅指定我們從該流中得到的***條目的  ID。此外,如果沒有數據可用,我們可以要求阻塞,當數據到達時,就解除阻塞。類似于阻塞列表操作產生的效果,但是這里并沒有消費從流中得到的數據,并且多個客戶端可以同時訪問同一份數據。

這里有一個典型的 XREAD 調用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:從 mystreamotherstream 取得數據。如果沒有數據可用,阻塞客戶端 5000 毫秒。在 STREAMS 選項之后指定我們想要監聽的關鍵字,***的是指定想要監聽的 ID,指定的 ID 為 $ 的意思是:假設我現在需要流中的所有元素,因此,只需要從下一個到達的元素開始給我。

如果我從另一個客戶端發送這樣的命令:

> XADD otherstream * message “Hi There”

XREAD 側會出現什么情況呢?

1) 1) "otherstream"   2) 1) 1) 1506935385635.0         2) 1) "message"            2) "Hi There"

與收到的數據一起,我們也得到了數據的關鍵字。在下次調用中,我們將使用接收到的***消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次類推。然而需要注意的是使用方式,客戶端有可能在一個非常大的延遲之后再次連接(因為它處理消息需要時間,或者其它什么原因)。在這種情況下,期間會有很多消息堆積,為了確保客戶端不被消息淹沒,以及服務器不會因為給單個客戶端提供大量消息而浪費太多的時間,使用 XREADCOUNT 選項是非常明智的。

流封頂

目前看起來還不錯&hellip;&hellip;然而,有些時候,流需要刪除一些舊的消息。幸運的是,這可以使用 XADD 命令的 MAXLEN 選項去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后發現消息數量超過了 1000000 個,那么就刪除舊的消息,以便于元素總量重新回到 1000000 以內。它很像是在列表中使用的 RPUSH + LTRIM,但是,這次我們是使用了一個內置機制去完成的。然而,需要注意的是,上面的意思是每次我們增加一個新的消息時,我們還需要另外的工作去從流中刪除舊的消息。這將消耗一些 CPU 資源,所以在計算 MAXLEN 之前,盡可能使用 ~ 符號,以表明我們不要求非常 精確 的 1000000 個消息,就是稍微多一些也不是大問題:

> XADD mystream MAXLEN ~ 1000000 * foo bar

這種方式的 XADD 僅當它可以刪除整個節點的時候才會刪除消息。相比普通的 XADD,這種方式幾乎可以自由地對流進行封頂。

消費組(開發中)

這是***個 Redis 中尚未實現而在開發中的特性。靈感也是來自 Kafka,盡管在這里是以不同的方式實現的。重點是使用了 XREAD,客戶端也可以增加一個 GROUP <name> 選項。相同組的所有客戶端將自動得到 不同的 消息。當然,同一個流可以被多個組讀取。在這種情況下,所有的組將收到流中到達的消息的相同副本。但是,在每個組內,消息是不會重復的。

當指定組時,能夠指定一個 RETRY <milliseconds> 選項去擴展組:在這種情況下,如果消息沒有通過 XACK 進行確認,它將在指定的毫秒數后進行再次投遞。這將為消息投遞提供更佳的可靠性,這種情況下,客戶端沒有私有的方法將消息標記為已處理。這一部分也正在開發中。

內存使用和節省加載時間

因為用來建模 Redis 流的設計,內存使用率是非常低的。這取決于它們的字段、值的數量和長度,對于簡單的消息,每使用 100MB  內存可以有幾百萬條消息。此外,該格式設想為需要極少的序列化:listpack 塊以 radix  樹節點方式存儲,在磁盤上和內存中都以相同方式表示的,因此它們可以很輕松地存儲和讀取。例如,Redis 可以在 0.3 秒內從 RDB 文件中讀取  500 萬個條目。這使流的復制和持久存儲非常高效。

我還計劃允許從條目中間進行部分刪除。現在僅實現了一部分,策略是在條目在標記中標識條目為已刪除,并且,當已刪除條目占全部條目的比例達到指定值時,這個塊將被回收重寫,如果需要,它將被連到相鄰的另一個塊上,以避免碎片化。

到此,關于“如何使用Redis的streams”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注創新互聯網站,小編會繼續努力為大家帶來更多實用的文章!

當前題目:如何使用Redis的streams
標題URL:http://vcdvsql.cn/article46/jhjjhg.html

成都網站建設公司_創新互聯,為您提供服務器托管商城網站動態網站網站策劃微信小程序企業網站制作

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

微信小程序開發