在Malwarebytes 我們經歷了顯著的增長,自從我一年前加入了硅谷的公司,一個主要的職責成了設計架構和開發一些系統來支持一個快速增長的信息安全公司和所有需要的設施來支持一個每天百萬用戶使用的產品。我在反病毒和反惡意軟件行業的不同公司工作了12年,從而我知道由于我們每天處理大量的數據,這些系統是多么復雜。
成都創新互聯公司網站建設公司是一家服務多年做網站建設策劃設計制作的公司,為廣大用戶提供了成都網站設計、網站建設、外貿網站建設,成都網站設計,廣告投放平臺,成都做網站選成都創新互聯公司,貼合企業需求,高性價比,滿足客戶不同層次的需求一站式服務歡迎致電。
有趣的是,在過去的大約9年間,我參與的所有的web后端的開發通常是通過Ruby on Rails技術實現的。不要錯怪我。我喜歡Ruby on Rails,并且我相信它是個令人驚訝的環境。但是一段時間后,你會開始以ruby的方式開始思考和設計系統,你會忘記,如果你可以利用多線程、并行、快速執行和小內存開銷,軟件架構本來應該是多么高效和簡單。很多年期間,我是一個c/c++、Delphi和c#開發者,我剛開始意識到使用正確的工具可以把復雜的事情變得簡單些。
作為首席架構師,我不會很關心在互聯網上的語言和框架戰爭。我相信效率、生產力。代碼可維護性主要依賴于你如何把解決方案設計得很簡單。
問題
當工作在我們的匿名遙測和分析系統中,我們的目標是可以處理來自于百萬級別的終端的大量的POST請求。web處理服務可以接收包含了很多payload的集合的JSON數據,這些數據需要寫入Amazon S3中。接下來,map-reduce系統可以操作這些數據。
按照習慣,我們會調研服務層級架構,涉及的軟件如下:
Sidekiq
Resque
DelayedJob
Elasticbeanstalk Worker Tier
RabbitMQ
and so on…
搭建了2個不同的集群,一個提供web前端,另外一個提供后端處理,這樣我們可以橫向擴展后端服務的數量。
但是,從剛開始,在 討論階段我們的團隊就知道我們應該使用Go,因為我們看到這會潛在性地成為一個非常龐大( large traffic)的系統。我已經使用了Go語言大約2年時間,我們開發了幾個系統,但是很少會達到這樣的負載(amount of load)。
我們開始創建一些結構,定義從POST調用得到的web請求負載,還有一個上傳到S3 budket的函數。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
本地Go routines方法
剛開始,我們采用了一個非常本地化的POST處理實現,僅僅嘗試把發到簡單go routine的job并行化:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // ----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
對于中小負載,這會對大多數的人適用,但是大規模下,這個方案會很快被證明不是很好用。我們期望的請求數,不在我們剛開始計劃的數量級,當我們把第一個版本部署到生產環境上。我們完全低估了流量。
上面的方案在很多地方很不好。沒有辦法控制我們產生的go routine的數量。由于我們收到了每分鐘1百萬的POST請求,這段代碼很快就崩潰了。
再次嘗試
我們需要找一個不同的方式。自開始我們就討論過, 我們需要保持請求處理程序的生命周期很短,并且進程在后臺產生。當然,這是你在Ruby on Rails的世界里必須要做的事情,否則你會阻塞在所有可用的工作 web處理器上,不管你是使用puma、unicore還是passenger(我們不要討論JRuby這個話題)。然后我們需要利用常用的處理方案來做這些,比如Resque、 Sidekiq、 SQS等。這個列表會繼續保留,因為有很多的方案可以實現這些。
所以,第二次迭代,我們創建了一個緩沖channel,我們可以把job排隊,然后把它們上傳到S3。因為我們可以控制我們隊列中的item最大值,我們有大量的內存來排列job,我們認為只要把job在channel里面緩沖就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue - payload
}
...
}
接下來,我們再從隊列中取job,然后處理它們。我們使用類似于下面的代碼:
func StartProcessor() {
for {
select {
case job := -Queue:
job.payload.UploadToS3() // -- STILL NOT GOOD
}
}
}
說實話,我不知道我們在想什么。這肯定是一個滿是Red-Bulls的夜晚。這個方法不會帶來什么改善,我們用了一個 有缺陷的緩沖隊列并發,僅僅是把問題推遲了。我們的同步處理器同時僅僅會上傳一個數據到S3,因為來到的請求遠遠大于單核處理器上傳到S3的能力,我們的帶緩沖channel很快達到了它的極限,然后阻塞了請求處理邏輯的queue更多item的能力。
我們僅僅避免了問題,同時開始了我們的系統掛掉的倒計時。當部署了這個有缺陷的版本后,我們的延時保持在每分鐘以常量增長。
最好的解決方案
我們討論過在使用用Go channel時利用一種常用的模式,來創建一個二級channel系統,一個來queue job,另外一個來控制使用多少個worker來并發操作JobQueue。
想法是,以一個恒定速率并行上傳到S3,既不會導致機器崩潰也不好產生S3的連接錯誤。這樣我們選擇了創建一個Job/Worker模式。對于那些熟悉Java、C#等語言的開發者,可以把這種模式想象成利用channel以golang的方式來實現了一個worker線程池,作為一種替代。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool - w.JobChannel
select {
case job := -w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case -w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit - true
}()
}
我們已經修改了我們的web請求handler,用payload創建一個Job實例,然后發到JobQueue channel,以便于worker來獲取。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue - work
}
w.WriteHeader(http.StatusOK)
}
在web server初始化時,我們創建一個Dispatcher,然后調用Run()函數創建一個worker池子,然后開始監聽JobQueue中的job。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是dispatcher的實現代碼:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := -JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := -d.WorkerPool
// dispatch the job to the worker job channel
jobChannel - job
}(job)
}
}
}
注意到,我們提供了初始化并加入到池子的worker的最大數量。因為這個工程我們利用了Amazon Elasticbeanstalk帶有的docker化的Go環境,所以我們常常會遵守12-factor方法論來配置我們的生成環境中的系統,我們從環境變了讀取這些值。這種方式,我們控制worker的數量和JobQueue的大小,所以我們可以很快的改變這些值,而不需要重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
直接結果
我們部署了之后,立馬看到了延時降到微乎其微的數值,并未我們處理請求的能力提升很大。
Elastic Load Balancers完全啟動后,我們看到ElasticBeanstalk 應用服務于每分鐘1百萬請求。通常情況下在上午時間有幾個小時,流量峰值超過每分鐘一百萬次。
我們一旦部署了新的代碼,服務器的數量從100臺大幅 下降到大約20臺。
我們合理配置了我們的集群和自動均衡配置之后,我們可以把服務器的數量降至4x EC2 c4.Large實例,并且Elastic Auto-Scaling設置為如果CPU達到5分鐘的90%利用率,我們就會產生新的實例。
總結
在我的書中,簡單總是獲勝。我們可以使用多隊列、后臺worker、復雜的部署設計一個復雜的系統,但是我們決定利用Elasticbeanstalk 的auto-scaling的能力和Go語言開箱即用的特性簡化并發。
我們僅僅用了4臺機器,這并不是什么新鮮事了。可能它們還不如我的MacBook能力強大,但是卻處理了每分鐘1百萬的寫入到S3的請求。
處理問題有正確的工具。當你的 Ruby on Rails 系統需要更強大的web handler時,可以考慮下ruby生態系統之外的技術,或許可以得到更簡單但更強大的替代方案。
GO是編譯性語言,所以函數的順序是無關緊要的,為了方便閱讀,建議入口函數 main 寫在最前面,其余函數按照功能需要進行排列
GO的函數 不支持嵌套,重載和默認參數
GO的函數 支持 無需聲明變量,可變長度,多返回值,匿名,閉包等
GO的函數用 func 來聲明,且左大括號 { 不能另起一行
一個簡單的示例:
輸出為:
參數:可以傳0個或多個值來供自己用
返回:通過用 return 來進行返回
輸出為:
上面就是一個典型的多參數傳遞與多返回值
對例子的說明:
按值傳遞:是對某個變量進行復制,不能更改原變量的值
引用傳遞:相當于按指針傳遞,可以同時改變原來的值,并且消耗的內存會更少,只有4或8個字節的消耗
在上例中,返回值 (d int, e int, f int) { 是進行了命名,如果不想命名可以寫成 (int,int,int){ ,返回的結果都是一樣的,但要注意:
當返回了多個值,我們某些變量不想要,或實際用不到,我們可以使用 _ 來補位,例如上例的返回我們可以寫成 d,_,f := test(a,b,c) ,我們不想要中間的返回值,可以以這種形式來舍棄掉
在參數后面以 變量 ... type 這種形式的,我們就要以判斷出這是一個可變長度的參數
輸出為:
在上例中, strs ...string 中, strs 的實際值是b,c,d,e,這就是一個最簡單的傳遞可變長度的參數的例子,更多一些演變的形式,都非常類似
在GO中 defer 關鍵字非常重要,相當于面相對像中的析構函數,也就是在某個函數執行完成后,GO會自動這個;
如果在多層循環中函數里,都定義了 defer ,那么它的執行順序是先進后出;
當某個函數出現嚴重錯誤時, defer 也會被調用
輸出為
這是一個最簡單的測試了,當然還有更復雜的調用,比如調試程序時,判斷是哪個函數出了問題,完全可以根據 defer 打印出來的內容來進行判斷,非常快速,這種留給你們去實現
一個函數在函數體內自己調用自己我們稱之為遞歸函數,在做遞歸調用時,經常會將內存給占滿,這是非常要注意的,常用的比如,快速排序就是用的遞歸調用
本篇重點介紹了GO函數(func)的聲明與使用,下一篇將介紹GO的結構 struct
按值傳遞函數參數,是拷貝參數的實際值到函數的形式參數的方法調用。在這種情況下,參數在函數內變化對參數不會有影響。
默認情況下,Go編程語言使用調用通過值的方法來傳遞參數。在一般情況下,這意味著,在函數內碼不能改變用來調用所述函數的參數。考慮函數swap()的定義如下。
代碼如下:
/* function definition to swap the values */
func swap(int x, int y) int {
var temp int
temp = x /* save the value of x */
x = y /* put y into x */
y = temp /* put temp into y */
return temp;
}
現在,讓我們通過使實際值作為在以下示例調用函數swap():
代碼如下:
package main
import "fmt"
func main() {
/* local variable definition */
var a int = 100
var b int = 200
fmt.Printf("Before swap, value of a : %d\n", a )
fmt.Printf("Before swap, value of b : %d\n", b )
/* calling a function to swap the values */
swap(a, b)
fmt.Printf("After swap, value of a : %d\n", a )
fmt.Printf("After swap, value of b : %d\n", b )
}
func swap(x, y int) int {
var temp int
temp = x /* save the value of x */
x = y /* put y into x */
y = temp /* put temp into y */
return temp;
}
讓我們把上面的代碼放在一個C文件,編譯并執行它,它會產生以下結果:
Before swap, value of a :100
Before swap, value of b :200
After swap, value of a :100
After swap, value of b :200
這表明,參數值沒有被改變,雖然它們已經在函數內部改變。
通過傳遞函數參數,即是拷貝參數的地址到形式參數的參考方法調用。在函數內部,地址是訪問調用中使用的實際參數。這意味著,對參數的更改會影響傳遞的參數。
要通過引用傳遞的值,參數的指針被傳遞給函數就像任何其他的值。所以,相應的,需要聲明函數的參數為指針類型如下面的函數swap(),它的交換兩個整型變量的值指向它的參數。
代碼如下:
/* function definition to swap the values */
func swap(x *int, y *int) {
var temp int
temp = *x /* save the value at address x */
*x = *y /* put y into x */
*y = temp /* put temp into y */
}
現在,讓我們調用函數swap()通過引用作為在下面的示例中傳遞數值:
代碼如下:
package main
import "fmt"
func main() {
/* local variable definition */
var a int = 100
var b int= 200
fmt.Printf("Before swap, value of a : %d\n", a )
fmt.Printf("Before swap, value of b : %d\n", b )
/* calling a function to swap the values.
* a indicates pointer to a ie. address of variable a and
* b indicates pointer to b ie. address of variable b.
*/
swap(a, b)
fmt.Printf("After swap, value of a : %d\n", a )
fmt.Printf("After swap, value of b : %d\n", b )
}
func swap(x *int, y *int) {
var temp int
temp = *x /* save the value at address x */
*x = *y /* put y into x */
*y = temp /* put temp into y */
}
讓我們把上面的代碼放在一個C文件,編譯并執行它,它會產生以下結果:
Before swap, value of a :100
Before swap, value of b :200
After swap, value of a :200
After swap, value of b :100
這表明變化的功能以及不同于通過值調用的外部體現的改變不能反映函數之外。
本文介紹一些Go語言的基礎語法。
先來看一個簡單的go語言代碼:
go語言的注釋方法:
代碼執行結果:
下面來進一步介紹go的基礎語法。
go語言中格式化輸出可以使用 fmt 和 log 這兩個標準庫,
常用方法:
示例代碼:
執行結果:
更多格式化方法可以訪問中的fmt包。
log包實現了簡單的日志服務,也提供了一些格式化輸出的方法。
執行結果:
下面來介紹一下go的數據類型
下表列出了go語言的數據類型:
int、float、bool、string、數組和struct屬于值類型,這些類型的變量直接指向存在內存中的值;slice、map、chan、pointer等是引用類型,存儲的是一個地址,這個地址存儲最終的值。
常量是在程序編譯時就確定下來的值,程序運行時無法改變。
執行結果:
執行結果:
Go 語言的運算符主要包括算術運算符、關系運算符、邏輯運算符、位運算符、賦值運算符以及指針相關運算符。
算術運算符:
關系運算符:
邏輯運算符:
位運算符:
賦值運算符:
指針相關運算符:
下面介紹一下go語言中的if語句和switch語句。另外還有一種控制語句叫select語句,通常與通道聯用,這里不做介紹。
if語法格式如下:
if ... else :
else if:
示例代碼:
語法格式:
另外,添加 fallthrough 會強制執行后面的 case 語句,不管下一條case語句是否為true。
示例代碼:
執行結果:
下面介紹幾種循環語句:
執行結果:
執行結果:
也可以通過標記退出循環:
--THE END--
分享名稱:go語言100個常用方法,go語言簡單
網站URL:http://vcdvsql.cn/article32/dsigepc.html
成都網站建設公司_創新互聯,為您提供營銷型網站建設、全網營銷推廣、小程序開發、網站內鏈、品牌網站設計、網站建設
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯