'TalkingData的Spark On Kubernetes實踐'

Spark Linux Hadoop MapReduce 路由器 硬件 Calico 大數據 阿里云云棲社區 2019-07-28
"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

TalkingData的Spark On Kubernetes實踐

當我們通過spark-submit將Spark作業提交到Kubernetes集群時,會執行以下流程:

  • Spark在Kubernetes pod中創建Spark driver
  • Driver調用Kubernetes API創建executor pods,executor pods執行作業代碼
  • 計算作業結束,executor pods回收並清理
  • driver pod處於completed狀態,保留日誌,直到Kubernetes GC或者手動清理

先決條件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有Kubernetes pods的list, create, edit和delete權限
  • Kubernetes集群必須正確配置Kubernetes DNS[1]

如何集成

Docker鏡像

由於Spark driver和executor都運行在Kubernetes pod中,並且我們使用Docker作為container runtime enviroment,所以首先我們需要建立Spark的Docker鏡像。

在Spark distribution中已包含相應腳本和Dockerfile,可以通過以下命令構建鏡像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作業

在構建Spark鏡像後,我們可以通過以下命令提交作業:

$ bin/spark-submit \\
--master k8s://https://: \\
--deploy-mode cluster \\
--name spark-pi \\
--class org.apache.spark.examples.SparkPi \\
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \\
--conf spark.kubernetes.container.image= \\
https://path/to/examples.jar

其中,Spark master是Kubernetes api server的地址,可以通過以下命令獲取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark的作業代碼和依賴,我們可以在--jars、--files和最後位置指定,協議支持http、https和HDFS。

執行提交命令後,會有以下輸出:

"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

TalkingData的Spark On Kubernetes實踐

當我們通過spark-submit將Spark作業提交到Kubernetes集群時,會執行以下流程:

  • Spark在Kubernetes pod中創建Spark driver
  • Driver調用Kubernetes API創建executor pods,executor pods執行作業代碼
  • 計算作業結束,executor pods回收並清理
  • driver pod處於completed狀態,保留日誌,直到Kubernetes GC或者手動清理

先決條件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有Kubernetes pods的list, create, edit和delete權限
  • Kubernetes集群必須正確配置Kubernetes DNS[1]

如何集成

Docker鏡像

由於Spark driver和executor都運行在Kubernetes pod中,並且我們使用Docker作為container runtime enviroment,所以首先我們需要建立Spark的Docker鏡像。

在Spark distribution中已包含相應腳本和Dockerfile,可以通過以下命令構建鏡像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作業

在構建Spark鏡像後,我們可以通過以下命令提交作業:

$ bin/spark-submit \\
--master k8s://https://: \\
--deploy-mode cluster \\
--name spark-pi \\
--class org.apache.spark.examples.SparkPi \\
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \\
--conf spark.kubernetes.container.image= \\
https://path/to/examples.jar

其中,Spark master是Kubernetes api server的地址,可以通過以下命令獲取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark的作業代碼和依賴,我們可以在--jars、--files和最後位置指定,協議支持http、https和HDFS。

執行提交命令後,會有以下輸出:

TalkingData的Spark On Kubernetes實踐

任務結束,會輸出:

"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

TalkingData的Spark On Kubernetes實踐

當我們通過spark-submit將Spark作業提交到Kubernetes集群時,會執行以下流程:

  • Spark在Kubernetes pod中創建Spark driver
  • Driver調用Kubernetes API創建executor pods,executor pods執行作業代碼
  • 計算作業結束,executor pods回收並清理
  • driver pod處於completed狀態,保留日誌,直到Kubernetes GC或者手動清理

先決條件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有Kubernetes pods的list, create, edit和delete權限
  • Kubernetes集群必須正確配置Kubernetes DNS[1]

如何集成

Docker鏡像

由於Spark driver和executor都運行在Kubernetes pod中,並且我們使用Docker作為container runtime enviroment,所以首先我們需要建立Spark的Docker鏡像。

在Spark distribution中已包含相應腳本和Dockerfile,可以通過以下命令構建鏡像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作業

在構建Spark鏡像後,我們可以通過以下命令提交作業:

$ bin/spark-submit \\
--master k8s://https://: \\
--deploy-mode cluster \\
--name spark-pi \\
--class org.apache.spark.examples.SparkPi \\
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \\
--conf spark.kubernetes.container.image= \\
https://path/to/examples.jar

其中,Spark master是Kubernetes api server的地址,可以通過以下命令獲取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark的作業代碼和依賴,我們可以在--jars、--files和最後位置指定,協議支持http、https和HDFS。

執行提交命令後,會有以下輸出:

TalkingData的Spark On Kubernetes實踐

任務結束,會輸出:

TalkingData的Spark On Kubernetes實踐

訪問Spark Driver UI

我們可以在本地使用kubectl port-forward訪問Driver UI:

$ kubectl port-forward <driver-pod-name> 4040:4040

執行完後通過http://localhost:4040訪問。

訪問日誌

Spark的所有日誌都可以通過Kubernetes API和kubectl CLI進行訪問:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

如何實現租戶和資源隔離

Kubernetes Namespace

在Kubernetes中,我們可以使用namespace在多用戶間實現資源分配、隔離和配額。Spark On Kubernetes同樣支持配置namespace創建Spark作業。

首先,創建一個Kubernetes namespace:

$ kubectl create namespace spark

由於我們的Kubernetes集群使用了RBAC,所以還需創建serviceaccount和綁定role:

$ kubectl create serviceaccount spark -n spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark

並在spark-submit中新增以下配置:

$ bin/spark-submit \\
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\
--conf spark.kubernetes.namespace=spark \\
...

資源隔離

考慮到我們Spark作業的一些特點和計算資源隔離,前期我們還是選擇了較穩妥的物理隔離方案。具體做法是為每個組提供單獨的Kubernetes namespace,計算任務都在各自namespace裡提交。計算資源以物理機為單位,折算成cpu和內存,納入Kubernetes統一管理。在Kubernetes集群裡,通過node label和PodNodeSelector將計算資源和namespace關聯。從而實現在提交Spark作業時,計算資源總是選擇namespace關聯的node。

具體做法如下:

1、創建node label

$ kubectl label nodes <node_name> spark:spark

2、開啟Kubernetes admission controller

我們是使用kubeadm安裝Kubernetes集群,所以修改/etc/kubernetes/manifests/kube-apiserver.yaml,在--admission-control後添加PodNodeSelector。

$ cat /etc/kubernetes/manifests/kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: kube-apiserver
tier: control-plane
name: kube-apiserver
namespace: kube-system
spec:
containers:
- command:
- kube-apiserver
- --secure-port=6443
- --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
- --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector
...

3、配置PodNodeSelector

在namespace的annotations中添加scheduler.alpha.kubernetes.io/node-selector: spark=spark。

apiVersion: v1
kind: Namespace
metadata:
annotations:
scheduler.alpha.kubernetes.io/node-selector: spark=spark
name: spark

完成以上配置後,可以通過spark-submit測試結果:

$ spark-submit
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.kubernetes.namespace=spark
--master k8s://https://xxxx:6443
--deploy-mode cluster
--name spark-pi
--class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=xxxx/library/spark:v2.3
http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

TalkingData的Spark On Kubernetes實踐

當我們通過spark-submit將Spark作業提交到Kubernetes集群時,會執行以下流程:

  • Spark在Kubernetes pod中創建Spark driver
  • Driver調用Kubernetes API創建executor pods,executor pods執行作業代碼
  • 計算作業結束,executor pods回收並清理
  • driver pod處於completed狀態,保留日誌,直到Kubernetes GC或者手動清理

先決條件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有Kubernetes pods的list, create, edit和delete權限
  • Kubernetes集群必須正確配置Kubernetes DNS[1]

如何集成

Docker鏡像

由於Spark driver和executor都運行在Kubernetes pod中,並且我們使用Docker作為container runtime enviroment,所以首先我們需要建立Spark的Docker鏡像。

在Spark distribution中已包含相應腳本和Dockerfile,可以通過以下命令構建鏡像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作業

在構建Spark鏡像後,我們可以通過以下命令提交作業:

$ bin/spark-submit \\
--master k8s://https://: \\
--deploy-mode cluster \\
--name spark-pi \\
--class org.apache.spark.examples.SparkPi \\
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \\
--conf spark.kubernetes.container.image= \\
https://path/to/examples.jar

其中,Spark master是Kubernetes api server的地址,可以通過以下命令獲取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark的作業代碼和依賴,我們可以在--jars、--files和最後位置指定,協議支持http、https和HDFS。

執行提交命令後,會有以下輸出:

TalkingData的Spark On Kubernetes實踐

任務結束,會輸出:

TalkingData的Spark On Kubernetes實踐

訪問Spark Driver UI

我們可以在本地使用kubectl port-forward訪問Driver UI:

$ kubectl port-forward <driver-pod-name> 4040:4040

執行完後通過http://localhost:4040訪問。

訪問日誌

Spark的所有日誌都可以通過Kubernetes API和kubectl CLI進行訪問:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

如何實現租戶和資源隔離

Kubernetes Namespace

在Kubernetes中,我們可以使用namespace在多用戶間實現資源分配、隔離和配額。Spark On Kubernetes同樣支持配置namespace創建Spark作業。

首先,創建一個Kubernetes namespace:

$ kubectl create namespace spark

由於我們的Kubernetes集群使用了RBAC,所以還需創建serviceaccount和綁定role:

$ kubectl create serviceaccount spark -n spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark

並在spark-submit中新增以下配置:

$ bin/spark-submit \\
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\
--conf spark.kubernetes.namespace=spark \\
...

資源隔離

考慮到我們Spark作業的一些特點和計算資源隔離,前期我們還是選擇了較穩妥的物理隔離方案。具體做法是為每個組提供單獨的Kubernetes namespace,計算任務都在各自namespace裡提交。計算資源以物理機為單位,折算成cpu和內存,納入Kubernetes統一管理。在Kubernetes集群裡,通過node label和PodNodeSelector將計算資源和namespace關聯。從而實現在提交Spark作業時,計算資源總是選擇namespace關聯的node。

具體做法如下:

1、創建node label

$ kubectl label nodes <node_name> spark:spark

2、開啟Kubernetes admission controller

我們是使用kubeadm安裝Kubernetes集群,所以修改/etc/kubernetes/manifests/kube-apiserver.yaml,在--admission-control後添加PodNodeSelector。

$ cat /etc/kubernetes/manifests/kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: kube-apiserver
tier: control-plane
name: kube-apiserver
namespace: kube-system
spec:
containers:
- command:
- kube-apiserver
- --secure-port=6443
- --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
- --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector
...

3、配置PodNodeSelector

在namespace的annotations中添加scheduler.alpha.kubernetes.io/node-selector: spark=spark。

apiVersion: v1
kind: Namespace
metadata:
annotations:
scheduler.alpha.kubernetes.io/node-selector: spark=spark
name: spark

完成以上配置後,可以通過spark-submit測試結果:

$ spark-submit
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.kubernetes.namespace=spark
--master k8s://https://xxxx:6443
--deploy-mode cluster
--name spark-pi
--class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=xxxx/library/spark:v2.3
http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
TalkingData的Spark On Kubernetes實踐

"

眾所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架類似。但是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特點,以及可以直接讀寫Hadoop上任何格式數據的優勢,使批處理更加高效,並有更低的延遲。實際上,Spark已經成為輕量級大數據快速處理的統一平臺。

Spark作為一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:

TalkingData的Spark On Kubernetes實踐

Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。由於我們TalkingData是使用Kubernetes作為資源的調度和管理平臺,所以Spark On Kubernetes對於我們是最好的解決方案。

如何搭建生產可用的Kubernetes集群

部署

目前市面上有很多搭建Kubernetes的方法,比如Scratch、Kubeadm、Minikube或者各種託管方案。因為我們需要簡單快速地搭建功能驗證集群,所以選擇了Kubeadm作為集群的部署工具。部署步驟很簡單,在master上執行:

kubeadm init

在node上執行:

kubeadm join --token : --discovery-token-ca-cert-hash sha256:

具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。

需要注意的是由於國內網絡限制,很多鏡像無法從k8s.gcr.io獲取,我們需要將之替換為第三方提供的鏡像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

網絡

Kubernetes網絡默認是通過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通信,Calico和Kube-router則是使用BGP。由於軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有一定的要求,且我們的基礎網絡是VXLAN實現的大二層,所以我們最終選擇了MACVLAN。

CNI MACVLAN的配置示例如下:

{
"name": "mynet",
"type": "macvlan",
"master": "eth0",
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/17",
"rangeStart": "10.0.64.1",
"rangeEnd": "10.0.64.126",
"gateway": "10.0.127.254",
"routes": [
{
"dst": "0.0.0.0/0"
},
{
"dst": "10.0.80.0/24",
"gw": "10.0.0.61"
}
]
}
}

Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。我們使用的IPAM是host-local,規則是在每個Kubernetes node上建立/25的子網,可以提供126個IP。我們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是因為容器在macvlan配置下egress並不會通過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,所有進入bridge的流量都會通過宿主機的iptables。經過分析kube-proxy,我們發現可以使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最後通過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其他網段的話,就通過物理網關10.0.127.254。

還有一個需要注意的地方是出於kernel security的考慮,link物理接口的macvlan是無法直接和物理接口通信的,這就導致容器並不能將宿主機作為網關。我們採用了一個小技巧,避開了這個限制。我們從物理接口又創建了一個macvlan,將物理IP移到了這個接口上,物理接口只作為網絡入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

這樣兩個macvlan是可以互相通信的。

Kube-dns

默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,我們發現在pod上通過service domain訪問service總是有5秒的延遲。使用tcpdump抓包,發現延遲出現在DNS AAAA。進一步排查,發現問題是由於netfilter在conntrack和SNAT時的Race Condition導致。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會導致netfilter在_nf_conntrack_confirm時認為第二個包是重複的(因為有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增加options single-request-reopen,使DNS A和AAAA記錄請求報文使用不同的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以參考。我們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。因為我們的基礎網絡是大二層,所以pod和node可以直接通信,這就避免了conntrack和SNAT。

Spark與Kubernetes集成

由於Spark的抽象設計,我們可以使用第三方資源管理平臺調度和管理Spark作業,比如Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,可以將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

TalkingData的Spark On Kubernetes實踐

當我們通過spark-submit將Spark作業提交到Kubernetes集群時,會執行以下流程:

  • Spark在Kubernetes pod中創建Spark driver
  • Driver調用Kubernetes API創建executor pods,executor pods執行作業代碼
  • 計算作業結束,executor pods回收並清理
  • driver pod處於completed狀態,保留日誌,直到Kubernetes GC或者手動清理

先決條件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有Kubernetes pods的list, create, edit和delete權限
  • Kubernetes集群必須正確配置Kubernetes DNS[1]

如何集成

Docker鏡像

由於Spark driver和executor都運行在Kubernetes pod中,並且我們使用Docker作為container runtime enviroment,所以首先我們需要建立Spark的Docker鏡像。

在Spark distribution中已包含相應腳本和Dockerfile,可以通過以下命令構建鏡像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作業

在構建Spark鏡像後,我們可以通過以下命令提交作業:

$ bin/spark-submit \\
--master k8s://https://: \\
--deploy-mode cluster \\
--name spark-pi \\
--class org.apache.spark.examples.SparkPi \\
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \\
--conf spark.kubernetes.container.image= \\
https://path/to/examples.jar

其中,Spark master是Kubernetes api server的地址,可以通過以下命令獲取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark的作業代碼和依賴,我們可以在--jars、--files和最後位置指定,協議支持http、https和HDFS。

執行提交命令後,會有以下輸出:

TalkingData的Spark On Kubernetes實踐

任務結束,會輸出:

TalkingData的Spark On Kubernetes實踐

訪問Spark Driver UI

我們可以在本地使用kubectl port-forward訪問Driver UI:

$ kubectl port-forward <driver-pod-name> 4040:4040

執行完後通過http://localhost:4040訪問。

訪問日誌

Spark的所有日誌都可以通過Kubernetes API和kubectl CLI進行訪問:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

如何實現租戶和資源隔離

Kubernetes Namespace

在Kubernetes中,我們可以使用namespace在多用戶間實現資源分配、隔離和配額。Spark On Kubernetes同樣支持配置namespace創建Spark作業。

首先,創建一個Kubernetes namespace:

$ kubectl create namespace spark

由於我們的Kubernetes集群使用了RBAC,所以還需創建serviceaccount和綁定role:

$ kubectl create serviceaccount spark -n spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark

並在spark-submit中新增以下配置:

$ bin/spark-submit \\
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\
--conf spark.kubernetes.namespace=spark \\
...

資源隔離

考慮到我們Spark作業的一些特點和計算資源隔離,前期我們還是選擇了較穩妥的物理隔離方案。具體做法是為每個組提供單獨的Kubernetes namespace,計算任務都在各自namespace裡提交。計算資源以物理機為單位,折算成cpu和內存,納入Kubernetes統一管理。在Kubernetes集群裡,通過node label和PodNodeSelector將計算資源和namespace關聯。從而實現在提交Spark作業時,計算資源總是選擇namespace關聯的node。

具體做法如下:

1、創建node label

$ kubectl label nodes <node_name> spark:spark

2、開啟Kubernetes admission controller

我們是使用kubeadm安裝Kubernetes集群,所以修改/etc/kubernetes/manifests/kube-apiserver.yaml,在--admission-control後添加PodNodeSelector。

$ cat /etc/kubernetes/manifests/kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: kube-apiserver
tier: control-plane
name: kube-apiserver
namespace: kube-system
spec:
containers:
- command:
- kube-apiserver
- --secure-port=6443
- --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
- --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector
...

3、配置PodNodeSelector

在namespace的annotations中添加scheduler.alpha.kubernetes.io/node-selector: spark=spark。

apiVersion: v1
kind: Namespace
metadata:
annotations:
scheduler.alpha.kubernetes.io/node-selector: spark=spark
name: spark

完成以上配置後,可以通過spark-submit測試結果:

$ spark-submit
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.kubernetes.namespace=spark
--master k8s://https://xxxx:6443
--deploy-mode cluster
--name spark-pi
--class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=xxxx/library/spark:v2.3
http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
TalkingData的Spark On Kubernetes實踐

TalkingData的Spark On Kubernetes實踐

我們可以看到,Spark作業全分配到了關聯的hadooptest-001到003三個node上。

待解決問題

Kubernetes HA

Kubernetes的集群狀態基本都保存在etcd中,所以etcd是HA的關鍵所在。由於我們目前還處在半生產狀態,HA這方面未過多考慮。有興趣的同學可以查看:https://kubernetes.io/docs/setup/independent/high-availability/。

日誌

在Spark On Yarn下,可以開啟yarn.log-aggregation-enable將日誌收集聚合到HDFS中,以供查看。但是在Spark On Kubernetes中,則缺少這種日誌收集機制,我們只能通過Kubernetes pod的日誌輸出,來查看Spark的日誌:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

收集和聚合日誌,我們後面會和ES結合。

監控

我們TalkingData內部有自己的監控平臺OWL[2](已開源),未來我們計劃編寫metric plugin,將Kubernetes接入OWL中。

混合部署

為了保證Spark作業時刻有可用的計算資源,我們前期採用了物理隔離的方案。顯而易見,這種方式大幅降低了物理資源的使用率。下一步我們計劃採用混部方案,通過以下三種方式實現:

  • 將HDFS和Kubernetes混合部署
  • 為Spark作業和Kubernetes node劃分優先級,在低優先級的node上同時運行一些無狀態的其他生產服務
  • 利用雲實現資源水平擴展,以防止資源突增

資源擴展

在採用以下兩種方法增加資源使用率時,集群可能會面臨資源短缺和可用性的問題:

  • 混合部署
  • 資源超賣

這會導致運行資源大於實際物理資源的情況(我稱之為資源擠兌)。一種做法是給資源劃分等級,優先保證部分等級的資源供給。另一種做法是實現資源的水平擴展,動態補充可用資源,並在峰值過後自動釋放。我在另一篇文章中闡述了這種設計理念:https://xiaoxubeii.github.io/articles/k8s-on-cloud/。

TalkingData有自研的多雲管理平臺,我們的解決方法是實現單獨的Kubernetes tdcloud-controller-manager作為資源的provider和manager,通過TalkingData OWL監控告警,實現資源的水平擴展。

原文鏈接:http://www.ipshop.xyz/7889.html

作者:開源大數據EMR

"

相關推薦

推薦中...