2020年11月 工信部考试——Hadoop(数据应用技术)中级认证2
系统提纲
280、使用 Java API 在 HDFS 的根目录下创建/tmp/demo1 目录
282、按部门号对员工信息进行分区
283、下列是词频统计实验中的 Main 类,按提示补全代码
295、使用 Java Api 访问 HBase,按提示补充代码
280. 使用 Java API 在 HDFS 的根目录下创建/tmp/demo1 目录
package com.myhdfs.mypro;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {public static void main(String[] args) throws Exception{Configuration conf=new Configuration();//配置 NameNode 地址URI uri=new URI("hdfs://192.168.229.141:8020");//指定用户名,获取 FileSystem 对象FileSystem fs=FileSystem.get(uri,conf,"dy");//设置路径Path dfs=new Path("/");//列出指定路径下的目录和文件FileStatus[] fileStatuses = new FileStatus[0];fileStatuses = fs.listStatus(dfs);for (FileStatus fileStatus : fileStatuses) {//System.out.println(fileStatus);if (fileStatus.isDirectory()){System.out.println("dir:"+fileStatus.getPath());}else {System.out.println("file:"+fileStatus.getPath());}}//创建级联目录fs.mkdirs(new Path("/tmp/demo1"));System.out.println("Mkidrs Successfully");}
}
282. 按部门号对员工信息进行分区
有 emp.csv 文件,内容如下:
7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,950,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10
| 从前往后 | 字段分别为 |
|---|---|
| EMPNO | 员工 ID |
| ENAME | 员工名称 |
| JOB | 职位 |
| MGR | 直接领导的员工 ID |
| HIREDATE | 雇佣时间 |
| SAL | 工资 |
| COMM | 奖金 |
| DEPTNO | 部门号 |
序列化,创建 Employee 类型(员工信息)
package com.mytest.myMapReduce; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable; public class Employee implements Writable{ // 定义员工信息表的各属性
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
// 在 Employee 类中重写 toString()方法以构造出 Reduce 所要的输出
@Override
public String toString() { return empno+","+ename+","+job+","+mgr+","+hiredate+","+sal+","+comm+","+deptno;
} // 反序列化(将字节流中的内容读取出来赋给对象)
public void readFields(DataInput input) throws IOException { this.empno = input.readInt(); this.ename = input.readUTF();this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt();
} // 序列化,将字内存中的信息存放在字节流当中
// 注意:序列化和反序列化中属性的顺序和类型
public void write(DataOutput output) throws IOException { output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno);
} // 其他类通过 set/get 方法操作变量:Source-->Generator Getters and Setters
public int getEmpno() { return empno;
}
public void setEmpno(int empno) { this.empno = empno;
}
public String getEname() { return ename;
}
public void setEname(String ename) { this.ename = ename;
}
public String getJob() { return job;
}
public void setJob(String job) { this.job = job;
}
public int getMgr() { return mgr;
}
public void setMgr(int mgr) {this.mgr = mgr;
}
public String getHiredate() { return hiredate;
}
public void setHiredate(String hiredate) { this.hiredate = hiredate;
}
public int getSal() { return sal;
}
public void setSal(int sal) { this.sal = sal;
}
public int getComm() { return comm;
}
public void setComm(int comm) { this.comm = comm;
}
public int getDeptno() { return deptno;}
public void setDeptno(int deptno) { this.deptno = deptno;
}
}
Mapper 类
package com.mytest.myMapReduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2 v2
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable,Employee> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { String data = value1.toString(); // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30String[] words = data.split(","); // 分词Employee e = new Employee(); // 创建员工对象// 设置员工的属性 e.setEmpno(Integer.parseInt(words[0])); // 员工 ID e.setEname(words[1]); // 员工姓名 e.setJob(words[2]); // 员工职位 try{ e.setMgr(Integer.parseInt(words[3])); // 直接领导的员工 ID(注意:领导ID 也可能没有) }catch(Exception ex){ e.setMgr(-1); // 没有领导 ID
} e.setHiredate(words[4]); // 雇佣时间 e.setSal(Integer.parseInt(words[5])); // 工资 try{ e.setComm(Integer.parseInt(words[6])); // 奖金(注意:奖金也可能没有) }catch(Exception ex){ e.setComm(0); // 没有奖金} e.setDeptno(Integer.parseInt(words[7])); // 部门号 // 输出:k2 部门号 v2 员工对象 context.write( new IntWritable(e.getDeptno()), // 员工的部门号 e // 员工对象 ); }
}
Reducer 类
package com.mytest.myMapReduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class EmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Text> {@Override protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context) throws IOException, InterruptedException { /* * k3 部门号 * v3 员工信息(类) */ String line = null; for(Employee e:v3){ line = e.toString(); context.write(k3, new Text(line)); } }
}
程序主入口类
package com.mytest.myMapReduce; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmployeeMain { public static void main(String[] args) throws Exception { // 创建一个 job Configuration conf = new Configuration(); Job job = Job.getInstance(new Configuration()); job.setJarByClass(EmployeeMain.class); // 指定 job 的 mapper 和输出的类型 k2 v2 job.setMapperClass(EmployeeMapper.class); job.setMapOutputKeyClass(IntWritable.class); // 部门号 job.setMapOutputValueClass(Employee.class); // 员工信息,Employee 类型 // 指定任务的分区规则 job.setPartitionerClass(EmployeeParitioner.class); job.setNumReduceTasks(3); // 指定建立几个分区 // 指定 job 的 reducer 和输出的类型 k4 v4 job.setReducerClass(EmployeeReducer.class);job.setOutputKeyClass(IntWritable.class); // 部门号 job.setOutputValueClass(Employee.class); // 员工信息,Employee 类型// 判断结果输出目录是否存在,如果存在则删除 Path path = new Path(args[1]); // 索引 1 的参数表示输出目录参数(索引 0 的参数是输入目录) FileSystem fileSystem = path.getFileSystem(conf); // 创建文件对象 if (fileSystem.exists(path)) { fileSystem.delete(path, true); } // 指定 job 的输入和输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 job.waitForCompletion(true); }
}
290. 下列是词频统计实验中的 Main 类,按提示补全代码
package com.app;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 设置 job Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); // 设置 map, job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 reduce, job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入输出目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 启动 job job.waitForCompletion(true); }
}
295.使用 Java Api 访问 HBase,按提示补充代码
package com.company; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException; public class Main { private static Configuration conf; private static Connection conn; public static void main(String[] args) throws IOException { // write your code here // 连接到 HBase 数据库的 Zookeeper getConnect(); // 创建表格 createTable(); System.out.println("Complete!");
} public static void getConnect() throws IOException { // 设置连接到 hbase 的 zookeeper 的地址和端口 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // 连接到 Zookeeper conn = ConnectionFactory.createConnection(conf);
} public static void createTable() throws IOException { // 创建 Admin 类的对象 Admin admin = conn.getAdmin(); // 判断表格是否已存在 TableName tableName = TableName.valueOf("stu0316"); if(admin.tableExists(tableName)){ System.out.println("Table exists!"); return;
} // 设置表格的基本信息,表名,列族 HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor infoDesc = new HColumnDescriptor("info"); HColumnDescriptor gradesDesc = new HColumnDescriptor("grades"); tableDesc.addFamily(infoDesc); tableDesc.addFamily(gradesDesc); // 创建表格 admin.createTable(tableDesc); // 关闭 Admin 类的对象 admin.close(); System.out.println("create table!"); }
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
