国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 操作系統 > 正文

Spark經典案例2-數據去重

2024-06-28 16:00:59
字體:
來源:轉載
供稿:網友

/** * 業務場景:數據去重問題 * Created by YJ on 2017/2/7. * 統計數據,盡量用reduceByKey,不要用groupByKey,優化點 * reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上匯總,(匯總要壓力小) * groupByKey,發送本機所有的map,在一個機器上匯總(匯總壓力大) */ /*

數據格式 flie1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c flie2: 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c */

package ClassicCaseimport org.apache.spark.{SparkConf, SparkContext}object case2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("reduce") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") //獲取數據 val two = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case2/*") two.filter(_.trim().length>0) //需要有空格。 .map(line=>(line.trim,""))//全部值當key,(key value,"") .groupByKey()//groupByKey,過濾重復的key value ,發送到總機器上匯總 .sortByKey() //按key value的自然順序排序 .keys.collect().foreach(PRintln) //所有的keys變成數組再輸出 //第二種有風險 two.filter(_.trim().length>0) .map(line=>(line.trim,"1")) .distinct() .reduceByKey(_+_) .sortByKey() .foreach(println) //reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上匯總,(匯總要壓力小) //groupByKey,發送本機所有的map,在一個機器上匯總(匯總壓力大) //如果數據在不同的機器上,則會出現先重復數據,distinct,reduceBykey,只是在本機上去重,謹慎一點的話,在reduceByKey后面需要加多一個distinct }}

輸出結果 2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d (2012-3-1 a,1) (2012-3-1 b,1) (2012-3-2 a,1) (2012-3-2 b,1) (2012-3-3 b,1) (2012-3-3 c,1) (2012-3-4 d,1) (2012-3-5 a,1) (2012-3-6 b,1) (2012-3-6 c,1) (2012-3-7 c,1) (2012-3-7 d,1)

reduceByKey和groupByKey區別與用法

(1)當采用reduceByKeyt時,Spark可以在每個分區移動數據之前將待輸出數據與一個共用的key結合。借助下圖可以理解在reduceByKey里究竟發生了什么。 注意在數據對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數)。然后lamdba函數在每個區上被再次調用來將所有值reduce成一個最終結果。整個過程如下: 這里寫圖片描述

(2)當采用groupByKey時,由于它不接收函數,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時。整個過程如下: 這里寫圖片描述

( 3 )區別 reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上suffle匯總map,(匯總要壓力小) groupByKey,發送本機所有的map,在一個機器上suffle匯總map(匯總壓力大)

因此,在對大數據進行復雜計算時,reduceByKey優于groupByKey。 另外,如果僅僅是group處理,那么以下函數應該優先于 groupByKey :   (1)、combineByKey 組合數據,但是組合之后的數據類型與輸入時值的類型不一樣。   (2)、foldByKey合并每一個 key 的所有值,在級聯函數和“零值”中使用。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 高尔夫| 剑河县| 岱山县| 渭南市| 明星| 迭部县| 娱乐| 大宁县| 肇源县| 平利县| 龙山县| 永定县| 策勒县| 洛阳市| 新源县| 仪陇县| 专栏| 闻喜县| 霍州市| 年辖:市辖区| 个旧市| 太仆寺旗| 江华| 赞皇县| 拜城县| 赣榆县| 西宁市| 兖州市| 翼城县| 辉南县| 尼玛县| 应城市| 玉树县| 乾安县| 饶阳县| 灵石县| 徐闻县| 甘德县| 瓦房店市| 乌拉特后旗| 榆林市|