傳說中的Hadoop,我終於來對著你唱"征服"了,好可愛的小象,! J
總的來說,hadoop的思路比較簡單(map-reduce),就是將任務分開進行,最後彙總。但這個思路實現起來,比較複雜,但相對於幾年前Intel等硬件公司提出的網格運算等方式,顯得更加開放。
你難任你難,哥就是頭鐵!
Tip:實踐應用是核心,本文概念為主,有些部分可能會有些晦澀,直接跳過就好(不是特別重要)。
本文代碼實踐在: https://github.com/wanliwang/cayman/tree/master/cm-web 的test->backupcode->hadoop部分。
提到列式(Column Family)數據庫,就不得不提Google的BigTable,其開源版本就是我們熟知的HBASE。BigTable建立在谷歌的另兩個系統GFS和Chubby之上,這三個系統和分佈式計算編程模型MapReduce共同構成Google雲計算的基礎,Chubby解決主從自動切換的基礎。接下來通過一個表格對比來引入Hadoop。
Google雲計算 | Hadoop中的對應 |
分佈式文件系統GFS | HDFS,負責數據物理存儲 |
分佈式管理服務Chubby | Zookeeper,負責管理服務器 |
分佈式計算框架MapReduce | MapReduce,負責計算 |
分佈式數據庫BigTable | HBase,負責存取數據 |
Hadoop是有Apache Lucene的作者Boug Cutting開發的,其主體結構如下圖所示。
HDFS(Hadoop File System)
NameNode:整個文件系統的大腦,提供整個系統的目錄信息並管理各個數據服務器。
DataNode:分佈式文件系統中每一個文件被切割為若干數據塊,每個數據塊存儲在不同服務器,這些就是數據服務器。
Block:每個被切分的數據塊就是一段文件內容,其是基本的存儲單位,被稱為數據塊,典型大小為64MB。
Tip:由於硬件錯誤是常態,HDFS是很多臺Server的集合,因而錯誤檢測和恢復是核心功能;其以流式讀為主,做批量操作,關注數據訪問的高吞吐量。
HDFS採用 master/slave架構,一個HDFS集群由一個NameNode和若干DataNode組成,中心服務器NameNode負責管理文件系統的namespace和客戶端對文件的訪問。DataNode一般一個節點一個,負責管理節點上附帶的存儲。在內部,一個文件被分成一個或多個block,這些block存儲在DataNode集合中。NameNode和DataNode均可運行在廉價的linux機器上,HDFS由java語言開發,跨平臺好,總體結構示意圖如下所示。
複製:採用rack-aware策略改進數據可靠性和網絡帶寬的利用;NameNode決定每個DataNode的Rack id;大多數情況, replication因子是3,簡單來說就是將一個副本放在本地機架節點,一個副本放在同一機架另一個節點,最後一個放在不同機架;在讀取時,會選擇最近的副本;NameNode啟動時會進入SafeMode狀態,該狀態時,NameNode不會進行數據塊的複製,這是會檢測DataNode的副本數量,如果滿足要求則認為安全。
NameNode用於存儲元數據,任何修改均被Editlog記錄,通訊協議基於TCP/IP,可以通過java API調用。
安裝Hadoop,步驟如下所示
1 1.安裝jdk 2 2.安裝hadoop集群情況(創建對應的hadoop應用,用於統一管理, useradd Hadoop, passwd hadoop) 3 node0: 192.168.181.136(NameNode/JobTracker) 4 node1: 192.168.181.132(DataNode/TaskTracker) 5 node2: 192.168.181.133(DataNode / TaskTracker) 6 node3: 192.168.181.134(DataNode / TaskTracker) 7 etc/hosts和hostname設置, 如192.168.181.136 node0, #hostname node0 8 下載hadoop-1.2.1.tar包,放在/home/hadoop,入後修改權限 9 #wget http://mirror.esocc.com/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz 10 #tar –zxvf Hadoop-1.2.1.tar 11 #chown –R Hadoop:Hadoop Hadoop-1.2.1 12 配置ssh無密碼登錄,在hadoop啟動後,NameNode通過SSH(sSecureShell)來啟動和停止各個Datanode上的各個守護進程,這就需要節點間執行指令無需密碼,因此需要配置SSH運用無密碼公鑰認證的方法。 13 在本例中,node0為主節點,需要連接node1,2,3,需要確認每臺機器安裝ssh,並且datanode上的sshd服務啟動。 14 #ssh-keygen –t rsa, 默認保存在/home/Hadoop/.ssh, 然後將其複製到每個機器的/home/Hadoop/.ssh/authorized_keys,命令如下,4臺機器都需要(顯得比較複雜,到時看看docker或者.sh腳本文件) 15 #su Hadoop 16 #cd /home/Hadoop 17 # ssh-keygen –t rsa 18 #cd .ssh 19 #cp id_rsa.pub authorized_keys 20 #ssh localhost 21 #ssh node0 22 在node0,1,2,3上交換公鑰 23 #scp authorized_keys [email protected]:/tmp, 複製keys到node1的/tmp目錄 24 #cat /tmp/authorized_keys>>/home/Hadoop/.ssh/authorized_keys 25 在node0上有了所有公鑰後,在複製node0上key到其他機器 26 #scp /home/Hadoop/.ssh/authorized_key[email protected]:/home/hadoop/.ssh 27 #chmod 644 authorized_keys,設置文件權限並測試 28 將當前用戶切換到hadoop,如果集群內機器環境一直,可以在一臺機器配置好後,用scp命令將master上的hadoop複製到每一個slave 29 修改hadoop-1.2.1/conf,配置hadoop-env.sh文件,添加JAVA_HOME路徑 30 配置conf/core-site.xml 31 <configuration> 32 <!-- NameNode的URI --> 33 <property> 34 <name>fs.default.name</name> 35 <value>hdfs://node0:49000</value> 36 </property> 37 <!-- hadoop時默認臨時路徑,如果在新增節點時DataNode無法啟動,就刪除此文件 --> 38 <property> 39 <name>hadoop.tmp.dir</name> 40 <value>/home/hadoop/hadoop-1.2.1/var</value> 41 </property> 42 <property> 43 <name>dfs.support.append</name> 44 <value>true</value> 45 </property> 46 <!-- 關閉權限檢查,方便之後使用hadoop-eclipse插件訪問hdfs --> 47 <property> 48 <name>dfs.permissions</name> 49 <value>false</value> 50 </property> 51 </configuration> 52 53 配置conf/mapred-site.xml 54 <configuration> 55 <!-- JobTracker的主機和端口 --> 56 <property> 57 <name>mapred.job.tracker</name> 58 <value>/node0:49001</value> 59 </property> 60 <!-- 目錄需要提前創建,注意使用chown -R來修改權限,為0777 --> 61 <property> 62 <name>mapred.local.dir</name> 63 <value>/home/hadoop/hadoop-1.2.1/var</value> 64 </property> 65 </configuration> 66 67 配置hdfs-site.xml 68 <configuration> 69 <!-- dir是NameNode持久存儲名字空間和事務日誌的本地文件系統路徑,當該值是一個逗號分隔的目錄列表是,nametable數據將會被複制到所有目錄中做冗餘備份 --> 70 <property> 71 <name>dfs.name.dir</name> 72 <value>/home/hadoop/name1</value> 73 </property> 74 <property> 75 <name>dfs.data.dir</name> 76 <value>/home/hadoop/data1</value> 77 </property> 78 <!-- 數據備份數量 --> 79 <property> 80 <name>dfs.replication</name> 81 <value>3</value> 82 </property> 83 </configuration> 84 85 配置主從節點 86 conf/masters: node0 87 conf/slaves: node1,2,3 88 啟動與測試 89 #hadoop namenode –format 90 #/home/hadoop/xxx/bin/start-all.sh View Code
在分佈式模式下,hadoop配置文件中不能使用ip,必須使用主機名,安裝hadoop必須在所有節點上使用相同配置和安裝路徑,並用相同用戶啟動。Hadoop中的HDFS和Map-Reduce可以分別啟動,NameNode和JobTracker可以部署到不同節點,但小集群一般在一起,注意元數據安全即可。
Hdfs常見操作,請見下表所示,在實踐中,一般都是通過API調用,瞭解下就好
命令 | 詮釋 | 命令 | 詮釋 |
#cat | Hadoop fs –cat uri輸出內容 | #chgrp | 修改文件所屬組 |
#chmod | 修改文件去哪先 | #chown | 修改文件擁有者 |
#put#copyFromLocal | 從本地文件系統複製到目標系統 | #get #getmerge#copToLocal | 複製文件到本地系統 Hadoop fs –get hdfs://host:port/user/Hadoop/file local file |
#cp | 複製文件 | #du,#dus | 顯示目錄、文件大小 |
#expunge | 清空回收站 | #ls, lsr | 顯示文件信息 |
#mv#movefromLocal | 移動文件 | #rm #rmr | 刪除文件 |
#mkdir | 創建目錄 | #setrep | 改變文件副本系數 |
#stat | 返回統計信息 hadoop fs –stat path | 其他 | #tail #touchz |
#test | #text |
通過Java調用hdfs的示例如下所示,其實就是一個文件系統
1 public void hdfsOpertion throws Exception { 2 Configuration conf = new Configuration; 3 FileSystem hdfs = FileSystem.get(conf); // 獲得HDFS文件系統對象 4 FileSystem local = FileSystem.getLocal(conf);// 獲得本地文件系統 5 Path inputDir = new Path("in_xxx"); 6 Path hdfsFile = new Path("fs_xxx"); 7 8 try { 9 FileStatus inputFiles = local.listStatus(inputDir);// 獲得目錄文件列表 10 FSDataOutputStream out = hdfs.create(hdfsFile);// 生成hdfs輸出流 11 for (int i = 0; i < inputFiles.length; i++) { 12 System.out.println(inputFiles[i].getPath.getName); 13 FSDataInputStream in = local.open(inputFiles[i].getPath);// 打開本地輸入流 14 byte buffer = new byte[256]; 15 int bytesRead = 0; 16 while ((bytesRead = in.read(buffer)) > 0) { 17 out.write(buffer, 0, bytesRead); 18 } 19 in.close; 20 } 21 out.close; 22 } catch (Exception ex) { 23 ex.printStackTrace; 24 } 25 } View Code
-
Map Reduce核心概念
Job: 用戶的每一個計算請求就是一個作業
JobTracker:用戶提交作業的服務器,同時它還負責各個作業任務的分配,管理所有的任務服務器。
Task:一個都需要拆分,交個多個服務器完成,拆分出來的執行單位就是任務
TaskTracker:就是任勞任怨的工人,負責執行具體的任務。
-
Map Reduce計算模型
在hadoop中,每一個MapReduce任務被初始化為一個Job,每個Job又被分為兩個階段:Map階段、Reduce階段。這兩個階段分別用兩個函數表示,Map函數接受一個<key,value>輸入,然後產生一個<key,value>的中間輸出;之後hadoop會將具有相同中間key的 value集合傳給Reduce函數,之後Reduce處理後得到<key,value>形式輸出。
在Java中接入Hadoop的配置與代碼如下所示。
1 Maven: 2 <dependency> 3 <groupId>org.apache.hadoop</groupId> 4 <artifactId>hadoop-common</artifactId> 5 <version>${hadoop.version}</version> 6 </dependency> 7 <dependency> 8 <groupId>org.apache.hadoop</groupId> 9 <artifactId>hadoop-hdfs</artifactId> 10 <version>${hadoop.version}</version> 11 </dependency> 12 <dependency> 13 <groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-mapreduce-client-core</artifactId> 15 <version>${hadoop.version}</version> 16 </dependency> 17 18 Code: 19 public class WordCountNew extends Configured implements Tool { 20 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 21 private final static IntWritable one = new IntWritable(1); 22 private Text word = new Text; 23 24 @Override 25 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) 26 throws IOException, InterruptedException { 27 String line = value.toString; 28 // 字符串分解器 29 StringTokenizer tokenizer = new StringTokenizer(line); 30 while (tokenizer.hasMoreTokens) { 31 word.set(tokenizer.nextToken); 32 context.write(word, one); 33 } 34 } 35 } 36 37 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 38 @Override 39 protected void reduce(Text key, Iterable<IntWritable> values, 40 Reducer<Text, IntWritable, Text, IntWritable>.Context context) 41 throws IOException, InterruptedException { 42 int sum = 0; 43 for (IntWritable val : values) { 44 sum += val.get; 45 } 46 context.write(key, new IntWritable(sum)); 47 } 48 } 49 50 @Override 51 public int run(String[] arg0) throws Exception { 52 Job job = new Job(getConf); 53 job.setJarByClass(WordCountNew.class); 54 job.setJobName("wordcount"); 55 job.setOutputKeyClass(Text.class); 56 job.setOutputValueClass(IntWritable.class); 57 job.setMapperClass(Map.class); 58 job.setReducerClass(Reduce.class); 59 job.setInputFormatClass(TextInputFormat.class); 60 job.setOutputFormatClass(TextOutputFormat.class); 61 FileInputFormat.setInputPaths(job, new Path("xxx01.txt")); 62 FileOutputFormat.setOutputPath(job, new Path("xxx02.txt")); 63 boolean success = job.waitForCompletion(true); 64 return success ? 0 : 1; 65 } View Code
MapReduce的數據流和控制流
zookeeper主要用來解決分佈式應用中經常遇到的數據管理的問題,如 統一命名服務、狀態同步服務、集群管理和分佈式應用配置項 的管理, Zookeeper典型的應用場景(配置文件的管理、集群管理、同步鎖、Leader選舉和隊列管理等)。
Zookeeper配置安裝的步驟如下所示
下載zookeeper包 修改zoo.cfg配置文件 initLimit=5 #Zookeeper中鏈接到Leader的Follower服務器初始化鏈接能忍耐的心跳間隔數:5*2000=10秒 syncLimit=2 #請求應答的時間長度2*2000=4秒 dataDir=/home/Hadoop/zookeeper server.1=node1:2888:3888 #用於選舉時服務器相互通信的端口 server.2=node2:2888:3888 server.3=node3:2888:3888 配置myid文件,在dataDir目錄,判斷時哪個server 啟動zookeeper,bin/zkServer.sh start View Code
ZooKeeper數據模型,其會維護一個層次關係的數據結構,非常類似標準文件系統
ZooKeeper的基礎使用,其作為一個分佈式服務框架,主要用於解決分佈式集群的一致性問題,它提供類似文件系統目錄節點樹方式的數據存儲,並會維護和監控數據的狀態變化,其常見方法如下所示。
方法 | 詮釋 |
Stringcreate | 創建一個給點的目錄節點path並設置數據 |
Statexists | 判斷某個path是否存在,並設置監控這個目錄節點 |
Delete | 參數path對應目錄節點 |
StatsetData,getData | 設置數據,獲取數據 |
addAuthInfo | 將自己授權信息發送給服務器 |
StatsetACL,getACL | 設置目錄節點訪問權限,獲取權限列表 |
java調用zookeeper的API示例如下
1 public void testName throws Exception { 2 // 1.創建一個和服務器的鏈接 3 ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT, CONNECT_TIMEOUT, new Watcher { 4 @Override 5 public void process(WatchedEvent event) { 6 System.out.println(String.format("已經觸發了%s事件", event.getType)); 7 } 8 }); 9 10 // 2.創建一個目錄節點 11 zk.create("/testRootPath", "testRootData".getBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 12 // 3.創建子節點 13 zk.create("/testRootPath/testChildrenPathOne", "testChildrenPathOne".getBytes, Ids.OPEN_ACL_UNSAFE, 14 CreateMode.PERSISTENT); 15 System.out.println(new String(zk.getData("/testRootPath", false, null))); 16 // 4.查找子目錄列表 17 System.out.println(zk.getChildren("/testRootPath", true)); 18 // 5.修改子目錄節點數據 19 zk.setData("/testRootPath/testChildrenPathOne", "modifyChildrenPathOne".getBytes, -1); 20 System.out.println("目錄節點狀態:[" + zk.exists("/testRootPath", true) + "]"); 21 // 6.刪除節點 22 zk.delete("/testRootPath/testChildrenPathOne", -1); 23 // 7.關閉鏈接 24 zk.close; 25 } View Code
ZooKeeper的典型應用場景
統一命名服務(Name Service):分佈式應用,通常需要一整套的命名規則,一般使用樹形命名,這兒和JNDI很相似。
配置管理:ZooKeeper統一管理配置信息,保存在對應目錄,一旦變化,對應機器就會收到通知(觀察者)。
集群管理:ZooKeeper不僅能維護當前集群中及其的服務狀態,並能選出一個總管(Leader Election),從而避免單點故障,示例代碼如下。
共享鎖(Locks):共享鎖在同一個進程容易實現,但再不同Server見不好實現,但Zookeeper卻很容易實現,方式就是需要獲取鎖的Servere創建一個EPHEMERAL_SEQUENTIAL目錄節點,然後調用getChildren方法獲得當前目錄節點列表中最小的目錄節點,並判斷,如果未自己建立,則獲得鎖,如果不是就調用exist方法監控節點變化,一直到自己創建的節點時最小,從而獲得鎖,釋放很賤,只要刪除前面自己創建的目錄節點就OK。
隊列管理(Queue Management):可以處理兩類隊列,一種是當成員齊聚時,隊列才可用,否則一直等待,被稱為同步隊列;一種是按照FIFO方式進行入隊和出隊,例如實現生產-消費者模型。
HBase(邏輯結構)是BigTable的開源版,其建立在HDFS(物理結構)之上,提供高可靠性、高性能、 列存儲和可伸縮、實時讀寫的數據庫系統。它結餘NOSQL和RDBMS之間,僅能通過主鍵和主鍵range來檢索數據,支持單行事務(可通過hive支持來實現多表join等複雜操作),主要用於存儲非結構和半結構化的鬆散數據。與Hadoop一樣,Hbase主要依靠橫向擴展來提高計算和存儲能力。
Hbase的表具有以下特點:
大:一個表可以有上億行
面向列:面向列族的存儲和權限控制,列族獨立檢索。
稀疏:對於空的列,並不佔用空間,因此表可以設計的非常稀疏。
-
邏輯視圖:HBase以表的形式存儲數據,表由行和列組成,列劃分為若干個列族row family,如下表所示。
Row Key | Column-family1 | Column-family2 | |
Column1 | Column1 | Column1 | Column2 |
Key1 | t2:abc t1:bcd | t1:354 | |
Key2 | t3:efy t1:uio | t2:tyi t1:456 |
Row Key:檢索數據的主鍵,訪問HBase中的行,可以通過單個row key(字典序,數值型數據需要補0)訪問;通過row key的range的訪問;全表掃描。
列族: 表中的每一列,都歸屬於列族,列族是表schema的一部分,必須在使用前定義,而列不是,關鍵理解 。列名都以列族作為前綴,例如courses:history和courses:math都數據courses列族。
時間戳:通過row和column確定一個存儲單元cell,每個cell保存同一份數據的多個版本,通過時間戳來索引。時間戳為64位證書,精確到毫秒,按時間倒序排列。為了避免版本過多,一般通過個數或時間來回收。
Cell:由{row key, column(=<family>+<label>),version}唯一確定的單元,cell中數據沒有類型,以字節碼存儲。
-
物理存儲:指如何將大表分佈的存儲在多臺服務器。
特點:Table上所有行使用row key排列;Table在行方向上分割為多個HRegion;HRegion按大小分割,每個表已開始只有一個region,隨著數據不斷插入,region增大,當超過閾值是,會分裂成連個新的HRegion;HRegion是HBase中分佈式存儲和負載均衡最小單元,表示不同Region可以分佈在不同RegionServer上;HRegion是分佈式存儲的最小單元,但不是最小存儲單元,實際上,一個Region由多個Store組成,一個Store保存一個columns family, 一個Store又由一個memStore和0-多個StoreFile( 重點是StoreFile就是一個Hdfs中文件,通過壓縮存儲減少通信消耗,這兒就找到了對應關係 ,還可以細分,就不介紹了)組成。(腦海裡有了大體的印象)
-
系統架構
Client:包含訪問HBase接口,client維護一些cahce來加快訪問,比如region未知信息。
ZooKeeper:保證任何時候集群只有一個master;存儲所有region尋址接口;實施監控Region Server狀態,將其上下線消息實時通知給master;存儲Hbase的schema,包含哪些table,每個table的column family;為region server分配region;負責Region server的負載均衡;發現失效的Region Server並重新分配其上Region,GFS上的垃圾文件回收;處理schema更新請求。
Region Server:維護Master分配給它的Region,處理這些Region的IO請求;切分在運行中變得過大的Region。
Tip:可以看到client訪問HBase數據的過程並不需要master參與,尋址訪問zookeeper和Region Server,數據讀寫訪問Region Server,master只維護table和Region的元數據,負載低。
-
關鍵算法和流程
Region定位:大表使用三層類似B+樹的結構來存儲Region位置,第一次保存zookeeper中數據,持有RootRegion位置;第二層RootRegion是.META表的第一個Region,其中保存了其他Region的位置;第三層是個特殊的表,存儲HBase中所有數據表的Region位置信息。
讀寫過程:HBase使用MemStore和StoreFile存儲對錶的更新。數據在更新時首先寫入Log和MemStore,MemStore中的數據是排序的,當MemStore累計到一定閾值,會創建新MemStore,並將老MemStore添加到Flush隊列,有單獨線程寫到磁盤,稱為一個StoreFile,同時系統會在zookeeper記錄一個Redo point,表示更新已經持久化。系統出現問題是,可以使用log來恢復check point之後的數據。(思路和傳統數據庫一致)
Region分配:任何時刻,一個region只能分配給一個server,master記錄了當前可用的Server以及當前region的分配情況,當存在未分配region且有server有可用空間時,master就給這個server發送一個裝載請求,分配該region。
Region Server的上下線:master通過zookeeper來跟蹤region server狀態,當某個server啟動時,會在zookeeper的server目錄建立代表自己的文件,並獲得該文件獨佔鎖,由於master訂閱了該目錄的變更小心,因此當文件出現增刪時,可以接到通知。下線時,斷開與zookeeper會話,釋放獨佔鎖,這時master會發現並刪除對應目錄文件,並將原有region分配給其他server。
master的上下線:從zookeeper獲取唯一master鎖,阻止其他人稱為master;掃描zookeeper上server目錄,獲得region server列表;與每個server通信,獲得Region分配的情況;掃描META.region集合,計算得到當前未分配的region,放入待分配列表。
-
安裝與配置
1 下載文件wget xxxx/hbase.tar.gz, tar zxvf habse.xxx 2 修改配置文件hbase-env.sh 3 #export JAVA_HOME=user/local/java 4 #export HBASE_CLASSPATH=/home/Hadoop/Hadoop.xx/conf 5 #export HBASE_MANAGES_ZK=false 6 修改hbase-site.xml 7 <configuration> 8 <property> 9 <name>hbase.rootdir</name> 10 <value>hdfs://node0:49000/hbase</value> 11 </property> 12 <property> 13 <name>hbase.cluster.distributed</name> 14 <value>true</value> 15 </property> 16 <property> 17 <name>hbase.tmp.dir</name> 18 <value>/home/hadoop/hbase</value> 19 </property> 20 </configuration> 21 指定Hbase的regionServers 22 最後在系統上設置最大文件數限制和進程數限制/etc/security/limits/conf 23 #hadoop-nofile 3268 24 #hadoop soft/hard nproc 32000 View Code
-
常見操作
比如創建一個如下表格
#name | #grad | #course:math | #course:art |
Xionger | 1 | 62 | 60 |
xiongda | 2 | 100 | 98 |
1 create'scores', 'grade', 'course' #創建scores表,列族為grade,course 2 list #查看有哪些表 3 describe 'scores' #查看錶構造 4 put 'scores', 'xionger','course:math;, '62' #插入一條數據 5 get 'score' , 'xionger' #獲得熊二成績 6 scan 'scores'#掃描表中所有數據 7 scan 'scores', {columns=>['course:']} #獲得courses列族數據 View Code
Tip:
終於完成了,帥,這部分內容之後重點在於既有的集成解決方案,包括docker上的部署等。
此外,有空考慮區塊鏈方面的學習,同時把數據結構好好再學習下,感覺還是不太OK。,比如B+樹。