大数据项目统计总访问量(一)次
1、将web项目打成jar包,上传到liunx集群上,并运行
[root@hdp-1 apps]# java -jar demo-0.0.1-SNAPSHOT.jar
测试是否运行成功:打开浏览器 --> 输入服务器名:web项目端口号,例:hdp-1:1997
2、启动HDFS
[root@hdp-1 ~]# start-all.sh
3、启动flume
[root@hdp-1 bin]# ./flume-ng agent \-c conf \-n a1 \-f ../conf/demo.logger.properties \-Dflume.root.logger=DEBUG,console
flume产生日志路径:/usr/local/nginx/logs
4、nginx配置文件
[root@hdp-1 conf]# vi nginx.conflog_format main '$remote_addr - $remote_user [$time_local] "$request" '
#user nobody;
worker_processes 1;#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;#pid logs/nginx.pid;events {worker_connections 1024;
}http {include mime.types;default_type application/octet-stream;log_format main '$remote_addr'#access_log logs/access.log main;sendfile on;#tcp_nopush on;#keepalive_timeout 0;keepalive_timeout 65;#gzip on;upstream frame-tomcat {#代理服务器名server hdp-4:8180;}server {listen 80;#本机服务器名server_name hdp-1;#charset koi8-r;access_log logs/host.access.log main;location / {# root html;# index index.html index.htm;proxy_pass http://frame-tomcat;}error_page 500 502 503 504 /50x.html;location = /50x.html {root html;
注:启动nginx的目的是为了产生日志,转发地址和负载均衡
启动nginx:
[root@hdp-1 ~]# /usr/local/nginx/sbin
[root@hdp-1 sbin]# ./nginx
5、flume采集数据下沉到kafka,kafka的作用队列,简单理解可以理解为序列化,其作用是避免数据阻塞
flume下沉到kafka的配置文件:
[root@hdp-1 apache-flume-1.6.0-bin]# vi tail-kafka.conf
a1.sources = source1
a1.sinks = k1
a1.channels = c1
#数据源定义一个可执行命令
a1.sources.source1.type = exec
#跟踪文件
a1.sources.source1.command = tail -F /usr/local/nginx/logs/log.demo.access.log
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = hdp-1:9092, hdp-2:9092, hdp-4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.source1.channels = c1
a1.sinks.k1.channel = c1
启动zookeeper:
[root@hdp-1 ~]# ./zk
zkdata/ zkmanager.sh
[root@hdp-1 ~]# ./zkmanager.sh
启动kafka:
[root@hdp-1 ~]# cd apps/kafka_2.12-2.2.0/bin/
[root@hdp-1 bin]# ./kafka-server-start.sh ../config/server.properties
注意:启动kafka之前,一定要先启动zk
启动生产者:
./kafka-console-producer.sh --broker-list hdp-1:9092,hdp-2:9092,hdp-3:9092 --topic test
启动消费者:
./kafka-console-consumer.sh --bootstrap-server hdp-1:9092,hdp-2:9092,hdp-3:9092 --topic test --from-beginning
代码之前写过,就不再写了
将kafka数据产生的临时文件上传到hdfs的hive表中
创建表
create external table flume_table (ip string ) row format delimited location '/usr/';
代码
package com.zpark.onekafka;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;public class HdfsTest {public static void main(String[] args) {URI uri = null;Configuration conf = null;String user = "root";FileSystem fs = null;try {uri = new URI("hdfs://hdp-1:9000");conf = new Configuration();//dfs.replication:分布式文件系统副本的数量conf.set("dfs.replication", "2");//dfs.blocksize:分布式文件系统的块的大小 100M 64+36conf.set("dfs.blocksize", "64m");fs = FileSystem.get(uri, conf, user);fs.copyFromLocalFile(new Path("d:/testlog/access.log"),new Path("/usr/a.txt"));fs.close();} catch (URISyntaxException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {}}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
