Kafka入门教程(二)

转自:https://blog.csdn.net/yuan_xw/article/details/79188061



Kafka集群环境安装

相关下载

  1. 相关下载:

JDK要求1.8版本以上。

JDK安装教程:http://blog.csdn.net/yuan_xw/article/details/49948285

Zookeeper安装教程:http://blog.csdn.net/yuan_xw/article/details/47148401

Kafka下载地址:http://mirrors.shu.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
这里写图片描述

Kafka集群规划

主机名IP安装软件
Kafka1192.168.1.221Jdk、Zookeeper、Kafka
Kafka2192.168.1.222Jdk、Zookeeper、Kafka
Kafka3192.168.1.223Jdk、Zookeeper、Kafka

1. 配置ssh免密码登录:

产生密钥,执行命令:ssh-keygen -t rsa,按4回车,密钥文件位于\~/.ssh文件

在192.168.1.221上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

ssh-copy-id 192.168.1.221

ssh-copy-id 192.168.1.222

ssh-copy-id 192.168.1.223

在192.168.1.222上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

ssh-copy-id 192.168.1.221

ssh-copy-id 192.168.1.222

ssh-copy-id 192.168.1.223

在192.168.1.223上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

ssh-copy-id 192.168.1.221

ssh-copy-id 192.168.1.222

ssh-copy-id 192.168.1.223

  1. 在所有的服务器上设置环境变量:
export JAVA_HOME=/usr/local/software/jdk1.8.0_66
export CLASSPATH=.:$JAVA_HOME</span>/lib/dt.jar:>class="hljs-variable">$JAVA_HOME/lib/tools.jar
export KAFKA_HOME=/usr/local/software/kafka_2.11-1.0.0
export ZOOKEEPER_HOME=/usr/local/software/zookeeper-3.4.11
export PATH=.:$JAVA_HOME</span>/class="hljs-symbol">bin:</span><span class="hljs-variable">$KAFKA_HOME/bin:$ZOOKEEPER_HOME</span>/<span class="hljs-symbol">bin:</span><span class="hljs-variable">$PATH
  • 1
  • 2
  • 3
  • 4
  • 5

这里写图片描述

刷新环境变量:source /etc/profile

  1. 关闭所有服务器上的防火墙:

systemctl stop firewalld.service

systemctl disable firewalld.service

安装Kafka

  1. 分别在[Kafka1、Kafka2、Kafka3]服务器下载:

tar -zxvf kafka_2.11-1.0.0.tgz
这里写图片描述
2. 修改配置文件:

Kafka1服务器配置文件

broker.id=0

listeners=PLAINTEXT://:9092

log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

Kafka2服务器配置文件

broker.id=1

listeners=PLAINTEXT://:9092

log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

Kafka3服务器配置文件

broker.id=2

listeners=PLAINTEXT://:9092

log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

  1. 分别启动[Kafka1、Kafka2、Kafka3]服务器的kafka服务

启动命令:./bin/kafka-server-start.sh -daemon config/server.properties

查看启动日志:tail -100f logs/server.log

查看启动端口:lsof -i:9092
这里写图片描述

创建主题Topic

  1. 创建主题topic

执行命令:bin/kafka-topics.sh - -create - -zookeeper 192.168.1.221:2181
- -replication-factor 2 - -partitions 2 - -topic test

  1. 查看主题topic描述

执行命令:bin/kafka-topics.sh –list –zookeeper 192.168.1.223:2181
这里写图片描述
1. 查看创建的所有主题

执行命令:bin/kafka-topics.sh –list –zookeeper 192.168.1.223:2181

  1. 生产者生产消息

执行命令:bin/kafka-console-producer.sh –broker-list 192.168.1.221:9092 –topic test

  1. 消费者消费消息

执行命令:bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.223:9092 –topic test –from-beginning

JAVA客户端

  1. 新建一个maven项目,项目名称:kafka-demo:

代码结构如下:
这里写图片描述
1. Maven的pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>com.kafkagroupId><artifactId>kafka-demoartifactId><version>1.0-SNAPSHOTversion><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins>build><packaging>jarpackaging><name>kafka-demoname><url>http://maven.apache.orgurl><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding>properties><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version><scope>testscope>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>1.0.0version>dependency>dependencies>
project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  1. 生产者代码实现:
package com.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/*** 生产者代码*/
public class DemoProducer{public static void main( String[] args ){Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer producer = new KafkaProducer<>(props);for(int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("test", Integer.toString(i), "message_value=====>" + i));}producer.close();}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  1. 消费者代码实现:
package com.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;
/*** 消费者代码*/
public class DemoConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while(true) {ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

消息结果:

message_value=====>1
message_value=====>5
message_value=====>7
message_value=====>8
message_value=====>4
message_value=====>6
message_value=====>0
message_value=====>2
message_value=====>3
message_value=====>9

源代码下载地址:http://download.csdn.net/download/yuan_xw/10228246

学习Kafka推荐书籍:

  1. 《Kafka入门与实践》

  2. 《Kafka技术内幕 图文详解Kafka源码设计与实现》

  3. 《流式架构:Kafka与MapR Streams数据流处理》

  4. 《Scala语言基础与开发实战》

  5. 《Kafka权威指南》

  6. 《Kafka源码解析与实战》

–以上为《Kafka集群环境安装》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。——厚积薄发(yuanxw)


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部