尚学堂大数据学习笔记(五)MapReduce简单案例1:找出每个月气温最高的2天
1. 数据集具体格式如下:
1951-07-03 12:21:03 47c
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
...
...
2. 需求:找出每个月气温最高的2天
2.1 前置准备:生成气温数据文件(TianQi.txt)并上传到HDFS上/data/目录下
public class HDFSTest {private Configuration conf = null;private FileSystem fs = null;@Beforepublic void conn() throws Exception {conf = new Configuration(true);fs = FileSystem.get(conf);}@Afterpublic void close() throws Exception {fs.close();}//上传文件到HDFS@Testpublic void upload() throws Exception {InputStream inputStream = new FileInputStream(new File("C:\\Users\\LGX\\Desktop\\TianQi.txt"));Path f = new Path("/data/TianQi.txt");FSDataOutputStream dataOutputStream = fs.create(f);IOUtils.copyBytes(inputStream, dataOutputStream, conf, true);System.out.println("upload OK...");}//创建气温数据//1951-07-03 12:21:03 47c@Testpublic void writeTest() throws Exception {FileOutputStream fos = new FileOutputStream(new File("C:\\Users\\LGX\\Desktop\\TianQi.txt"));Random random = new Random();StringBuffer sb = new StringBuffer();for(int i = 0;i < 1000000; i++) {sb.append(randomDate());sb.append("\t");sb.append(random.nextInt(50) - 10);sb.append("c");sb.append("\r\n");System.out.println(sb.toString());fos.write(sb.toString().getBytes());sb.delete(0, sb.length());}}private String randomDate(){Random rndYear=new Random();int year=rndYear.nextInt(118)+1900;Random rndMonth=new Random();int month=rndMonth.nextInt(12)+1;Random rndDay=new Random();int Day=rndDay.nextInt(28)+1;Random rndHour=new Random();int hour=rndHour.nextInt(23);Random rndMinute=new Random();int minute=rndMinute.nextInt(60);Random rndSecond=new Random();int second=rndSecond.nextInt(60);return year+"-"+cp(month)+"-"+cp(Day)+" "+cp(hour)+":"+cp(minute)+":"+cp(second);}private String cp(int num){String Num=num+"";if (Num.length()==1){return "0"+Num;}else {return Num;}}
}
3. 具体代码
3.0 具体目录结构:

3.1 MyTQ .java
/*** tianqi Main* @author LGX**/
public class MyTQ {public static void main(String[] args) throws Exception {Configuration conf = new Configuration(true);Job job = Job.getInstance(conf);job.setJobName("analyse-tq");job.setJarByClass(MyTQ.class);//Input OutputPath inputPath = new Path("/data/TianQi.txt");FileInputFormat.addInputPath(job, inputPath);Path outputPath = new Path("/data/analyse/");if(outputPath.getFileSystem(conf).exists(outputPath)) {outputPath.getFileSystem(conf).delete(outputPath, true);}FileOutputFormat.setOutputPath(job, outputPath);//MapTaskjob.setMapperClass(TMapper.class);job.setMapOutputKeyClass(TQ.class);job.setMapOutputValueClass(IntWritable.class);job.setPartitionerClass(TPartitioner.class);job.setSortComparatorClass(TSortComparator.class);job.setCombinerClass(TCombiner.class);job.setCombinerKeyGroupingComparatorClass(TGroupingComparator.class);//ReduceTaskjob.setGroupingComparatorClass(TGroupingComparator.class);job.setReducerClass(TReducer.class);job.waitForCompletion(true);}
}
3.2 TQ .java
/*** TianQi Type* @author LGX**/
public class TQ implements WritableComparable{private int year;private int month;private int day;private int wd;public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public int getWd() {return wd;}public void setWd(int wd) {this.wd = wd;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(this.year);out.writeInt(this.month);out.writeInt(this.day);out.writeInt(this.wd);}@Overridepublic void readFields(DataInput in) throws IOException {this.year = in.readInt();this.month = in.readInt();this.day = in.readInt();this.wd = in.readInt();}@Overridepublic int compareTo(TQ that) {int c1 = Integer.compare(this.year, that.getYear());if(c1 == 0) {int c2 = Integer.compare(this.month, that.getMonth());if(c2 == 0) {return Integer.compare(this.day, that.getDay());}return c2;}return c1;}}
3.3 TMapper .java
public class TMapper extends Mapper{private TQ Tkey = new TQ();private IntWritable TValue = new IntWritable();@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context)throws IOException, InterruptedException {//1949-10-01 14:21:02 34ctry {String[] valueStrs = StringUtils.split(value.toString(), '\t');//analyse dateSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = sdf.parse(valueStrs[0]);Calendar cal = Calendar.getInstance();cal.setTime(date);Tkey.setYear(cal.get(Calendar.YEAR));Tkey.setMonth(cal.get(Calendar.MONTH) + 1);Tkey.setDay(cal.get(Calendar.DAY_OF_MONTH));Tkey.setWd(Integer.parseInt(valueStrs[1].substring(0, valueStrs[1].length() - 1)));//analyse wdTValue.set(Integer.parseInt(valueStrs[1].substring(0, valueStrs[1].length() - 1)));context.write(Tkey, TValue);} catch (Exception e) {e.printStackTrace();}}}
3.4 TPartitioner .java
/*** 设置分区器 K V P* 用于分配相应的Reduce块* @author LGX**/
public class TPartitioner extends Partitioner{@Overridepublic int getPartition(TQ key, IntWritable value, int numPartitions) {return key.getYear() % numPartitions;}}
3.5 TSortComparator .java
public class TSortComparator extends WritableComparator{public TSortComparator() {super(TQ.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ obj1 = (TQ)a;TQ obj2 = (TQ)b;int c1 = Integer.compare(obj1.getYear(), obj2.getYear());if(c1 == 0) {int c2 = Integer.compare(obj1.getMonth(), obj2.getMonth());if(c2 == 0) {return -Integer.compare(obj1.getWd(), obj2.getWd());}return c2;}return c1;}}
3.6 TCombiner .java
public class TCombiner extends Reducer{private TQ rkey = new TQ();private IntWritable rvalue = new IntWritable();@Overrideprotected void reduce(TQ key, Iterable values, Reducer.Context context)throws IOException, InterruptedException {//原语:相同的Key为一组,调用一次Reduce方法int flg = 0;int day = 0;for (IntWritable intWritable : values) {if(flg == 0) {rkey = key;rvalue.set(key.getWd());context.write(rkey, rvalue);flg++;day = key.getDay();}if(flg != 0 && day != key.getDay()) {rkey = key;rvalue.set(key.getWd());context.write(rkey, rvalue);break;}} }
}
3.7 TGroupingComparator .java
public class TGroupingComparator extends WritableComparator{public TGroupingComparator() {super(TQ.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ obj1 = (TQ)a;TQ obj2 = (TQ)b;int c1 = Integer.compare(obj1.getYear(), obj2.getYear());if(c1 == 0) {return Integer.compare(obj1.getMonth(), obj2.getMonth());}return c1;}
}
3.8 TReducer .java
public class TReducer extends Reducer{private Text rkey = new Text();private IntWritable rvalue = new IntWritable();@Overrideprotected void reduce(TQ key, Iterable values, Reducer.Context context)throws IOException, InterruptedException {//原语:相同的Key为一组,调用一次Reduce方法int flg = 0;int day = 0;for (IntWritable intWritable : values) {if(flg == 0) {rkey.set(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());rvalue.set(key.getWd());context.write(rkey, rvalue);flg++;day = key.getDay();}if(flg != 0 && day != key.getDay()) {rkey.set(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());rvalue.set(key.getWd());context.write(rkey, rvalue);break;}}}
}
4. 导出Jar包
4.1 将当前项目导出成一个jar包上传到Hadoop集群上

4.2 使用xftp上传到集群上

5. 执行
运行命令
hadoop jar jar包名 完整类名
hadoop jar MyTQ.jar com.hadoop.mr.MyTQ
可以看到:
[root@node3 ~]# hadoop jar MyTQ.jar com.hadoop.mr.MyTQ
19/02/04 07:26:44 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
19/02/04 07:26:44 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/02/04 07:26:45 INFO input.FileInputFormat: Total input paths to process : 1
19/02/04 07:26:45 INFO mapreduce.JobSubmitter: number of splits:1
19/02/04 07:26:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1549208398103_0008
19/02/04 07:26:45 INFO impl.YarnClientImpl: Submitted application application_1549208398103_0008
19/02/04 07:26:45 INFO mapreduce.Job: The url to track the job: http://node4:8088/proxy/application_1549208398103_0008/
19/02/04 07:26:45 INFO mapreduce.Job: Running job: job_1549208398103_0008
19/02/04 07:26:56 INFO mapreduce.Job: Job job_1549208398103_0008 running in uber mode : false
19/02/04 07:26:56 INFO mapreduce.Job: map 0% reduce 0%
19/02/04 07:27:09 INFO mapreduce.Job: map 57% reduce 0%
19/02/04 07:27:12 INFO mapreduce.Job: map 67% reduce 0%
19/02/04 07:27:14 INFO mapreduce.Job: map 100% reduce 0%
19/02/04 07:27:22 INFO mapreduce.Job: map 100% reduce 100%
19/02/04 07:27:22 INFO mapreduce.Job: Job job_1549208398103_0008 completed successfully
19/02/04 07:27:22 INFO mapreduce.Job: Counters: 49File System CountersFILE: Number of bytes read=62310FILE: Number of bytes written=344919FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=24820451HDFS: Number of bytes written=36620HDFS: Number of read operations=6HDFS: Number of large read operations=0HDFS: Number of write operations=2Job Counters Launched map tasks=1Launched reduce tasks=1Data-local map tasks=1Total time spent by all maps in occupied slots (ms)=15905Total time spent by all reduces in occupied slots (ms)=4698Total time spent by all map tasks (ms)=15905Total time spent by all reduce tasks (ms)=4698Total vcore-milliseconds taken by all map tasks=15905Total vcore-milliseconds taken by all reduce tasks=4698Total megabyte-milliseconds taken by all map tasks=16286720Total megabyte-milliseconds taken by all reduce tasks=4810752Map-Reduce FrameworkMap input records=1000000Map output records=1000000Map output bytes=20000000Map output materialized bytes=62310Input split bytes=97Combine input records=1000000Combine output records=2832Reduce input groups=1416Reduce shuffle bytes=62310Reduce input records=2832Reduce output records=2832Spilled Records=5664Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=793CPU time spent (ms)=11390Physical memory (bytes) snapshot=299294720Virtual memory (bytes) snapshot=4133277696Total committed heap usage (bytes)=137498624Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=24820354File Output Format Counters Bytes Written=36620
You have new mail in /var/spool/mail/root

运行结束后查看结果:

使用命令hadoop dfs -get /data/analyse/* ./下载相应结果文件,命令依照自己情况更改hadoop dfs -get 需要下载的文件 下载的本地目录
[root@node3 MyTQ]# clear
[root@node3 MyTQ]# hadoop dfs -get /data/analyse/* ./
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.[root@node3 MyTQ]# ls
part-r-00000 _SUCCESS
[root@node3 MyTQ]#
使用cat part-r-00000命令查看结果:

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
