性能優化實戰:百萬級WebSockets和Go語言

Go語言 Linux 文章 Nginx Python學習 Python學習 2017-09-24

性能優化實戰:百萬級WebSockets和Go語言

大家好!我的名字叫 Sergey Kamardin。我是來自 Mail.Ru 的一名工程師。這篇文章將講述我們是如何用 Go 語言開發一個高負荷的 WebSocket 服務。即使你對 WebSockets 熟悉但對 Go 語言知之甚少,我還是希望這篇文章裡講到的性能優化的思路和技術對你有所啟發。

介紹

作為全文的鋪墊,我想先講一下我們為什麼要開發這個服務。

Mail.Ru 有許多包含狀態的系統。用戶的電子郵件存儲是其中之一。有很多辦法來跟蹤這些狀態的改變。不外乎通過定期的輪詢或者系統通知來得到狀態的變化。這兩種方法都有它們的優缺點。對郵件這個產品來說,讓用戶儘快收到新的郵件是一個考量指標。郵件的輪詢會產生大概每秒5萬個 HTTP 請求,其中60%的請求會返回304狀態(表示郵箱沒有變化)。因此,為了減少服務器的負荷並加速郵件的接收,我們決定重寫一個 publisher-subscriber 服務(這個服務通常也會稱作 bus,message broker 或者 event-channel)。這個服務負責接收狀態更新的通知,然後還處理對這些更新的訂閱。

重寫 publisher-subscriber 服務之前:

性能優化實戰:百萬級WebSockets和Go語言

現在:

性能優化實戰:百萬級WebSockets和Go語言

上面第一個圖為舊的架構。瀏覽器(Browser)會定期輪詢 API 服務來獲得郵件存儲服務(Storage)的更新。

第二張圖展示的是新的架構。瀏覽器(Browser)和通知 API 服務(notificcation API)建立一個 WebSocket 連接。通知 API 服務會發送相關的訂閱到 Bus 服務上。當收到新的電子郵件時,存儲服務(Storage)向Bus(1)發送一個通知,Bus 又將通知發送給相應的訂閱者(2)。API 服務為收到的通知找到相應的連接,然後把通知推送到用戶的瀏覽器(3)。

我們今天就來討論一下這個 API 服務(也可以叫做 WebSocket 服務)。在開始之前,我想提一下這個在線服務處理將近3百萬個連接。

慣用的做法(The idiomatic way )

首先,我們看一下不做任何優化會如何用 Go 來實現這個服務的部分功能。在使用 net/http 實現具體功能前,讓我們先討論下我們將如何發送和接收數據。這些數據是定義在 WebSocket 協議之上的(例如 JSON 對象)。我們在下文中會成他們為 packet。

我們先來實現 Channel 結構。它包含相應的邏輯來通過 WebScoket 連接發送和接收 packet。

◆ Channel 結構

// Packet represents application level data.type Packet struct { ...}// Channel wraps user connection.type Channel struct { conn net.Conn // WebSocket connection. send chan Packet // Outgoing packets queue.}func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c}

這裡我要強調的是讀和寫這兩個 goroutines。每個 goroutine 都需要各自的內存棧。棧的初始大小由操作系統和 Go 的版本決定,通常在 2KB 到 8KB 之間。我們之前提到有3百萬個在線連接,如果每個 goroutine 棧需要 4KB 的話,所有連接就需要 24GB 的內存。這還沒算上給 Channel 結構,發送 packet 用的 ch.send 和其它一些內部字段分配的內存空間。

◆ I/O goroutines

接下來看一下“reader”的實現:

func (c *Channel) reader() { // We make a buffered read to reduce read syscalls. buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) }}

這裡我們使用了 bufio.Reader。每次都會在 buf 大小允許的範圍內儘量讀取多的字節,從而減少 read() 系統調用的次數。在無限循環中,我們期望會接收到新的數據。請記住之前這句話:期望接收到新的數據。我們之後會討論到這一點。

我們把 packet 的解析和處理邏輯都忽略掉了,因為它們和我們要討論的優化不相關。不過 buf 值得我們的關注:它的缺省大小是4KB。這意味著所有連接將消耗掉額外的12 GB內存。“writer”也是類似的情況:

func (c *Channel) writer() { // We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() }}

我們在待發送 packet 的 c.send channel 上循環將 packet 寫到緩存(buffer)裡。細心的讀者肯定已經發現,這又是額外的4KB內存。3百萬個連接會佔用12GB的內存。

◆ HTTP

我們已經有了一個簡單的 Channel 實現。現在我們需要一個 WebSocket 連接。因為還在通常做法(Idiomatic Way)的標題下,那麼就先來看看通常是如何實現的。

注:如果你不知道 WebSocket 是怎麼工作的,那麼這裡值得一提的是客戶端是通過一個叫升級(Upgrade)請求的特殊 HTTP 機制來建立 WebSocket的。在成功處理升級請求以後,服務端和客戶端使用 TCP 連接來交換二進制的 WebSocket 幀(frames)。這裡有關於幀結構的描述。

import ( "net/http" "some/websocket")http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //...})

請注意這裡的 http.ResponseWriter 結構包含 bufio.Reader 和bufio.Writer(各自分別包含4KB的緩存)。它們用於 \*http.Request 初始化和返回結果。

不管是哪個 WebSocket,在成功迴應一個升級請求之後,服務端在調用responseWriter.Hijack() 之後會接收到一個 I/O 緩存和對應的 TCP 連接。

注:有時候我們可以通過 net/http.putBufio{Reader,Writer} 調用把緩存釋放回 net/http 裡的 sync.Pool。

這樣,這3百萬個連接又需要額外的24 GB內存。

所以,為了這個什麼都不幹的程序,我們已經佔用了72 GB的內存!

優化

我們來回顧一下前面介紹的用戶連接的工作流程。在建立 WebSocket 之後,客戶端會發送請求訂閱相關事件(我們這裡忽略類似 ping/pong 的請求)。接下來,在整個連接的生命週期裡,客戶端可能就不會發送任何其它數據了。

連接的生命週期可能會持續幾秒鐘到幾天。

所以在大部分時間裡,Channel.reader() 和 Channel.writer() 都在等待接收和發送數據。與它們一起等待的是各自分配的4 KB的I/O緩存。

現在,我們發現有些地方是可以做進一步優化的,對吧?

◆ Netpoll

你還記得 Channel.reader() 的實現使用了 bufio.Reader.Read() 嗎?bufio.Reader.Read() 又會調用 conn.Read()。這個調用會被阻塞以等待接收連接上的新數據。如果連接上有新的數據,Go 的運行環境(runtime)就會喚醒相應的 goroutine 讓它去讀取下一個 packet。之後,goroutine 會被再次阻塞來等待新的數據。我們來研究下 Go 的運行環境是怎麼知道 goroutine需要被喚醒的。

如果我們看一下 conn.Read() 的實現,就會看到它調用 net.netFD.Read():

// net/fd_unix.gofunc (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //...}

Go 使用了 sockets 的非阻塞模式。EAGAIN 表示 socket 裡沒有數據了但不會阻塞在空的 socket 上,OS 會把控制權返回給用戶進程。

這裡它首先對連接文件描述符進行 read() 系統調用。如果 read() 返回的是EAGAIN 錯誤,運行環境就是調用 pollDesc.waitRead():

// net/fd_poll_runtime.gofunc (pd *pollDesc) waitRead() error { return pd.wait('r')}func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //...}

如果繼續深挖,我們可以看到 netpoll 的實現在 Linux 裡用的是 epoll 而在 BSD 裡用的是 kqueue。我們的這些連接為什麼不採用類似的方式呢?只有在 socket 上有可讀數據時,才分配緩存空間並啟用讀數據的 goroutine。

在 github.com/golang/go 上,有一個關於開放(exporting)netpoll 函數的問題。

◆ 幹掉 goroutines

假設我們用 Go 語言實現了 netpoll。我們現在可以避免創 Channel.reader() 的 goroutine,取而代之的是從訂閱連接裡收到新數據的事件。

ch := NewChannel(conn)// Make conn to be observed by netpoll instance.poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go ch.Receive()})// Receive reads a packet from conn and handles it somehow.func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt)}

Channel.writer() 相對容易一點,因為我們只需在發送 packet 的時候創建 goroutine 並分配緩存。

func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p}

注意,這裡我們沒有處理 write() 系統調用時返回的 EAGAIN。我們依賴 Go 運行環境去處理它。這種情況很少發生。如果需要的話我們還是可以像之前那樣來處理。

從 ch.send 讀取待發送的 packets 之後,ch.writer() 會完成它的操作,最後釋放 goroutine 的棧和用於發送的緩存。

很不錯!通過避免這兩個連續運行的 goroutine 所佔用的 I/O 緩存和棧內存,我們已經節省了48 GB。

◆ 控制資源

大量的連接不僅僅會造成大量的內存消耗。在開發服務端的時候,我們還不停地遇到競爭條件(race conditions)和死鎖(deadlocks)。隨之而來的是所謂的自我分佈式阻斷攻擊(self-DDOS)。在這種情況下,客戶端會悍然地嘗試重新連接服務端而把情況搞得更加糟糕。

舉個例子,如果因為某種原因我們突然無法處理 ping/pong 消息,這些空閒連接就會不斷地被關閉(它們會以為這些連接已經無效因此不會收到數據)。然後客戶端每N秒就會以為失去了連接並嘗試重新建立連接,而不是繼續等待服務端發來的消息。

在這種情況下,比較好的辦法是讓負載過重的服務端停止接受新的連接,這樣負載均衡器(例如nginx)就可以把請求轉到其它的服務端上去。

撇開服務端的負載不說,如果所有的客戶端突然(很可能是因為某個bug)向服務端發送一個 packet,我們之前節省的 48 GB 內存又將會被消耗掉。因為這時我們又會和開始一樣給每個連接創建 goroutine 並分配緩存。

Goroutine 池

可以用一個 goroutine 池來限制同時處理 packets 的數目。下面的代碼是一個簡單的實現:

package gopoolfunc New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), }}func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) }}func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work }}

我們使用 netpoll 的代碼就變成下面這樣:

pool := gopool.New(128)poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { ch.Receive() })})

現在我們不僅要等可讀的數據出現在 socket 上才能讀 packet,還必須等到從池裡獲取到空閒的 goroutine。

同樣的,我們修改下 Send() 的代碼:

pool := gopool.New(128)func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p}

這裡我們沒有調用 go ch.writer(),而是想重複利用池裡 goroutine 來發送數據。 所以,如果一個池有 N 個 goroutines 的話,我們可以保證有 N 個請求被同時處理。而 N + 1 個請求不會分配 N + 1 個緩存。goroutine 池允許我們限制對新連接的 Accept() 和 Upgrade(),這樣就避免了大部分 DDoS 的情況。

◆ 零拷貝升級(Zero-copy upgrade)

之前已經提到,客戶端通過 HTTP 升級(Upgrade)請求切換到 WebSocket協議。下面顯示的是一個升級請求:

GET /ws HTTP/1.1Host: mail.ruConnection: UpgradeSec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==Sec-Websocket-Version: 13Upgrade: websocketHTTP/1.1 101 Switching ProtocolsConnection: UpgradeSec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=Upgrade: websocket

我們接收 HTTP 請求和它的頭部只是為了切換到 WebSocket 協議,而 http.Request 裡保存了所有頭部的數據。從這裡可以得到啟發,如果是為了優化,我們可以放棄使用標準的 net/http 服務並在處理 HTTP 請求的時候避免無用的內存分配和拷貝。

舉個例子,http.Request 包含了一個叫做 Header 的字段。標準 net/http 服務會將請求裡的所有頭部數據全部無條件地拷貝到 Header 字段裡。你可以想象這個字段會保存許多冗餘的數據,例如一個包含很長 cookie 的頭部。

我們如何來優化呢?

WebSocket 實現

不幸的是,在我們優化服務端的時候所有能找到的庫只支持對標準 net/http 服務做升級。而且沒有一個庫允許我們實現上面提到的讀和寫的優化。為了使這些優化成為可能,我們必須有一套底層的 API 來操作 WebSocket。為了重用緩存,我們需要類似下面這樣的協議函數:

func ReadFrame(io.Reader) (Frame, error)func WriteFrame(io.Writer, Frame) error

如果我們有一個包含這樣 API 的庫,我們就按照下面的方式從連接上讀取 packets:

// getReadBuf, putReadBuf are intended to// reuse *bufio.Reader (with sync.Pool for example).func getReadBuf(io.Reader) *bufio.Readerfunc putReadBuf(*bufio.Reader)// readPacket must be called when data could be read from conn.func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //...}

簡而言之,我們需要自己寫一個庫。

github.com/gobwas/ws

ws 庫的主要設計思想是不將協議的操作邏輯暴露給用戶。所有讀寫函數都接受通用的 io.Reader 和 io.Writer 接口。因此它可以隨意搭配是否使用緩存以及其它 I/O 的庫。

除了標準庫 net/http 裡的升級請求,ws 還支持零拷貝升級。它能夠處理升級請求並切換到 WebSocket 模式而不產生任何內存分配或者拷貝。

ws.Upgrade() 接受 io.ReadWriter(net.Conn 實現了這個接口)。換句話說,我們可以使用標準的 net.Listen() 函數然後把從 ln.Accept() 收到的連接馬上交給 ws.Upgrade() 去處理。庫也允許拷貝任何請求數據來滿足將來應用的需求(舉個例子,拷貝 Cookie 來驗證一個 session)。

下面是處理升級請求的性能測試:標準 net/http 庫的實現和使用零拷貝升級的 net.Listen():

BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/opBenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op

使用 ws 以及零拷貝升級為我們節省了24 GB的空間。這些空間原本被用做 net/http 裡處理請求的 I/O 緩存。

◆ 回顧

讓我們來回顧一下之前提到過的優化:

  • 一個包含緩存的讀 goroutine 會佔用很多內存。方案: netpoll(epoll, kqueue);重用緩存。

  • 一個包含緩存的寫 goroutine 會佔用很多內存。方案: 在需要的時候創建goroutine;重用緩存。

  • 存在大量連接請求的時候,netpoll 不能很好的限制連接數。方案: 重用 goroutines 並且限制它們的數目。

  • net/http 對升級到 WebSocket 請求的處理不是最高效的。方案: 在 TCP 連接上實現零拷貝升級。

下面是服務端的大致實現代碼:

import ( "net" "github.com/gobwas/ws")ln, _ := net.Listen("tcp", ":8080")for { // Try to accept incoming connection inside free pool worker. // If there no free workers for 1ms, do not accept anything and try later. // This will help us to prevent many self-ddos or out of resource limit cases. err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // Wrap WebSocket connection with our Channel struct. // This will help us to handle/send our app's packets. ch := NewChannel(conn) // Wait for incoming bytes from connection. poller.Start(conn, netpoll.EventRead, func() { // Do not cross the resource limits. pool.Schedule(func() { // Read and handle incoming packet(s). ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) }}

結論

在程序設計時,過早優化是萬惡之源。Donald Knuth

上面的優化是有意義的,但不是所有情況都適用。舉個例子,如果空閒資源(內存,CPU)與在線連接數之間的比例很高的話,優化就沒有太多意義。當然,知道什麼地方可以優化以及如何優化總是有幫助的。

引用

https://github.com/mailru/easygo

https://github.com/gobwas/ws

https://github.com/gobwas/ws-...

https://github.com/gobwas/htt...

性能優化實戰:百萬級WebSockets和Go語言

相關推薦

推薦中...