MapReduce 初识+案例(词频统计)

1. MapReduce

1.1 MapReduce 是什么

MapReduce:是 Hadoop 中的一个分布式计算框架,基于 MapReduce 写出的应用程序能够运行在大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数据集。

一个 MapReduce 作业(Job)通常会把输入的数据切分为若干个独立的数据块,由 Map 任务(Task)以完全并行的方式处理它们。框架会对 Map 的输出先进行排序,然后把结果输入给 Reduce 任务,通常作业的输入和输出都会被存储在文件系统中。

整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

1.2 优点/缺点

1.2.1 优点

  • 易于编程

    MapReduce 将所有的计算抽象为 Map(映射)和 Reduce(聚合)两个阶段,只需要继承并实现 Mapper 和 Reduce 类,就可以完成高性能的分布式程序

  • 高扩展性

    MapReduce 通过将多台机器的计算能力(CPU、内存),提供海量级的计算

  • 高容错性

    高并发(多线程)的分布式程序运行过程中,如果一些线程出现错误或机器故障,MapReduce 可以自动的启动错误重试机制,或将任务转移到其他机器运行,能够保证程序最终正确执行

  • 适用于海量级数据

    HDFS 可以存储的数据量级,MapReduce 可以使用应用程序对其完成计算

1.2.2 缺点

MapReduce 不擅长做实际计算、流式计算、DAG(有向图)计算。

MapReduce 的任务表达能力有限,一个 MapReduce 只能完成一次映射和聚合,像 DAG 任务就需要多次聚合,那就需要将任务拆成多个 MapReduce ,每个MapReduce 任务都需要大量的磁盘 IO,将导致性能低下。

1.3 MapReduce 运行阶段

第一阶段: Map Task 并发实例,完全并行运行,不互相干

第二阶段: Reduce Task 并发实例,获取上一阶段的输出作为本阶段的输入

1.4 MapReduce 进程

MrAppMaster:负责整个程序的过程调度及状态调度

MapTask:负责 Map 阶段的整个数据处理流程

ReduceTask:负责 Reduce 阶段的整个数据处理流程

2. Java 词频统计

在理解 MapReduce 之前,不如先用 Java 实现一个词频统计的实例。

public static void main(String[] args) throws IOException {// 1. 创建容器存储结果HashMap<String, Integer> map = new HashMap<>();// 2. 读取文件File file = new File("...");String encoding = "utf8";List<String> lines = FileUtils.readLines(file, encoding);// 3. 遍历每一行for (String line : lines) {// 4. 切分出每个单词String[] words = line.split("\\s+");for (String word : words) {// 5. 替换掉特殊字符String w = word.toLowerCase().replace("\\W", "");// 6. 每出现一个单词进行数量 + 1if (!w.isEmpty()){map.put(w, map.getOrDefault(w,0) + 1 );}}}// 7. 将统计结果进行排序ArrayList<Map.Entry<String, Integer>> entries = new ArrayList<>(map.entrySet());entries.sort(new Comparator<Map.Entry<String, Integer>>() {@Overridepublic int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {return o2.getValue() - o1.getValue();}});for (Map.Entry<String, Integer> entry : entries) {System.out.printf("单词:%s \t出现的个数为 %d\n", entry.getKey(), entry.getValue());}}

3. MapReduce 编程规范

利用 MapReduce 实现词频统计之前还需要了解 MapReduce 的编程规范。

通常我们编写一个 MapReduce 程序,会将其分解为三个部分:Mapper、Reduce、Driver。

**Mapper: **

  1. 自定义一个类,并继承 Mapper,定义输入输出键值对的泛型
  2. 实现父类的 map() 方法(MapTask进程),定义键值对的参数类型及上下文对象 context
  3. 编写 map 的具体实现,最后通过 context 对象将映射结果写入 MapReduce 框架

**Reduce: **

  1. 自定义一个类,并继承 Reduce,定义输入输出键值对的泛型
  2. 实现父类的 reduce() 方法(ReduceTask进程),定义键值对的参数类型及上下文对象 context
  3. 编写 reduce的具体实现,最后通过 context 对象将聚合结果写入 MapReduce 框架

Driver:

  1. 这是一个包含 main 方法的 MapReduce 任务的入口
  2. 实例化 Job 对象,可选择性的添加各种配置
  3. 将 Job 任务提交到集群

4. MapReduce 词频统计

4.1 Mapper

我们需要继承 Mapper 类,来自于:org.apache.hadoop.mapreduce.Mapper

4.1.1 窥见源码

4.1.1.1 Mapper 类

Mapper 类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {...}

⚠️ 注意:这里我们不适用 Java 提供的类型,而是使用 Hadoop 实现的序列化类型,公共接口为 Writable

🌈 如果读文本,通常默认的 KEYIN 类型为LongWritable(可表示当前行文本的位置、字节偏移量offset),VALUEIN 的类型为 Text(表示当前行文本内容),输出的 KEYOUT 类型为 Text(表示输出值的键),输出的 VALUEOUT 的类型为 IntWritable(可表示为数量)。

4.1.1.2 Mapper 方法
// 在任务开始时调用一次,通常用作创建连接、打开流等获取资源的操作
protected void setup(Context context) throws IOException, InterruptedException {// NOTHING
}
// 对输入拆分中的每个键/值对调用一次,通常需要重写该方法,这是一个默认的核心方法
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {// map 处理完后将数据写出去context.write((KEYOUT) key, (VALUEOUT) value);
}
// 在任务结束时调用一次,用作关闭资源等
protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING
}
// 该方法相当于整合了上面三个方法
public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}
}

4.1.2 Job_WordCountMapper

词频统计 Mapper 如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Job_WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {// 输出Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读取的文本每行的数据,分隔成单词String[] words = value.toString().split("\\s+");// 对单词进行处理for (String word : words) {// 转小写 去除特殊字符String w = word.toLowerCase().replace("\\W", "");// 将单词作为输出的 keyk.set(w);// 使用上下文对象 将 mapper 处理的结果以  的方式写到 MapReduce 框架context.write(k, v);}}
}

4.2 Reduce

4.2.1 窥见源码

4.2.1.1 Reduce 类

Reduce 类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {...}

🌈 Reduce 负责接收 Mapper 输出的内容,所以 KEYINVALUEIN 就对应 mapper 的输出键值对的类型,KEYOUT 使用 TextVALUEOUT 使用 LongWritable(防止结果量级太大)

4.2.1.2 Reduce 方法
protected void setup(...){...}
protected void cleanup(...){...}
public void run(...){...}
// 这个方法对每个键调用一次,通常需要重写该方法,这是一个默认的核心方法
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}
}

4.2.2 Job_WordCountReducer

词频统计 Reduce 如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Job_WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,  Context context) throws IOException, InterruptedException {// 声明变量 用于存储聚合完的结果long count = 0;// 遍历相同的 key 获取对应的所有 valuefor (IntWritable value : values) {count += value.get();}// 将聚合完的结果写到 MapReduce 框架context.write(key, new LongWritable(count));}
}

4.3 Driver

我们需要实例化 Job 对象,并配置相关类(相当于整合了 Mapper、Reduce)。

🌈 配置成类,而不是配置为具体的对象,是因为方便后期通过反射获取多个实例

4.3.1 相关方法

编写 Driver 不需要继承某个类,但需要注意需要使用的几个方法:

1️⃣ : getInstance

通过 Job.getInstance() 获取 Job 实例,该方法实则传了一个配置信息,再向下延申,可以发现先实例化了 JobConf 类,再实例化 Job

public static Job getInstance() throws IOException {// create with a null Clusterreturn getInstance(new Configuration());
}
public static Job getInstance(Configuration conf) throws IOException {// create with a null ClusterJobConf jobConf = new JobConf(conf);return new Job(jobConf);
}

2️⃣ : waitForCompletion

意为等待事务完成,其中参数表示是否打印程序的运行过程

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();
}

4.3.2 Job_WordCountDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Job_WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0. 自定义配置对象Configuration conf = new Configuration();// 1. 创建 Job 对象,参数可取消Job job = Job.getInstance(conf);// 2. 给 Job 对象添加 Mapper 类的 Classjob.setMapperClass(Job_WordCountMapper.class);// 3. 给 Job 对象添加 Reduce 类的 Classjob.setReducerClass(Job_WordCountReducer.class);// 4. 给 Job 对象添加 Driver 类的 Classjob.setJarByClass(Job_WordCountDriver.class);// 5. 设置 Mapper 输出的数据的 key 类型job.setMapOutputKeyClass(Text.class);// 6. 设置 Mapper 输出的数据的 value 类型job.setMapOutputValueClass(IntWritable.class);// 7. 设置 Reduce 输出的数据的 key 类型job.setOutputKeyClass(Text.class);// 8. 设置 Reduce 输出的数据的 value 类型job.setOutputValueClass(LongWritable.class);// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path("..."));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path("..."));// 11.提交任务boolean b = job.waitForCompletion(true);System.exit( b ? 0 : 1 );}
}

⚠️ 注意:本地测试可以直接写输入输出路径,但集群上不能写死,所以需要以参数的形式让用户输入,注意 main 方法的参数是个数组类型,所以可将其修改为:

    // 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));

4.4 运行测试

11231

注:文本处理的不太干净,先这样了…

5. 集群测试

MayReduce 写完最主要的还是要放到集群上测试。

测试环境:

在开发时使用较小的样例数据,MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式循行,这种方式称为 local 测试

集群测试:

本地运行测试逻辑正确后,将程序提交至 Yarn 集群,分发到很多节点上并发执行,处理的数据和输出的结果存放于 HDFS 系统

1️⃣ Step1:通过 IDEA 将程序打包为 jar 包

image-20211123010631545

2️⃣ Step2:(可省略此步骤)使用解压缩工具打开该 jar 包(不要解压),在 MayReduce_WordCount-1.0-SNAPSHOT.jar\META-INF 目录下修改 MANIFEST.MF 文件,添加主类(Reference 路径),并保存

Main-Class: WordCount_API.Job_WordCountDriver

3️⃣ Step3:将该 jar 包上传至服务器,开启 HDFS,提前上传一个待统计的英文词频文件

image-20211123011452064

4️⃣ Step4:启动该程序(若运行后找不到类,可添加 main-class 参数以配置启动类,确保文件输出路径为空)

hadoop jar xxx.jar  fileIn fileOut

5️⃣ Step5:运行测试,到这算成功了

11232

6️⃣ Step6:查看输出,如下,与 IDEA 测试的输出一致

image-20211123012538117

6. MapReduce 运行流程

1️⃣ Step1:MapReduce 程序读取文件的输入目录上存放的相应文件

2️⃣ Step2:获取待处理的数据信息,根据集群中的配置形成一个任务分配规划

3️⃣ Step3:客户端提交 job.split、jar.xml 等文件给 yarn, yarn 中的 resourcemanager 启动 MapReduce AppMaster

4️⃣ Step4:MapReduce AppMaster 启动后根据本次 Job 的描述信息,计算存储需要的 maptask 实例数量,然后向集群申请节点,启动相应数量的 maptask 进程

5️⃣ Step5:maptask 利用客户指定的 inputformat 来读取数据,形成输入的键值对

6️⃣ Step6:maptask 将输入的键值对传递给客户定义的 map 方法,做逻辑运算

7️⃣ Step7:map 方法运算完后将键值对收集到 maptask 缓存

8️⃣ Step8:maptask 缓存中的键值对按照 K 分区排序后不断写到磁盘文件

9️⃣ Step9:MapReduce AppMaster 监控到所有 maptask 进程完成后,会根据客户指定的参数启动相应数量的 reducetask 进程,并通知 reducetask 进程要处理的数据分区

1️⃣0️⃣ Step10:reducetask 进程启动后,根据 MapReduce AppMaster 告知的待处理数据的位置,从若干台 maptask 运行所在节点上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序,然后按照 相同键的键值对为一组,调用客户定义的 reduce 方法进行逻辑运算

1️⃣1️⃣ Step11:reducetask 运算完成后,调用客户指定的 outputformat 将结果数据输出到外部存储

7. 写在最后

梳理下 MapReduce 词频统计的流程

Mapper:

  1. 程序启动后将 maptask 传递的文本转换为 String
  2. 根据空格切分出单词,并处理大小写、特殊符号
  3. 将单词输出为 <单词, 1>

Reducer

  1. 汇总相同 key 的个数
  2. 输出该 key 的总数

Driver

  1. 获取配置信息,实例化 Job 对象
  2. 关联 mapper/reduce 类
  3. 指定 mapper 输出数据的 key-value 类型
  4. 指定 reducer 输出数据的 key-value 类型
  5. 指定 Job 输入输出的文件路径
  6. 提交 Job


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部