使用Flume消费Kafka数据并落盘到HDFS

目录

    • 1.大体流程
    • 2.具体配置
    • 3.配置流程
      • 1.配置Flume Agent
      • 2.Flume启动停止脚本
    • 4.Flume内存优化
      • 1.抛出异常
      • 2.内存参数设置及优化
    • 5.采集通道启动停止脚本
    • 6.数据传输测试
      • 1.启动集群
      • 2.生成日志数据
      • 3.进入HDFS的Web页面查看落盘情况

1.大体流程

在这里插入图片描述

2.具体配置

在这里插入图片描述

3.配置流程

1.配置Flume Agent

在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[lili@hadoop104 conf]$ vim kafka-flume-hdfs.conf

文件配置内容如下:

#定义组件
#由于要分别从Kafka的两个分区中获得数据,因此我们定义两个source
#r1获取topic_start的数据,r2获取topic_event的数据
a1.sources=r1 r2
#同理定义创建两个传输通道
a1.channels=c1 c2
#定义两个Sink。
#k1将数据发送到到HDFS的opic_start中
#k2将数据发送到到HDFS的opic_event中
a1.sinks=k1 k2#配置source1
#定义source的类型为KafkaSource
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#每次写入Channel的最大消息数
a1.sources.r1.batchSize = 5000
#延迟时间。如果一批消息数不够5000,则会在延迟时间到达时,将该批次消息写入Channel
a1.sources.r1.batchDurationMillis = 2000
#kafka集群
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#指定消费的哪个topic。
a1.sources.r1.kafka.topics=topic_start## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event#配置channel1
#定义channel的类型为FileChannel
a1.channels.c1.type = file
#磁盘索引存储路径
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
#磁盘数据存放路径
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
#将source的数据通过putlist传输进入channel
#如果channel已经满的,putlist需要等待**一定时间**然后继续写
#putlist如果反复写入,channel一直都是满的,则将putlist清除。
#在清除之后,在重新拉取刚刚读的数据。
#因此我们希望putlist的等待时间稍微长一点,尽可能的让channel里的数据消化一部分
#在这里我们设置6秒
a1.channels.c1.keep-alive = 6## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.keep-alive = 6#配置sink1
#定义sink的属性为HDFS Sink
a1.sinks.k1.type = hdfs
#定义数据上到的HDFS地址
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
#HDFS文件名称加前缀
a1.sinks.k1.hdfs.filePrefix = logstart-##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-#不要产生大量小文件,生产环境rollInterval配置为3600
#配置每次滚动生成新文件的时间(单位秒)
a1.sinks.k1.hdfs.rollInterval = 10
#文件在达到128M时会滚动生成新文件
a1.sinks.k1.hdfs.rollSize = 134217728
#0表示禁止使用,文件在达到一定event的数量时会滚动生成新文件
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0# 控制输出文件是原生文件。
#定义文件的类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 
#压缩方式lzop支持索引
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop# 拼装
#将数据从source放入channel中
a1.sources.r1.channels = c1
#channel将数据传入到sink中
a1.sinks.k1.channel= c1a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

2.Flume启动停止脚本

  1. 创建脚本f2.sh

    [lili@hadoop102 bin]$ vim f2.sh 
    #! /bin/bash
    case $1 in
    "start"){for i in hadoop104doecho " --------启动 $i 消费flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume2.log   2>&1 &"done
    };;
    "stop"){for i in hadoop104doecho " --------停止 $i 消费flume-------"ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"done};;
    esac
    
  2. 增加权限

    [lili@hadoop102 bin]$ chmod 777 f2.sh
    
  3. 启动脚本

    [lili@hadoop102 bin]$ f2.sh start
    
  4. 停止脚本

    [lili@hadoop102 bin]$ f1.sh stop
    

4.Flume内存优化

1.抛出异常

  1. 问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed

    java.lang.OutOfMemoryError: GC overhead limit exceeded

  2. 解决方法:

    1. 在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

      [lili@hadoop102 ~]$ vim /opt/module/flume/conf/flume-env.sh
      export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
      
    2. 同步配置到hadoop103、hadoop104服务器

2.内存参数设置及优化

  1. JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
  2. -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁Full GC。
  3. -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发Full GC。

5.采集通道启动停止脚本

  1. 创建脚本

    [lili@hadoop102 ~]$ vim /home/lili/bin/cluster.sh
    #! /bin/bashcase $1 in
    "start"){echo "================    开始启动集群    ================"echo "================    正在启动HDFS    ================"/opt/module/hadoop-2.7.2/sbin/start-dfs.sh echo "================    正在启动YARN    ================"ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"#启动 Zookeeper集群echo "==============    正在启动zookeeper   =============="zk.sh startsleep 4s;#启动 Flume采集集群f1.sh startsleep 2s;#启动 Kafka采集集群kf.sh startsleep 6s;#启动 Flume消费集群f2.sh start};;
    "stop"){echo "================    开始停止集群    ================"#停止 Flume消费集群f2.sh stopsleep 2s; #停止 Kafka采集集群kf.sh stopsleep 6s;#停止 Flume采集集群f1.sh stopsleep 2s;echo "==============    正在停止zookeeper   =============="#停止 Zookeeper集群zk.sh stopecho "================    正在停止YARN    ================"ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"echo "================    正在停止HDFS    ================"/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh };;
    esac
    
  2. 增加权限

    [lili@hadoop102 bin]$ chmod 777 cluster.sh
    
  3. 启动脚本

    [lili@hadoop102 bin]$ cluster.sh start
    
  4. 停止脚本

    [lili@hadoop102 bin]$ cluster.sh stop
    

6.数据传输测试

1.启动集群

[lili@hadoop102 bin]$ cluster.sh start
================    开始启动集群    ================
================    正在启动HDFS    ================
Starting namenodes on [hadoop102]
hadoop102: starting namenode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-namenode-hadoop102.out
hadoop103: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop103.out
hadoop104: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop104.out
hadoop102: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop102.out
Starting secondary namenodes [hadoop104]
hadoop104: starting secondarynamenode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-secondarynamenode-hadoop104.out
================    正在启动YARN    ================
starting yarn daemons
starting resourcemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-resourcemanager-hadoop103.out
hadoop102: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop102.out
hadoop104: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop104.out
hadoop103: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop103.out
==============    正在启动zookeeper   ==============
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED--------启动 hadoop102 采集flume---------------启动 hadoop103 采集flume---------------启动 hadoop102 Kafka---------------启动 hadoop103 Kafka---------------启动 hadoop104 Kafka---------------启动 hadoop104 消费flume-------
[lili@hadoop102 bin]$ 

2.生成日志数据

[lili@hadoop102 bin]$ lg.sh 
------hadoop102 生成日志-------
------hadoop103 生成日志-------
[lili@hadoop102 bin]$

3.进入HDFS的Web页面查看落盘情况

在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部