2020年11月 工信部考试——Hadoop(数据应用技术)中级认证2

系统提纲
280、使用 Java API 在 HDFS 的根目录下创建/tmp/demo1 目录
282、按部门号对员工信息进行分区
283、下列是词频统计实验中的 Main 类,按提示补全代码
295、使用 Java Api 访问 HBase,按提示补充代码

280. 使用 Java API 在 HDFS 的根目录下创建/tmp/demo1 目录
package com.myhdfs.mypro;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {public static void main(String[] args) throws Exception{Configuration conf=new Configuration();//配置 NameNode 地址URI uri=new URI("hdfs://192.168.229.141:8020");//指定用户名,获取 FileSystem 对象FileSystem fs=FileSystem.get(uri,conf,"dy");//设置路径Path dfs=new Path("/");//列出指定路径下的目录和文件FileStatus[] fileStatuses = new FileStatus[0];fileStatuses = fs.listStatus(dfs);for (FileStatus fileStatus : fileStatuses) {//System.out.println(fileStatus);if (fileStatus.isDirectory()){System.out.println("dir:"+fileStatus.getPath());}else {System.out.println("file:"+fileStatus.getPath());}}//创建级联目录fs.mkdirs(new Path("/tmp/demo1"));System.out.println("Mkidrs Successfully");}
}
282. 按部门号对员工信息进行分区

有 emp.csv 文件,内容如下:
7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,950,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10

从前往后字段分别为
EMPNO员工 ID
ENAME员工名称
JOB职位
MGR直接领导的员工 ID
HIREDATE雇佣时间
SAL工资
COMM奖金
DEPTNO部门号

序列化,创建 Employee 类型(员工信息)

package com.mytest.myMapReduce; import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.Writable; public class Employee implements Writable{        // 定义员工信息表的各属性 
private int empno; 
private String ename; 
private String job; 
private int mgr; 
private String hiredate; 
private int sal; 
private int comm; 
private int deptno; 
// 在 Employee 类中重写 toString()方法以构造出 Reduce 所要的输出 
@Override 
public String toString() { return empno+","+ename+","+job+","+mgr+","+hiredate+","+sal+","+comm+","+deptno; 
} // 反序列化(将字节流中的内容读取出来赋给对象) 
public void readFields(DataInput input) throws IOException { this.empno = input.readInt(); this.ename = input.readUTF();this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); 
} // 序列化,将字内存中的信息存放在字节流当中 
// 注意:序列化和反序列化中属性的顺序和类型 
public void write(DataOutput output) throws IOException { output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); 
} // 其他类通过 set/get 方法操作变量:Source-->Generator Getters and Setters
public int getEmpno() { return empno; 
} 
public void setEmpno(int empno) { this.empno = empno; 
} 
public String getEname() { return ename; 
} 
public void setEname(String ename) { this.ename = ename; 
} 
public String getJob() { return job; 
} 
public void setJob(String job) { this.job = job; 
} 
public int getMgr() { return mgr; 
} 
public void setMgr(int mgr) {this.mgr = mgr; 
} 
public String getHiredate() { return hiredate; 
} 
public void setHiredate(String hiredate) { this.hiredate = hiredate; 
} 
public int getSal() { return sal; 
} 
public void setSal(int sal) { this.sal = sal; 
} 
public int getComm() { return comm; 
} 
public void setComm(int comm) { this.comm = comm; 
} 
public int getDeptno() { return deptno;} 
public void setDeptno(int deptno) { this.deptno = deptno; 
} 
} 

Mapper 类

package com.mytest.myMapReduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; // k1 v1        k2 v2 
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable,Employee> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { String data = value1.toString();                      // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30String[] words = data.split(",");                     // 分词Employee e = new Employee();                          // 创建员工对象// 设置员工的属性 e.setEmpno(Integer.parseInt(words[0]));               // 员工 ID e.setEname(words[1]);                                 // 员工姓名 e.setJob(words[2]);                                   // 员工职位 try{ e.setMgr(Integer.parseInt(words[3]));            // 直接领导的员工 ID(注意:领导ID 也可能没有) }catch(Exception ex){ e.setMgr(-1);                                    // 没有领导 ID 
} e.setHiredate(words[4]);                              // 雇佣时间 e.setSal(Integer.parseInt(words[5]));                 // 工资 try{ e.setComm(Integer.parseInt(words[6]));                // 奖金(注意:奖金也可能没有) }catch(Exception ex){ e.setComm(0);                                         // 没有奖金} e.setDeptno(Integer.parseInt(words[7]));              // 部门号 // 输出:k2 部门号 v2 员工对象 context.write( new IntWritable(e.getDeptno()),                  // 员工的部门号 e                                                // 员工对象 ); } 
} 

Reducer 类

package com.mytest.myMapReduce; 
import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
// k3 v3 k4 v4 
public class EmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Text> {@Override protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context) throws IOException, InterruptedException { /* * k3 部门号 * v3 员工信息(类) */ String line = null; for(Employee e:v3){ line = e.toString(); context.write(k3, new Text(line)); } } 
} 

程序主入口类

package com.mytest.myMapReduce; import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmployeeMain { public static void main(String[] args) throws Exception { // 创建一个 job Configuration conf = new Configuration(); Job job = Job.getInstance(new Configuration()); job.setJarByClass(EmployeeMain.class); // 指定 job 的 mapper 和输出的类型 k2 v2 job.setMapperClass(EmployeeMapper.class); job.setMapOutputKeyClass(IntWritable.class);       // 部门号 job.setMapOutputValueClass(Employee.class);        // 员工信息,Employee 类型 // 指定任务的分区规则 job.setPartitionerClass(EmployeeParitioner.class); job.setNumReduceTasks(3);                         // 指定建立几个分区 // 指定 job 的 reducer 和输出的类型 k4 v4       job.setReducerClass(EmployeeReducer.class);job.setOutputKeyClass(IntWritable.class);        // 部门号 job.setOutputValueClass(Employee.class);         // 员工信息,Employee 类型// 判断结果输出目录是否存在,如果存在则删除 Path path = new Path(args[1]);                  // 索引 1 的参数表示输出目录参数(索引 0 的参数是输入目录) FileSystem fileSystem = path.getFileSystem(conf); // 创建文件对象 if (fileSystem.exists(path)) { fileSystem.delete(path, true); } // 指定 job 的输入和输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 job.waitForCompletion(true); } 
} 
290. 下列是词频统计实验中的 Main 类,按提示补全代码
package com.app;import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 设置 job Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); // 设置 map,  job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 reduce,  job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入输出目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 启动 job job.waitForCompletion(true); } 
} 
295.使用 Java Api 访问 HBase,按提示补充代码
package com.company; import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.*; 
import org.apache.hadoop.hbase.client.*; 
import org.apache.hadoop.hbase.filter.BinaryComparator; 
import org.apache.hadoop.hbase.filter.CompareFilter; 
import org.apache.hadoop.hbase.filter.Filter; 
import org.apache.hadoop.hbase.filter.ValueFilter; 
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException; public class Main { private static Configuration conf; private static Connection conn; public static void main(String[] args) throws IOException { // write your code here // 连接到 HBase 数据库的 Zookeeper getConnect(); // 创建表格 createTable(); System.out.println("Complete!"); 
} public static void getConnect() throws IOException { // 设置连接到 hbase 的 zookeeper 的地址和端口 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // 连接到 Zookeeper conn = ConnectionFactory.createConnection(conf); 
} public static void createTable() throws IOException { // 创建 Admin 类的对象 Admin admin = conn.getAdmin(); // 判断表格是否已存在 TableName tableName = TableName.valueOf("stu0316"); if(admin.tableExists(tableName)){ System.out.println("Table exists!"); return; 
} // 设置表格的基本信息,表名,列族 HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor infoDesc = new HColumnDescriptor("info"); HColumnDescriptor gradesDesc = new HColumnDescriptor("grades"); tableDesc.addFamily(infoDesc); tableDesc.addFamily(gradesDesc); // 创建表格 admin.createTable(tableDesc); // 关闭 Admin 类的对象 admin.close(); System.out.println("create table!"); } 
} 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部