离线数仓(四)1、网站流量日志数据采集Flume采集

网站流量日志数据采集Flume采集

在网站流量日志分析场景中,对数据采集部分的可靠性、容错能力要求通常不会非常严苛,需要注意结合语境分析是何种含义的数据采集:

  • 对于数据从无到有的过程

    • 结合使用web服务器自带的日志功能、自定义埋点JavaScript采集
    • 收集用户访问网站的行为数据
  • 对于数据需要做搬运的操作

    • 使用Flume定制相关的采集方案满足数据采集传输

1、Flume版本选择

针对nginx日志生成场景

  • Flume 1.8+
    • 提供了一个非常好用的TaildirSource
    • 使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集

Flume采集系统的搭建

  1. 在服务器上部署agent节点,修改配置文件

  2. 启动agent节点,将采集到的数据汇聚到指定的HDFS目录中

2、Flume核心配置

(1)tailDirSource配置

核心配置如下

a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*

配置说明

  • filegroups

    • 指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件)
  • positionFile

    • 配置检查点文件的路径,检查点文件会以json格式保存已经tail文件的位置,解决了断点不能续传的缺陷。
  • filegroups.

    • 配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

通过以上配置,即可监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被tail。

(2)sink配置

本次将日志采集到HDFS中,需要使用HDFSSink文件。HDFSSink需要配置滚动属性。

  • 基于hdfs文件副本数
    • 配置项:hdfs.minBlockReplicas
    • 默认值:和hdfs的副本数一致
    • 说明
      • hdfs.minBlockReplicas是为了让flume感知不到hdfs的块复制,这样滚动方式配置(比如时间间隔、文件大小、events数量等)才不会受影响

示例说明:

假如hdfs的副本为3,配置的滚动时间为10秒,那么在第二秒的时候,flume检测到hdfs在复制块,这时候flume就会滚动,这样导致flume的滚动方式受到影响。所以通常hdfs.minBlockReplicas配置为1,就检测不到副本的复制了。但是hdfs的实际副本还是3

(3)完整版配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*# Describe the sink
#指定hdfs sink
a1.sinks.k1.type = hdfs
#hdfs目录,带有时间信息
a1.sinks.k1.hdfs.path = /flume/tailout/%Y-%m-%d/
#生成的hdfs文件名的前缀
a1.sinks.k1.hdfs.filePrefix = events-
#指定滚动时间,默认是30秒,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollInterval = 0
#指定滚动大小,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollSize = 200000000
#指定滚动条数
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#副本策略
a1.sinks.k1.hdfs.minBlockReplicas=1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)启动flume agent采集数据

创建目录

mkdir -p /var/log/test1/
mkdir -p /var/log/test2/

上传测试数据到上面创建的目录!!

flume启动命令

bin/flume-ng agent --conf-file job/log2hdfs.conf -name a1  -Dflume.root.logger=INFO,console

(5)问题 hdfs路径是否正确?

问题

按照上面flume agent的配置文件会出现一种情况,数据存放的路径信息不正确,需要按照日志时间存储。

3、flume自定义拦截器

实现步骤

  1. 创建maven工程

  2. 新建class实现flume提供的Interceptor接口

    • 实现相关方法

      interceptor方法

      定义静态内部类实现Interceptor.Builder接口

  3. 打成jar包上传到flume安装目录下lib文件夹中

  4. 开发flume agent配置文件引用header中的日期信息

具体实现

创建maven java工程,导入jar包

<dependencies><dependency><groupId>org.apache.flumegroupId><artifactId>flume-ng-coreartifactId><version>1.8.0version><scope>providedscope>dependency>
dependencies>
<build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><version>3.0version><configuration><source>1.8source><target>1.8target><encoding>UTF-8encoding>configuration>plugin>plugins>
build>

自定义flume的拦截器

package com.yyds.interceptor;import org.apache.commons.compress.utils.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//获得body的内容String eventBody = new String(event.getBody(), Charsets.UTF_8);final String[] bodyArr = eventBody.split(" ");String time_local = "";if (bodyArr.length > 11) {time_local = bodyArr[4] ;}final Map<String, String> headers = event.getHeaders();// 添加时间信息 到event的headerif (StringUtils.isNotBlank(time_local)) {headers.put("event_time", time_local);} else {headers.put("event_time", "unkown");}event.setHeaders(headers);return event;}@Overridepublic List<Event> intercept(List<Event> events) {List<Event> out = new ArrayList<Event>();for (Event event : events) {Event outEvent = intercept(event);if (outEvent != null) {out.add(outEvent);}}return out;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}
}

打包上传服务器

将我们的拦截器打成jar包放到flume的lib目录下

开发flume的配置文件

开发flume的配置文件

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /var/log/flume/taildir_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /var/log/test1/example.loga1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*#interceptora1.sources.r1.interceptors =i1 a1.sources.r1.interceptors.i1.type =com.yyds.interceptor.CustomTimeInterceptor$Builder# Describe the sink#指定hdfs sinka1.sinks.k1.type = hdfs#hdfs目录,带有时间信息a1.sinks.k1.hdfs.path = /flume/tailout/%{event_time}/#生成的hdfs文件名的前缀a1.sinks.k1.hdfs.filePrefix = events-#指定滚动时间,默认是30秒,设置为0表示禁用该策略a1.sinks.k1.hdfs.rollInterval = 0#指定滚动大小,设置为0表示禁用该策略a1.sinks.k1.hdfs.rollSize = 200000000#指定滚动条数a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.batchSize = 100#副本策略a1.sinks.k1.hdfs.minBlockReplicas=1#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部