玩了分布式这么久,你不会连Kafka都不清楚吧
Kafka 现在在企业和互联网项目中的应用越来越多了,本篇文章就从 Kafka 的基础开始带你一展 Kafka 的宏图。
图片来自 Pexels
什么是 Kafka
Kafka 是一个分布式流式平台,它有三个关键能力:
订阅发布记录流,它类似于企业中的消息队列或企业消息传递系统。
以容错的方式存储记录流。
实时记录流。
Kafka 的应用:
作为消息系统。
作为存储系统。
作为流处理器。
Kafka 作为消息系统
Kafka 作为消息系统,它有三个基本组件:
Producer : 发布消息的客户端
Broker:一个从生产者接受并存储消息的客户端
Consumer : 消费者从 Broker 中读取消息
为了在这样的消息系统中传输数据,你需要有合适的数据管道:
这种数据的交互看起来就很混乱,如果我们使用消息传递系统,那么系统就会变得更加简单和整洁。
Kafka 集群存储消息记录的目录被称为 Topics。
每一条消息记录包含三个要素:键(Key)、值(Value)、时间戳(Timestamp)。
核心 API
Producer API,它允许应用程序向一个或多个 Topics 上发送消息记录。
Consumer API,允许应用程序订阅一个或多个 Topics 并处理为其生成的记录流。
Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改。
Kafka 基本概念
Topic
Partition
Partition 译为分区,Topic 中的消息被分割为一个或多个的 Partition,它是一个物理概念,对应到系统上就是一个或若干个目录,一个分区就是一个提交日志。消息以追加的形式写入分区,先后以顺序的方式读取。
Segment
Broker
这种复制的机制为分区提供了消息冗余,如果一个 Broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。
Producer
Consumer
消费者,即消息的使用者,一个消费者可以消费多个 Topic 的消息,对于某一个 Topic 的消息,其只会消费同一个 Partition 中的消息。
确保安装环境
安装 Java 环境
如果没有安装 Java 环境的话,可以按照这篇文章进行安装:
https://www.cnblogs.com/zs-notes/p/8535275.html
安装 Zookeeper 环境
Zookeeper 单机搭建
Zookeeper 单机搭建比较简单,直接从官网下载一个稳定版本的 Zookeeper:
https://www.apache.org/dyn/closer.cgi/zookeeper/
进入 bin 目录,启动服务输入命令 ./zkServer.sh start 输出下面内容表示搭建成功:
Zookeeper 集群搭建
①准备条件
②设置集群
新建完成后,需要编辑 conf/zoo.cfg 文件,三个文件的内容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888
tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒。
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒。
dataDir:快照日志的存储路径。
dataLogDir:事务日志的存储路径,如果不配置这个那么事务日志会默认存储到 dataDir 指定的目录,这样会严重影响 ZK 的性能,当 ZK 吞吐量较大的时候,产生的事务日志、快照日志太多。
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
③创建 myid 文件
在了解完其配置文件后,现在来创建每个集群节点的 myid ,我们上面说过,这个 myid 就是 server.1 的这个 1 ,类似的,需要为集群中的每个服务都指定标识,使用 echo 命令进行创建:
# server.1
echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.2
echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.3
echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
④启动服务并测试
启动服务(每台都需要执行):
cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start
192.168.1.7 --- follower:
192.168.1.8 --- leader:
192.168.1.9 --- follower:
Kafka 集群搭建
准备条件
准备条件如下:
搭建好的 Zookeeper 集群
Kafka 压缩包
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
我们可以看到有很多 properties 配置文件,这里主要关注 server.properties 这个文件即可。
一种是使用 Kafka 自带的 Zookeeper 配置文件来启动(可以按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker)。
一种是通过使用独立的 ZK 集群来启动,这里推荐使用第二种方式,使用 ZK 集群来启动。
②修改配置项
需要为每个服务都修改一下配置项,也就是 server.properties, 需要更新和添加的内容有:
broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置项的含义:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
③启动 Kafka 集群并测试
# 启动后台进程
./kafka-server-start.sh -daemon ../config/server.properties
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下
bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
Replication-factor 2:复制两份
Partitions 1:创建1个分区
Topic:创建主题
查看我们的主题是否创建成功:
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
在一台机器上创建一个发布者:
# 创建一个broker,发布者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一台服务器上创建一个订阅者:
# 创建一个consumer, 消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:这里使用 --zookeeper 的话可能出现 zookeeper is not a recognized option 的错误,这是因为 Kafka 版本太高,需要使用 --bootstrap-server 指令。
发布:
消费:
④其他命令
显示 topic:
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
# 显示
cxuantopic
查看 topic 状态:
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic
# 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
# 分区为为1 复制因子为2 主题 cxuantopic 的分区为0
# Replicas: 0,1 复制的为1,2
⑤验证多节点接收数据
在另外两个节点上使用:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
配置详解
常规配置
port:如果使用配置样本来启动 Kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口。
zookeeper.connect:用于保存 Broker 元数据的地址是通过 zookeeper.connect 来指定。
localhost:2181:表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表。
每一部分含义如下:
hostname 是 Zookeeper 服务器的服务名或 IP 地址。
port 是 Zookeeper 连接的端口。
/path 是可选的 Zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用跟路径。
log.dirs:Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。
如果指定了多个路径,那么 Broker 会根据 "最少使用" 原则,把同一分区的日志片段保存到同一路径下。
要注意,Broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。
num.recovery.threads.per.data.dir:对于如下三种情况,Kafka 会使用可配置的线程池来处理日志片段:
服务器正常启动,用于打开每个分区的日志片段。
服务器崩溃后启动,用于检查和截断每个分区的日志片段。
服务器正常关闭,用于关闭日志片段。
auto.create.topics.enable:默认情况下,Kafka 会在如下 3 种情况下创建主题:
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户向主题发送元数据请求时
delete.topic.enable:如果你想要删除一个主题,你可以使用主题管理工具。
默认情况下,是不允许删除主题的,delete.topic.enable 的默认值是 false 因此你不能随意删除主题。
这是对生产环境的合理性保护,但是在开发环境和测试环境,是可以允许你删除主题的,所以,如果你想要删除主题,需要把 delete.topic.enable 设为 true。
主题默认配置
num.partitions:num.partitions 参数指定了新创建的主题需要包含多少个分区。
如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。
log.retention.ms:Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。
除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。
log.retention.bytes:另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。
也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。
所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。
log.segment.bytes:上述的日志都是作用在日志片段上,而不是作用在单个消息上。
当消息到达 Broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。
如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
log.segment.ms:上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数。
log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。
message.max.bytes:Broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB。
如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 Broker 返回的错误消息。
跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值。
Kafka【第一篇】Kafka 集群搭建
https://juejin.im/post/5ba792f5e51d450e9e44184d
https://blog.csdn.net/k393393/article/details/93099276
《Kafka 权威指南》
https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/
作者:cxuan
编辑:陶家龙、孙淑娟
出处:转载自微信公众号 Java 极客技术(ID:Javageektech)
精彩文章推荐:
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
随时掌握互联网精彩
- 1 奋力打开改革发展新天地 7971491
- 2 中国黄金原董事长家搜出大量黄金 7979404
- 3 空调英文不会男生盯着考场空调看 7807672
- 4 “冷资源”里的“热经济” 7760532
- 5 被铁路售票员的手速惊到了 7634993
- 6 网红赤木刚宪爆改赵露思 7513374
- 7 特朗普想拿下世界第一大岛 7473391
- 8 山姆代购在厕所分装蛋糕 7336230
- 9 女演员陈丽君回应获最佳男主角奖 7272946
- 10 刘强东提前发年终奖 7166580