【基于Flink的城市交通实时监控平台】需求四:车辆违法路径跟踪-使用FlinkSQL在Dlink写入HBase
案例需求分析
通过Kafka发送模拟实时车辆JSON信息给Flink接收,FlinkSQL将实时车辆JSON信息与t_violation_list表中已经捕捉到的违规违章车辆做连接对比,选择出通过当前路段的违章记录车辆,将其存入HBase中。
本次需求四案例,将基于t_violation_list表中数据:
CREATE TABLE `t_violation_list` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) NOT NULL,`violation` varchar(1000) DEFAULT NULL,`create_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
INSERT INTO `t_violation_list` (`car`, `violation`, `create_time`) VALUES
('豫A99999', '嫌疑套牌车', 1686690777),
('豫DF09991', '排水道过弯', 1686609999);
当前模拟了两辆车在t_violation_list表中,稍后启动Kafka Topic-car时,只有在该表中的车,会被记录到HBase。
mysql> select * from t_violation_list;
+----+------------+-----------------+-------------+
| id | car | violation | create_time |
+----+------------+-----------------+-------------+
| 2 | 豫A99999 | 嫌疑套牌车 | 1686690777 |
| 3 | 豫DF09991 | 排水道过弯 | 1686609999 |
+----+------------+-----------------+-------------+
2 rows in set (0.00 sec)
需求代码
CREATE TABLE table1 ( `actionTime` BIGINT,`monitorId` STRING,`cameraId` STRING,`car` STRING,`speed` double,`roadId` STRING,`areaId` STRING,proctime AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'topic-car','properties.bootstrap.servers' = 'hadoop10:9092','properties.group.id' = 'c1','scan.startup.mode' = 'latest-offset','format' = 'json'
);
CREATE TABLE table2 ( id INT, car STRING, violation STRING, create_time BIGINT
) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8', 'table-name' = 't_violation_list', 'username' = 'root', 'password' = '0000'
);
create table table3(rowkey STRING,cf1 ROW<monitorId String,cameraId String,car String,roadId String,areaId String,speed Double,actionTime BIGINT>,PRIMARY KEY (rowkey) NOT ENFORCED
)with('connector' = 'hbase-2.2','table-name' = 't_track_info','zookeeper.quorum' = 'hadoop10:2181'
);
insert into table3
select concat(car,'_',cast(actionTime as string)),ROW(monitorId,cameraId,car,roadId,areaId,speed,actionTime)
from(select t1.monitorId,t1.cameraId,t1.car,t1.roadId,t1.areaId,t1.speed,t1.actionTimefrom table1 t1 inner join table2 for system_time as of t1.proctime as t2on t1.car = t2.car
)t3
测试数据
从Topic-car中发送JSON信息给Flink

[root@hadoop10 kafka0.11]# kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic-car >{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫A99999","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫DF09991","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647526,"monitorId":"0001","cameraId":"1","car":"LZ21740","speed":100,"roadId":"01","areaId":"20"}
HBase表中操作
包括了创建t_track_info','cf1表和两次查询表中内容,注意查询后row的数量,即为成功插入的违法车辆追踪信息

hbase:002:0> create 't_track_info','cf1'
Created table t_track_info
Took 0.8506 seconds
=> Hbase::Table - t_track_info
hbase:003:0> scan 't_track_info'
ROW COLUMN+CELL
0 row(s)
Took 0.0376 seconds
hbase:004:0> scan 't_track_info'
ROW COLUMN+CELL\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
1 row(s)
Took 0.0363 seconds
hbase:005:0> scan 't_track_info'
ROW COLUMN+CELL\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00\xE8\xB1\xABDF09991_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5\xE8\xB1\xABDF09991_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20\xE8\xB1\xABDF09991_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1\xE8\xB1\xABDF09991_1686647525 column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991\xE8\xB1\xABDF09991_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001\xE8\xB1\xABDF09991_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01\xE8\xB1\xABDF09991_1686647525 column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0394 seconds
hbase:006:0> scan 't_track_info'
ROW COLUMN+CELL\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00\xE8\xB1\xABDF09991_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5\xE8\xB1\xABDF09991_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20\xE8\xB1\xABDF09991_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1\xE8\xB1\xABDF09991_1686647525 column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991\xE8\xB1\xABDF09991_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001\xE8\xB1\xABDF09991_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01\xE8\xB1\xABDF09991_1686647525 column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0459 seconds
FlinkSQL Web工具Dlink的安装使用

本次需求使用了FlinkSQL的网页可视化工具Dlink,在安装Dlink过程中遇到了很多坑,官方文档和官方二进制文件似乎不太对版,Dinky和Dlink的名词使用有模糊歧义,出现了很多异常和错误,尤其是要注意Jar包的正确导入。除此之外,按照Dlink官方网站给的文档进行安装基本是傻瓜式的复制命令敲入linux。
flink-dist_2.11-1.13.6.jar这个jar包是最让人费解的,它需要被正确的放在"/opt/installs/dlink0.7.3/plugins/flink-dist_2.11-1.13.6.jar"Dlink的目录下,但是却不能放在Flink的目录下,Flink的lib目录里需要放flink-dist_2.12-1.13.6.jar版本的。
另外flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar和flink-shaded-hadoop-2-uber-2.7.5-10.0.jar会有冲突,但是我也见到了没有出现冲突的成功实验。我的Flink版本是1.13.6,Hadoop版本是3.4.6。
最后还需要下载HBase的连接依赖,JSON的依赖等。
以下是实例各版本参考,请注意目录:



本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
