大数据案例 -- App数据分析

文章目录

  • 1. 项目需求
  • 2. flume采集数据
  • 3. 预处理
  • 4. 导入数据到hive
  • 5. hive数据仓库etl(各种hive sql编写)
  • 6. 数据迁移sqoop
  • 7. web展示系统开发

1. 项目需求

网站、app的运营者需要知道自己的产品或服务的运营状况,就需要对使用自己产品的用户进行各种角度的数据分析,比如:

  • 用户数量
  • 新增用户
  • 留存用户
  • 活跃用户
  • 地域分析
  • 渠道分析

要做这样的分析,数据来源应该是用户的产品使用的行为日志,行为日志是由app或者网站的页面获取用户相关信息后,发送给后台服务器记录下来的:
在这里插入图片描述

需求:
1、检查每条日志的必选字段是否完整,不完整的日志应该滤除
必选字段:
cid_sn ;
mac ;
commit_time ;
sdk_ver ;
device_id_type ;
city ;
device_model ;
app_ver_name ;
app_ver_code ;
device_id ;
release_channel ; ## 用户下载该app时所用的app应用市场:360,安智市场,
country ;
time_zone ;
os_name ; ## 操作系统名称
commit_id ; ## 提交请求的序号
app_token ; ## app名称:
app_id ; ## app的id标识(所属的公司事业部)
language ; ## 用户的操作系统语言()

2、为每条日志添加一个用户唯一标识字段:user_id
user_id的取值逻辑:
如果是ios设备,user_id=device_id
如果是android设备, user_id = android_id
如果android_id为空,则user_id = device_id

3、将event字段抛弃,将header中的各字段解析成普通文本行

4、需要将清洗后的结果数据,分ios和android两种类别,输出到2个不同的文件夹:

/app_clean_log/2017-09-20/ios/part-r-00000/ios/part-r-00001/ios/part-r-00002/android/part-r-00000/android/part-r-00001

2. flume采集数据

把 /home/hadoop/logs/access_log 目录下的数据采集到HDFS /app-log-data/data/%y-%m-%d/ 路径下

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1#configure host for source
#agent1.sources.source1.interceptors = i1
#agent1.sources.source1.interceptors.i1.type = host
#agent1.sources.source1.interceptors.i1.hostHeader = hostname# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hadoop100:9000/app-log-data/data/%y-%m-%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 24
agent1.sinks.sink1.hdfs.roundUnit = hour
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

假设已经从日志服务器中采集数据到HDFS中,将这些数据存储到HDFS下/app-log-data/data/对应日期 下即可作为预处理程序的输入:
数据:https://pan.baidu.com/s/1KOYsAmHYJ_yEGReSu_cJmQ
提取码:sdba

3. 预处理

预处理包括:

  • mapreduce程序(数据清洗、解析)
  • 开发shell脚本启动
  • 把脚本配置成crontab定时任务

1. mapreduce程序
AppDataClean类:

package app;import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;public class AppDataClean {public static class AppDataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text k = null;NullWritable v = null;MultipleOutputs<Text, NullWritable> mos = null;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {k = new Text();v = NullWritable.get();// 多路输出器mos = new MultipleOutputs<Text, NullWritable>(context); }@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {JSONObject jsonObj = JSON.parseObject(value.toString());JSONObject headObj = jsonObj.getJSONObject("header");/*** 滤除没有以下字段的数据*/if(StringUtils.isBlank(headObj.getString("cid_sn"))){return;}if(StringUtils.isBlank(headObj.getString("mac"))){return;}if(StringUtils.isBlank(headObj.getString("commit_time"))){return;}if(StringUtils.isBlank(headObj.getString("sdk_ver"))){return;}if(StringUtils.isBlank(headObj.getString("device_id_type"))){return;}if(StringUtils.isBlank(headObj.getString("city"))){return;}if(StringUtils.isBlank(headObj.getString("device_model"))){return;}if(StringUtils.isBlank(headObj.getString("app_ver_name"))){return;}if(StringUtils.isBlank(headObj.getString("app_ver_code"))){return;}// device_id标准长度是19if(StringUtils.isBlank(headObj.getString("device_id")) || headObj.getString("device_id").length()<19){return;}if(StringUtils.isBlank(headObj.getString("release_channel"))){return;}if(StringUtils.isBlank(headObj.getString("country"))){return;}if(StringUtils.isBlank(headObj.getString("time_zone"))){return;}if(StringUtils.isBlank(headObj.getString("os_name"))){return;}if(StringUtils.isBlank(headObj.getString("commit_id"))){return;}if(StringUtils.isBlank(headObj.getString("app_token"))){return;}if(StringUtils.isBlank(headObj.getString("app_id"))){return;}if(StringUtils.isBlank(headObj.getString("language"))){return;}/*** 生成user_id*/String user_id = "";if("android".equals(headObj.getString("os_name").trim())){user_id = StringUtils.isNotBlank(headObj.getString("android_id"))?headObj.getString("android_id"):headObj.getString("device_id");}else{user_id = headObj.getString("device_id");}headObj.put("user_id", user_id);k.set(JsonToString.toString(headObj));/*** 分目录输出*/if("android".equals(headObj.getString("os_name"))){mos.write(k, v, "android/android");}else{mos.write(k, v, "ios/ios");}}// 用完记得关@Overrideprotected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {mos.close();}}public static void main(String args[]) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(AppDataClean.class);job.setMapperClass(AppDataCleanMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(0);// 避免生成part-m-00000等文件,因为文件已交给mos生成LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}
}

JsonToString类

package app;import com.alibaba.fastjson.JSONObject;public class JsonToString {public static String toString(JSONObject jsonObj){StringBuilder sb = new StringBuilder();// 分隔符用"\001"sb.append(jsonObj.get("cid_sn")).append("\001").append(jsonObj.get("mac")).append("\001").append(jsonObj.get("commit_time")).append("\001").append(jsonObj.get("sdk_ver")).append("\001").append(jsonObj.get("device_id_type")).append("\001").append(jsonObj.get("city")).append("\001").append(jsonObj.get("device_model")).append("\001").append(jsonObj.get("app_ver_name")).append("\001").append(jsonObj.get("imei")).append("\001").append(jsonObj.get("app_ver_code")).append("\001").append(jsonObj.get("device_id")).append("\001").append(jsonObj.get("release_channel")).append("\001").append(jsonObj.get("country")).append("\001").append(jsonObj.get("time_zone")).append("\001").append(jsonObj.get("os_name")).append("\001").append(jsonObj.get("commit_id")).append("\001").append(jsonObj.get("app_token")).append("\001").append(jsonObj.get("app_id ")).append("\001").append(jsonObj.get("language")).append("\001").append(jsonObj.get("user_id"));return sb.toString();}
}

2. 开发shell脚本启动
将数据清洗程序导出成data-clean.jar存储在windows本地,并导入到linux目录/root/data-clean.jar,接下来就是用一个shell脚本(命名为data-clean.sh)去运行data-clean.jar。

注意:该脚本可以设置在当天00:30时启动,因此清洗的是昨天的数据

#!/bin/bash
day_str=`date -d '-1 day' + '%Y-%m-%d'`inpath=/app-log-data/data/$day_str
outpath=/app-log-data/clean/$day_strecho "准备清洗$day_str 的数据......"/root/hadoop/hadoop-2.9.2/bin/hadoop jar /root/data-clean.jar app.AppDataClean $inpath $outpath

其中

#hadoop安装目录/bin/hadoop 运行jar包
/root/hadoop/hadoop-2.9.2/bin/hadoop jar
#哪个jar包: 
/root/data-clean.jar
#运行jar包中哪个类:
app.AppDataClean
#这个类中的main方法的输入参数:
$inpath $outpath

3. 把脚本配置成crontab定时任务
在linux中输入crontab -e
再输入30 0 * * * /root/data-clean.sh

4. 导入数据到hive

建分区表(按日期和os_name分区)并插入数据

CREATE EXTERNAL TABLE ods_app_log (cid_sn string,mac string,         commit_time string,    sdk_ver string,device_id_type string,city string,device_model string,app_ver_name string,        app_ver_code string,device_id string,release_channel string,country string,time_zone string,os_name string,commit_id string,app_token string,app_id string,language string,user_id string) partitioned BY (day string,os string) row format delimited fields terminated BY '\001' location '/app-log-data/clean';ALTER TABLE ods_app_log ADD PARTITION (day = '2017-08-14',os = 'android') location '/app-log-data/clean/2017-08-14/android';
ALTER TABLE ods_app_log ADD PARTITION (day = '2017-08-14',os = 'ios') location '/app-log-data/clean/2017-08-14/ios';

5. hive数据仓库etl(各种hive sql编写)

统计日活用户:
– 1. 把当天的活跃用户信息抽取出来,存入一个日活用户信息表
– 1.1/ 建日活用户信息表

CREATE TABLE etl_user_active_day (cid_sn string,mac string,         commit_time string,    sdk_ver string,device_id_type string,city string,device_model string,app_ver_name string,        app_ver_code string,device_id string,release_channel string,country string,time_zone string,os_name string,commit_id string,app_token string,app_id string,language string,user_id string) partitioned BY (day string) row format delimited fields terminated BY '\001';

– 1.2 从ods_app_log原始数据表的当天分区中,抽取当日的日活用户信息插入日活用户信息表etl_user_active_day
– 注意点:每个活跃用户抽取他当天所有记录中时间最早的一条;

INSERT INTO TABLE etl_user_active_day PARTITION (day = '2017-08-14')
SELECT 
cid_sn,
mac,         
commit_time,    
sdk_ver,
device_id_type,
city,
device_model,
app_ver_name,        
imei,
app_ver_code,
device_id,
release_channel,
country,
time_zone,
os_name,
commit_id,
app_token,
app_id,
language,
user_id
FROM (SELECT *,row_number() OVER (PARTITION BY user_id ORDER BY commit_time) AS rnFROM ods_app_logWHERE day = '2017-08-14'
) tmp
WHERE rn = 1;

各种维度统计:

  • 是否区分操作系统os_name
  • 是否区分城市city
  • 是否区分渠道release_channel
  • 是否区分版本app_ver_name

– 1. 建维度统计结果表 dim_user_active_day

CREATE TABLE dim_user_active_day (os_name string,city string,release_channel string,app_ver_name string,cnts INT) partitioned BY (day string,dim string);

– 2. 利用多重insert语法来统计各种维度组合的日活用户数,并插入到日活维度统计表的各分区中;

FROM etl_user_active_dayINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0000' )
SELECT 'all' ,'all','all','all',count(1) WHERE day = '2017-08-14'INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '1000' )
SELECT os_name,'all','all','all' ,count(1) WHERE day = '2017-08-14' GROUP BY (os_name)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0100')
SELECT 'all',city,'all','all' ,count(1) WHERE day = '2017-08-14' GROUP BY (city)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14' ,dim = '0010')
SELECT 'all','all' ,release_channel,'all' ,count(1) WHERE day = '2017-08-14' GROUP BY (release_channel)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0001')
SELECT 'all','all' ,'all',app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY (app_ver_name)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0011')
SELECT 'all','all',release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY release_channel,app_ver_nameINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0110')
SELECT 'all',city,release_channel,'all',count(1) WHERE day = '2017-08-14' GROUP BY city,release_channelINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0111')
SELECT 'all',city,release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY city,release_channel,app_ver_nameINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '1111')
SELECT os_name,city,release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY os_name,city,release_channel,app_ver_name
;

日新用户统计:


日新:当日第一次出现的用户–当日的新增用户

思路:
a、应该建立一个历史用户表(只存user_id)

b、将当日的活跃用户去 比对 历史用户表, 就知道哪些人是今天新出现的用户 --> 当日新增用户

c、将当日新增用户追加到历史用户表


********************************* 建表 *********************************
– 1 历史用户表

create table etl_user_history(user_id string);

– 2 当日新增用户表:存所有字段(每个人时间最早的一条),带有一个分区字段:day string;

create table etl_user_new_day like etl_user_active_day;

********************************* 统计实现 *********************************

– 1 当日活跃-历史用户表 --> 新增用户表的当日分区

insert into etl_user_new_day partition(day='2017-08-14')
SELECTcid_sn,mac,         commit_time,    sdk_ver,device_id_type,city,device_model,app_ver_name,        imei,app_ver_code,device_id,release_channel,country,time_zone,os_name,commit_id,app_token,app_id,language,a.user_id
from  etl_user_active_day a left join  etl_user_history b on a.user_id = b.user_id
where a.day='2017-08-14' and b.user_id is null;

– 2 将当日新增用户的user_id追加到历史表

insert into table etl_user_history
select user_id from etl_user_new_day where day='2017-08-14';

6. 数据迁移sqoop

例如将app数据仓库中的 日新用户维度统计报表:dim_user_new_day 导出到mysql的表中去
– 1 在mysql中建库建表

create database app;
create table dim_user_new_day(
os_name varchar(20),city varchar(20),release_channel varchar(20),app_ver_name varchar(20),cnts int,dt varchar(20)
);

–注意:将库和表的编码集改成utf8,命令如下:
修改库的编码:
mysql> alter database db_name character set utf8;
修改表的编码:
mysql> ALTER TABLE table_name CONVERT TO CHARACTER SET utf8 COLLATE utf8_general_ci;

– 2 用sqoop将hive中的 dim_user_new_day 中的指定日分区的数据导出到mysql 的dim_user_new_day

这里有坑:因为dim_user_new_day表有二级分区(day,dim),如果
--export-dir /user/hive/warehouse/app.db/dim_user_new_day_1p/day=2017-08-14/dim=0000
会报错,这里应该是个bug,所以只能把dim_user_new_day变成一级分区(partition=day)表dim_user_new_day_1p

大致思路:创建新分区表dim_user_new_day_1p,然后from dim_user_new_day 表中select除了dim的字段,where day = ‘2017-08-14’

#!/bin/bash
day=`date -d '-1 day' +'%Y-%m-%d'`/root/apps/sqoop/bin/sqoop export \
--connect "jdbc:mysql://hadoop101:3306/app?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--input-fields-terminated-by '\001' \
--table dim_user_new_day \
--export-dir /user/hive/warehouse/app.db/dim_user_new_day_1p/day=${day}

这里写成脚本,其实以上所有sql统计都可以写成脚本,然后定时执行。

7. web展示系统开发

echarts + springmvc+spring+mybatis ==> mysql


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部