Kafka——Kafka安装(kafka_2.13-4.1.1)
标签搜索

Kafka——Kafka安装(kafka_2.13-4.1.1)

mrui
2025-11-27 / 0 评论 / 2 阅读 / 正在检测是否收录...

kafka2.8.0版本引入了基于Raft共识协议的新特性,它允许kafka集群在没有ZooKeeper的情况下运行。

为了剥离和去除ZooKeeper,Kafka引入了自己的KRaft(Kafka Raft Metadata Mode)。

KRaft是一个新的元数据管理架构,基于Raft一致性算法实现的一种内置元数据管理方式,旨在替代ZooKeeper的元数据管理功能。

KRaft的优势有以下几点:

简化部署:

Kafka 集群不再依赖外部的 ZooKeeper 集群,简化了部署和运维的复杂性。

KRaft 将所有协调服务嵌入 Kafka 自身,不再依赖外部系统,这样大大简化了部署和管理,因为管理员只需关注 Kafka 集群。

高效的一致性协议:

Raft 是一种简洁且易于理解的一致性算法,易于调试和实现。KRaft 利用 Raft 协议实现了强一致性的元数据管理,优化了复制机制。

提高性能:

由于元数据管理不再依赖 ZooKeeper,Kafka 集群的性能得到了提升,尤其是在元数据读写方面。

增强可扩展性:

KRaft 模式支持更大的集群规模,可以有效地扩展到数百万个分区。

提高元数据操作的扩展性:新的架构允许更多的并发操作,并减少了因为扩展性问题导致的瓶颈,特别是在高负载场景中。

更快的控制器故障转移:

控制器(Controller)的选举和故障转移速度更快,提高了集群的稳定性。

消除 ZooKeeper 作为中间层之后,Kafka 的延迟性能有望得到改善,特别是在涉及选主和元数据更新的场景中。

KRaft模式下,kafka集群中的一些节点被指定为控制器(Controller),它们负责集群的元数据管理和共识服务,所有的元数据都存储在kafka内部的主题中,

而不是ZooKeeper,控制器通过KRaft协议来确保元数据在集群中的准确复制,这种模式使用了基于时间的存储模型,通过定期快照来保证元数据日志不会无限增长。

完全自主:

因为是自家产品,所以产品的架构设计,代码开发都可以自己说了算,未来架构走向完全控制在自己手上。

控制器(Controller)节点的去中心化:

KRaft 模式中,控制器节点由一组 Kafka 服务进程代替,而不是一个独立的 ZooKeeper 集群。

这些节点共同负责管理集群的元数据,通过 Raft 实现数据的一致性。

日志复制和恢复机制:

利用 Raft 的日志复制和状态机应用机制,KRaft 实现了对元数据变更的强一致性支持,这意味着所有控制器节点都能够就集群状态达成共识。

动态集群管理:

KRaft允许动态地向集群中添加或移除节点,而无需手动去ZooKeeper中更新配置,这使得集群管理更为便捷。

下载

kakfa官网 下载最新的安装包,推荐下载已经编译好的二进制包。

wget https://dlcdn.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgz
tar -xzf kafka_2.13-4.1.1.tgz
mv kafka_2.13-4.1.1 /opt/kafka

修改配置文件

修改位于config目录下的server.properties

############################# Server Basics #############################

#定义节点角色。broker,处理消息存储、生产者/消费者请求;controller,管理集群元数据(分区状态、副本分配)。
process.roles=broker,controller
# 节点id,要求集群内唯一,每个节点的id都要不同(如 1、2、3)。
node.id=1
#Controller节点启动时发现的初始服务器列表(用于加入集群)。
controller.quorum.bootstrap.servers=kf1:9093,kf2:9093,kf3:9093
#controller集群成员列表,集群内部选举和同步元数据,所有 Controller 节点必须在此列表中,且数量需满足多数派(如 3 节点集群需至少 2 个在线)。
controller.quorum.voters=1@kf1:9093,2@kf2:9093,3@kf3:9093

############################# Socket Server Settings #####################

#节点绑定的网络接口和端口。PLAINTEXT:Broker服务端口(接收客户端请求),CONTROLLER:Controller间通信端口。
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
#Broker 之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
#客户端实际连接的地址(Broker 会返回此地址给客户端)。
advertised.listeners=PLAINTEXT://192.168.88.51:9092,CONTROLLER://192.168.88.51:9093
#Controller 节点间通信使用的监听器名称
controller.listener.names=CONTROLLER
#监听器名称与安全协议的映射关系。
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#处理网络请求的线程数(如接收请求、发送响应)。
num.network.threads=3
#处理磁盘 I/O 的线程数(如读写日志文件)。
num.io.threads=8
#Socket 缓冲区
socket.send.buffer.bytes=102400  #发送数据缓冲区大小(100KB)。
socket.receive.buffer.bytes=102400 #接收数据缓冲区大小(100KB)。
socket.request.max.bytes=104857600 #请求最大长度(100MB),防止 OOM。

############################# Log Basics #############################

#消息日志存储目录(可配置多个路径,逗号分隔)。
log.dirs=/opt/kafka_data
# 日志目录(可选)
kafka_logs_dir=/var/log/kafka
#分区与恢复
num.partitions=3 #新建 Topic 的默认分区数。
num.recovery.threads.per.data.dir=1 #每个日志目录用于崩溃恢复的线程数。

############################# Internal Topic Settings  ####################

#副本因子与 ISR
offsets.topic.replication.factor=3 # __consumer_offsets 副本数(生产环境建议 ≥3)
share.coordinator.state.topic.replication.factor=2 # Share Group 状态主题副本数
share.coordinator.state.topic.min.isr=2 
#share.coordinator.state.topic:Kafka Share Group 协调器的状态主题(内部主题)。min.isr:最小同步副本数(Minimum In-Sync Replicas),即写入操作需确认的最小同步副本数量(含 Leader)。配置值 1:表示向 share.coordinator.state.topic写入数据时,只需 Leader 副本确认即可认为成功(无需等待 Follower 副本同步)。逻辑:ISR(In-Sync Replicas,同步副本集)是 Leader 副本及与其保持同步的 Follower 副本的集合。min.isr=1意味着只要 Leader 副本存活且可写,写入就成功(即使没有其他同步副本)。
transaction.state.log.replication.factor=3 
#transaction.state.log:Kafka 事务协调器的状态日志主题。事务协调器负责管理分布式事务(如跨分区/主题的事务提交/中止),该主题存储事务元数据(如事务 ID、参与者列表、状态(进行中/已提交/已中止)、超时时间等)。replication.factor:副本因子,即主题每个分区的副本总数(含 Leader 副本)。配置值 1:表示 transaction.state.log的每个分区仅有 1 个副本(仅 Leader 副本,无 Follower 副本)。逻辑:副本因子决定数据冗余度。replication.factor=1意味着数据仅存储在单个 Broker 上,无任何备份。
transaction.state.log.min.isr=3
#transaction.state.log:指定该参数作用于 事务状态日志(而非普通业务 Topic)。min.isr:即“最小同步副本数”,表示事务状态日志的每个分区,必须至少有 N 个副本(包括 Leader 副本)与 Leader 保持同步,否则该分区会进入“不可用”状态(无法写入新事务记录)。当事务状态日志的某个分区进行写入时,Kafka 会确保该分区的 同步副本集(ISR)大小 ≥ 3。只有当至少 3 个副本(Leader + 2 个 Follower)都成功复制了事务日志条目后,才会向生产者返回“事务提交成功”的响应。
default.replication.factor=3
#普通主题默认的副本因子数量
############################# Log Flush Policy ############################

#日志保留策略
log.retention.hours=168 #日志保留时间(168 小时 = 7 天)。
log.segment.bytes=1073741824 #单个日志分段大小(1GB),达到后滚动生成新文件。
log.retention.check.interval.ms=300000 #检查日志保留的间隔(5 分钟)。

在节点2上修改

node.id=2
advertised.listeners=PLAINTEXT://192.168.88.52:9092,CONTROLLER://192.168.88.52:9093

在节点3上修改

node.id=3
advertised.listeners=PLAINTEXT://192.168.88.53:9092,CONTROLLER://192.168.88.53:9093

初始化集群并启动

首次启动时执行
#在任一节点执行,仅执行一次,生成唯一集群ID
bin/kafka-storage.sh random-uuid
0jQIB8NvQ3muEJg2vXIRRA
#在每个节点执行,格式化存储
bin/kafka-storage.sh format -t 0jQIB8NvQ3muEJg2vXIRRA -c config/server.propertie
Formatting metadata directory /opt/kafka_data with metadata.version 4.1-IV1.
启动服务
配置开机自启

创建service文件

vim /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
RestartSec=10
KillMode=mixed
TimeoutStopSec=30
Environment="LOG_DIR=/var/log/kafka"
Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G" # 显式设置堆内存

# 日志重定向
StandardOutput=file:/var/log/kafka/service.log
StandardError=file:/var/log/kafka/service-error.log

[Install]
WantedBy=multi-user.target
# 重载 systemd 配置
sudo systemctl daemon-reload

创建 Kafka 专用用户和目录

# 创建系统用户
sudo useradd -r -s /sbin/nologin kafka

# 创建数据/日志目录
sudo mkdir -p /opt/kafka_data
sudo mkdir -p /var/log/kafka
sudo chown -R kafka:kafka /opt/kafka_data /var/log/kafka

# 授权 Kafka 安装目录
sudo chown -R kafka:kafka /opt/kafka
按顺序启动节点

可以使用systemctl命令或者手动的方式启动服务,

#systemctl启动
systemctl enable --now kafka
#前台启动
bin/kafka-server-start.sh config/server.properties 
#后台启动
nohup bin/kafka-server-start.sh config/server.properties  &
查看集群状态
bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092

# 192.168.88.51:9092 Broker 对外提供服务的 IP:端口(客户端实际连接的地址)。
#id: 1  Broker 的唯一标识 broker.id(集群中必须唯一,此处为 1、2、3)。
# rack: null 机架信息(未配置,故为 null,用于副本跨机架分布策略)。
#isFenced: false Broker 未被“隔离”(正常工作状态,若为 true则无法处理请求)。

192.168.88.51:9092 (id: 1 rack: null isFenced: false) -> (
    Produce(0): 0 to 13 [usable: 13],
#Produce(0) API 名称(Produce,消息生产 API),括号内 0是该 API 的 固定 key(Kafka 定义)。
#0 to 13 该 API 支持的 版本范围(从 v0 到 v13,共 14 个版本)。
#usable: 13 客户端与 Broker 协商后 实际使用的版本(取双方都支持的最高版本,性能最优)。
    Fetch(1): 4 to 18 [usable: 18],
    ListOffsets(2): 1 to 10 [usable: 10],
    Metadata(3): 0 to 13 [usable: 13],
    OffsetCommit(8): 2 to 9 [usable: 9],
    OffsetFetch(9): 1 to 9 [usable: 9],
    FindCoordinator(10): 0 to 6 [usable: 6],
    JoinGroup(11): 0 to 9 [usable: 9],
    Heartbeat(12): 0 to 4 [usable: 4],
    LeaveGroup(13): 0 to 5 [usable: 5],
    SyncGroup(14): 0 to 5 [usable: 5],
    DescribeGroups(15): 0 to 6 [usable: 6],
    ListGroups(16): 0 to 5 [usable: 5],
    SaslHandshake(17): 0 to 1 [usable: 1],
    ApiVersions(18): 0 to 4 [usable: 4],
    CreateTopics(19): 2 to 7 [usable: 7],
    DeleteTopics(20): 1 to 6 [usable: 6],
    DeleteRecords(21): 0 to 2 [usable: 2],
    InitProducerId(22): 0 to 5 [usable: 5],
    OffsetForLeaderEpoch(23): 2 to 4 [usable: 4],
    AddPartitionsToTxn(24): 0 to 5 [usable: 5],
    AddOffsetsToTxn(25): 0 to 4 [usable: 4],
    EndTxn(26): 0 to 5 [usable: 5],
    WriteTxnMarkers(27): 1 [usable: 1],
    TxnOffsetCommit(28): 0 to 5 [usable: 5],
    DescribeAcls(29): 1 to 3 [usable: 3],
    CreateAcls(30): 1 to 3 [usable: 3],
    DeleteAcls(31): 1 to 3 [usable: 3],
    DescribeConfigs(32): 1 to 4 [usable: 4],
    AlterConfigs(33): 0 to 2 [usable: 2],
    AlterReplicaLogDirs(34): 1 to 2 [usable: 2],
    DescribeLogDirs(35): 1 to 4 [usable: 4],
    SaslAuthenticate(36): 0 to 2 [usable: 2],
    CreatePartitions(37): 0 to 3 [usable: 3],
    CreateDelegationToken(38): 1 to 3 [usable: 3],
    RenewDelegationToken(39): 1 to 2 [usable: 2],
    ExpireDelegationToken(40): 1 to 2 [usable: 2],
    DescribeDelegationToken(41): 1 to 3 [usable: 3],
    DeleteGroups(42): 0 to 2 [usable: 2],
    ElectLeaders(43): 0 to 2 [usable: 2],
    IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
    AlterPartitionReassignments(45): 0 to 1 [usable: 1],
    ListPartitionReassignments(46): 0 [usable: 0],
    OffsetDelete(47): 0 [usable: 0],
    DescribeClientQuotas(48): 0 to 1 [usable: 1],
    AlterClientQuotas(49): 0 to 1 [usable: 1],
    DescribeUserScramCredentials(50): 0 [usable: 0],
    AlterUserScramCredentials(51): 0 [usable: 0],
    DescribeQuorum(55): 0 to 2 [usable: 2],
    UpdateFeatures(57): 0 to 2 [usable: 2],
    DescribeCluster(60): 0 to 2 [usable: 2],
    DescribeProducers(61): 0 [usable: 0],
    UnregisterBroker(64): 0 [usable: 0],
    DescribeTransactions(65): 0 [usable: 0],
    ListTransactions(66): 0 to 2 [usable: 2],
    ConsumerGroupHeartbeat(68): 0 to 1 [usable: 1],
    ConsumerGroupDescribe(69): 0 to 1 [usable: 1],
    GetTelemetrySubscriptions(71): UNSUPPORTED,
    PushTelemetry(72): UNSUPPORTED,
    ListConfigResources(74): 0 to 1 [usable: 1],
    DescribeTopicPartitions(75): 0 [usable: 0],
    ShareGroupHeartbeat(76): 1 [usable: 1],
    ShareGroupDescribe(77): 1 [usable: 1],
    ShareFetch(78): 1 [usable: 1],
    ShareAcknowledge(79): 1 [usable: 1],
    AddRaftVoter(80): 0 [usable: 0],
    RemoveRaftVoter(81): 0 [usable: 0],
    InitializeShareGroupState(83): 0 [usable: 0],
    ReadShareGroupState(84): 0 [usable: 0],
    WriteShareGroupState(85): 0 [usable: 0],
    DeleteShareGroupState(86): 0 [usable: 0],
    ReadShareGroupStateSummary(87): 0 [usable: 0],
    StreamsGroupHeartbeat(88): UNSUPPORTED,
    StreamsGroupDescribe(89): UNSUPPORTED,
    DescribeShareGroupOffsets(90): 0 [usable: 0],
    AlterShareGroupOffsets(91): 0 [usable: 0],
    DeleteShareGroupOffsets(92): 0 [usable: 0]
)
192.168.88.52:9092 (id: 2 rack: null isFenced: false) -> (
……
)
192.168.88.53:9092 (id: 3 rack: null isFenced: false) -> (
……
)
0

评论 (0)

取消