国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

MapReduce并行編程模型和框架

2019-11-11 04:55:34
字體:
來源:轉載
供稿:網友

傳統的串行處理方式

有四組文本數據:

“the weather is good”, “today is good”, “good weather is good”, “today has good weather”

對這些文本數據進行詞頻統計:

import java.util.Hashtable;import java.util.Iterator;import java.util.StringTokenizer;/** * 傳統的串行計算方式詞頻統計 * * @version 2017年1月12日 下午4:05:33 */public class WordCount { public static void main(String[] args) { String[] text = new String[]{ "the weather is good","today is good", "good weather is good","today has good weather" }; //同步、線程安全 Hashtable ht = new Hashtable(); //HashMap ht = new HashMap(); for(int i=0;i<=3;i++){ //字符串根據分隔符解析 StringTokenizer st = new StringTokenizer(text[i]); while (st.hasMoreTokens()) { String world = st.nextToken(); if(!ht.containsKey(world)){ ht.put(world, new Integer(1)); }else{ int wc = ((Integer)ht.get(world)).intValue()+1; ht.put(world, new Integer(wc)); } }//end of while }//end of for //輸出統計結果 for(Iterator itr = ht.keySet().iterator();itr.hasNext();){ String world = (String) itr.next(); System.out.一個MR分布式程序

求出每個年份的最高氣溫:

MaxTemperatureMapper.Java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //解析字段 String line =value.toString(); try{ String year = line.substring(0,4); int airTemperature =Integer.parseInt(line.substring(5)); context.write(new Text(year),new IntWritable(airTemperature)); }catch(Exception e){ System.out.println("error in line:" + line); } }} MaxTemperatureReducer.java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * reducer 比較每年度溫度最高值 * */public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int MaxValue = Integer.MIN_VALUE; for(IntWritable value:values){ MaxValue = Math.max(MaxValue, value.get()); } context.write(key , new IntWritable(MaxValue)); }}MaxTemperatureDriver.java:import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class MaxTemperatureDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { // 對 參數進行判斷:參數個數不為2,打印錯誤信息 if (args.length != 2){ System.err.printf("Usage: %s <input><output>",getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Configuration conf =getConf(); @SuppressWarnings("deprecation") //不檢測過期的方法 Job job = new Job(conf); job.setJobName("Max Temperature"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args)throws Exception{ int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitcode); }}

上傳數據至hadoop集群:

這里寫圖片描述

原始數據: Temperature1:

1990 211990 181991 211992 301990 21

Temperature2:

1991 211990 181991 241992 301993 21

將程序打包上傳至主節點某個目錄下,執行

hadoop jar /data/jar/maxtemperature.jar hdfs://192.168.75.128:9000/input hdfs://192.168.75.128:9000/output/temperature

執行結果:

結果數據:

1990 211991 241992 301993 21

完整的MapReduce編程模型

Combiner:進行中間結果數據網絡傳輸優化的工作。Combiner程序的執行是在Map節點完成計算之后、輸出結果之前。

Partitioner:將所有主鍵相同的鍵值對傳輸給同一個Reduce節點。分區的過程在Map節點輸出后、傳入Reduce節點之前完成的。

下面是針對四組數據的MapReduce完整的并行編程模型:

“the weather is good”, “today is good”, “good weather is good”, “today has good weather”

這里寫圖片描述

完整的MapReduce編程模型

(1)用戶程序會分成三個部分:Mapper,Reducer,Driver (2)Mapper的輸入數據是KV對的形式,KV的類型可以設置 (3)Mapper的輸出數據是KV對的形式,KV的類型可以設置 (4)Mapper中的業務邏輯寫在map方法中 (5)map方法是每進來一個KV對調用一次 (6)Reducer的輸入數據應該對應Mapper的輸出數據,也是KV (7)Reducer的業務邏輯寫在reduce方法中 (8)reduce方法是對每一個< key,valueList> 調用一次 (9)用戶的Mapper和Reducer都要繼承各自的父類 (10)整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象。

Hadoop系統架構和MapReduce執行流程

為了實現Hadoop系統設計中本地化計算的原則,數據存儲節點DataNode與計算節點TaskTracker將合并設置,讓每個從節點同時運行作為DataNode和TaskTracker,以此讓每個Tasktracker盡量處理存儲在本地DataNode上的數據。

而數據存儲主控節點NameNode與作業執行主控節點JobTracker既可以設置在同一個主控節點上,在集群規模較大或者這兩個主控節點負載都很高以至于互相影響時,也可以分開設置在兩個不同的節點上。

這里寫圖片描述

Hadoop系統的基本組成構架

MapReduce程序的執行流程:

MapReduce執行一個用戶提交的MapReduce程序的基本過程。

這里寫圖片描述

Hadoop MapReduce 程序執行流程

1) 首先,用戶程序客戶端通過作業客戶端接口程序JobClient提交一個用戶程序。 2) 然后JobClient向JobTracker提交作業執行請求并獲得一個Job ID。 3) JobClient同時也會將用戶程序作業和待處理的數據文件信息準備好并存儲在HDFS中。 4) JobClient正式向JobTracker提交和執行該作業。 5) JobTracker接受并調度該作業,進行作業的初始化準備工作,根據待處理數據的實際分片情況,調度和分配一定的Map節點來完成作業。 6) JobTracker 查詢作業中的數據分片信息,構建并準備相應的任務。 7) JobTracker 啟動TaskTracker節點開始執行具體的任務。 8) TaskTracker根據所分配的具體任務,獲取相應的作業數據。 9) TaskTracker節點創建所需要的Java虛擬機,并啟動相應的Map任務(或Reduce任務)的執行。 10) TaskTracker執行完所分配的任務之后,若是Map任務,則把中間結果數據輸出到HDFS中;若是Reduce任務,則輸出最終結果。 11) TaskTracker向JobTracker報告所分配的任務的完成。若是Map任務完成并且后續還有Reduce任務,則JobTracker會分配和啟動Reduce節點繼續處理中間結果并輸出最終結果。

參考學習資料:

1.HashMap和Hashtable的區別: http://www.importnew.com/7010.html 2.StringTokenizer類的使用方法: http://yacole.iteye.com/blog/41512


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 当涂县| 南宫市| 疏勒县| 云霄县| 堆龙德庆县| 衡阳市| 宁河县| 泸溪县| 临潭县| 吉林省| 延长县| 酒泉市| 株洲市| 尖扎县| 上蔡县| 东光县| 梅河口市| 武冈市| 山东| 开封县| 介休市| 临武县| 图片| 屏东市| 德阳市| 邹平县| 延吉市| 叶城县| 龙川县| 含山县| 迁西县| 铁力市| 富蕴县| 日土县| 全南县| 保山市| 长白| 娄底市| 榆树市| 古交市| 清新县|