国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 服務器 > Web服務器 > 正文

Spark自定義累加器的使用實例詳解

2024-09-01 13:53:05
字體:
來源:轉載
供稿:網友

累加器(accumulator)是Spark中提供的一種分布式的變量機制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。

累加器簡單使用

Spark內置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數的同時進行計數,最后計算剩下整數的和。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統計奇數的個數 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{  if(n%2!=0) accum.add(1L)   n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop() 

結果為:

sum: 20
accum: 5

這是結果正常的情況,但是在使用累加器的過程中如果對于spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。

自定義累加器

自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。官方同時給出了一個實現的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些信息。例如,我們可以用這個類收集Spark處理數據時的一些細節,當然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規模要加以控制,不宜過大。

繼承AccumulatorV2類,并復寫它的所有方法

package sparkimport constant.Constantimport org.apache.spark.util.AccumulatorV2import util.getFieldFromConcatStringimport util.setFieldFromConcatStringopen class SessionAccmulator : AccumulatorV2<String, String>() {  private var result = Constant.SESSION_COUNT + "=0|"+      Constant.TIME_PERIOD_1s_3s + "=0|"+      Constant.TIME_PERIOD_4s_6s + "=0|"+      Constant.TIME_PERIOD_7s_9s + "=0|"+      Constant.TIME_PERIOD_10s_30s + "=0|"+      Constant.TIME_PERIOD_30s_60s + "=0|"+      Constant.TIME_PERIOD_1m_3m + "=0|"+      Constant.TIME_PERIOD_3m_10m + "=0|"+      Constant.TIME_PERIOD_10m_30m + "=0|"+      Constant.TIME_PERIOD_30m + "=0|"+      Constant.STEP_PERIOD_1_3 + "=0|"+      Constant.STEP_PERIOD_4_6 + "=0|"+      Constant.STEP_PERIOD_7_9 + "=0|"+      Constant.STEP_PERIOD_10_30 + "=0|"+      Constant.STEP_PERIOD_30_60 + "=0|"+      Constant.STEP_PERIOD_60 + "=0"  override fun value(): String {    return this.result  }  /**   * 合并數據   */  override fun merge(other: AccumulatorV2<String, String>?) {    if (other == null) return else {      if (other is SessionAccmulator) {        var newResult = ""        val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)        resultArray.forEach {          val oldValue = other.result.getFieldFromConcatString("|", it)          if (oldValue.isNotEmpty()) {            val newValue = oldValue.toInt() + 1            //找到原因,一直在循環賦予值,debug30分鐘 很煩            if (newResult.isEmpty()){              newResult = result.setFieldFromConcatString("|", it, newValue.toString())            }            //問題就在于這里,自定義沒有寫錯,合并錯了            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())          }        }        result = newResult      }    }  }  override fun copy(): AccumulatorV2<String, String> {    val sessionAccmulator = SessionAccmulator()    sessionAccmulator.result = this.result    return sessionAccmulator  }  override fun add(p0: String?) {    val v1 = this.result    val v2 = p0    if (v2.isNullOrEmpty()){      return    }else{      var newResult = ""      val oldValue = v1.getFieldFromConcatString("|", v2!!)      if (oldValue.isNotEmpty()){        val newValue = oldValue.toInt() + 1        newResult = result.setFieldFromConcatString("|", v2, newValue.toString())      }      result = newResult    }  }  override fun reset() {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    result = newResult  }  override fun isZero(): Boolean {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    return this.result == newResult  }}

方法介紹

value方法:獲取累加器中的值

       merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合并的方法(下面介紹執行流程中將要用到)

        iszero方法:判斷是否為初始值

        reset方法:重置累加器中的值

        copy方法:拷貝累加器

spark中累加器的執行流程:

          首先有幾個task,spark engine就調用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執行最后將調用merge方法和各個task的結果累計器進行合并(此時被注冊的累加器是初始值)

總結

以上就是本文關于Spark自定義累加器的使用實例詳解的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家的。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 昌江| 芮城县| 来安县| 三台县| 汾阳市| 庆城县| 平顺县| 正镶白旗| 新昌县| 富源县| 始兴县| 鄂托克前旗| 扎赉特旗| 莆田市| 康定县| 阿瓦提县| 夏邑县| 眉山市| 革吉县| 阳城县| 韶关市| 祁阳县| 德阳市| 遵义县| 曲松县| 平武县| 奉贤区| 读书| 昂仁县| 广西| 陕西省| 阜宁县| 永寿县| 温泉县| 九江县| 灵山县| 宜川县| 江门市| 河间市| 呼伦贝尔市| 邓州市|