一文了解 Apache Flink 核心技術

Apache 技術 Java 算法 大數據 SQL Ververica 2019-06-02

作者:伍翀

Apache Flink 介紹

Apache Flink (以下簡稱Flink)是近年來越來越流行的一款開源大數據計算引擎,它同時支持了批處理和流處理,也能用來做一些基於事件的應用。使用官網的語句來介紹 Flink 就是 "Stateful Computations Over Streams"。

首先 Flink 是一個純流式的計算引擎,它的基本數據模型是數據流。流可以是無邊界的無限流,即一般意義上的流處理。也可以是有邊界的有限流,這樣就是批處理。因此 Flink 用一套架構同時支持了流處理和批處理。其次,Flink 的一個優勢是支持有狀態的計算。如果處理一個事件(或一條數據)的結果只跟事件本身的內容有關,稱為無狀態處理;反之結果還和之前處理過的事件有關,稱為有狀態處理。稍微複雜一點的數據處理,比如說基本的聚合,數據流之間的關聯都是有狀態處理。

一文了解 Apache Flink 核心技術

Apache Flink 基石

Apache Flink 之所以能越來越受歡迎,我們認為離不開它最重要的四個基石:Checkpoint、State、Time、Window。

首先是Checkpoint機制,這是 Flink 最重要的一個特性。Flink 基於 Chandy-Lamport 算法實現了分佈式一致性的快照,從而提供了 exactly-once 的語義。在 Flink 之前的流計算系統(如 Strom,Samza)都沒有很好地解決 exactly-once 的問題。提供了一致性的語義之後,Flink 為了讓用戶在編程時能夠更輕鬆、更容易地去管理狀態,引入了託管狀態(managed state)並提供了 API 接口,讓用戶使用起來感覺就像在用 Java 的集合類一樣。除此之外,Flink 還實現了 watermark 的機制,解決了基於事件時間處理時的數據亂序和數據遲到的問題。最後,流計算中的計算一般都會基於窗口來計算,所以 Flink 提供了一套開箱即用的窗口操作,包括滾動窗口、滑動窗口、會話窗口,還支持非常靈活的自定義窗口以滿足特殊業務的需求。

Flink API 歷史變遷

一文了解 Apache Flink 核心技術

在 Flink 1.0.0 時期,加入了 State API,即 ValueState、ReducingState、ListState 等等。State API 可以認為是 Flink 里程碑式的創新,它能夠讓用戶像使用 Java 集合一樣地使用 Flink State,卻能夠自動享受到狀態的一致性保證,不會因為故障而丟失狀態。包括後來 Apache Beam 的 State API 也從中借鑑了很多。

在 Flink 1.1.0 時期,支持了 Session Window 並且能夠正確的處理亂序的遲到數據,使得最終結果是正確的

在 Flink 1.2.0 時期,提供了 ProcessFunction,這是一個 Lower-level 的API,用於實現更高級更復雜的功能。它除了能夠註冊各種類型的 State 外,還支持註冊定時器(支持 EventTime 和 ProcessingTime),常用於開發一些基於事件、基於時間的應用程序。

在 Flink 1.3.0 時期,提供了 Side Output 功能。算子的輸出一般只有一種輸出類型,但是有些時候可能需要輸出另外的類型,比如除了輸出主流外,還希望把一些異常數據、遲到數據以側邊流的形式進行輸出,並分別交給下游不同節點進行處理。簡而言之,Side Output 支持了多路輸出的功能。

在 Flink 1.5.0 時期,加入了BroadcastState。BroadcastState是對 State API 的一個擴展。它用來存儲上游被廣播過來的數據,這個 operator 的每個併發上存的BroadcastState裡面的數據都是一模一樣的,因為它是從上游廣播而來的。基於這種State可以比較好地去解決 CEP 中的動態規則的功能,以及 SQL 中不等值Join的場景。

在 Flink 1.6.0 時期,提供了State TTL功能、DataStream Interval Join功能。State

TTL實現了在申請某個State時候可以在指定一個生命週期參數(TTL),指定該state

過了多久之後需要被系統自動清除。在這個版本之前,如果用戶想要實現這種狀態清理操作需要使用ProcessFunction註冊一個Timer,然後利用Timer的回調手動把這個State

清除。從該版本開始,Flink框架可以基於TTL原生地解決這件事情。DataStream Interval Join 使得

區間Join成為可能。例如左流的每一條數據去Join右流前後5分鐘之內的數據,這種就是5分鐘的區間Join。

Flink High-Level API 歷史變遷

一文了解 Apache Flink 核心技術

在 Flink 1.0.0 時期,Table API (結構化數據處理API)和 CEP(複雜事件處理API)這兩個框架被首次加入到倉庫中。Table API 是一種結構化的高級

API,支持 Java 語言和 Scala 語言,類似於 Spark 的 DataFrame API。Table API 和 SQL非常相近,他們都是一種處理結構化數據的語言,實現上可以共用很多內容。所以在 Flink 1.1.0 裡面,社區基於Apache Calcite對整個 Table 模塊做了重構,使得同時支持了 Table API 和 SQL 並共用了大部分代碼。

在 Flink 1.2.0 時期,社區在Table API和SQL上支持豐富的內置窗口操作,包括Tumbling Window、Sliding Window、Session Window。

在 Flink 1.3.0 時期,社區首次提出了Dynamic Table這個概念,藉助Dynamic

Table,流和批之間可以相互進行轉換。流可以是一張表,表也可以是一張流,這是流批統一的基礎之一。其中Retraction機制是實現Dynamic

Table的基礎之一,基於Retraction才能夠正確地實現多級Aggregate、多級Join,才能夠保證流式 SQL 的語義與結果的正確性。另外,在該版本中還支持了 CEP

算子的可伸縮容(即改變併發)。

在 Flink 1.5.0 時期,在 Table API 和 SQL 上支持了Join操作,包括無限流的 Join 和帶窗口的 Join。還添加了 SQL CLI 支持。SQL CLI 提供了一個類似Shell命令的對話框,可以交互式執行查詢。

Flink Checkpoint & Recovery 歷史變遷

一文了解 Apache Flink 核心技術

Checkpoint機制在Flink很早期的時候就已經支持,是Flink一個很核心的功能,Flink 社區也一直努力提升 Checkpoint 和 Recovery 的效率。

在 Flink 1.0.0 時期,提供了 RocksDB 狀態後端的支持,在這個版本之前所有的狀態數據只能存在進程的內存裡面,JVM 內存是固定大小的,隨著數據越來越多總會發生 FullGC 和 OOM 的問題,所以在生產環境中很難應用起來。如果想要存更多數據、更大的State就要用到 RocksDB。RocksDB是一款基於文件的嵌入式數據庫,它會把數據存到磁盤,同時又提供高效的讀寫性能。所以使用RocksDB不會發生OOM這種事情。

在 Flink 1.1.0 時期,支持了 RocksDB Snapshot 的異步化。在之前的版本,RocksDB 的 Snapshot 過程是同步的,它會阻塞主數據流的處理,很影響吞吐量。在支持異步化之後,吞吐量得到了極大的提升。

在 Flink 1.2.0 時期,通過引入KeyGroup的機制,支持了 KeyedState 和 OperatorState 的可擴縮容。也就是支持了對帶狀態的流計算任務改變併發的功能。

在 Flink 1.3.0 時期,支持了 Incremental Checkpoint (增量檢查點)機制。Incemental Checkpoint 的支持標誌著 Flink 流計算任務正式達到了生產就緒狀態。增量檢查點是每次只將本次 checkpoint 期間新增的狀態快照並持久化存儲起來。一般流計算任務,GB 級別的狀態,甚至 TB 級別的狀態是非常常見的,如果每次都把全量的狀態都刷到分佈式存儲中,這個效率和網絡代價是很大的。如果每次只刷新增的數據,效率就會高很多。在這個版本里面還引入了細粒度的recovery的功能,細粒度的recovery在做恢復的時候,只需要恢復失敗節點的聯通子圖,不用對整個 Job 進行恢復,這樣便能夠提高恢復效率。

在 Flink 1.5.0 時期,引入了本地狀態恢復的機制。因為基於checkpoint機制,會把State持久化地存儲到某個分佈式存儲,比如HDFS,當發生 failover 的時候需要重新把數據從遠程HDFS再下載下來,如果這個狀態特別大那麼下載耗時就會較長,failover 恢復所花的時間也會拉長。本地狀態恢復機制會提前將狀態文件在本地也備份一份,當Job發生failover之後,恢復時可以在本地直接恢復,不需從遠程HDFS重新下載狀態文件,從而提升了恢復的效率。

Flink Runtime 歷史變遷

一文了解 Apache Flink 核心技術

在 Flink 1.2.0 時期,提供了Async I/O功能。Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,主要目的是為了解決與外部系統交互時網絡延遲成為了系統瓶頸的問題。例如,為了關聯某些字段需要查詢外部 HBase 表,同步的方式是每次查詢的操作都是阻塞的,數據流會被頻繁的I/O請求卡住。當使用異步I/O之後就可以同時地發起N個異步查詢的請求,不會阻塞主數據流,這樣便提升了整個job的吞吐量,提升CPU利用率。

在 Flink 1.3.0 時期,引入了HistoryServer的模塊。HistoryServer主要功能是當job結束以後,會把job的狀態以及信息都進行歸檔,方便後續開發人員做一些深入排查。

在 Flink 1.4.0 時期,提供了端到端的 exactly-once 的語義保證。Exactly-once 是指每條輸入的數據只會作用在最終結果上有且只有一次,即使發生軟件或硬件的故障,不會有丟數據或者重複計算髮生。而在該版本之前,exactly-once 保證的範圍只是 Flink 應用本身,並不包括輸出給外部系統的部分。在 failover 時,這就有可能寫了重複的數據到外部系統,所以一般會使用冪等的外部系統來解決這個問題。在 Flink 1.4 的版本中,Flink 基於兩階段提交協議,實現了端到端的 exactly-once 語義保證。內置支持了 Kafka 的端到端保證,並提供了 TwoPhaseCommitSinkFunction 供用於實現自定義外部存儲的端到端 exactly-once 保證。

在 Flink 1.5.0 時期,Flink 發佈了新的部署模型和處理模型(FLIP6)。新部署模型的開發工作已經持續了很久,該模型的實現對Flink核心代碼改動特別大,可以說是自 Flink 項目創建以來,Runtime 改動最大的一次。簡而言之,新的模型可以在YARN, MESOS調度系統上更好地動態分配資源、動態釋放資源,並實現更高的資源利用率,還有提供更好的作業之間的隔離。

除了 FLIP6 的改進,在該版本中,還對網站棧做了重構。重構的原因是在老版本中,上下游多個 task 之間的通信會共享同一個 TCP connection,導致某一個 task 發生反壓時,所有共享該連接的 task 都會被阻塞,反壓的粒度是 TCP connection 級別的。為了改進反壓機制,Flink應用了在解決網絡擁塞時一種經典的流控方法——基於Credit的流量控制。使得流控的粒度精細到具體某個 task 級別,有效緩解了反壓對吞吐量的影響。

總結

Flink 同時支持了流處理和批處理,目前流計算的模型已經相對比較成熟和領先,也經歷了各個公司大規模生產的驗證。社區在接下來將繼續加強流計算方面的性能和功能,包括對 Flink SQL 擴展更豐富的功能和引入更多的優化。另一方面也將加大力量提升批處理、機器學習等生態上的能力。

相關推薦

推薦中...