下载安装包
wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz# 解压
tar -zxf kafka_2.12-2.2.0.tgz# 复制kafka
cp -rp kafka_2.12-2.2.0 kafka-broker-1
新建kafka目录
cd /usr/local/kafkamkdir -p kafka/001/log
mkdir -p kafka/002/log
mkdir -p kafka/003/log
新建zookeeper目录
cd /usr/local/kafkamkdir -p zookeeper/001/log
mkdir -p zookeeper/002/log
mkdir -p zookeeper/003/log
mkdir -p zookeeper/001/data
mkdir -p zookeeper/002/data
mkdir -p zookeeper/003/data# 新建myid
cat 1 > zookeeper/001/data/myid
cat 2 > zookeeper/002/data/myid
cat 3 > zookeeper/003/data/myid
第一个broker(路径:/usr/local/kafka/kafka-broker-1/config)
新增jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin";
};
新增zk_jaas.conf
Server {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="admin"password="admin"user_admin="admin";
};Client {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="admin"password="admin";
};
修改zookeeper配置文件
zookeeper.properties
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/001/data
dataLogDir=/usr/local/kafka/zookeeper/001/log# 端口
clientPort=2181# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 加入export这一行
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-1/config/zk_jaas.conf ${KAFKA_OPTS}"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
broker.id=0listeners=SASL_PLAINTEXT://:9092listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAINlog.dirs=/usr/local/kafka/kafka/001/logzookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
修改kafka启动命令
kafka-run-class.sh
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then#KAFKA_OPTS=""
# 加入下面这一行KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/config/jaas.conf"
fi
配置第二个broker(路径:/usr/local/kafka/kafka-broker-2)
cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-2
修改zookeeper配置文件
zookeeper.properties
# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/002/data
dataLogDir=/usr/local/kafka/zookeeper/002/log# 修改
# 端口
clientPort=2182# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-2/config/zk_jaas.conf ${KAFKA_OPTS}"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
# 修改
broker.id=1# 修改
listeners=SASL_PLAINTEXT://:9093listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN# 修改
log.dirs=/usr/local/kafka/kafka/002/logzookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
配置第三个broker(路径:/usr/local/kafka/kafka-broker-3)
cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-3
修改zookeeper配置文件
zookeeper.properties
# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/003/data
dataLogDir=/usr/local/kafka/zookeeper/003/log# 修改
# 端口
clientPort=2183# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-3/config/zk_jaas.conf ${KAFKA_OPTS}"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
# 修改
broker.id=2# 修改
listeners=SASL_PLAINTEXT://:9094listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN# 修改
log.dirs=/usr/local/kafka/kafka/003/logzookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper启动
# 启动命令
nohup /usr/local/kafka/kafka-broker-1/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-1/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-2/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-2/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-3/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-3/config/zookeeper.properties 查看端口(正常应该有2181、3887、3888、3889和2887|2888|2889中的一个)
ss -lnput | egrep '2181|2887|3887|2888|3888|2889|3889'
kafka启动
# 启动命令
/usr/local/kafka/kafka-broker-1/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-1/config/server.properties
/usr/local/kafka/kafka-broker-2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-2/config/server.properties
/usr/local/kafka/kafka-broker-3/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-3/config/server.properties# 查看端口
ss -lnput | egrep '9092|9093|9094'
安装kafka-manager
# 安装yum-utils软件包
yum install -y yum-utils
# 设置docker仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装docker
yum install docker-ce docker-ce-cli containerd.io
# 启动docker
systemctl start docker# 安装kafka-manager
docker run -itd --rm -p 9000:9000 -e ZK_HOSTS="172.30.129.14:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager
访问http://xxx:9000/即可
Springboot项目集成
新建springboot项目,添加依赖
pom.xml
org.springframework.boot spring-boot-starter-parent 2.7.2
org.springframework.kafka spring-kafka
添加配置
kafka-jaas.config
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin";
};
application.properties
# 端口
server.port=8899# 应用名称
spring.application.name=spring-boot-test# kafka配置
# 生产者配置
spring.kafka.bootstrap-servers=xxx:9092,xxx:9093,xxx:9094
#发送失败后,重试次数,0表示不重试
spring.kafka.producer.retries=3
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT# 消费者配置
# 默认的消费组ID
spring.kafka.consumer.group-id=test-topic
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
添加kafka消费类
KafkaConsumer
@Component
public class KafkaConsumer {@KafkaListener(id="test-topic-consumer", topics = "test-topic")public void recieveMsg(ConsumerRecord consumerRecord) {System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",消息偏移量:" + consumerRecord.offset());}
}
添加kafka生产者
KafkaController
@RestController
@RequestMapping(value = "kafka")
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@PostMapping("sendMsg")public String sendMsg(String msg) {kafkaTemplate.send("test-topic", msg);return "success";}
}
SpringBoot启动类
SpringBootTestApplication
@SpringBootApplication
public class SpringBootTestApplication {public static void main(String[] args) throws FileNotFoundException {// 启动时配置sasl认证final File file = ResourceUtils.getFile("classpath:kafka-jaas.conf");System.setProperty("java.security.auth.login.config", file.getAbsolutePath());SpringApplication.run(SpringBootTestApplication.class, args);}}
使用postman发送请求测试即可。