Kafka Partition
Partition(分区)是Kafka的核心角色,对于 Kafka 的存储结构、消息的生产消费方式都至关重要。
Kafka 中 Topic 被分成多个 Partition 分区。Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据。
一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据。
Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。
这样会有多种好处:
如果把 Topic 的所有 Partition 都放在一个 Broker 上,那么这个 Topic 的可扩展性就大大降低了,会受限于这个 Broker 的 IO 能力。把 Partition 分散开之后,Topic 就可以水平扩展 。
一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker,那么支持的 Consumer 数量就有限,而分散之后,可以支持更多的 Consumer。
一个 Consumer 可以有多个实例,Partition 分布在多个 Broker 的话,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。可以让一个 Consumer 实例负责一个 Partition,这样消息处理既清晰又高效。
参考:https://blog.csdn.net/duysh/article/details/116481414
安装Kafka
Kafka依赖于Zookeeper,所以前置环境是Zookeeper已经安装好或者使用Kafka安装包中附带的ZooKeeper
下载Kafka
https://kafka.apache.org/downloads
示例安装的是kafka_2.13-3.1.0.tgz版本,使用JDK1.8
下载好之后放入/opt目录,并进行解压
tar -zvxf kafka_2.13-3.1.0.tgz
则解压之后Kafka的工作路径为/opt/kafka_2.13-3.1.0
启动ZooKeeper
设置配置文件
配置文件路径,此处运用了默认配置
/opt/kafka_2.13-3.1.0/config/zookeeper.properties
启动ZooKeeper
在工作路径下使用以下命令
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
*后台启动命令
./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 &
启动Kafka
配文件路径,此处使用默认配置
/opt/kafka_2.13-3.1.0/config/server.properties
启动Kafka
在工作路径下使用以下命令
./bin/kafka-server-start.sh ./config/server.properties
*后台启动命令
./bin/kafka-server-start.sh ./config/server.properties 1>/dev/null 2>&1 &
配置Topic
创建Topic为alpha
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic alpha
带参数创建Topic示例,创建Topic gamma设置partitions分区数量为2,replication-factor为1,max.message.bytes为128000
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic gamma --partitions 2 --replication-factor 1 --config max.message.bytes=128000
配置Topic示例
# 配置max.message.bytes为128000 bytes
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --config max.message.bytes=128000
# 删除max.message.bytes配置
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --delete-config max.message.bytes
# 配置partitions分区
# Kafka partitions分区数量只允许增加,不允许减少
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --partitions 10
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --partitions 3
删除Topic
注意:Topic不能直接删除,默认情况下,删除的是标记,没有实际删除这个Topic,需要通过以下方式进行操作
方式一:配置server.properties文件,给定参数delete.topic.enable=true,然后重启Kafka服务,之后即可执行原来的删除操作
方式二:通过delete命令删除后,手动将本地磁盘以及ZooKeeper上的相关Topic的信息删除
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
查看Topic列表
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
查看Topic描述
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic alpha
启动Consumer消费者,读取alpha Topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic alpha --from-beginning
启动Producer生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic alpha
*安装JDK
Kafka和ZooKeeper依赖Java,所以如果系统环境还没有Java则先需要安装Java
下载Java
cd /usr/
mkdir java
cd java
wget https://download.oracle.com/otn/java/jdk/8u321-b07/df5ad55fdd604472a86a45a217032c7d/jdk-8u321-linux-x64.tar.gz?AuthParam=1645601010_474b98b6db80e40e79ff4005aa9e6dae
tar -zxvf jdk-8u321-linux-x64.tar.gz
配置环境变量
vi /etc/profile
最后加入以下内容
#java
export JAVA_HOME=/usr/java/jdk1.8.0_321
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib
更新环境变量
source /etc/profile
检查效果
java -version
java version "1.8.0_321"
Java(TM) SE Runtime Environment (build 1.8.0_321-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.321-b07, mixed mode)
*配置防火墙
可选操作,端口不可达的时候注意检查防火墙配置
1.关闭SELINUX
vi /etc/selinux/config
注释两条信息,增加disabled
#SELINUX=enforcing
#SELINUXTYPE=targeted
SELINUX=disabled
保存退出
:wq!
使配置立即生效
setenforce 0
2.配置iptables
编辑iptables防火墙配置文件,主要增加-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
vi /etc/sysconfig/iptables
iptables配置参考如下
# Firewall configuration written by system-config-firewall
# Manual customization of this file is not recommended.
*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
-A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
-A INPUT -p icmp -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -j REJECT --reject-with icmp-host-prohibited
-A FORWARD -j REJECT --reject-with icmp-host-prohibited
COMMIT
保存退出
:wq!
最后重启防火墙使配置生效
service iptables restart
其它说明
1.在较新版本(2.2 及更高版本)的Kafka不再需要ZooKeeper连接字符串命令
参考链接
配置文件说明比较详细
https://www.cnblogs.com/sandea/p/12078442.html
https://www.jb51.net/article/76397.htm
有集群相关的说明
https://blog.csdn.net/panchang199266/article/details/82113453