'基於MySQL Binlog的Elasticsearch數據同步實踐'

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

基於MySQL Binlog的Elasticsearch數據同步實踐

上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規則,根據該配置規則,把 Data 中的 key-value 構造成一個與對應 Elasticsearch 索引相匹配的 key-value map,同時包括一些數據類型的轉換:

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

基於MySQL Binlog的Elasticsearch數據同步實踐

上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規則,根據該配置規則,把 Data 中的 key-value 構造成一個與對應 Elasticsearch 索引相匹配的 key-value map,同時包括一些數據類型的轉換:

基於MySQL Binlog的Elasticsearch數據同步實踐

5)Elasticsearch相關模塊

Binlog 數據解析模塊生成的 key-value map,由該模塊拼裝成請求_bulk 接口的 update payload,寫入 Elasticsearch。考慮到 MySQL 頻繁更新時對 Elasticsearch 的寫入壓力,key-value map 會暫存到一個 slice 中,每 200ms 或 slice 長度達到一定長度時(可以通過配置調整),才會調用 Elasticsearch 的_bulk 接口,寫入數據。

2、定製化開發

1)適應業務需求

① upsert

業務中使用的索引數據可能是來自多個不同的表,同一個文檔的數據來自不同表的時候,先到的數據是一條 index,後到的數據是一條 update,在我們無法控制先後順序時,需要實現 upsert 功能。在_bulk 參數中加入:

{

"doc_as_upsert" : true

}

② Filter

實際業務場景中,可能業務需要的數據只是某張表中的部分數據,比如用 type 字段標識該條數據來源,只需要把 type=1或2的數據同步到 Elasticsearch 中。我們擴展了規則配置,可以支持對 Binlog 指定字段的過濾需求,類似:

select * from sometable where type in (1,2)

2)快速增量

數據同步一般分為全量和增量。接入一個業務時,首先需要把業務現有的歷史 MySQL 數據導入到 Elasticsearch 中,這部分為全量同步。在全量同步過程中以及後續增加的數據為增量數據。

在全量數據同步完成後,如果從最舊開始消費 Kafka,隊列數據量很大的情況下,需要很長時間增量數據才能追上當前進度。為了更快的拿到所需的增量 Binlog,在 Consumer Group 消費 Kafka 之前,先獲取各個 Topic 的 Partition 在指定時間的 offset 值,並 commit 這些 offset,這樣在 Consumer Group 連接 Kafka 集群時,會從剛才提交的 offset 開始消費,可以立即拿到所需的增量 Binlog。

3)微服務和配置中心

項目使用馬蜂窩微服務部署,為新接入業務提供了快速上線支持,並且在業務 Binlog 數據突增時可以方便快速的擴容 Consumer。

馬蜂窩配置中心支持了各個接入業務的配置管理,相比於開源項目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同業務不同環境的配置。

五、日誌與監控

馬蜂窩微服務的日誌 ELK 提供了記錄和查詢日誌的途徑,我們對於接入業務的每一條 Binlog 的數據流轉都記錄了日誌。如針對一個訂單,拿到的 Kafka Message 是什麼,調用_bulk 接口時的 Post Payload 是什麼,_bulk 接口的 Response有沒有錯誤信息等。

除了方便於排查問題,日誌也是監控的一部分。目前監控的指標有兩個,一個是數據同步延時,同步延時計算的是該條 Binlog 從產生到寫入 Elasticsearch 的時間差。

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

基於MySQL Binlog的Elasticsearch數據同步實踐

上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規則,根據該配置規則,把 Data 中的 key-value 構造成一個與對應 Elasticsearch 索引相匹配的 key-value map,同時包括一些數據類型的轉換:

基於MySQL Binlog的Elasticsearch數據同步實踐

5)Elasticsearch相關模塊

Binlog 數據解析模塊生成的 key-value map,由該模塊拼裝成請求_bulk 接口的 update payload,寫入 Elasticsearch。考慮到 MySQL 頻繁更新時對 Elasticsearch 的寫入壓力,key-value map 會暫存到一個 slice 中,每 200ms 或 slice 長度達到一定長度時(可以通過配置調整),才會調用 Elasticsearch 的_bulk 接口,寫入數據。

2、定製化開發

1)適應業務需求

① upsert

業務中使用的索引數據可能是來自多個不同的表,同一個文檔的數據來自不同表的時候,先到的數據是一條 index,後到的數據是一條 update,在我們無法控制先後順序時,需要實現 upsert 功能。在_bulk 參數中加入:

{

"doc_as_upsert" : true

}

② Filter

實際業務場景中,可能業務需要的數據只是某張表中的部分數據,比如用 type 字段標識該條數據來源,只需要把 type=1或2的數據同步到 Elasticsearch 中。我們擴展了規則配置,可以支持對 Binlog 指定字段的過濾需求,類似:

select * from sometable where type in (1,2)

2)快速增量

數據同步一般分為全量和增量。接入一個業務時,首先需要把業務現有的歷史 MySQL 數據導入到 Elasticsearch 中,這部分為全量同步。在全量同步過程中以及後續增加的數據為增量數據。

在全量數據同步完成後,如果從最舊開始消費 Kafka,隊列數據量很大的情況下,需要很長時間增量數據才能追上當前進度。為了更快的拿到所需的增量 Binlog,在 Consumer Group 消費 Kafka 之前,先獲取各個 Topic 的 Partition 在指定時間的 offset 值,並 commit 這些 offset,這樣在 Consumer Group 連接 Kafka 集群時,會從剛才提交的 offset 開始消費,可以立即拿到所需的增量 Binlog。

3)微服務和配置中心

項目使用馬蜂窩微服務部署,為新接入業務提供了快速上線支持,並且在業務 Binlog 數據突增時可以方便快速的擴容 Consumer。

馬蜂窩配置中心支持了各個接入業務的配置管理,相比於開源項目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同業務不同環境的配置。

五、日誌與監控

馬蜂窩微服務的日誌 ELK 提供了記錄和查詢日誌的途徑,我們對於接入業務的每一條 Binlog 的數據流轉都記錄了日誌。如針對一個訂單,拿到的 Kafka Message 是什麼,調用_bulk 接口時的 Post Payload 是什麼,_bulk 接口的 Response有沒有錯誤信息等。

除了方便於排查問題,日誌也是監控的一部分。目前監控的指標有兩個,一個是數據同步延時,同步延時計算的是該條 Binlog 從產生到寫入 Elasticsearch 的時間差。

基於MySQL Binlog的Elasticsearch數據同步實踐

從上圖中可以看出,訂單各個表的數據同步延時平均在 1s 左右。把延時數據接入 ElastAlert,在延時數據過多時發送報警通知。

另一個監控指標是心跳檢測,單獨建立一張獨立於業務的表,crontab 腳本每分鐘修改一次該表,同時檢查上一次修改是否同步到了指定的索引,如果沒有,則發送報警通知。該心跳檢測,監控了整個流程上的 Kafka、微服務和 ES,任何一個會導致數據不同步的環節出問題,都會第一個接到通知。

六、結語

目前接入的最重要業務方是電商的訂單索引,數據同步延時穩定在 1s 左右。這次的開源項目本地化實踐,希望能為一些有 Elasticsearch 數據同步需求的業務場景提供幫助。

作者丨張坤

來源丨馬蜂窩技術(ID:mfwtech)

dbaplus社群歡迎廣大技術人員投稿,投稿郵箱:[email protected]

>>>>

活動推薦

Gdevops全球敏捷運維峰會

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

基於MySQL Binlog的Elasticsearch數據同步實踐

上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規則,根據該配置規則,把 Data 中的 key-value 構造成一個與對應 Elasticsearch 索引相匹配的 key-value map,同時包括一些數據類型的轉換:

基於MySQL Binlog的Elasticsearch數據同步實踐

5)Elasticsearch相關模塊

Binlog 數據解析模塊生成的 key-value map,由該模塊拼裝成請求_bulk 接口的 update payload,寫入 Elasticsearch。考慮到 MySQL 頻繁更新時對 Elasticsearch 的寫入壓力,key-value map 會暫存到一個 slice 中,每 200ms 或 slice 長度達到一定長度時(可以通過配置調整),才會調用 Elasticsearch 的_bulk 接口,寫入數據。

2、定製化開發

1)適應業務需求

① upsert

業務中使用的索引數據可能是來自多個不同的表,同一個文檔的數據來自不同表的時候,先到的數據是一條 index,後到的數據是一條 update,在我們無法控制先後順序時,需要實現 upsert 功能。在_bulk 參數中加入:

{

"doc_as_upsert" : true

}

② Filter

實際業務場景中,可能業務需要的數據只是某張表中的部分數據,比如用 type 字段標識該條數據來源,只需要把 type=1或2的數據同步到 Elasticsearch 中。我們擴展了規則配置,可以支持對 Binlog 指定字段的過濾需求,類似:

select * from sometable where type in (1,2)

2)快速增量

數據同步一般分為全量和增量。接入一個業務時,首先需要把業務現有的歷史 MySQL 數據導入到 Elasticsearch 中,這部分為全量同步。在全量同步過程中以及後續增加的數據為增量數據。

在全量數據同步完成後,如果從最舊開始消費 Kafka,隊列數據量很大的情況下,需要很長時間增量數據才能追上當前進度。為了更快的拿到所需的增量 Binlog,在 Consumer Group 消費 Kafka 之前,先獲取各個 Topic 的 Partition 在指定時間的 offset 值,並 commit 這些 offset,這樣在 Consumer Group 連接 Kafka 集群時,會從剛才提交的 offset 開始消費,可以立即拿到所需的增量 Binlog。

3)微服務和配置中心

項目使用馬蜂窩微服務部署,為新接入業務提供了快速上線支持,並且在業務 Binlog 數據突增時可以方便快速的擴容 Consumer。

馬蜂窩配置中心支持了各個接入業務的配置管理,相比於開源項目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同業務不同環境的配置。

五、日誌與監控

馬蜂窩微服務的日誌 ELK 提供了記錄和查詢日誌的途徑,我們對於接入業務的每一條 Binlog 的數據流轉都記錄了日誌。如針對一個訂單,拿到的 Kafka Message 是什麼,調用_bulk 接口時的 Post Payload 是什麼,_bulk 接口的 Response有沒有錯誤信息等。

除了方便於排查問題,日誌也是監控的一部分。目前監控的指標有兩個,一個是數據同步延時,同步延時計算的是該條 Binlog 從產生到寫入 Elasticsearch 的時間差。

基於MySQL Binlog的Elasticsearch數據同步實踐

從上圖中可以看出,訂單各個表的數據同步延時平均在 1s 左右。把延時數據接入 ElastAlert,在延時數據過多時發送報警通知。

另一個監控指標是心跳檢測,單獨建立一張獨立於業務的表,crontab 腳本每分鐘修改一次該表,同時檢查上一次修改是否同步到了指定的索引,如果沒有,則發送報警通知。該心跳檢測,監控了整個流程上的 Kafka、微服務和 ES,任何一個會導致數據不同步的環節出問題,都會第一個接到通知。

六、結語

目前接入的最重要業務方是電商的訂單索引,數據同步延時穩定在 1s 左右。這次的開源項目本地化實踐,希望能為一些有 Elasticsearch 數據同步需求的業務場景提供幫助。

作者丨張坤

來源丨馬蜂窩技術(ID:mfwtech)

dbaplus社群歡迎廣大技術人員投稿,投稿郵箱:[email protected]

>>>>

活動推薦

Gdevops全球敏捷運維峰會

基於MySQL Binlog的Elasticsearch數據同步實踐

另外,dbaplus社群即將在9月21日舉辦Fintech上海沙龍,邀請到多位深耕金融科技的技術專家,一起從不同視角探討金融級數據庫與運維實踐,同樣不容錯過。

"

作者介紹

張坤,馬蜂窩電商研發團隊度假業務高級研發工程師。

一、為什麼要做

隨著馬蜂窩的逐漸發展,我們的業務數據越來越多,單純使用 MySQL 已經不能滿足我們的數據查詢需求,例如對於商品、訂單等數據的多維度檢索。

使用 Elasticsearch 存儲業務數據可以很好的解決我們業務中的搜索需求。而數據進行異構存儲後,隨之而來的就是數據同步的問題。

二、現有方法及問題

對於數據同步,我們目前的解決方案是建立數據中間表。把需要檢索的業務數據,統一放到一張MySQL 表中,這張中間表對應了業務需要的Elasticsearch 索引,每一列對應索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大於上一次讀取時間的所有數據,即該段時間內的增量,寫入Elasticsearch。

所以,一旦業務邏輯中有相應字段的數據變更,需要同時顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數據即時性較高,還需要同時寫入 Elasticsearch。

隨著業務數據越來越多,MySQL 中間表的數據量越來越大。當需要在 Elasticsearch 的索引中新增 Mapping 字段時,相應的 MySQL 中間表也需要新增列,在數據量龐大的表中,擴展列的耗時是難以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段隨著業務發展增多,需要由業務方增加相應的寫入 MySQL 中間表方法,這也帶來一部分開發成本。

三、方案設計

1、整體思路

目前現有的一些開源數據同步工具,如阿里的 DataX 等,主要是基於查詢來獲取數據源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業務場景對於數據同步的實時性要求比較高。

為了解決上述問題,我們提出了一種基於 MySQL Binlog 來進行 MySQL 數據同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協議用來做主從數據同步的數據,所以它有我們需要寫入 Elasticsearch 的數據,並符合對數據同步時效性的要求。

使用 Binlog 數據同步 Elasticsearch,業務方就可以專注於業務邏輯對 MySQL 的操作,不用再關心數據向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴展中間表列的長耗時問題。

經過調研後,我們採用開源項目 go-mysql-elasticsearch 實現數據同步,並針對馬蜂窩技術棧和實際的業務環境進行了一些定製化開發。

2、數據同步正確性保證

公司的所有表的 Binlog 數據屬於機密數據,不能直接獲取,為了滿足各業務線的使用需求,採用接入 Kafka 的形式提供給使用方,並且需要使用方申請相應的 Binlog 數據使用權限。獲取使用權限後,使用方以 Consumer Group 的形式讀取。

這種方式保證了 Binglog 數據的安全性,但是對保證數據同步的正確性帶來了挑戰。因此我們設計了一些機制,來保證數據源的獲取有序、完整。

1)順序性

通過 Kafka 獲取 Binlog 數據,首先需要保證獲取數據的順序性。嚴格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數據都可以有序到達 Consumer。

但是每個 Partition 上的數據是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數據都發送到同一個 Partition。

如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內的數據可以有序的 Update 到 Elasticsearch 中。

基於MySQL Binlog的Elasticsearch數據同步實踐

2)完整性

考慮到同步程序可能面臨各種正常或異常的退出,以及 Consumer 數量變化時的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數據。

利用 Kafka 的 Offset 機制,在確認一條 Message 數據成功寫入 Elasticsearch 後,才 Commit 該條 Message 的 Offset,這樣就保證了數據的完整性。而對於數據同步的使用場景,在保證了數據順序性和完整性的情況下,重複消費是不會有影響的。

基於MySQL Binlog的Elasticsearch數據同步實踐

四、技術實現

基於MySQL Binlog的Elasticsearch數據同步實踐

黃色箭頭表示依賴,藍色箭頭表示數據流向

1、功能模塊

1)配置解析模塊

負責解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日誌記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應關係配置等。

2)規則模塊

規則模塊決定了一條 Binlog 數據應該寫入到哪個 Elasticsearch 索引、文檔_id 對應的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應關係和寫入類型等。

在本地化過程中,根據我們的業務場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數據。

3)Kafka 相關模塊

該模塊負責連接 Kafka 集群,獲取 Binlog 數據。

在本地化過程中,該模塊的大部分功能已經封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認證,以及從指定時間點的 Offset 開始消費數據。

4)Binlog 數據解析模塊

原項目中的 Binlog 數據解析針對的是原始的 Binlog 數據,包含了解析 Replication 協議的實現。在我們的使用場景中,Binlog 數據已經是由 canal 解析成的 json 字符串,所以對該模塊的功能進行了簡化。

binlog json字符串示例:

基於MySQL Binlog的Elasticsearch數據同步實踐

上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規則,根據該配置規則,把 Data 中的 key-value 構造成一個與對應 Elasticsearch 索引相匹配的 key-value map,同時包括一些數據類型的轉換:

基於MySQL Binlog的Elasticsearch數據同步實踐

5)Elasticsearch相關模塊

Binlog 數據解析模塊生成的 key-value map,由該模塊拼裝成請求_bulk 接口的 update payload,寫入 Elasticsearch。考慮到 MySQL 頻繁更新時對 Elasticsearch 的寫入壓力,key-value map 會暫存到一個 slice 中,每 200ms 或 slice 長度達到一定長度時(可以通過配置調整),才會調用 Elasticsearch 的_bulk 接口,寫入數據。

2、定製化開發

1)適應業務需求

① upsert

業務中使用的索引數據可能是來自多個不同的表,同一個文檔的數據來自不同表的時候,先到的數據是一條 index,後到的數據是一條 update,在我們無法控制先後順序時,需要實現 upsert 功能。在_bulk 參數中加入:

{

"doc_as_upsert" : true

}

② Filter

實際業務場景中,可能業務需要的數據只是某張表中的部分數據,比如用 type 字段標識該條數據來源,只需要把 type=1或2的數據同步到 Elasticsearch 中。我們擴展了規則配置,可以支持對 Binlog 指定字段的過濾需求,類似:

select * from sometable where type in (1,2)

2)快速增量

數據同步一般分為全量和增量。接入一個業務時,首先需要把業務現有的歷史 MySQL 數據導入到 Elasticsearch 中,這部分為全量同步。在全量同步過程中以及後續增加的數據為增量數據。

在全量數據同步完成後,如果從最舊開始消費 Kafka,隊列數據量很大的情況下,需要很長時間增量數據才能追上當前進度。為了更快的拿到所需的增量 Binlog,在 Consumer Group 消費 Kafka 之前,先獲取各個 Topic 的 Partition 在指定時間的 offset 值,並 commit 這些 offset,這樣在 Consumer Group 連接 Kafka 集群時,會從剛才提交的 offset 開始消費,可以立即拿到所需的增量 Binlog。

3)微服務和配置中心

項目使用馬蜂窩微服務部署,為新接入業務提供了快速上線支持,並且在業務 Binlog 數據突增時可以方便快速的擴容 Consumer。

馬蜂窩配置中心支持了各個接入業務的配置管理,相比於開源項目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同業務不同環境的配置。

五、日誌與監控

馬蜂窩微服務的日誌 ELK 提供了記錄和查詢日誌的途徑,我們對於接入業務的每一條 Binlog 的數據流轉都記錄了日誌。如針對一個訂單,拿到的 Kafka Message 是什麼,調用_bulk 接口時的 Post Payload 是什麼,_bulk 接口的 Response有沒有錯誤信息等。

除了方便於排查問題,日誌也是監控的一部分。目前監控的指標有兩個,一個是數據同步延時,同步延時計算的是該條 Binlog 從產生到寫入 Elasticsearch 的時間差。

基於MySQL Binlog的Elasticsearch數據同步實踐

從上圖中可以看出,訂單各個表的數據同步延時平均在 1s 左右。把延時數據接入 ElastAlert,在延時數據過多時發送報警通知。

另一個監控指標是心跳檢測,單獨建立一張獨立於業務的表,crontab 腳本每分鐘修改一次該表,同時檢查上一次修改是否同步到了指定的索引,如果沒有,則發送報警通知。該心跳檢測,監控了整個流程上的 Kafka、微服務和 ES,任何一個會導致數據不同步的環節出問題,都會第一個接到通知。

六、結語

目前接入的最重要業務方是電商的訂單索引,數據同步延時穩定在 1s 左右。這次的開源項目本地化實踐,希望能為一些有 Elasticsearch 數據同步需求的業務場景提供幫助。

作者丨張坤

來源丨馬蜂窩技術(ID:mfwtech)

dbaplus社群歡迎廣大技術人員投稿,投稿郵箱:[email protected]

>>>>

活動推薦

Gdevops全球敏捷運維峰會

基於MySQL Binlog的Elasticsearch數據同步實踐

另外,dbaplus社群即將在9月21日舉辦Fintech上海沙龍,邀請到多位深耕金融科技的技術專家,一起從不同視角探討金融級數據庫與運維實踐,同樣不容錯過。

基於MySQL Binlog的Elasticsearch數據同步實踐"

相關推薦

推薦中...