有四組文本數(shù)據(jù):
“the weather is good”, “today is good”, “good weather is good”, “today has good weather”
對(duì)這些文本數(shù)據(jù)進(jìn)行詞頻統(tǒng)計(jì):
import java.util.Hashtable;import java.util.Iterator;import java.util.StringTokenizer;/** * 傳統(tǒng)的串行計(jì)算方式詞頻統(tǒng)計(jì) * * @version 2017年1月12日 下午4:05:33 */public class WordCount { public static void main(String[] args) { String[] text = new String[]{ "the weather is good","today is good", "good weather is good","today has good weather" }; //同步、線程安全 Hashtable ht = new Hashtable(); //HashMap ht = new HashMap(); for(int i=0;i<=3;i++){ //字符串根據(jù)分隔符解析 StringTokenizer st = new StringTokenizer(text[i]); while (st.hasMoreTokens()) { String world = st.nextToken(); if(!ht.containsKey(world)){ ht.put(world, new Integer(1)); }else{ int wc = ((Integer)ht.get(world)).intValue()+1; ht.put(world, new Integer(wc)); } }//end of while }//end of for //輸出統(tǒng)計(jì)結(jié)果 for(Iterator itr = ht.keySet().iterator();itr.hasNext();){ String world = (String) itr.next(); System.out.一個(gè)MR分布式程序求出每個(gè)年份的最高氣溫:
MaxTemperatureMapper.Java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //解析字段 String line =value.toString(); try{ String year = line.substring(0,4); int airTemperature =Integer.parseInt(line.substring(5)); context.write(new Text(year),new IntWritable(airTemperature)); }catch(Exception e){ System.out.println("error in line:" + line); } }} MaxTemperatureReducer.java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * reducer 比較每年度溫度最高值 * */public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int MaxValue = Integer.MIN_VALUE; for(IntWritable value:values){ MaxValue = Math.max(MaxValue, value.get()); } context.write(key , new IntWritable(MaxValue)); }}MaxTemperatureDriver.java:import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class MaxTemperatureDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { // 對(duì) 參數(shù)進(jìn)行判斷:參數(shù)個(gè)數(shù)不為2,打印錯(cuò)誤信息 if (args.length != 2){ System.err.printf("Usage: %s <input><output>",getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Configuration conf =getConf(); @SuppressWarnings("deprecation") //不檢測(cè)過期的方法 Job job = new Job(conf); job.setJobName("Max Temperature"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args)throws Exception{ int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitcode); }}上傳數(shù)據(jù)至hadoop集群:
原始數(shù)據(jù): Temperature1:
1990 211990 181991 211992 301990 21Temperature2:
1991 211990 181991 241992 301993 21將程序打包上傳至主節(jié)點(diǎn)某個(gè)目錄下,執(zhí)行
hadoop jar /data/jar/maxtemperature.jar hdfs://192.168.75.128:9000/input hdfs://192.168.75.128:9000/output/temperature執(zhí)行結(jié)果:
結(jié)果數(shù)據(jù):
1990 211991 241992 301993 21Combiner:進(jìn)行中間結(jié)果數(shù)據(jù)網(wǎng)絡(luò)傳輸優(yōu)化的工作。Combiner程序的執(zhí)行是在Map節(jié)點(diǎn)完成計(jì)算之后、輸出結(jié)果之前。
Partitioner:將所有主鍵相同的鍵值對(duì)傳輸給同一個(gè)Reduce節(jié)點(diǎn)。分區(qū)的過程在Map節(jié)點(diǎn)輸出后、傳入Reduce節(jié)點(diǎn)之前完成的。
下面是針對(duì)四組數(shù)據(jù)的MapReduce完整的并行編程模型:
“the weather is good”, “today is good”, “good weather is good”, “today has good weather”
(1)用戶程序會(huì)分成三個(gè)部分:Mapper,Reducer,Driver (2)Mapper的輸入數(shù)據(jù)是KV對(duì)的形式,KV的類型可以設(shè)置 (3)Mapper的輸出數(shù)據(jù)是KV對(duì)的形式,KV的類型可以設(shè)置 (4)Mapper中的業(yè)務(wù)邏輯寫在map方法中 (5)map方法是每進(jìn)來一個(gè)KV對(duì)調(diào)用一次 (6)Reducer的輸入數(shù)據(jù)應(yīng)該對(duì)應(yīng)Mapper的輸出數(shù)據(jù),也是KV (7)Reducer的業(yè)務(wù)邏輯寫在reduce方法中 (8)reduce方法是對(duì)每一個(gè)< key,valueList> 調(diào)用一次 (9)用戶的Mapper和Reducer都要繼承各自的父類 (10)整個(gè)程序需要一個(gè)Drvier來進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對(duì)象。
為了實(shí)現(xiàn)Hadoop系統(tǒng)設(shè)計(jì)中本地化計(jì)算的原則,數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)DataNode與計(jì)算節(jié)點(diǎn)TaskTracker將合并設(shè)置,讓每個(gè)從節(jié)點(diǎn)同時(shí)運(yùn)行作為DataNode和TaskTracker,以此讓每個(gè)Tasktracker盡量處理存儲(chǔ)在本地DataNode上的數(shù)據(jù)。
而數(shù)據(jù)存儲(chǔ)主控節(jié)點(diǎn)NameNode與作業(yè)執(zhí)行主控節(jié)點(diǎn)JobTracker既可以設(shè)置在同一個(gè)主控節(jié)點(diǎn)上,在集群規(guī)模較大或者這兩個(gè)主控節(jié)點(diǎn)負(fù)載都很高以至于互相影響時(shí),也可以分開設(shè)置在兩個(gè)不同的節(jié)點(diǎn)上。
MapReduce程序的執(zhí)行流程:
MapReduce執(zhí)行一個(gè)用戶提交的MapReduce程序的基本過程。
1) 首先,用戶程序客戶端通過作業(yè)客戶端接口程序JobClient提交一個(gè)用戶程序。 2) 然后JobClient向JobTracker提交作業(yè)執(zhí)行請(qǐng)求并獲得一個(gè)Job ID。 3) JobClient同時(shí)也會(huì)將用戶程序作業(yè)和待處理的數(shù)據(jù)文件信息準(zhǔn)備好并存儲(chǔ)在HDFS中。 4) JobClient正式向JobTracker提交和執(zhí)行該作業(yè)。 5) JobTracker接受并調(diào)度該作業(yè),進(jìn)行作業(yè)的初始化準(zhǔn)備工作,根據(jù)待處理數(shù)據(jù)的實(shí)際分片情況,調(diào)度和分配一定的Map節(jié)點(diǎn)來完成作業(yè)。 6) JobTracker 查詢作業(yè)中的數(shù)據(jù)分片信息,構(gòu)建并準(zhǔn)備相應(yīng)的任務(wù)。 7) JobTracker 啟動(dòng)TaskTracker節(jié)點(diǎn)開始執(zhí)行具體的任務(wù)。 8) TaskTracker根據(jù)所分配的具體任務(wù),獲取相應(yīng)的作業(yè)數(shù)據(jù)。 9) TaskTracker節(jié)點(diǎn)創(chuàng)建所需要的Java虛擬機(jī),并啟動(dòng)相應(yīng)的Map任務(wù)(或Reduce任務(wù))的執(zhí)行。 10) TaskTracker執(zhí)行完所分配的任務(wù)之后,若是Map任務(wù),則把中間結(jié)果數(shù)據(jù)輸出到HDFS中;若是Reduce任務(wù),則輸出最終結(jié)果。 11) TaskTracker向JobTracker報(bào)告所分配的任務(wù)的完成。若是Map任務(wù)完成并且后續(xù)還有Reduce任務(wù),則JobTracker會(huì)分配和啟動(dòng)Reduce節(jié)點(diǎn)繼續(xù)處理中間結(jié)果并輸出最終結(jié)果。
1.HashMap和Hashtable的區(qū)別: http://www.importnew.com/7010.html 2.StringTokenizer類的使用方法: http://yacole.iteye.com/blog/41512
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注