Kafka3.0.0版本不再支持Java8,Kafka2.8.0可替换不使用ZooKeeper
下载zookeeper:https://link.csdn.net/?target=https%3A%2F%2Farchive.apache.org%2Fdist%2Fzookeeper%2F
tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz #解压
mv /data/zookeeper/apache-zookeeper-3.6.4-bin/conf/zoo_sample.cfg zoo.cfg 修改文件名
2888:zookeeper的leader和follower进行消息交换的端口
3888:zookeeper进行选举时信息交换的端口
dataDir=/opt/zkData #修改zookeeper数据保存位置
#在zoo.cfg中增加集群配置
server.1=192.168.236.137:2888:3888
server.2=192.168.236.138:2888:3888
server.3=192.168.236.139:2888:3888
myid中的内容即为当前IP对应的server后面的数字
cd /opt/
mkdir zkData
vi myid
#根据zoo.cfg中的ip填上前面的序号,然后保存
/data/zookeeper/apache-zookeeper-3.6.4-bin/bin/zkServer.sh start #启动zookeeper
/data/zookeeper/apache-zookeeper-3.6.4-bin/bin/zkServer.sh stop #停止zookeeper
/data/zookeeper/apache-zookeeper-3.6.4-bin/bin/zkServer.sh status #查看状态
状态信息,leader和follower
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/apache-zookeeper-3.6.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
下载kafka:https://kafka.apache.org/downloads
tar -zxvf kafka_2.13-2.8.1.tgz
vi server.properties
修改配置文件如下
broker.id=0 #服务器id,集群唯一
log.dirs=/opt/kafka/data #kafka数据存储位置
zookeeper.connect=192.168.236.137:2181,192.168.236.138:2181,192.168.236.139:2181
listeners=PLAINTEXT://{内网IP}:9092 # 允许外部端口连接
broker.id :该数值在集群服务中为唯一,不允许重复。
log.dirs用于存储kafka数据
配置参数为zookeepe集群的地址,可以是多个,用逗号分割,一般端口都为2181;master:2181,slave0:2181,slave1:2181
#启动kafka
/data/kafka_2.13-2.8.1/bin/kafka-server-start.sh -daemon /data/kafka_2.13-2.8.1/config/server.properties
#停止kafka
/data/kafka_2.13-2.8.1/bin/kafka-server-stop.sh -daemon /data/kafka_2.13-2.8.1/config/server.properties
#创建topic并定义partition和replication比
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
#查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
#删除topic
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
cd /data/kafka_2.13-2.8.1/bin
#开启生产者
./kafka-console-producer.sh --broker-list 192.168.236.137:9092,192.168.236.138:9092,192.168.236.139:9092 --topic testTopic
#开启消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.236.137:9092,192.168.236.138:9092,192.168.236.139:9092 --topic testTopic --from-beginning
开启生产者和消费者,生产者发送数据,消费者就可以收到了
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.236.137:9092,192.168.236.138:9092,192.168.236.139:9092 --topic testTopic --time -1
结果:testTopic:0:4
#查看的是历史数据和执行之后的数据./kafka-console-consumer.sh --bootstrap-server 192.168.236.137:9092,192.168.236.138:9092,192.168.236.139:9092 --topic testTopic --from-beginning #查看执行后推上来的数据
./kafka-console-consumer.sh --bootstrap-server 192.168.236.137:9092,192.168.236.138:9092,192.168.236.139:9092 --topic testTopic
注意:这里是滚动查看,只有有消息发送上来才能看到