kafka learning 02 生产者

1.kafka 生产者简介

1.1 生产者传输实体

kafka Producer 发送的数据对象叫做 ProducerRecord,它有四个关键参数:

  • Topic – 主题
  • Partition – 分区(非必填)
  • Key – 键(非必填)
  • Value – 值

1.2 生产者发送流程

kafka 生产者发送消息流程:

  1. 序列化 – 发送前,生产者要先把键和值序列化。
  2. 分区 – 数据被传给分区器。如果在 ProducerRecord 中已经指定了分区(Partition),那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键(Key)来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。
  3. 批次传输 – 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
    • 批次,就是一组消息,这些消息属于同一个主题和分区
    • 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。
  4. 响应 – 服务器收到消息会返回一个响应。
    • 如果成功,则返回一个 RecordMetaData 对象,它包含了主题、分区、偏移量
    • 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?

  • 生产者会向任意 broker 发送一个元数据请求(MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。
  • 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

2.生产者 API

2.1.创建生产者

Kafka 生产者核心配置:

  • bootstrap.servers – 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。

2.2.异步发送

直接发送消息,不关心消息是否到达。

这种方式吞吐量最高,但有小概率会丢失消息。

2.3.同步发送

2.4.异步响应发送

2.5.关闭连接

3.生产者的连接

Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。

3.1. 何时创建 TCP 连接

Kafka 生产者创建连接有三个时机:

(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。

(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。

  • 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
  • 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。

3.2.何时关闭 TCP 连接

Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭

主动关闭是指调用 producer.close() 方法来关闭生产者连接;甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。

如果设置 Producer 端 connections.max.idle.ms 参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms 指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1,TCP 连接将成为永久长连接。

值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

4.序列化

Go 本身不内置 kakfa 的序列化器,只能通过一些第三方库,比如 IBM/sarama、kafka-go 等,它们均提供了常见类型的序列化方式。

  • sarama:依赖于用户提供的序列化方法。通常,用户需要自己处理消息的序列化和反序列化。常见的做法是将消息数据编码为 JSON、Protobuf 或 Avro 格式(常用的消息格式)。在发送消息时,用户需要将消息对象转为字节流(byte slice),然后通过 sarama.ProducerMessage 发送到 Kafka。
  • kafka-go:kafka-go 支持 StringEncoderBytesEncoder,并且可以轻松扩展以支持其他格式(如 JSON 或 Protobuf)

5.分区

5.1.什么是分区

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。

5.2.为什么要分区

为什么 Kafka 的数据结构采用三级结构?

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。

5.3.分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。

前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。

如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。

  • 没有 key 时的分发逻辑:每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。
  • 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错

5.4.自定义分区策略

如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?在 Java 中,可以按如下方式实现:

首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partition 和 close,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的 topickeykeyBytesvalue和 valueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。

接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。

负载均衡算法常见的有:

  • 随机算法
  • 轮询算法
  • 最小活跃数算法
  • 源地址哈希算法

可以根据实际需要去实现

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇