Kafka学习笔记(二)-生产者

一个生产者发送消息的核心代码简略如下:

KafkaProducer<String, String> producer = new KafkaProduce<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);

构建 Kafka 生产者对象,构建消息对象,调用发送接口。

生产者的消息结构

基本概念一文中我们知道:

生产者和消费者之间通过 Topic 传递消息,每个 Topic 又由多个 partition 组成

而 Topic 和 partition 的概念就定义在 ProducerRecord 类中。我们一起看下 ProducerRecord.java 的定义。

public class ProducerRecord<K, V> {
/** 主题 **/
private final String topic;
/** 分区 **/
private final Integer partition;
/** 消息头部,可以自定义传一些业务相关的数据,也可以不传 **/
private final Headers headers;
/** 可以用于计算分区,或者作为附加信息 **/
private final K key;
/** 消息内容,为空则表示墓碑消息,用于日志相关 **/
private final V value;
/** 消息创建的时间(CreateTime)或者追加到日志(LogAppendTime)的时间 **/
private final Long timestamp;
}

生产者的配置

生产者的配置都在 ProducerConfig.java 类中,主要记录下三个必填的:

  • BOOTSTRAP_SERVERS_CONFIG: 用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

生产者的消息发送

三种发送模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

同步和异步很好理解,send 方法本来返回的是个 Future 对象,即默认是异步的。而发后即忘,其实就是调用完 send 即不捕捉异常也不回调或者取结果(个人觉得算不上一种发送模式)。

同步和异步

我们看看 send 的代码:

/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

想要同步的话,采用send(message).get()方式即可。通过 get 阻塞住主流程即可。此时会返回 RecordMetadata 对象,这个对象中包含了 topic、partition、offset 等值。

异步的话,则需要传一个回调函数 callback。因为 Kafka 能保证单分区的有序性,因此,单分区的回调函数也会是有序的

序列化

在生产时指定序列化器,在消费时指定反序列化器。

这个没什么好说的,我们只要明确 Kafka 是以字节的形式通过 ?协议传输数据即可。

分区器

消息将发往的分区和我们指定的 partition 、key 有关。

指定 partition 不指定 partition
指定 key partition murmur2(key)
不指定 key partition random

我们稍微看下源码:

在前面 KafkaProducer.send 源码中我们发现,实际的发送消息的主流程函数是 doSend。在 doSend 发送消息的过程中,需要计算出消息发送到哪个分区,调用逻辑简略如下:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
...;
int partition = partition(record, serializedKey, serializedValue, cluster);
...;
}

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

如果指定了分区,则直接使用该指定的分区,否则通过分区器计算。

我们看一下他的 DefaultPartition 的实现。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

当 key 不为 null 时,做一个 murmur2 哈希算法,计算分区。否则就随机分区。

注:我们可以通过ProducerConfig.PARTITIONER_CLASS_CONFIG指定分区器

拦截器

拦截器的作用时期在发送消息前以及发送回调前

实现方式是实现org.apache.kafka.clients.producer.ProducerInterceptor接口即可。

多个拦截器可以通过ProducerConfig.INTERCEPTOR_CLASSES_CONFIG实现链路调用。

生产者的整体架构

CSDN某博客图片)

可以看到,消息发送的处理的顺序依次为:拦截器-序列化器-分区器。(拦截器的时候可以修改消息,此时还没有被序列化,可以印证这个顺序)

整个生产者客户端由两个线程组成,消息在主线程经过三步处理后,缓存到消息累加器中。Sender 线程负责从累加器中拿消息,创建 Request 请求,发送给 Kafka集群。

在 RecordAccumulator 的源码中可以看,它有这么一个成员变量:

public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
}

一个 key 为分区,value 为双端队列的并发 map。

在计算机科学中,batch 的含义指的是 批处理,在 Kafka 的这段代码里,这的就是一批要被发送到同一分区的消息。

在逻辑上,这些 batch 的内存是紧密相关的,但在物理内存中,Kafka 的 RecordAccumular 中有一个 BufferPool,用来复用内存,可以用来存放这些 batch。具体的存放规则就不细聊了。

参数配置

这个就不多聊了,自己看书吧。

文章作者: yPhantom
文章链接: https://guoyuxiang.cn/2021/03/27/kafka-note-2-producer/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Life Note