多个消费者要消费kafka相同数据怎么办

发布网友 发布时间:2022-04-22 06:42

我来回答

2个回答

热心网友 时间:2022-04-10 14:17

在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区partition,通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理不管是procer还是consumer的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费具体如何确定consumer线程数目我们后面会详细说明。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:一、客户端/服务器端需要使用的内存就越多先说说客户端的情况。Kafka 082之后推出了Java版的全新的procer,这个procer有个参数batchsize,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数大部分情况下是最佳的消费吞吐量配置的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。二、文件句柄的开销每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄file handler。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的*。三、降低高可用性Kafka通过副本replica机制来保证高可用。具体做法就是为每个分区保存若干个副本replica_factor指定副本数。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理procer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的procer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的procer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / maxTp, TcTp表示procer的吞吐量。测试procer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。另外,Kafka并不能真正地做到线性扩展其实任何系统都不能,所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。消息-分区的分配默认情况下,Kafka根据传递消息的key来进行分区的分配,即hashkey

热心网友 时间:2022-04-10 15:35

将消费者放在不同的group

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com