大数据案例 -- App数据分析
文章目录
- 1. 项目需求
- 2. flume采集数据
- 3. 预处理
- 4. 导入数据到hive
- 5. hive数据仓库etl(各种hive sql编写)
- 6. 数据迁移sqoop
- 7. web展示系统开发
1. 项目需求
网站、app的运营者需要知道自己的产品或服务的运营状况,就需要对使用自己产品的用户进行各种角度的数据分析,比如:
- 用户数量
- 新增用户
- 留存用户
- 活跃用户
- 地域分析
- 渠道分析
…
要做这样的分析,数据来源应该是用户的产品使用的行为日志,行为日志是由app或者网站的页面获取用户相关信息后,发送给后台服务器记录下来的:

需求:
1、检查每条日志的必选字段是否完整,不完整的日志应该滤除
必选字段:
cid_sn ;
mac ;
commit_time ;
sdk_ver ;
device_id_type ;
city ;
device_model ;
app_ver_name ;
app_ver_code ;
device_id ;
release_channel ; ## 用户下载该app时所用的app应用市场:360,安智市场,
country ;
time_zone ;
os_name ; ## 操作系统名称
commit_id ; ## 提交请求的序号
app_token ; ## app名称:
app_id ; ## app的id标识(所属的公司事业部)
language ; ## 用户的操作系统语言()
2、为每条日志添加一个用户唯一标识字段:user_id
user_id的取值逻辑:
如果是ios设备,user_id=device_id
如果是android设备, user_id = android_id
如果android_id为空,则user_id = device_id
3、将event字段抛弃,将header中的各字段解析成普通文本行
4、需要将清洗后的结果数据,分ios和android两种类别,输出到2个不同的文件夹:
/app_clean_log/2017-09-20/ios/part-r-00000/ios/part-r-00001/ios/part-r-00002/android/part-r-00000/android/part-r-00001
2. flume采集数据
把 /home/hadoop/logs/access_log 目录下的数据采集到HDFS /app-log-data/data/%y-%m-%d/ 路径下
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1#configure host for source
#agent1.sources.source1.interceptors = i1
#agent1.sources.source1.interceptors.i1.type = host
#agent1.sources.source1.interceptors.i1.hostHeader = hostname# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hadoop100:9000/app-log-data/data/%y-%m-%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 24
agent1.sinks.sink1.hdfs.roundUnit = hour
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
假设已经从日志服务器中采集数据到HDFS中,将这些数据存储到HDFS下/app-log-data/data/对应日期 下即可作为预处理程序的输入:
数据:https://pan.baidu.com/s/1KOYsAmHYJ_yEGReSu_cJmQ
提取码:sdba
3. 预处理
预处理包括:
- mapreduce程序(数据清洗、解析)
- 开发shell脚本启动
- 把脚本配置成crontab定时任务
1. mapreduce程序
AppDataClean类:
package app;import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;public class AppDataClean {public static class AppDataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text k = null;NullWritable v = null;MultipleOutputs<Text, NullWritable> mos = null;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {k = new Text();v = NullWritable.get();// 多路输出器mos = new MultipleOutputs<Text, NullWritable>(context); }@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {JSONObject jsonObj = JSON.parseObject(value.toString());JSONObject headObj = jsonObj.getJSONObject("header");/*** 滤除没有以下字段的数据*/if(StringUtils.isBlank(headObj.getString("cid_sn"))){return;}if(StringUtils.isBlank(headObj.getString("mac"))){return;}if(StringUtils.isBlank(headObj.getString("commit_time"))){return;}if(StringUtils.isBlank(headObj.getString("sdk_ver"))){return;}if(StringUtils.isBlank(headObj.getString("device_id_type"))){return;}if(StringUtils.isBlank(headObj.getString("city"))){return;}if(StringUtils.isBlank(headObj.getString("device_model"))){return;}if(StringUtils.isBlank(headObj.getString("app_ver_name"))){return;}if(StringUtils.isBlank(headObj.getString("app_ver_code"))){return;}// device_id标准长度是19if(StringUtils.isBlank(headObj.getString("device_id")) || headObj.getString("device_id").length()<19){return;}if(StringUtils.isBlank(headObj.getString("release_channel"))){return;}if(StringUtils.isBlank(headObj.getString("country"))){return;}if(StringUtils.isBlank(headObj.getString("time_zone"))){return;}if(StringUtils.isBlank(headObj.getString("os_name"))){return;}if(StringUtils.isBlank(headObj.getString("commit_id"))){return;}if(StringUtils.isBlank(headObj.getString("app_token"))){return;}if(StringUtils.isBlank(headObj.getString("app_id"))){return;}if(StringUtils.isBlank(headObj.getString("language"))){return;}/*** 生成user_id*/String user_id = "";if("android".equals(headObj.getString("os_name").trim())){user_id = StringUtils.isNotBlank(headObj.getString("android_id"))?headObj.getString("android_id"):headObj.getString("device_id");}else{user_id = headObj.getString("device_id");}headObj.put("user_id", user_id);k.set(JsonToString.toString(headObj));/*** 分目录输出*/if("android".equals(headObj.getString("os_name"))){mos.write(k, v, "android/android");}else{mos.write(k, v, "ios/ios");}}// 用完记得关@Overrideprotected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {mos.close();}}public static void main(String args[]) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(AppDataClean.class);job.setMapperClass(AppDataCleanMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(0);// 避免生成part-m-00000等文件,因为文件已交给mos生成LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}
}
JsonToString类
package app;import com.alibaba.fastjson.JSONObject;public class JsonToString {public static String toString(JSONObject jsonObj){StringBuilder sb = new StringBuilder();// 分隔符用"\001"sb.append(jsonObj.get("cid_sn")).append("\001").append(jsonObj.get("mac")).append("\001").append(jsonObj.get("commit_time")).append("\001").append(jsonObj.get("sdk_ver")).append("\001").append(jsonObj.get("device_id_type")).append("\001").append(jsonObj.get("city")).append("\001").append(jsonObj.get("device_model")).append("\001").append(jsonObj.get("app_ver_name")).append("\001").append(jsonObj.get("imei")).append("\001").append(jsonObj.get("app_ver_code")).append("\001").append(jsonObj.get("device_id")).append("\001").append(jsonObj.get("release_channel")).append("\001").append(jsonObj.get("country")).append("\001").append(jsonObj.get("time_zone")).append("\001").append(jsonObj.get("os_name")).append("\001").append(jsonObj.get("commit_id")).append("\001").append(jsonObj.get("app_token")).append("\001").append(jsonObj.get("app_id ")).append("\001").append(jsonObj.get("language")).append("\001").append(jsonObj.get("user_id"));return sb.toString();}
}
2. 开发shell脚本启动
将数据清洗程序导出成data-clean.jar存储在windows本地,并导入到linux目录/root/data-clean.jar,接下来就是用一个shell脚本(命名为data-clean.sh)去运行data-clean.jar。
注意:该脚本可以设置在当天00:30时启动,因此清洗的是昨天的数据
#!/bin/bash
day_str=`date -d '-1 day' + '%Y-%m-%d'`inpath=/app-log-data/data/$day_str
outpath=/app-log-data/clean/$day_strecho "准备清洗$day_str 的数据......"/root/hadoop/hadoop-2.9.2/bin/hadoop jar /root/data-clean.jar app.AppDataClean $inpath $outpath
其中
#hadoop安装目录/bin/hadoop 运行jar包
/root/hadoop/hadoop-2.9.2/bin/hadoop jar
#哪个jar包:
/root/data-clean.jar
#运行jar包中哪个类:
app.AppDataClean
#这个类中的main方法的输入参数:
$inpath $outpath
3. 把脚本配置成crontab定时任务
在linux中输入crontab -e
再输入30 0 * * * /root/data-clean.sh
4. 导入数据到hive
建分区表(按日期和os_name分区)并插入数据
CREATE EXTERNAL TABLE ods_app_log (cid_sn string,mac string, commit_time string, sdk_ver string,device_id_type string,city string,device_model string,app_ver_name string, app_ver_code string,device_id string,release_channel string,country string,time_zone string,os_name string,commit_id string,app_token string,app_id string,language string,user_id string) partitioned BY (day string,os string) row format delimited fields terminated BY '\001' location '/app-log-data/clean';ALTER TABLE ods_app_log ADD PARTITION (day = '2017-08-14',os = 'android') location '/app-log-data/clean/2017-08-14/android';
ALTER TABLE ods_app_log ADD PARTITION (day = '2017-08-14',os = 'ios') location '/app-log-data/clean/2017-08-14/ios';
5. hive数据仓库etl(各种hive sql编写)
统计日活用户:
– 1. 把当天的活跃用户信息抽取出来,存入一个日活用户信息表
– 1.1/ 建日活用户信息表
CREATE TABLE etl_user_active_day (cid_sn string,mac string, commit_time string, sdk_ver string,device_id_type string,city string,device_model string,app_ver_name string, app_ver_code string,device_id string,release_channel string,country string,time_zone string,os_name string,commit_id string,app_token string,app_id string,language string,user_id string) partitioned BY (day string) row format delimited fields terminated BY '\001';
– 1.2 从ods_app_log原始数据表的当天分区中,抽取当日的日活用户信息插入日活用户信息表etl_user_active_day
– 注意点:每个活跃用户抽取他当天所有记录中时间最早的一条;
INSERT INTO TABLE etl_user_active_day PARTITION (day = '2017-08-14')
SELECT
cid_sn,
mac,
commit_time,
sdk_ver,
device_id_type,
city,
device_model,
app_ver_name,
imei,
app_ver_code,
device_id,
release_channel,
country,
time_zone,
os_name,
commit_id,
app_token,
app_id,
language,
user_id
FROM (SELECT *,row_number() OVER (PARTITION BY user_id ORDER BY commit_time) AS rnFROM ods_app_logWHERE day = '2017-08-14'
) tmp
WHERE rn = 1;
各种维度统计:
- 是否区分操作系统os_name
- 是否区分城市city
- 是否区分渠道release_channel
- 是否区分版本app_ver_name
– 1. 建维度统计结果表 dim_user_active_day
CREATE TABLE dim_user_active_day (os_name string,city string,release_channel string,app_ver_name string,cnts INT) partitioned BY (day string,dim string);
– 2. 利用多重insert语法来统计各种维度组合的日活用户数,并插入到日活维度统计表的各分区中;
FROM etl_user_active_dayINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0000' )
SELECT 'all' ,'all','all','all',count(1) WHERE day = '2017-08-14'INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '1000' )
SELECT os_name,'all','all','all' ,count(1) WHERE day = '2017-08-14' GROUP BY (os_name)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0100')
SELECT 'all',city,'all','all' ,count(1) WHERE day = '2017-08-14' GROUP BY (city)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14' ,dim = '0010')
SELECT 'all','all' ,release_channel,'all' ,count(1) WHERE day = '2017-08-14' GROUP BY (release_channel)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0001')
SELECT 'all','all' ,'all',app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY (app_ver_name)INSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0011')
SELECT 'all','all',release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY release_channel,app_ver_nameINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0110')
SELECT 'all',city,release_channel,'all',count(1) WHERE day = '2017-08-14' GROUP BY city,release_channelINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '0111')
SELECT 'all',city,release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY city,release_channel,app_ver_nameINSERT INTO TABLE dim_user_active_day PARTITION (day = '2017-08-14',dim = '1111')
SELECT os_name,city,release_channel,app_ver_name,count(1) WHERE day = '2017-08-14' GROUP BY os_name,city,release_channel,app_ver_name
;
日新用户统计:
日新:当日第一次出现的用户–当日的新增用户
思路:
a、应该建立一个历史用户表(只存user_id)
b、将当日的活跃用户去 比对 历史用户表, 就知道哪些人是今天新出现的用户 --> 当日新增用户
c、将当日新增用户追加到历史用户表
********************************* 建表 *********************************
– 1 历史用户表
create table etl_user_history(user_id string);
– 2 当日新增用户表:存所有字段(每个人时间最早的一条),带有一个分区字段:day string;
create table etl_user_new_day like etl_user_active_day;
********************************* 统计实现 *********************************
– 1 当日活跃-历史用户表 --> 新增用户表的当日分区
insert into etl_user_new_day partition(day='2017-08-14')
SELECTcid_sn,mac, commit_time, sdk_ver,device_id_type,city,device_model,app_ver_name, imei,app_ver_code,device_id,release_channel,country,time_zone,os_name,commit_id,app_token,app_id,language,a.user_id
from etl_user_active_day a left join etl_user_history b on a.user_id = b.user_id
where a.day='2017-08-14' and b.user_id is null;
– 2 将当日新增用户的user_id追加到历史表
insert into table etl_user_history
select user_id from etl_user_new_day where day='2017-08-14';
6. 数据迁移sqoop
例如将app数据仓库中的 日新用户维度统计报表:dim_user_new_day 导出到mysql的表中去
– 1 在mysql中建库建表
create database app;
create table dim_user_new_day(
os_name varchar(20),city varchar(20),release_channel varchar(20),app_ver_name varchar(20),cnts int,dt varchar(20)
);
–注意:将库和表的编码集改成utf8,命令如下:
修改库的编码:
mysql> alter database db_name character set utf8;
修改表的编码:
mysql> ALTER TABLE table_name CONVERT TO CHARACTER SET utf8 COLLATE utf8_general_ci;
– 2 用sqoop将hive中的 dim_user_new_day 中的指定日分区的数据导出到mysql 的dim_user_new_day
这里有坑:因为dim_user_new_day表有二级分区(day,dim),如果
--export-dir /user/hive/warehouse/app.db/dim_user_new_day_1p/day=2017-08-14/dim=0000
会报错,这里应该是个bug,所以只能把dim_user_new_day变成一级分区(partition=day)表dim_user_new_day_1p
大致思路:创建新分区表dim_user_new_day_1p,然后from dim_user_new_day 表中select除了dim的字段,where day = ‘2017-08-14’
#!/bin/bash
day=`date -d '-1 day' +'%Y-%m-%d'`/root/apps/sqoop/bin/sqoop export \
--connect "jdbc:mysql://hadoop101:3306/app?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--input-fields-terminated-by '\001' \
--table dim_user_new_day \
--export-dir /user/hive/warehouse/app.db/dim_user_new_day_1p/day=${day}
这里写成脚本,其实以上所有sql统计都可以写成脚本,然后定时执行。
7. web展示系统开发
echarts + springmvc+spring+mybatis ==> mysql
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
