国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 服務器 > Web服務器 > 正文

Hadoop對文本文件的快速全局排序實現方法及分析

2024-09-01 13:53:12
字體:
來源:轉載
供稿:網友

一、背景

Hadoop中實現了用于全局排序的InputSampler類和TotalOrderPartitioner類,調用示例是org.apache.hadoop.examples.Sort。

但是當我們以Text文件作為輸入時,結果并非按Text中的string列排序,而且輸出結果是SequenceFile。

原因:

1) hadoop在處理Text文件時,key是行號LongWritable類型,InputSampler抽樣的是key,TotalOrderPartitioner也是用key去查找分區。這樣,抽樣得到的partition文件是對行號的抽樣,結果自然是根據行號來排序。

2)大數據量時,InputSampler抽樣速度會非常慢。比如,RandomSampler需要遍歷所有數據,IntervalSampler需要遍歷文件數與splits數一樣。SplitSampler效率比較高,但它只抽取每個文件前面的記錄,不適合應用于文件內有序的情況。

二、功能

1. 實現了一種局部抽樣方法PartialSampler,適用于輸入數據各文件是獨立同分布的情況

2. 使RandomSampler、IntervalSampler、SplitSampler支持對文本的抽樣

3. 實現了針對Text文件string列的TotalOrderPartitioner

三、實現

1. PartialSampler

PartialSampler從第一份輸入數據中隨機抽取第一列文本數據。PartialSampler有兩個屬性:freq(采樣頻率),numSamples(采樣總數)。

public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {   InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());   ArrayList<K> samples = new ArrayList<K>(numSamples);   Random r = new Random();   long seed = r.nextLong();   r.setSeed(seed);   LOG.debug("seed: " + seed);      // 對splits【0】抽樣   for (int i = 0; i < 1; i++) {    System.out.println("PartialSampler will getSample splits["+i+"]");    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,      Reporter.NULL);    K key = reader.createKey();    V value = reader.createValue();    while (reader.next(key, value)) {     if (r.nextDouble() <= freq) {      if (samples.size() < numSamples) {        // 選擇value中的第一列抽樣        Text value0 = new Text(value.toString().split("/t")[0]);             samples.add((K) value0);              } else {       // When exceeding the maximum number of samples, replace a       // random element with this one, then adjust the frequency       // to reflect the possibility of existing elements being       // pushed out       int ind = r.nextInt(numSamples);       if (ind != numSamples) {        Text value0 = new Text(value.toString().split("/t")[0]);         samples.set(ind, (K) value0);       }       freq *= (numSamples - 1) / (double) numSamples;      }      key = reader.createKey();     }    }        reader.close();   }   return (K[])samples.toArray();  }

首先通過InputFormat的getSplits方法得到所有的輸入分區;

然后掃描第一個分區中的記錄進行采樣。

記錄采樣的具體過程如下:

從指定分區中取出一條記錄,判斷得到的隨機浮點數是否小于等于采樣頻率freq

  如果大于則放棄這條記錄;

  如果小于,則判斷當前的采樣數是否小于最大采樣數,

    如果小于則這條記錄被選中,被放進采樣集合中;

    否則從【0,numSamples】中選擇一個隨機數,如果這個隨機數不等于最大采樣數numSamples,則用這條記錄替換掉采樣集合隨機數對應位置的記錄,同時采樣頻率freq減小變為freq*(numSamples-1)/numSamples。

然后依次遍歷分區中的其它記錄。

note:

1)PartialSampler只適用于輸入數據各文件是獨立同分布的情況。

2)自帶的三種Sampler通過修改samples.add(key)為samples.add((K) value0); 也可以實現對第一列的抽樣。

2. TotalOrderPartitioner

TotalOrderPartitioner主要改進了兩點:

1)讀partition時指定keyClass為Text.class

因為partition文件中的key類型為Text

在configure函數中,修改:

//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();Class<K> keyClass = (Class<K>)Text.class;

2)查找分區時,改用value查

public int getPartition(K key, V value, int numPartitions) {  Text value0 = new Text(value.toString().split("/t")[0]);   return partitions.findPartition((K) value0); }

3. Sort

1)設置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass

2)初始化InputSampler對象,抽樣

3)partitionFile通過CacheFile傳給TotalOrderPartitioner,執行MapReduce任務

 Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;  Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;  Class<? extends WritableComparable> outputKeyClass = Text.class;  Class<? extends Writable> outputValueClass = Text.class;  jobConf.setMapOutputKeyClass(LongWritable.class);  // Set user-supplied (possibly default) job configs  jobConf.setNumReduceTasks(num_reduces);  jobConf.setInputFormat(inputFormatClass);  jobConf.setOutputFormat(outputFormatClass);  jobConf.setOutputKeyClass(outputKeyClass);  jobConf.setOutputValueClass(outputValueClass);  if (sampler != null) {   System.out.println("Sampling input to effect total-order sort...");   jobConf.setPartitionerClass(TotalOrderPartitioner.class);   Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];   inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));   //Path partitionFile = new Path(inputDir, "_sortPartitioning");   TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);   InputSampler.<K,V>writePartitionFile(jobConf, sampler);   URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");   DistributedCache.addCacheFile(partitionUri, jobConf);   DistributedCache.createSymlink(jobConf);  }  FileSystem hdfs = FileSystem.get(jobConf);  hdfs.delete(outputpath);  hdfs.close();  System.out.println("Running on " +    cluster.getTaskTrackers() +    " nodes to sort from " +     FileInputFormat.getInputPaths(jobConf)[0] + " into " +    FileOutputFormat.getOutputPath(jobConf) +    " with " + num_reduces + " reduces.");  Date startTime = new Date();  System.out.println("Job started: " + startTime);  jobResult = JobClient.runJob(jobConf);

四、執行

usage:

hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>

Example:

hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

五、性能

200G輸入數據,15億條url,1000個分區,排序時間只用了6分鐘

總結

以上就是本文關于Hadoop對文本文件的快速全局排序實現方法及分析的全部內容,希望對大家有所幫助 ,如有不足之處,歡迎留言指出,感謝朋友們對本站的支持!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 长子县| 遵义县| 永安市| 荣昌县| 云南省| 榆树市| 乐山市| 宁城县| 西乌珠穆沁旗| 广宁县| 多伦县| 峨山| 河源市| 深水埗区| 华池县| 伽师县| 通渭县| 平原县| 阳春市| 射洪县| 屏南县| 古交市| 仁怀市| 张家口市| 汉中市| 齐齐哈尔市| 尼木县| 郧西县| 林口县| 太康县| 郎溪县| 应城市| 庐江县| 宿松县| 清流县| 射洪县| 酉阳| 舟山市| 长顺县| 镇宁| 九龙城区|