首页 存档 技术 查看内容

kafra技术内幕:消费者初始化(scala)(一)

2018-3-30 13:00 |来自: 互联网 401 0

摘要: 架构师(JiaGouX)我们都是架构师! high-level consumer 有时候应用程序从Kafka读取数据,并不太关心消息offset的处理. 所以Hight Level Consumer提供了一个从Kafka消费数据的高层抽象. High Level Consumer将从 ...

架构师(JiaGouX)
我们都是架构师!


high-level consumer


有时候应用程序从Kafka读取数据,并不太关心消息offset的处理. 所以Hight Level Consumer提供了一个从Kafka消费数据的高层抽象.


High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中. 这个offset基于Consumer Group的名称.


Consumer Group是整个Kafka集群全局的. 所以要特别小心在新的逻辑启动之前要关闭所有的旧的逻辑(消费者进程).


当新的消费者加入同一个消费组时,Kafka会添加这个消费者的线程到要消费的topic的可用线程集合中,并且触发re-balance.


在re-balance时,kafka会分配可用的partition给可用的线程,可能移动一个partition到其他的线程中.


如果你的消费者逻辑混合了新的和旧的处理逻辑,很可能有些消息会被分配到旧的处理逻辑中.


High Level Consumer可以(应该)是多线程的应用程序.线程模型是以topic的partitions数量为中心的,不过有些规则:


  • ●如果线程数量多于partition的数量,有部分线程无法消费该topic下任何一条消息

  • ●如果线程数量少于partition的数量,有一些线程会消费多个partition的数据

  • ●如果线程数量等于partition的数量,则正好一个线程消费一个partition的数据

  • ●当添加更多的消费者进程/线程会触发re-balance,导致partition的分配发生了变化


如果一个线程消费多个partitions,并不会保证收到的消息的有序性,不过在同一个partition里的offset则是有序的.


比如某个消费者线程从partition-10接收5条消息,然后从partition-11接收了6条消息,接着再从partition-10接收了5条消息,


然后还是从partition-10又接收了5条消息,即使这个时候partition-11也有数据. 但是并不保证partition之间的顺序.

High-level的实现依赖于Consumer Group.Kafka保证同一consumer group中只有一个consumer会消费某条消息.


即每一个consumer实例只会消费某一个或多个特定partition的数据,而一个partition的数据只会被一个特定的consumer所消费.


  • ●同一条消息会被多个ConsumerGroup消费,所以有多个ConsumerGroup,每个ConsumerGroup只有一个Consumer,实现了广播.
    通常不同的ConsumerGroup的消费处理逻辑是不同的,这样同一份数据源(消息)交给不同的处理逻辑.

  • ●一个ConsumerGroup有多个Consumer,一条消息只会被这个ConsumerGroup的一个消费者所消费.实现了单播.


不过通常的设计如下图,有多个ConsumerGroup, 并且每个ConsumerGroup中也有多个Consumer.


high-level Consumer Example


消费者示例, 指定要消费的topic和线程数,返回每个topic对应的KafkaStream列表,每个线程对应一个KafkaStream.


下面的示例中只使用了一个线程,所以通过streams.get(0)获取到该线程对应的KafkaStream.然后从流中读取出消息.


topicCountMap表示客户端可以同时消费多个topic,那为什么要设置线程数呢? 因为一个topic有多个partition分布在
多个broker节点上.即使是同一个broker,也可能有这个topic的多个partition. 用不同的线程来隔离不同的partition.


  • ●ConsumerConnector: Consumer的连接器,这里基于ZK实现,是ZookeeperConsumerConnector

  • ●KafkaStream: 消息流,每个消费者线程都对应了一个消息流,消息会放入消息流的阻塞队列中

  • ●ConsumerIterator: 消费者迭代器,只有迭代器开始迭代获取数据时,才会返回给消费者

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);

Map

声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系 [邮箱地址] 删除

路过

雷人

握手

鲜花

鸡蛋

相关分类

返回顶部