大数据学习之路(八)——MapReduce实战(hot weather top2)

之前的WordCount,比较简单,就只上了代码,接下来稍微复杂一点的项目,将会记录整个项目编写过程的思路

项目介绍:统计每年,每月最热的两天的温度

数据:

1949-10-01 14:21:02 34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c

期望:

1949-10-02:36   36
1949-10-01:34   34
1950-10-02:41   41
1950-10-01:37   37
1951-07-03:47   47
1951-07-02:46   46

思路:

  1. 我们要做的是统计每年每月的最高温度,所以在源数据中,我们所需要的是温度;

  2. 其次我们要修改排序方式sort,按照年、月升序排(降序排也可以),按照温度降序排,这样我们就可以在最终结果中取到前两个数据就ok了;

  3. 我们需要重写group,修改MApReduce的组合方式,将年、月相同的数据放到同一个reducer中进行计算。

  4. 我们的数据不再像WordCount那样是一个简单的Text了,我们需要自己写一个java bean来实现一个WritableComparable类,来存储我们的数据。

代码:

  • 首先写一个java bean,起名weather

public class Weather implements WritableComparable {// 年private Integer year;// 月private Integer month;// 日private Integer day;// 温度private Integer temperature;public Integer getYear() {return year;}public void setYear(int year) {this.year = year;}public Integer getMonth() {return month;}public void setMonth(int month) {this.month = month;}public Integer getDay() {return day;}public void setDay(int day) {this.day = day;}public Integer getTemperature() {return temperature;}public void setTemperature(int temperature) {this.temperature = temperature;}@Overridepublic int compareTo(Object o) {Weather w = (Weather) o;int res1 = Integer.compare(year, w.getYear());if (res1 == 0) {int res2 = Integer.compare(month, w.getMonth());if (res2 == 0) {return Integer.compare(w.getTemperature(), temperature);}return res2;}return res1;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(year);dataOutput.writeInt(month);dataOutput.writeInt(day);dataOutput.writeInt(temperature);}@Overridepublic void readFields(DataInput dataInput) throws IOException {year = dataInput.readInt();month = dataInput.readInt();day = dataInput.readInt();temperature = dataInput.readInt();}
}
  • 接下来使我们的Mapper类
public class MyMapper extends Mapper<LongWritable, Text, Weather, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");Calendar c = Calendar.getInstance();/*行数据是: 1949-10-01 14:21:02   34c日期和温度之间是用tab分隔的,所以我们用\t来拆分*/String line = value.toString();String[] list = StringUtils.split(line, '\t');if (list.length == 2) {// 1949-10-01 14:21:02String arg = list[0];// 34cString temp = list[1];Weather w = new Weather();try {Date date =dateFormat.parse(arg);c.setTime(date);// 1949w.setYear(c.get(Calendar.YEAR));// 10w.setMonth(c.get(Calendar.MONTH) + 1);// 01 -> 1w.setDay(c.get(Calendar.DATE));// 34c -> 34,我们要注意寒冷的天气,所以截取的时候不能写2,而要根据c的位置类截取int t = Integer.parseInt(temp.substring(0, temp.toString().lastIndexOf("c")));w.setTemperature(t);context.write(w, new IntWritable(t));} catch (ParseException e) {e.printStackTrace();}}}
}
  • map完之后我们进行分割,默认是hash分割(其实不写也行,只要保证数据量能被平均分配)
public class MyPartitioner extends HashPartitioner<Weather, IntWritable>{@Overridepublic int getPartition(Weather key, IntWritable value, int numReduceTasks) {// 这里我们按照年份取模,按年份划分给不同的task,numReduceTasks是task的数量return (key.getYear()-1949) % numReduceTasks;}
}
  • 接下来重写sort方法
public class MySort extends WritableComparator {public MySort() {super(Weather.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {// 强转成WeatherWeather w1 = (Weather) a;Weather w2 = (Weather) b;int res1 = w1.getYear().compareTo(w2.getYear());if (res1 == 0){int res2 = w1.getMonth().compareTo(w2.getMonth());if (res2 == 0) {// -w1.getTemperature().compareTo(w2.getTemperature());也可以// 只是我觉得多了异步运算return w2.getTemperature().compareTo(w1.getTemperature());}return res2;}return res1;}
}
  • 然后是group方法
// 这几个比较方法差别不大,group只需要比较年月相同就ok了
public class MyGroup extends WritableComparator {public MyGroup() {super(Weather.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {Weather w1 = (Weather) a;Weather w2 = (Weather) b;int res1 = w1.getYear().compareTo(w2.getYear());if (res1 == 0){return w1.getMonth().compareTo(w2.getMonth());}return res1;}
}
  • 接下来是我们的reducer
public class MyReducer extends Reducer<Weather, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Weather key, Iterable values, Context context) throws IOException, InterruptedException {int i = 0;for (IntWritable t : values) {if (i++ == 2) {// 只取前两个就okbreak;}// String val = key.getYear()+"-"+key.getMonth()+"-"+key.getDay();context.write(new Text(val), t);}}
}
  • 最后来写我们的配置来执行吧

public class RunJob {static Configuration conf;public static void main (String[] args) {// 加载配置文件try {conf = new Configuration();// 设置测试用配置conf.set("fs.defaultFS", "hdfs://localhost:9000");conf.set("yarn.resourcemanager.hostname", "localhost");// 实例化hdfsFileSystem fs = FileSystem.get(conf);// 获取job实例Job job = Job.getInstance(conf);job.setJarByClass(RunJob.class);job.setMapperClass(MyMapper.class);job.setPartitionerClass(MyPartitioner.class);job.setSortComparatorClass(MySort.class);job.setGroupingComparatorClass(MyGroup.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(Weather.class);job.setMapOutputValueClass(IntWritable.class);job.setNumReduceTasks(3);// 上传数据,添加数据输入路径// hdfs dfs -put data /weather/input/dataPath input = new Path("/weather/input/data");if (!fs.exists(input)) {System.out.println("输入文件不存在!");System.exit(1);}FileInputFormat.addInputPath(job, input);Path output = new Path("/weather/output");// 保证输出路径不存在if (fs.exists(output)) {fs.delete(output, true);}// 设置数据输出路径FileOutputFormat.setOutputPath(job, output);boolean res = job.waitForCompletion(true);if(res){System.out.println("job 成功执行");}} catch (Exception e) {e.printStackTrace();}}
}

测试,运行main,到 [ http://localhost:8088/cluster/apps/RUNNING ]查看任务

查看输出文件

hdfs dfs -ls /weather/output 

查看输出数据

hdfs dfs -cat /weather/output/part-r-00000
hdfs dfs -cat /weather/output/part-r-00001
hdfs dfs -cat /weather/output/part-r-00002

对比预期数据,ok完成

代码下载地址 [ https://github.com/qn9301/bigdata-learn ],喜欢的话欢迎star,我会吧我学习的过程全部记录下来,想学习的同学可以一起做个参考。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部