【MapReduce】案例

美国新冠疫情COVID-19数据统计

统计美国各州病例数量

需求分析

  • 1、自定义对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。
  • 2、注意自定义对象需要实现Hadoop的序列化机制。
  • 3、以州作为map阶段输出的key,以CovidCountBean作为value,这样属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。

画图分析

在这里插入图片描述

代码实现

CovidCountBean

import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class CovidCountBean implements Writable {private long cases; //确诊病例数private long deaths; //死亡病例数public CovidCountBean() {}public CovidCountBean(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public long getCases() {return cases;}public long getDeaths() {return deaths;}public void set(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}@Overridepublic String toString() {return cases+"\t"+deaths;}/*** 序列化方法,控制哪一些字段可以序列化出去* @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(cases);dataOutput.writeLong(deaths);}/*** 反序列化方法 注意反序列的读取顺序,和序列化的写入顺序是一样的* @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {this.cases = dataInput.readLong();this.deaths = dataInput.readLong();}}

CovidSumMapper

import cn.hwq.mapreduce.covid.bean.CovidCountBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidSumMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> {Text outKey = new Text();CovidCountBean outValue = new CovidCountBean();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CovidCountBean>.Context context) throws IOException, InterruptedException {//读取一行数据 进行切割String[] fields = value.toString().split(",");//提取数据 州 确诊病例 死亡病例outKey.set(fields[2]);
//        outValue.set(Long.parseLong(fields[4]),Long.parseLong(fields[5])); 这里这样写可能出现下标越界,因为某些数据有缺失 如果还这样访问就会出现异常 需要考虑到特殊情况outValue.set(Long.parseLong(fields[fields.length - 2]),Long.parseLong(fields[fields.length - 1]));//输出结果context.write(outKey,outValue);//<州,CovidCountBean>}
}

CovidSumReducer

import cn.hwq.mapreduce.covid.bean.CovidCountBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidSumReducer extends Reducer<Text, CovidCountBean, Text, CovidCountBean> {CovidCountBean outValue = new CovidCountBean();@Overrideprotected void reduce(Text key, Iterable<CovidCountBean> values, Reducer<Text, CovidCountBean, Text, CovidCountBean>.Context context) throws IOException, InterruptedException {//统计变量long cases = 0; //确诊病例long deaths = 0; //死亡病例//遍历该州各个县的数据,并累加for (CovidCountBean value : values) {cases += value.getCases();deaths += value.getDeaths();}outValue.set(cases, deaths);context.write(key,outValue);}
}

CovidSumDriver

import cn.hwq.mapreduce.covid.bean.CovidCountBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @description: 美国各县新冠疫情汇总统计 客户端驱动类*/
public class CovidSumDriver {public static void main(String[] args) throws Exception{//配置文件对象Configuration conf = new Configuration();// 创建作业实例Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName());// 设置作业驱动类job.setJarByClass(CovidSumDriver.class);// 设置作业mapper reducer类job.setMapperClass(CovidSumMapper.class);job.setReducerClass(CovidSumReducer.class);// 设置作业mapper阶段输出key value数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CovidCountBean.class);//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(CovidCountBean.class);// 配置作业的输入数据路径FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作业的输出数据路径FileOutputFormat.setOutputPath(job, new Path(args[1]));//判断输出路径是否存在 如果存在删除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作业并等待执行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);}

结果示例

输入文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N8IphlKW-1670310486988)(C:/Users/HWQ/AppData/Roaming/Typora/typora-user-images/image-20221205235052710.png)]

输出文件part-r-00000

  • 统计了美国每个州的确诊病例和死亡病例

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-POZpSZxi-1670310486988)(C:/Users/HWQ/AppData/Roaming/Typora/typora-user-images/image-20221205235130550.png)]


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部