kafka生产者的缓存机制介绍

2021-09-16 大数据 1294
ForFuture
ForFuture VIP8 2021-09-16 01:05:019
所需:null积分

afka生产者的缓存机制介绍

前言

从上篇文章我们讲到的《kafka生产端拦截器、序列化器、分区器的详细介绍》一文知道生产者客户端发送消息,会先经过拦截器、序列化器和分区器等一系列模块,之后消息便会被写到缓存。那么今天我们来看下kafka生产者客户端缓存架构的设计是怎样的?我们先整体看下发送消息到服务端的完整过程和架构:

消息累加器

其实,整个生产者客户端是由两个线程协调运行的,一个是主线程Producer线程,一个是Sender线程。由主线程生产消息,然后缓存到消息累加器(RecordAccumulator);而Sender线程则负责从消息累加器中不断获取消息,然后发送到kakfa broker。这个过程大致如下图:

那为什么需要消息累加器呢?直接发送不是更直接?其实主要是用来缓存消息以便Sender线程可以每次批量发送,从而减少网络传输的资源消耗;而数据到服务端,服务端也可以批量写操作,从而减少磁盘I/O资源消耗,可以提升性能。这个设计,也可以运用于我们平时的业务需求场景开发中。

那么消息累加器中的结构是怎样的呢?消息累加器的内部为每个区分维护一个双端队列Deque,队列的内容是ProducerBatch;而ProducerBatch包含了一个或者多个ProducerRecord消息。

ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; 复制代码

具体结构如下图:

消息Record写入到消息累加器时,会被追加到双端队列的尾部。而尾部的ProducerBatch剩余可用空间也意味着能否再写入本次的这条消息ProducerRecord。ProducerBatch的大小和batch.size参数有着密切的关系。batch.size的参数大小控制着ProducerBatch的大小,默认是16KB大小。

整个过程是这样的:

  1. 消息写入到消息累加器时,先寻找其对应分区的双端队列(如果不存在则新建队列)

  2. 从双端队列的尾部获取一个ProducerBatch对象(如果不存在则新建Batch对象)

  3. 判断ProducerBatch是否还有剩余空间写入本次的消息Record,如果可以则写入(如果不可以则需要创建一个新的ProducerBatch对象)

  4. 新建ProducerBatch对象时会评估这条消息的大小是否超过batch.size参数的大小,如果不超过则以batch.size参数大小创建Batch对象(如果超过,就以消息的评估大小创建新Batch对象)。而用batch.size创建的内存空间是会被BufferPool管理的,可以进行空间复用;而超过batch.size创建的内存空间则不会被复用,使用完成则被回收。

而整个消息累加器的缓存空间与buffer.memory参数有关。默认是32MB大小。如果生产者客户端需要向很多区分发送比较多的消息,则可以根据实际情况将此参数适当调大以增加整体的吞吐量。

Sender线程

Sender线程则异步从消息累加器中获取缓存的消息,然后将其转为指定格式的ProducerRequest对象,将Request对象请求发往各个broker了。不过,请求在从Sender线程发往broker之前还会被保存到InFlightRequests中,其主要作用是缓存了已经发出去但还没有收到服务端响应的请求。

InFlightRequests相关的有一种重要的配置参数是max.in.flight.requests.per.connection。该参数表示每个连接(客户端与broker node节点之间的网络连接)的最多缓存请求数,默认值为5个。当超过该数值之后,消费者客户端便不能再向这个连接发送更多的请求了。另外也得注意,当该参数配置大于1时,由于因为失败重试原因,可能会存在消息乱序的风险。

总结

本次我们了解了生产者客户端的两个线程,主线程Producer线程和Sender线程的各自的职责和作用;同时也了解了kafka的Producer客户端发送缓存机制。对于优秀的设计机制,我们也可以思考借鉴运用于其他类似的业务开发中。


评论
    热搜排行🔥
    新版-Go开发工程师(完结) 757
    2021年最新使用Java开发企业级秒杀系统实战视频教程 1348
    Vue3 系统入门与项目实战 1193
    新冠疫情防控指挥作战平台入门到项目实战 1641
    Java 源码剖析 34 讲 921
    容器实战高手课(PDF+MP3+HTML完结) 1490
    架构师训练营(完整) 1316
    协程原理从入门到精通(完结) 1471
    Java高级架构师三大阶段 1134
    SpringBoot+Mysql实现的校园在线兼职实习招聘管理系统源码+运行视频教程 1435
    相似资源🌱
    kafka生产者的缓存机制介绍 1094
    使用JavaEE技术开发企业级实战项目之物流系统视频教程 1154
    2021年最新阿里P7 Java 升级教程 2020
    2021年价值1万元的Java企业级架构师视频教程(图灵4期完整) 1099
    大数据开发工程师(完结) 1056
    Spring Cloud分布式微服务实战,养成应对复杂业务的技术能力 1093
    最新图灵5期架构师视频课程 841
    Java就业急训营(完结) 1173
    JAVA互联网架构师专题/分布式/高并发/微服务【第三期】 1177
    Java工程师高薪训练营(全) 1189
    取消

    『FORFUTURE RESOURCES. JUST FOR YOU!』

    扫码支持
    ㊟:感谢您的支持🌹,我会继续努力的

    打开支付宝扫一扫,即可进行扫码打赏哦!!

    Powered by JavaCoderForFuture Group™💖

    ForFuture Group