基于任何平臺(tái)實(shí)現(xiàn)的云盤系統(tǒng),面臨的首要的技術(shù)問(wèn)題就是客戶端上傳和下載效率優(yōu)化問(wèn)題。基于Hadoop實(shí)現(xiàn)的云盤系統(tǒng),受到Hadoop文件讀寫機(jī)制的影響,采用Hadoop提供的API進(jìn)行HDFS文件系統(tǒng)訪問(wèn),文件讀取時(shí)默認(rèn)是順序、逐block讀取;寫入時(shí)是順序?qū)懭搿?/p>
一、讀寫機(jī)制
首先來(lái)看文件讀取機(jī)制:盡管DataNode實(shí)現(xiàn)了文件存儲(chǔ)空間的水平擴(kuò)展和多副本機(jī)制,但是針對(duì)單個(gè)具體文件的讀取,Hadoop默認(rèn)的API接口并沒(méi)有提供多DataNode的并行讀取機(jī)制;贖adoop提供的API接口實(shí)現(xiàn)的云盤客戶端也自然面臨同樣的問(wèn)題。Hadoop的文件讀取流程如下圖所示:
使用HDFS提供的客戶端開(kāi)發(fā)庫(kù),向遠(yuǎn)程的Namenode發(fā)起RPC請(qǐng)求;
Namenode會(huì)視情況返回文件的部分或者全部block列表,對(duì)于每個(gè)block,Namenode都會(huì)返回有該block拷貝的datanode地址;
客戶端開(kāi)發(fā)庫(kù)會(huì)選取離客戶端最接近的datanode來(lái)讀取block;
讀取完當(dāng)前block的數(shù)據(jù)后,關(guān)閉與當(dāng)前的datanode連接,并為讀取下一個(gè)block尋找最佳的datanode;
當(dāng)讀完列表的block后,且文件讀取還沒(méi)有結(jié)束,客戶端開(kāi)發(fā)庫(kù)會(huì)繼續(xù)向Namenode獲取下一批的block列表。
讀取完一個(gè)block都會(huì)進(jìn)行checksum驗(yàn)證,如果讀取datanode時(shí)出現(xiàn)錯(cuò)誤,客戶端會(huì)通知Namenode,然后再?gòu)南乱粋(gè)擁有該block拷貝的datanode繼續(xù)讀取。
這里需要注意的關(guān)鍵點(diǎn)是:多個(gè)Datanode順序讀取。
其次再看文件的寫入機(jī)制:
使用HDFS提供的客戶端開(kāi)發(fā)庫(kù),向遠(yuǎn)程的Namenode發(fā)起RPC請(qǐng)求;
Namenode會(huì)檢查要?jiǎng)?chuàng)建的文件是否已經(jīng)存在,創(chuàng)建者是否有權(quán)限進(jìn)行操作,成功則會(huì)為文件創(chuàng)建一個(gè)記錄,否則會(huì)讓客戶端拋出異常;
當(dāng)客戶端開(kāi)始寫入文件的時(shí)候,開(kāi)發(fā)庫(kù)會(huì)將文件切分成多個(gè)packets,并在內(nèi)部以"data queue"的形式管理這些packets,并向Namenode申請(qǐng)新的blocks,獲取用來(lái)存儲(chǔ)replicas的合適的datanodes列表, 列表的大小根據(jù)在Namenode中對(duì)replication的設(shè)置而定。開(kāi)始以pipeline(管道)的形式將packet寫入所有的replicas中。開(kāi)發(fā)庫(kù)把packet以流的方式寫入第一個(gè) datanode,該datanode把該packet存儲(chǔ)之后,再將其傳遞給在此pipeline中的下一個(gè)datanode,直到最后一個(gè) datanode,這種寫數(shù)據(jù)的方式呈流水線的形式。
最后一個(gè)datanode成功存儲(chǔ)之后會(huì)返回一個(gè)ack packet,在pipeline里傳遞至客戶端,在客戶端的開(kāi)發(fā)庫(kù)內(nèi)部維護(hù)著"ack queue",成功收到datanode返回的ack packet后會(huì)從"ack queue"移除相應(yīng)的packet。
如果傳輸過(guò)程中,有某個(gè)datanode出現(xiàn)了故障,那么當(dāng)前的pipeline會(huì)被關(guān)閉,出現(xiàn)故障的datanode會(huì)從當(dāng)前的 pipeline中移除,剩余的block會(huì)繼續(xù)剩下的datanode中繼續(xù)以pipeline的形式傳輸,同時(shí)Namenode會(huì)分配一個(gè)新的 datanode,保持replicas設(shè)定的數(shù)量。
關(guān)鍵詞:開(kāi)發(fā)庫(kù)把packet以流的方式寫入第一個(gè)datanode,該datanode將其傳遞給pipeline中的下一個(gè)datanode,知道最后一個(gè)Datanode,這種寫數(shù)據(jù)的方式呈流水線方式。
二、解決方案
1.下載效率優(yōu)化
通過(guò)以上讀寫機(jī)制的分析,我們可以發(fā)現(xiàn)基于Hadoop實(shí)現(xiàn)的云盤客戶段下載效率的優(yōu)化可以從兩個(gè)層級(jí)著手:
1.文件整體層面:采用并行訪問(wèn)多線程(多進(jìn)程)份多文件并行讀取。
2.Block塊讀取:改寫Hadoop接口擴(kuò)展,多Block并行讀取。
2.上傳效率優(yōu)化
上傳效率優(yōu)化只能采用文件整體層面的并行處理,不支持分Block機(jī)制的多Block并行讀取。
HDFS處理大量小文件時(shí)的問(wèn)題
小文件指的是那些size比HDFS 的block size(默認(rèn)64M)小的多的文件。如果在HDFS中存儲(chǔ)小文件,那么在HDFS中肯定會(huì)含有許許多多這樣的小文件(不然就不會(huì)用hadoop了)。
而HDFS的問(wèn)題在于無(wú)法很有效的處理大量小文件。
任何一個(gè)文件,目錄和block,在HDFS中都會(huì)被表示為一個(gè)object存儲(chǔ)在namenode的內(nèi)存中,沒(méi)一個(gè)object占用150 bytes的內(nèi)存空間。所以,如果有10million個(gè)文件,
沒(méi)一個(gè)文件對(duì)應(yīng)一個(gè)block,那么就將要消耗namenode 3G的內(nèi)存來(lái)保存這些block的信息。如果規(guī)模再大一些,那么將會(huì)超出現(xiàn)階段計(jì)算機(jī)硬件所能滿足的極限。
不僅如此,HDFS并不是為了有效的處理大量小文件而存在的。它主要是為了流式的訪問(wèn)大文件而設(shè)計(jì)的。對(duì)小文件的讀取通常會(huì)造成大量從
datanode到datanode的seeks和hopping來(lái)retrieve文件,而這樣是非常的低效的一種訪問(wèn)方式。
大量小文件在mapreduce中的問(wèn)題
Map tasks通常是每次處理一個(gè)block的input(默認(rèn)使用FileInputFormat)。如果文件非常的小,并且擁有大量的這種小文件,那么每一個(gè)map task都僅僅處理了非常小的input數(shù)據(jù),
并且會(huì)產(chǎn)生大量的map tasks,每一個(gè)map task都會(huì)消耗一定量的bookkeeping的資源。比較一個(gè)1GB的文件,默認(rèn)block size為64M,和1Gb的文件,沒(méi)一個(gè)文件100KB,
那么后者沒(méi)一個(gè)小文件使用一個(gè)map task,那么job的時(shí)間將會(huì)十倍甚至百倍慢于前者。
hadoop中有一些特性可以用來(lái)減輕這種問(wèn)題:可以在一個(gè)JVM中允許task reuse,以支持在一個(gè)JVM中運(yùn)行多個(gè)map task,以此來(lái)減少一些JVM的啟動(dòng)消耗
(通過(guò)設(shè)置mapred.job.reuse.jvm.num.tasks屬性,默認(rèn)為1,-1為無(wú)限制)。另一種方法為使用MultiFileInputSplit,它可以使得一個(gè)map中能夠處理多個(gè)split。
為什么會(huì)產(chǎn)生大量的小文件?
至少有兩種情況下會(huì)產(chǎn)生大量的小文件
1. 這些小文件都是一個(gè)大的邏輯文件的pieces。由于HDFS僅僅在不久前才剛剛支持對(duì)文件的append,因此以前用來(lái)向unbounde files(例如log文件)添加內(nèi)容的方式都是通過(guò)將這些數(shù)據(jù)用許多chunks的方式寫入HDFS中。
2. 文件本身就是很小。例如許許多多的小圖片文件。每一個(gè)圖片都是一個(gè)獨(dú)立的文件。并且沒(méi)有一種很有效的方法來(lái)將這些文件合并為一個(gè)大的文件
這兩種情況需要有不同的解決方 式。對(duì)于第一種情況,文件是由許許多多的records組成的,那么可以通過(guò)件邪行的調(diào)用HDFS的sync()方法(和append方法結(jié)合使用)來(lái)解 決;蛘,可以通過(guò)些一個(gè)程序來(lái)專門合并這些小文件(see Nathan Marz’s post about a tool called the Consolidator which does exactly this).
對(duì)于第二種情況,就需要某種形式的容器來(lái)通過(guò)某種方式來(lái)group這些file。hadoop提供了一些選擇:
* HAR files
Hadoop Archives (HAR files)是在0.18.0版本中引入的,它的出現(xiàn)就是為了緩解大量小文件消耗namenode內(nèi)存的問(wèn)題。HAR文件是通過(guò)在HDFS上構(gòu)建一個(gè)層次化的文件系統(tǒng)來(lái)工作。一個(gè)HAR文件是通過(guò)hadoop的archive命令來(lái)創(chuàng)建,而這個(gè)命令實(shí) 際上也是運(yùn)行了一個(gè)MapReduce任務(wù)來(lái)將小文件打包成HAR。對(duì)于client端來(lái)說(shuō),使用HAR文件沒(méi)有任何影響。所有的原始文件都 visible && accessible(using har://URL)。但在HDFS端它內(nèi)部的文件數(shù)減少了。
通 過(guò)HAR來(lái)讀取一個(gè)文件并不會(huì)比直接從HDFS中讀取文件高效,而且實(shí)際上可能還會(huì)稍微低效一點(diǎn),因?yàn)閷?duì)每一個(gè)HAR文件的訪問(wèn)都需要完成兩層index 文件的讀取和文件本身數(shù)據(jù)的讀取(見(jiàn)上圖)。并且盡管HAR文件可以被用來(lái)作為MapReduce job的input,但是并沒(méi)有特殊的方法來(lái)使maps將HAR文件中打包的文件當(dāng)作一個(gè)HDFS文件處理。 可以考慮通過(guò)創(chuàng)建一種input format,利用HAR文件的優(yōu)勢(shì)來(lái)提高M(jìn)apReduce的效率,但是目前還沒(méi)有人作這種input format。 需要注意的是:MultiFileInputSplit,即使在HADOOP-4565的改進(jìn)(choose files in a split that are node local),但始終還是需要seek per small file。
* Sequence Files
通 常對(duì)于“the small files problem”的回應(yīng)會(huì)是:使用SequenceFile。這種方法是說(shuō),使用filename作為key,并且file contents作為value。實(shí)踐中這種方式非常管用;氐10000個(gè)100KB的文件,可以寫一個(gè)程序來(lái)將這些小文件寫入到一個(gè)單獨(dú)的 SequenceFile中去,然后就可以在一個(gè)streaming fashion(directly or using mapreduce)中來(lái)使用這個(gè)sequenceFile。不僅如此,SequenceFiles也是splittable的,所以mapreduce 可以break them into chunks,并且分別的被獨(dú)立的處理。和HAR不同的是,這種方式還支持壓縮。block的壓縮在許多情況下都是最好的選擇,因?yàn)樗鼘⒍鄠(gè) records壓縮到一起,而不是一個(gè)record一個(gè)壓縮。
將已有的許多小文件轉(zhuǎn)換成一個(gè)SequenceFiles可能會(huì)比較慢。但 是,完全有可能通過(guò)并行的方式來(lái)創(chuàng)建一個(gè)一系列的SequenceFiles。(Stuart Sierra has written a very useful post about converting a tar file into a SequenceFile — tools like this are very useful).更進(jìn)一步,如果有可能最好設(shè)計(jì)自己的數(shù)據(jù)pipeline來(lái)將數(shù)據(jù)直接寫入一個(gè)SequenceFile。