大数据项目统计总访问量(一)次

1、将web项目打成jar包,上传到liunx集群上,并运行

[root@hdp-1 apps]# java -jar demo-0.0.1-SNAPSHOT.jar

测试是否运行成功:打开浏览器 --> 输入服务器名:web项目端口号,例:hdp-1:1997

2、启动HDFS

[root@hdp-1 ~]# start-all.sh

3、启动flume

[root@hdp-1 bin]# ./flume-ng agent \-c conf \-n a1 \-f ../conf/demo.logger.properties \-Dflume.root.logger=DEBUG,console

flume产生日志路径:/usr/local/nginx/logs

4、nginx配置文件

[root@hdp-1 conf]# vi nginx.conflog_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
#user  nobody;
worker_processes  1;#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;#pid        logs/nginx.pid;events {worker_connections  1024;
}http {include       mime.types;default_type  application/octet-stream;log_format  main  '$remote_addr'#access_log  logs/access.log  main;sendfile        on;#tcp_nopush     on;#keepalive_timeout  0;keepalive_timeout  65;#gzip  on;upstream frame-tomcat {#代理服务器名server hdp-4:8180;}server {listen       80;#本机服务器名server_name  hdp-1;#charset koi8-r;access_log  logs/host.access.log  main;location / {# root   html;# index  index.html index.htm;proxy_pass http://frame-tomcat;}error_page   500 502 503 504  /50x.html;location = /50x.html {root   html;

注:启动nginx的目的是为了产生日志,转发地址和负载均衡

启动nginx:

[root@hdp-1 ~]# /usr/local/nginx/sbin
[root@hdp-1 sbin]# ./nginx

5、flume采集数据下沉到kafka,kafka的作用队列,简单理解可以理解为序列化,其作用是避免数据阻塞

flume下沉到kafka的配置文件:

[root@hdp-1 apache-flume-1.6.0-bin]# vi tail-kafka.conf 
a1.sources = source1
a1.sinks = k1
a1.channels = c1
#数据源定义一个可执行命令
a1.sources.source1.type = exec
#跟踪文件
a1.sources.source1.command = tail -F /usr/local/nginx/logs/log.demo.access.log
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = hdp-1:9092, hdp-2:9092, hdp-4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.source1.channels = c1
a1.sinks.k1.channel = c1

启动zookeeper:

[root@hdp-1 ~]# ./zk
zkdata/       zkmanager.sh  
[root@hdp-1 ~]# ./zkmanager.sh

启动kafka:

[root@hdp-1 ~]# cd apps/kafka_2.12-2.2.0/bin/
[root@hdp-1 bin]# ./kafka-server-start.sh ../config/server.properties

注意:启动kafka之前,一定要先启动zk

启动生产者:

./kafka-console-producer.sh --broker-list hdp-1:9092,hdp-2:9092,hdp-3:9092 --topic test

启动消费者:

./kafka-console-consumer.sh --bootstrap-server hdp-1:9092,hdp-2:9092,hdp-3:9092 --topic test --from-beginning

代码之前写过,就不再写了

将kafka数据产生的临时文件上传到hdfs的hive表中
创建表

create external table flume_table (ip string ) row format delimited location '/usr/';

代码

package com.zpark.onekafka;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;public class HdfsTest {public static void main(String[] args) {URI uri = null;Configuration conf = null;String user = "root";FileSystem fs = null;try {uri = new URI("hdfs://hdp-1:9000");conf = new Configuration();//dfs.replication:分布式文件系统副本的数量conf.set("dfs.replication", "2");//dfs.blocksize:分布式文件系统的块的大小   100M   64+36conf.set("dfs.blocksize", "64m");fs = FileSystem.get(uri, conf, user);fs.copyFromLocalFile(new Path("d:/testlog/access.log"),new Path("/usr/a.txt"));fs.close();} catch (URISyntaxException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {}}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部