电商指标项目-HBaseUtil工具类(完整源码)

Flink整合Kafka,可以从Kafka中获取数据进行分析,分析之后要把结果存入HBase中

编写一个操作HBase的工具类。HBase作为一个数据库,写一个工具类,实现数据的增删改查
在这里插入图片描述

1. API介绍

方法名用途参数说明返回值
getTable创建/获取表tableNameStr:表名
columnFamily:列族名
HBase Table对象
putData插入/更新一列数据tableNameStr: 表名
rowkey:String rowkey
columnFamily:列族名
column:String 列名
data:String 列值
putMapData插入/更新多个列数据tableNameStr: 表名
rowkey:String rowkey
columnFamily:列族名
mapData:列名/列值
getData根据rowkey,列族+列名
获取单列数据
tableNameStr: 表名
rowkey:String rowkey
columnFamily:列族名
column:列名
列对应的数据
String类型
getMapData根据rowkey,列族+列名集合
获取多列数据
tableNameStr: 表名
rowkey:String rowkey
columnFamily:列族名
column:列名集合
列对应的数据
Map[列名, 列值]
deleteData根据rowkey删除一条数据tableNameStr: 表名
rowkey:rowkey
columnFamily: 列族名

HBase操作基本类

类名用途获取方式
ConfigurationHBase的配置类HBaseConfiguration.create
Connection连接ConnectionFactory.createConnection(conf)
AdminHBase的操作APIConnection.getAdmin
Table用来链接HBase的单表Connection.getTable()
Get用来查询HBase的单行数据new Get(rowkey.getBytes())
Put保存单行数据new Put(rowkey.getBytes())
Delete删除单行数据new Delete(rowkey.getBytes())

2. 获取表

开发步骤:

  1. 将导入hbase-site.xml配置文件到resources目录
  2. util包中添加HBaseUtil
    • 使用HBaseConfiguration.create获取配置对象Configuration,该配置对象会自动加载hbase-site.xml
    • 使用ConnectionFactory.createConnection获取hbase连接
    • 使用Connection.getAdmin获取与master的连接
  3. 创建getTable方法
    • 构建TableName
    • 构建TableDescriptorBuilder
    • 构建ColumnFamilyDescriptor
    • 添加列族
    • 检查表是否存在,若不存在,则创建表

源码解析:
在这里插入图片描述

3. 存储数据

创建putData方法

  • 调用getTable获取表
  • 构建Put对象
  • 添加列、列值
  • 对table执行put操作
  • 启动编写main进行测试

4. 获取数据

  1. 使用Connection获取表
  2. 创建getData方法
    • 调用getTable获取表
    • 构建Get对象
    • 对table执行get操作,获取result
    • 使用Result.getValue获取列族列对应的值
    • 捕获异常
    • 关闭表
  3. 启动hhbase
  4. 启动编写main进行测试
# 5.  批量存储数据
创建`putMapData`方法- 调用`getTable`获取表
- 构建`Put`对象
- 添加Map中的列、列值
- 对table执行put操作
- 捕获异常
- 关闭表
- 启动编写main进行测试# 6. 批量获取数据
创建`putMapData`方法- 调用`getTable`获取表
- 构建`Get`对象
- 根据Get对象查询表
- 构建可变Map
- 遍历查询各个列的列值
- 过滤掉不符合的结果
- 把结果转换为Map返回
- 捕获异常
- 关闭表
- 启动编写main进行测试# 7.  删除数据
创建`deleteData`方法- 调用`getTable`获取表
- 构建`Delete`对象
- 对table执行delete操作
- 捕获异常
- 关闭表
- 启动编写main进行测试完整代码:```java
package com.xu.realprocess.utilimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptor, _}
import org.apache.hadoop.hbase.util.Bytes/*** HBase的工具类** 获取Table* 保存单列数据* 查询单列数据* 保存多列数据* 查询多列数据* 删除数据*/
object HBaseUtil {// HBase的配置类, 不需要指定配置文件名,文件名要求是hbase-site.xmlval conf: Configuration = HBaseConfiguration.create()// HBase的连接val conn: Connection = ConnectionFactory.createConnection(conf)// HBase的操作APIval admin: Admin = conn.getAdmin/*** 返回table,如果不存在,则创建表*/def getTable(tableNameStr: String, columnFamilyName: String): Table = {// 获取TableNameval tableName: TableName = TableName.valueOf(tableNameStr)// 如果表不存在,则创建表if (!admin.tableExists(tableName)) {// 构建出 表的描述的建造者val descBuilder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName)val familyDescriptor: ColumnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(columnFamilyName.getBytes).build()// 给表去添加列族descBuilder.setColumnFamily(familyDescriptor)// 创建表admin.createTable(descBuilder.build())}conn.getTable(tableName)}/*** 存储单列数据** @param tableNameStr     表名* @param rowkey           rowkey* @param columnFamilyName 列族名* @param columnName       列名* @param columnValue      列值*/def putData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnName: String, columnValue: String) = {// 获取表val table: Table = getTable(tableNameStr, columnFamilyName)try {// Putval put: Put = new Put(rowkey.getBytes)put.addColumn(columnFamilyName.getBytes, columnName.getBytes, columnValue.getBytes)// 保存数据table.put(put)} catch {case ex: Exception => {ex.printStackTrace()}} finally {table.close()}}/*** 通过单列名获取列值* @param tableNameStr     表名* @param rowkey           rowkey* @param columnFamilyName 列族名* @param columnName       列名* @return 列值*/def getData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnName: String): String = {// 1.获取Table对象val table = getTable(tableNameStr, columnFamilyName)try {// 2. 构建Get对象val get = new Get(rowkey.getBytes)// 3. 进行查询val result: Result = table.get(get)// 4. 判断查询结果是否为空,并且包含我们要查询的列if (result != null && result.containsColumn(columnFamilyName.getBytes, columnName.getBytes)) {val bytes: Array[Byte] = result.getValue(columnFamilyName.getBytes(), columnName.getBytes)Bytes.toString(bytes)} else {""}} catch {case ex: Exception => {ex.printStackTrace()""}} finally {// 5. 关闭表table.close()}}/*** 存储多列数据* @param tableNameStr     表名* @param rowkey           rowkey* @param columnFamilyName 列族名* @param map              多个列名和列值集合*/def putMapData(tableNameStr: String, rowkey: String, columnFamilyName: String, map: Map[String, Any]) = {// 1. 获取Tableval table = getTable(tableNameStr, columnFamilyName)try {// 2. 创建Putval put = new Put(rowkey.getBytes)// 3. 在Put中添加多个列名和列值for ((colName, colValue) <- map) {put.addColumn(columnFamilyName.getBytes, colName.getBytes, colValue.toString.getBytes)}// 4. 保存Puttable.put(put)} catch {case ex: Exception => {ex.printStackTrace()}} finally {// 5. 关闭表table.close()}}/*** 获取多列数据的值* @param tableNameStr     表名* @param rowkey           rowkey* @param columnFamilyName 列族名* @param columnNameList   多个列名* @return 多个列名和多个列值的Map集合*/def getMapData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnNameList: List[String]): Map[String, String] = {// 1. 获取Tableval table = getTable(tableNameStr, columnFamilyName)try{// 2. 构建Getval get = new Get(rowkey.getBytes)// 3. 执行查询val result: Result = table.get(get)// 4. 遍历列名集合,取出列值,构建成Map返回columnNameList.map {col =>val bytes: Array[Byte] = result.getValue(columnFamilyName.getBytes(), col.getBytes)if (bytes != null && bytes.size > 0) {col -> Bytes.toString(bytes)}else{""->""}}.filter(_._1!="").toMap}catch{case ex:Exception=>{ex.printStackTrace()Map[String,String]()}}finally {// 5. 关闭Tabletable.close()}}/*** 删除数据*/def deleteData(tableNameStr:String,rowkey:String,columnFamilyName:String)={// 1. 获取Tableval table:Table = getTable(tableNameStr,columnFamilyName)try{// 2. 构建Delete对象val delete:Delete = new Delete(rowkey.getBytes)// 3. 执行删除table.delete(delete)}catch {case ex:Exception=>ex.printStackTrace()}finally {// 4. 关闭tabletable.close()}}}

main方法测试代码

object HBaseUtil {def main(args: Array[String]): Unit = {// 测试存入单列数据putData("test", "123", "info", "t1", "hello world")// 测试存入多列数据val map = Map("t2" -> "scala","t3" -> "hive","t4" -> "sqoop")putMapData("test", "123", "info", map)println(getData("test", "123", "info", "t1"))println(getData("test", "123", "info", "t2"))println(getData("test", "123", "info", "t3"))println(getData("test", "123", "info", "t4"))println(getMapData("test", "123", "info", List("t1", "t2","t3","t4")))deleteData("test", "123", "info")}}

查看页面:

http://node01:16010/master-status#userTables

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部