国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

跟天齊老師學Spark(7)--關于Spark的RDD

2019-11-11 05:19:37
字體:
來源:轉載
供稿:網友
關于spark的RDD:關于RDD,可以查看官方文檔,可以看作者的論文,也可以看spark源碼中關于RDD的注釋。按Ctrl+N快捷鍵,搜索RDD,進入源碼,如果沒有關聯源碼,在IDEA中右上角會有一個提示:"Attach Sources".在IDEA中關聯spark的源碼,首先解壓下載好的spark源碼包(spark-1.6.2.tgz),然后在IDEA中選擇右上角的Attach Sources,在彈出的窗口中選擇自己解壓后的spark的源碼目錄即可。RDD:一個彈性、可復原的、分布式的數據集。它是spark的一個最基本的抽象。不可變的(一旦創建好了,在計算的時候是不可變,對它進行各種操作都只能生成新的RDD),被分區的(一個分區只能屬于一臺機器,但是一臺機器上可能有很多很多的分區),的集合,它可以被并行的計算。只有key-value格式的數據才可以使用groupByKey或者join。RDD的5個特點:數據是存放在多個分區里面的。(1)RDD中有很多的分區,分區List是有序的(意味著如果你的數據很少,而分區很多,那么就可能有的分區中有數據,有的可能沒有數據);(2)一個函數會作用到每一臺機器上的每一個分區上面(split);(3)RDD和RDD之間是存在依賴關系的(為了容錯),最終的那一個RDD會觸發Action提交任務,它會向前依次推斷出前面的RDD,然后一點點計算;可復原;這些RDD之間是有序的;(4)如果你的RDD里面是key-value類型的數據,它一定會有一個Partitioner(分區器,它決定了這條數據屬于哪一個分區,它默認使用的分區器是hash分區器);如果不是key-value類型,他就沒有分區器;(在spark中沒有reduce概念,但它有一個partition概念,二者是相似的)(5)RDD里面會保存著一個最優位置。也就是數據在哪,以后它的任務就啟動在哪;把計算調度到數據所在的機器上,位置感知,實現數據本地化;寧愿移動計算,而不會移動數據;因為移動數據的代價很大,數據要消耗大量的網絡帶寬和磁盤io;(說明:如果我們是從hdfs這種分布式系統里面讀數據,它會有一個最佳位置。它會在有數據的那臺機器上創建分區,它在啟動Executor的時候,它還不知道這個數據在哪,它在創建分區的時候才會向我們的namenode進行交互,知道這個分區在哪臺機器上,然后在那臺機器上創建分區。后面看源碼的時候再介紹)--------------------------------------------------------------RDD的特點說明通過從hdfs中讀取數據來驗證RDD的一些特點首先textFile("")生成的RDD會有幾個分區呢?hdfs中的每一個block(每一個輸入切片)就會對應spark中的RDD的一個分區;通過rdd.partition.length來查看一個RDD的分區數。注意:它在分區的時候,會讓每一個分區中的數據量盡量被均勻分配。在從HDFS中讀取數據的時候,假如我們的hdfs中有兩個小文件,它會用一個RDD來讀,第一個分區是partition0,第二個分區是partition1,這樣hdfs中的文件和RDD中的分區是一一對應的,分區位于Worker中的Executor進程中。這兩個分區可能在一個Executor上,也可能在不同機器的Executor上,但是一個分區里面的數據不可能在兩臺機器上。注意textFile方法并不會觸發Action,所以現在還不會真的去讀數據,所以此時的分區中還沒有數據。但是它會記錄住每一個分區將要從哪個目錄下的哪個block中讀取數據。val rdd2 = rdd1.map((_, 1))如果沒有改變RDD分區的數量,那么新生成的子RDD中分區的數量會和父RDD的分區的數量一樣。RDD之間會記錄住它們之間的血緣關系。雖然現在還沒有出發Action,但是這些RDD會記住你調了什么方法,傳入了什么函數。rdd2.saveAsTextFile("")調動saveAsTextFile方法之后。觸發Action,開始提交任務,它會從最后一個RDD往前推知道推到最前面的rdd1,才開始讀數據,它好比從一個迭代器中讀數據,好比一個流水線,讀一條處理一條。我們調rdd上的map方法(這個rdd上的map方法是針對多臺機器的一個抽象方法),其實它最終會調每一個分區上的那個map方法(MapPartitionsRDD),然后這個分區上的map方法會調scala的map方法。在hdfs中就會產生兩個結果文件,因為它有兩個分區。打印RDD之間的依賴關系的方法:rdd.toDebugString驗證分區器:有一個getPartition方法,用key的哈希code值對分區的數量求模。val rdd3 = rdd2.repartition(2)這樣就給rdd2重新分區。rdd3.saveAsTextFile("")發現結果不一樣了。因為重新分區會有一個shuffer的過程;import org.apache.spark.HashPartitionerrdd.partitonBy(new HashPartitioner(2))rdd3.saveAsTextFile("")再看生成的結果文件中的數據,發現數據被分類了,key相同的數據在一個文件中。說明我們設置的分區器發揮作用了。調用partitonBy方法會生成一個新的RDD,叫做ShuffleRDD。它會用key的hashcode對分區數求模。這樣key相同的數據就會進入同一個分區。對每一個分區的數據進行局部求和,最后再匯總。val rdd3 = rdd2.reduceByKey(_+_)它里面也有一個默認的分區器。rdd3.saveAsTextFile("")結果文件還是兩個。說明它把key相同的數據Shuffle到同一個分區中然后再分別聚合。//在使用reduceByKey的時候,指定分區數量val rdd3 = rdd2.reduceByKey(_+_, 1)rdd3.saveAsTextFile("")此時hdfs中只有一個結果文件。因為指定了只有一個分區,所以說有的數據都被Shuffle到同一個分區中。

上面的一些實驗測試,是為了驗證RDD的一些特征。

RDD上的一些復雜的方法:將老師發的文件spark_rdd_api.txt文件中的練習一下即可。查看分區中的數據是什么:val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)rdd1.mapPartitionsWithIndex(func).collect常用函數:math.max(_,_)和math.min(_, _)RDD并沒有定義reduceByKey方法,但是它能調這個方法,其實它是在PairRDDFunction中定義的。它將普通的RDD轉換成了PairRDDFunction,在RDD單例對象中就定義了一個rddToPairRDDFunction方法。在SparkContext(注意是object)中定義了很多implicit,其中就包含rddToPairRDDFunction方法,它已經被廢棄,它調用的就是RDD單例對象中定義的那個rddToPairRDDFunction方法。foreachPartition:這個方法在以后開發中用的非常非常多!!!它可以將每一個分區中的數據拿出來進行處理,在Spark中計算好的數據不需要sqoop工具,定義一個函數就可以直接往關系型數據庫中寫(后面會專門有例子講)。注意coalesce(分區數, Boolean)方法和repartition方法的關系,其實repartition方法底層調的就是coalesce(分區數, shuffle=true),只不過給它傳了一個shuffle=true,表示分區中的數據一定要shuffle,也就是說數據一定要在網絡中傳遞,以數據為單位重新分配到新的RDD中。如果我們直接調用coalesce(分區數,false)方法給它傳一個false的話,就不會有shuffle。它只會以分區為單位分配給新的RDD中的分區。而分區中的數據是不會重新洗牌的。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 融水| 淮阳县| 荔浦县| 广平县| 梅州市| 平安县| 富平县| 瓮安县| 五指山市| 东宁县| 枞阳县| 德惠市| 武平县| 资源县| 黑河市| 周至县| 博爱县| 崇左市| 肥西县| 托里县| 定日县| 杭州市| 青铜峡市| 长治县| 蓝山县| 信阳市| 广水市| 永靖县| 广宗县| 南宫市| 施秉县| 榆中县| 临洮县| 新邵县| 海南省| 巧家县| 乐山市| 抚州市| 昭通市| 南宁市| 九台市|