Kafka Partition

Partition(分区)是Kafka的核心角色,对于 Kafka 的存储结构、消息的生产消费方式都至关重要。

Kafka 中 Topic 被分成多个 Partition 分区。Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据。

一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据。

Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。

这样会有多种好处:

如果把 Topic 的所有 Partition 都放在一个 Broker 上,那么这个 Topic 的可扩展性就大大降低了,会受限于这个 Broker 的 IO 能力。把 Partition 分散开之后,Topic 就可以水平扩展 。
一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker,那么支持的 Consumer 数量就有限,而分散之后,可以支持更多的 Consumer。
一个 Consumer 可以有多个实例,Partition 分布在多个 Broker 的话,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。可以让一个 Consumer 实例负责一个 Partition,这样消息处理既清晰又高效。

参考:https://blog.csdn.net/duysh/article/details/116481414

安装Kafka

Kafka依赖于Zookeeper,所以前置环境是Zookeeper已经安装好或者使用Kafka安装包中附带的ZooKeeper

下载Kafka

https://kafka.apache.org/downloads

示例安装的是kafka_2.13-3.1.0.tgz版本,使用JDK1.8

下载好之后放入/opt目录,并进行解压

tar -zvxf kafka_2.13-3.1.0.tgz

则解压之后Kafka的工作路径为/opt/kafka_2.13-3.1.0

启动ZooKeeper

设置配置文件

配置文件路径,此处运用了默认配置

/opt/kafka_2.13-3.1.0/config/zookeeper.properties

启动ZooKeeper

在工作路径下使用以下命令

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

*后台启动命令

./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 &

启动Kafka

配文件路径,此处使用默认配置

/opt/kafka_2.13-3.1.0/config/server.properties

启动Kafka

在工作路径下使用以下命令

./bin/kafka-server-start.sh ./config/server.properties

*后台启动命令

./bin/kafka-server-start.sh ./config/server.properties 1>/dev/null 2>&1 &

配置Topic

创建Topic为alpha

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic alpha

带参数创建Topic示例,创建Topic gamma设置partitions分区数量为2,replication-factor为1,max.message.bytes为128000

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic gamma --partitions 2 --replication-factor 1 --config max.message.bytes=128000

配置Topic示例

# 配置max.message.bytes为128000 bytes
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --config max.message.bytes=128000
# 删除max.message.bytes配置
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --delete-config max.message.bytes
# 配置partitions分区
# Kafka partitions分区数量只允许增加,不允许减少
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --partitions 10
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic beta --partitions 3

删除Topic

注意:Topic不能直接删除,默认情况下,删除的是标记,没有实际删除这个Topic,需要通过以下方式进行操作

方式一:配置server.properties文件,给定参数delete.topic.enable=true,然后重启Kafka服务,之后即可执行原来的删除操作
方式二:通过delete命令删除后,手动将本地磁盘以及ZooKeeper上的相关Topic的信息删除

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test

查看Topic列表

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查看Topic描述

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic alpha

启动Consumer消费者,读取alpha Topic

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic alpha --from-beginning

启动Producer生产者

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic alpha

*安装JDK

Kafka和ZooKeeper依赖Java,所以如果系统环境还没有Java则先需要安装Java

下载Java

cd /usr/
mkdir java
cd java
wget https://download.oracle.com/otn/java/jdk/8u321-b07/df5ad55fdd604472a86a45a217032c7d/jdk-8u321-linux-x64.tar.gz?AuthParam=1645601010_474b98b6db80e40e79ff4005aa9e6dae
tar -zxvf jdk-8u321-linux-x64.tar.gz

配置环境变量

vi /etc/profile

最后加入以下内容

#java
export JAVA_HOME=/usr/java/jdk1.8.0_321
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib

更新环境变量

source /etc/profile

检查效果

java -version
java version "1.8.0_321"
Java(TM) SE Runtime Environment (build 1.8.0_321-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.321-b07, mixed mode)

*配置防火墙

可选操作,端口不可达的时候注意检查防火墙配置

1.关闭SELINUX

vi /etc/selinux/config

注释两条信息,增加disabled

#SELINUX=enforcing
#SELINUXTYPE=targeted
SELINUX=disabled

保存退出

:wq!

使配置立即生效

setenforce 0 

2.配置iptables

编辑iptables防火墙配置文件,主要增加-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

vi /etc/sysconfig/iptables

iptables配置参考如下

# Firewall configuration written by system-config-firewall

# Manual customization of this file is not recommended.
*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
-A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
-A INPUT -p icmp -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -j REJECT --reject-with icmp-host-prohibited
-A FORWARD -j REJECT --reject-with icmp-host-prohibited
COMMIT

保存退出

:wq! 

最后重启防火墙使配置生效

service iptables restart 

其它说明

1.在较新版本(2.2 及更高版本)的Kafka不再需要ZooKeeper连接字符串命令

参考链接

配置文件说明比较详细

https://www.cnblogs.com/sandea/p/12078442.html

https://www.jb51.net/article/76397.htm

有集群相关的说明

https://blog.csdn.net/panchang199266/article/details/82113453

Last modification:April 22nd, 2022 at 02:42 pm
硬币投入口