'Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL'

SQL MySQL 腳本語言 JSON JAVA柯尼塞克丶 2019-09-13
"

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...) 。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通過正則表達式匹配前綴,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 設置參數
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自 阿里雲天池公開數據集 ,特別鳴謝),位於 src/main/resources/user_behavior.log 。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql )。

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 數據源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = '123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成“年月日小時”的字符串格式,並根據這個字符串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 數據庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓(解壓目錄 flink-1.9.0 ): https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  1. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  2. 執行 flink-1.9.0/bin/start-cluster.sh ,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

"

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...) 。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通過正則表達式匹配前綴,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 設置參數
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自 阿里雲天池公開數據集 ,特別鳴謝),位於 src/main/resources/user_behavior.log 。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql )。

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 數據源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = '123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成“年月日小時”的字符串格式,並根據這個字符串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 數據庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓(解壓目錄 flink-1.9.0 ): https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  1. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  2. 執行 flink-1.9.0/bin/start-cluster.sh ,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安裝

  1. 下載 Kafka 2.2.0 安裝包並解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  1. 在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。
  2. 在命令行執行 jps ,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

  • 可以在 官方頁面 下載 MySQL 並安裝。
  • 如果有 Docker 環境的話,也可以直接通過 Docker 安裝
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

在 MySQL 中創建一個 flink-test 的數據庫,並按照上文的 schema 創建 pvuv_sink 表。

提交 SQL 任務

  1. 在 flink-sql-submit 目錄下運行 ./source-generator.sh ,會自動創建 user_behavior topic,並實時往裡灌入數據。
"

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...) 。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通過正則表達式匹配前綴,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 設置參數
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自 阿里雲天池公開數據集 ,特別鳴謝),位於 src/main/resources/user_behavior.log 。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql )。

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 數據源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = '123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成“年月日小時”的字符串格式,並根據這個字符串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 數據庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓(解壓目錄 flink-1.9.0 ): https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  1. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  2. 執行 flink-1.9.0/bin/start-cluster.sh ,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安裝

  1. 下載 Kafka 2.2.0 安裝包並解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  1. 在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。
  2. 在命令行執行 jps ,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

  • 可以在 官方頁面 下載 MySQL 並安裝。
  • 如果有 Docker 環境的話,也可以直接通過 Docker 安裝
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

在 MySQL 中創建一個 flink-test 的數據庫,並按照上文的 schema 創建 pvuv_sink 表。

提交 SQL 任務

  1. 在 flink-sql-submit 目錄下運行 ./source-generator.sh ,會自動創建 user_behavior topic,並實時往裡灌入數據。
Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

  1. 在 flink-sql-submit 目錄下運行 ./run.sh q1 , 提交成功後,可以在 Web UI 中看到拓撲。
"

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...) 。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通過正則表達式匹配前綴,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 設置參數
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自 阿里雲天池公開數據集 ,特別鳴謝),位於 src/main/resources/user_behavior.log 。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql )。

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 數據源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = '123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成“年月日小時”的字符串格式,並根據這個字符串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 數據庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓(解壓目錄 flink-1.9.0 ): https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  1. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  2. 執行 flink-1.9.0/bin/start-cluster.sh ,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安裝

  1. 下載 Kafka 2.2.0 安裝包並解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  1. 在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。
  2. 在命令行執行 jps ,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

  • 可以在 官方頁面 下載 MySQL 並安裝。
  • 如果有 Docker 環境的話,也可以直接通過 Docker 安裝
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

在 MySQL 中創建一個 flink-test 的數據庫,並按照上文的 schema 創建 pvuv_sink 表。

提交 SQL 任務

  1. 在 flink-sql-submit 目錄下運行 ./source-generator.sh ,會自動創建 user_behavior topic,並實時往裡灌入數據。
Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

  1. 在 flink-sql-submit 目錄下運行 ./run.sh q1 , 提交成功後,可以在 Web UI 中看到拓撲。
Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化。

"

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...) 。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通過正則表達式匹配前綴,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 設置參數
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自 阿里雲天池公開數據集 ,特別鳴謝),位於 src/main/resources/user_behavior.log 。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql )。

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 數據源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = '123456', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成“年月日小時”的字符串格式,並根據這個字符串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 數據庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓(解壓目錄 flink-1.9.0 ): https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  • flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
  1. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  2. 執行 flink-1.9.0/bin/start-cluster.sh ,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安裝

  1. 下載 Kafka 2.2.0 安裝包並解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  1. 在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。
  2. 在命令行執行 jps ,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

  • 可以在 官方頁面 下載 MySQL 並安裝。
  • 如果有 Docker 環境的話,也可以直接通過 Docker 安裝
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

在 MySQL 中創建一個 flink-test 的數據庫,並按照上文的 schema 創建 pvuv_sink 表。

提交 SQL 任務

  1. 在 flink-sql-submit 目錄下運行 ./source-generator.sh ,會自動創建 user_behavior topic,並實時往裡灌入數據。
Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

  1. 在 flink-sql-submit 目錄下運行 ./run.sh q1 , 提交成功後,可以在 Web UI 中看到拓撲。
Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化。

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

結尾

本文帶大家搭建基礎集群環境,並使用 SqlSubmit 提交純 SQL 任務來學習瞭解如何連接外部系統。 flink-sql-submit/src/main/resources/q1.sql 中還有一些註釋掉的調優參數,感興趣的同學可以將參數打開,觀察對作業的影響。

演示代碼已經開源到了 GitHub 上: https://github.com/wuchong/flink-sql-submit

原文:http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/?utm_source=tuicool&utm_medium=referral

"

相關推薦

推薦中...