bl双性强迫侵犯h_国产在线观看人成激情视频_蜜芽188_被诱拐的少孩全彩啪啪漫画

什么是go-stash組件

本篇內容主要講解“什么是go-stash組件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“什么是go-stash組件”吧!

創新互聯公司:于2013年開始為各行業開拓出企業自己的“網站建設”服務,為上1000家公司企業提供了專業的成都網站制作、做網站、網頁設計和網站推廣服務, 按需求定制網站由設計師親自精心設計,設計的效果完全按照客戶的要求,并適當的提出合理的建議,擁有的視覺效果,策劃師分析客戶的同行競爭對手,根據客戶的實際情況給出合理的網站構架,制作客戶同行業具有領先地位的。

前言

今天來介紹 go-zero 生態的另一個組件 go-stash。這是一個 logstash 的 Go 語言替代版,我們用 go-stash 相比原先的 logstash 節省了2/3的服務器資源。如果你在用 logstash,不妨試試,也可以看看基于 go-zero 實現這樣的工具是多么的容易,這個工具作者僅用了兩天時間。

整體架構

先從它的配置中,我們來看看設計架構。

Clusters:
  - Input:
      Kafka:
        # Kafka 配置 --> 聯動 go-queue
    Filters:
    	# filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es 配置 {host, index}

看配置名:kafka 是數據輸出端,es 是數據輸入端,filter 抽象了數據處理過程。

對,整個 go-stash 就是如 config 配置中顯示的,所見即所得。

什么是go-stash組件

啟動

stash.go 的啟動流程大致分為幾個部分。因為可以配置多個 cluster,那從一個 cluster 分析:

  1. 建立與 es 的連接【傳入 es 配置】

  2. 構建 filter processorses 前置處理器,做數據過濾以及處理,可以設置多個】

  3. 完善對 es 中 索引配置,啟動 handle ,同時將 filter 加入handle【處理輸入輸出】

  4. 連接下游的 kafka,將上面創建的 handle 傳入,完成 kafkaes 之間的數據消費和數據寫入

MessageHandler

在上面架構圖中,中間的 filter 只是從 config 中看到,其實更詳細是 MessageHandler 的一部分,做數據過濾和轉換,下面來說說這塊。

> 以下代碼:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go

type MessageHandler struct {
	writer  *es.Writer
	indexer *es.Index
	filters []filter.FilterFunc
}

這個就對應上面說的,filter 只是其中一部分,在結構上 MessageHandler 是對接下游 es ,但是沒有看到對 kafka 的操作。

別急,從接口設計上 MessageHandler 實現了 go-queueConsumeHandler 接口。

這里,上下游就串聯了:

  1. MessageHandler 接管了 es 的操作,負責數據處理到數據寫入

  2. 對上實現了 kafkaConsume 操作。這樣在消費過程中執行 handler 的操作,從而寫入 es

實際上,Consume() 也是這么處理的:

func (mh *MessageHandler) Consume(_, val string) error {
	var m map[string]interface{}
  // 反序列化從 kafka 中的消息
	if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
		return err
	}
	// es 寫入index配置
	index := mh.indexer.GetIndex(m)
  // filter 鏈式處理【因為沒有泛型,整個處理都是 `map進map出`】
	for _, proc := range mh.filters {
		if m = proc(m); m == nil {
			return nil
		}
	}
	bs, err := jsoniter.Marshal(m)
	if err != nil {
		return err
	}
	// es 寫入
	return mh.writer.Write(index, string(bs))
}

數據流

說完了數據處理,以及上下游的連接點。但是數據要從 kafka -> es ,數據流出這個動作從 kafka 角度看,應該是由開發者主動 pull data from kafka。

那么數據流是怎么動起來?我們回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go

其實 啟動 整個流程中,其實就是一個組合模式:

func main() {
	// 解析命令行參數,啟動優雅退出
	...
  // service 組合模式
	group := service.NewServiceGroup()
	defer group.Stop()

	for _, processor := range c.Clusters {
		// 連接es
    ...
		// filter processors 構建
    ...
    // 準備es的寫入操作 {寫入的index, 寫入器writer}
		handle := handler.NewHandler(writer, indexer)
		handle.AddFilters(filters...)
		handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
    // 按照配置啟動kafka,并將消費操作傳入,同時加入組合器
		for _, k := range toKqConf(processor.Input.Kafka) {
			group.Add(kq.MustNewQueue(k, handle))
		}
	}
	// 啟動這個組合器
	group.Start()
}

整個數據流,就和這個 group 組合器有關了。

group.Start()
	|- group.doStart()
		|- [service.Start() for service in group.services]

那么說明加入 groupservice 都是實現 Start()。也就是說 kafka 端的啟動邏輯在 Start()

func (q *kafkaQueue) Start() {
	q.startConsumers()
	q.startProducers()

	q.producerRoutines.Wait()
	close(q.channel)
	q.consumerRoutines.Wait()
}
  1. 啟動 kafka 消費程序

  2. 啟動 kafka 消費拉取端【可能會被名字迷惑,實際上是從 kafka 拉取消息到 q.channel

  3. 消費程序終止,收尾工作

而我們傳入 kafka 中的 handler,上文說過其實是 Consume,而這個方法就是在 q.startConsumers() 中執行的:

q.startConsumers()
	|- [q.consumeOne(key, value) for msg in q.channel]
		|- q.handler.Consume(key, value)

這樣整個數據流就徹底串起來了:

什么是go-stash組件

到此,相信大家對“什么是go-stash組件”有了更深的了解,不妨來實際操作一番吧!這里是創新互聯網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

新聞標題:什么是go-stash組件
標題鏈接:http://vcdvsql.cn/article8/pegjip.html

成都網站建設公司_創新互聯,為您提供做網站網站設計定制開發、品牌網站制作、面包屑導航、品牌網站建設

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

外貿網站制作