high-level consumer
有时候应用程序从Kafka读取数据,并不太关心消息offset的处理. 所以Hight Level Consumer提供了一个从Kafka消费数据的高层抽象.
High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中. 这个offset基于Consumer Group的名称.
Consumer Group是整个Kafka集群全局的. 所以要特别小心在新的逻辑启动之前要关闭所有的旧的逻辑(消费者进程).
当新的消费者加入同一个消费组时,Kafka会添加这个消费者的线程到要消费的topic的可用线程集合中,并且触发re-balance.
在re-balance时,kafka会分配可用的partition给可用的线程,可能移动一个partition到其他的线程中.
如果你的消费者逻辑混合了新的和旧的处理逻辑,很可能有些消息会被分配到旧的处理逻辑中.
High Level Consumer可以(应该)是多线程的应用程序.线程模型是以topic的partitions数量为中心的,不过有些规则:
●如果线程数量多于partition的数量,有部分线程无法消费该topic下任何一条消息
●如果线程数量少于partition的数量,有一些线程会消费多个partition的数据
●如果线程数量等于partition的数量,则正好一个线程消费一个partition的数据
●当添加更多的消费者进程/线程会触发re-balance,导致partition的分配发生了变化
如果一个线程消费多个partitions,并不会保证收到的消息的有序性,不过在同一个partition里的offset则是有序的.
比如某个消费者线程从partition-10接收5条消息,然后从partition-11接收了6条消息,接着再从partition-10接收了5条消息,
然后还是从partition-10又接收了5条消息,即使这个时候partition-11也有数据. 但是并不保证partition之间的顺序.
High-level的实现依赖于Consumer Group.Kafka保证同一consumer group中只有一个consumer会消费某条消息.
即每一个consumer实例只会消费某一个或多个特定partition的数据,而一个partition的数据只会被一个特定的consumer所消费.
●同一条消息会被多个ConsumerGroup消费,所以有多个ConsumerGroup,每个ConsumerGroup只有一个Consumer,实现了广播. 通常不同的ConsumerGroup的消费处理逻辑是不同的,这样同一份数据源(消息)交给不同的处理逻辑.
●一个ConsumerGroup有多个Consumer,一条消息只会被这个ConsumerGroup的一个消费者所消费.实现了单播.
不过通常的设计如下图,有多个ConsumerGroup, 并且每个ConsumerGroup中也有多个Consumer.
high-level Consumer Example
消费者示例, 指定要消费的topic和线程数,返回每个topic对应的KafkaStream列表,每个线程对应一个KafkaStream.
下面的示例中只使用了一个线程,所以通过streams.get(0)获取到该线程对应的KafkaStream.然后从流中读取出消息.
topicCountMap表示客户端可以同时消费多个topic,那为什么要设置线程数呢? 因为一个topic有多个partition分布在 多个broker节点上.即使是同一个broker,也可能有这个topic的多个partition. 用不同的线程来隔离不同的partition.
●ConsumerConnector: Consumer的连接器,这里基于ZK实现,是ZookeeperConsumerConnector
●KafkaStream: 消息流,每个消费者线程都对应了一个消息流,消息会放入消息流的阻塞队列中
●ConsumerIterator: 消费者迭代器,只有迭代器开始迭代获取数据时,才会返回给消费者
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
Map |