国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

跟天齊老師學(xué)Spark(7)--關(guān)于Spark的RDD

2019-11-11 03:16:30
字體:
供稿:網(wǎng)友
關(guān)于spark的RDD:關(guān)于RDD,可以查看官方文檔,可以看作者的論文,也可以看spark源碼中關(guān)于RDD的注釋。按Ctrl+N快捷鍵,搜索RDD,進入源碼,如果沒有關(guān)聯(lián)源碼,在IDEA中右上角會有一個提示:"Attach Sources".在IDEA中關(guān)聯(lián)spark的源碼,首先解壓下載好的spark源碼包(spark-1.6.2.tgz),然后在IDEA中選擇右上角的Attach Sources,在彈出的窗口中選擇自己解壓后的spark的源碼目錄即可。RDD:一個彈性、可復(fù)原的、分布式的數(shù)據(jù)集。它是spark的一個最基本的抽象。不可變的(一旦創(chuàng)建好了,在計算的時候是不可變,對它進行各種操作都只能生成新的RDD),被分區(qū)的(一個分區(qū)只能屬于一臺機器,但是一臺機器上可能有很多很多的分區(qū)),的集合,它可以被并行的計算。只有key-value格式的數(shù)據(jù)才可以使用groupByKey或者join。RDD的5個特點:數(shù)據(jù)是存放在多個分區(qū)里面的。(1)RDD中有很多的分區(qū),分區(qū)List是有序的(意味著如果你的數(shù)據(jù)很少,而分區(qū)很多,那么就可能有的分區(qū)中有數(shù)據(jù),有的可能沒有數(shù)據(jù));(2)一個函數(shù)會作用到每一臺機器上的每一個分區(qū)上面(split);(3)RDD和RDD之間是存在依賴關(guān)系的(為了容錯),最終的那一個RDD會觸發(fā)Action提交任務(wù),它會向前依次推斷出前面的RDD,然后一點點計算;可復(fù)原;這些RDD之間是有序的;(4)如果你的RDD里面是key-value類型的數(shù)據(jù),它一定會有一個Partitioner(分區(qū)器,它決定了這條數(shù)據(jù)屬于哪一個分區(qū),它默認(rèn)使用的分區(qū)器是hash分區(qū)器);如果不是key-value類型,他就沒有分區(qū)器;(在spark中沒有reduce概念,但它有一個partition概念,二者是相似的)(5)RDD里面會保存著一個最優(yōu)位置。也就是數(shù)據(jù)在哪,以后它的任務(wù)就啟動在哪;把計算調(diào)度到數(shù)據(jù)所在的機器上,位置感知,實現(xiàn)數(shù)據(jù)本地化;寧愿移動計算,而不會移動數(shù)據(jù);因為移動數(shù)據(jù)的代價很大,數(shù)據(jù)要消耗大量的網(wǎng)絡(luò)帶寬和磁盤io;(說明:如果我們是從hdfs這種分布式系統(tǒng)里面讀數(shù)據(jù),它會有一個最佳位置。它會在有數(shù)據(jù)的那臺機器上創(chuàng)建分區(qū),它在啟動Executor的時候,它還不知道這個數(shù)據(jù)在哪,它在創(chuàng)建分區(qū)的時候才會向我們的namenode進行交互,知道這個分區(qū)在哪臺機器上,然后在那臺機器上創(chuàng)建分區(qū)。后面看源碼的時候再介紹)--------------------------------------------------------------RDD的特點說明通過從hdfs中讀取數(shù)據(jù)來驗證RDD的一些特點首先textFile("")生成的RDD會有幾個分區(qū)呢?hdfs中的每一個block(每一個輸入切片)就會對應(yīng)spark中的RDD的一個分區(qū);通過rdd.partition.length來查看一個RDD的分區(qū)數(shù)。注意:它在分區(qū)的時候,會讓每一個分區(qū)中的數(shù)據(jù)量盡量被均勻分配。在從HDFS中讀取數(shù)據(jù)的時候,假如我們的hdfs中有兩個小文件,它會用一個RDD來讀,第一個分區(qū)是partition0,第二個分區(qū)是partition1,這樣hdfs中的文件和RDD中的分區(qū)是一一對應(yīng)的,分區(qū)位于Worker中的Executor進程中。這兩個分區(qū)可能在一個Executor上,也可能在不同機器的Executor上,但是一個分區(qū)里面的數(shù)據(jù)不可能在兩臺機器上。注意textFile方法并不會觸發(fā)Action,所以現(xiàn)在還不會真的去讀數(shù)據(jù),所以此時的分區(qū)中還沒有數(shù)據(jù)。但是它會記錄住每一個分區(qū)將要從哪個目錄下的哪個block中讀取數(shù)據(jù)。val rdd2 = rdd1.map((_, 1))如果沒有改變RDD分區(qū)的數(shù)量,那么新生成的子RDD中分區(qū)的數(shù)量會和父RDD的分區(qū)的數(shù)量一樣。RDD之間會記錄住它們之間的血緣關(guān)系。雖然現(xiàn)在還沒有出發(fā)Action,但是這些RDD會記住你調(diào)了什么方法,傳入了什么函數(shù)。rdd2.saveAsTextFile("")調(diào)動saveAsTextFile方法之后。觸發(fā)Action,開始提交任務(wù),它會從最后一個RDD往前推知道推到最前面的rdd1,才開始讀數(shù)據(jù),它好比從一個迭代器中讀數(shù)據(jù),好比一個流水線,讀一條處理一條。我們調(diào)rdd上的map方法(這個rdd上的map方法是針對多臺機器的一個抽象方法),其實它最終會調(diào)每一個分區(qū)上的那個map方法(MapPartitionsRDD),然后這個分區(qū)上的map方法會調(diào)scala的map方法。在hdfs中就會產(chǎn)生兩個結(jié)果文件,因為它有兩個分區(qū)。打印RDD之間的依賴關(guān)系的方法:rdd.toDebugString驗證分區(qū)器:有一個getPartition方法,用key的哈希code值對分區(qū)的數(shù)量求模。val rdd3 = rdd2.repartition(2)這樣就給rdd2重新分區(qū)。rdd3.saveAsTextFile("")發(fā)現(xiàn)結(jié)果不一樣了。因為重新分區(qū)會有一個shuffer的過程;import org.apache.spark.HashPartitionerrdd.partitonBy(new HashPartitioner(2))rdd3.saveAsTextFile("")再看生成的結(jié)果文件中的數(shù)據(jù),發(fā)現(xiàn)數(shù)據(jù)被分類了,key相同的數(shù)據(jù)在一個文件中。說明我們設(shè)置的分區(qū)器發(fā)揮作用了。調(diào)用partitonBy方法會生成一個新的RDD,叫做ShuffleRDD。它會用key的hashcode對分區(qū)數(shù)求模。這樣key相同的數(shù)據(jù)就會進入同一個分區(qū)。對每一個分區(qū)的數(shù)據(jù)進行局部求和,最后再匯總。val rdd3 = rdd2.reduceByKey(_+_)它里面也有一個默認(rèn)的分區(qū)器。rdd3.saveAsTextFile("")結(jié)果文件還是兩個。說明它把key相同的數(shù)據(jù)Shuffle到同一個分區(qū)中然后再分別聚合。//在使用reduceByKey的時候,指定分區(qū)數(shù)量val rdd3 = rdd2.reduceByKey(_+_, 1)rdd3.saveAsTextFile("")此時hdfs中只有一個結(jié)果文件。因為指定了只有一個分區(qū),所以說有的數(shù)據(jù)都被Shuffle到同一個分區(qū)中。

上面的一些實驗測試,是為了驗證RDD的一些特征。

RDD上的一些復(fù)雜的方法:將老師發(fā)的文件spark_rdd_api.txt文件中的練習(xí)一下即可。查看分區(qū)中的數(shù)據(jù)是什么:val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)rdd1.mapPartitionsWithIndex(func).collect常用函數(shù):math.max(_,_)和math.min(_, _)RDD并沒有定義reduceByKey方法,但是它能調(diào)這個方法,其實它是在PairRDDFunction中定義的。它將普通的RDD轉(zhuǎn)換成了PairRDDFunction,在RDD單例對象中就定義了一個rddToPairRDDFunction方法。在SparkContext(注意是object)中定義了很多implicit,其中就包含rddToPairRDDFunction方法,它已經(jīng)被廢棄,它調(diào)用的就是RDD單例對象中定義的那個rddToPairRDDFunction方法。foreachPartition:這個方法在以后開發(fā)中用的非常非常多!!!它可以將每一個分區(qū)中的數(shù)據(jù)拿出來進行處理,在Spark中計算好的數(shù)據(jù)不需要sqoop工具,定義一個函數(shù)就可以直接往關(guān)系型數(shù)據(jù)庫中寫(后面會專門有例子講)。注意coalesce(分區(qū)數(shù), Boolean)方法和repartition方法的關(guān)系,其實repartition方法底層調(diào)的就是coalesce(分區(qū)數(shù), shuffle=true),只不過給它傳了一個shuffle=true,表示分區(qū)中的數(shù)據(jù)一定要shuffle,也就是說數(shù)據(jù)一定要在網(wǎng)絡(luò)中傳遞,以數(shù)據(jù)為單位重新分配到新的RDD中。如果我們直接調(diào)用coalesce(分區(qū)數(shù),false)方法給它傳一個false的話,就不會有shuffle。它只會以分區(qū)為單位分配給新的RDD中的分區(qū)。而分區(qū)中的數(shù)據(jù)是不會重新洗牌的。


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 石泉县| 察雅县| 桦川县| 富裕县| 渝中区| 德州市| 巧家县| 潞城市| 五峰| 辉县市| 盐边县| 鲁甸县| 天全县| 堆龙德庆县| 常德市| 贵南县| 泽普县| 津市市| 科技| 罗平县| 邻水| 绩溪县| 江华| 元氏县| 措勤县| 淅川县| 崇左市| 南华县| 秦皇岛市| 日喀则市| 阿拉善右旗| 舞钢市| 镇巴县| 且末县| 牙克石市| 长汀县| 明星| 修文县| 神农架林区| 清苑县| 盐亭县|