
知道數據傾斜發生在哪一個stage之后,接著我們就需要根據stage劃分原理,推算出來發生傾斜的那個stage對應代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。精準推算stage與代碼的對應關系,需要對Spark的源碼有深入的理解,這里我們可以介紹一個相對簡單實用的推算方法:只要看到Spark代碼中出現了一個shuffle類算子或者是Spark SQL的SQL語句中出現了會導致shuffle的語句(比如group by語句),那么就可以判定,以那個地方為界限劃分出了前后兩個stage。 這里我們就以Spark最基礎的入門程序——單詞計數來舉例,如何用最簡單的方法大致推算出一個stage對應的代碼。如下示例,在整個代碼中,只有一個reduceByKey是會發生shuffle的算子,因此就可以認為,以這個算子為界限,會劃分出前后兩個stage。 1、stage0,主要是執行從textFile到map操作,以及執行shuffle write操作。shuffle write操作,我們可以簡單理解為對pairs RDD中的數據進行分區操作,每個task處理的數據中,相同的key會寫入同一個磁盤文件內。 2、stage1,主要是執行從reduceByKey到collect操作,stage1的各個task一開始運行,就會首先執行shuffle read操作。執行shuffle read操作的task,會從stage0的各個task所在節點拉取屬于自己處理的那些key,然后對同一個key進行全局性的聚合或join等操作,在這里就是對key的value值進行累加。stage1在執行完reduceByKey算子之后,就計算出了最終的WordCounts RDD,然后會執行collect算子,將所有數據拉取到Driver上,供我們遍歷和打印輸出。[python] view plain copy
[python] view plain copy 新聞熱點
疑難解答