首页 存档 技术 查看内容

kafra技术内幕:消费者初始化(scala)(二)

2018-3-30 13:00 |来自: 互联网 329 0

摘要: 架构师(JiaGouX)我们都是架构师! ZKRebalancerListener watcherclass ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map]]) extends ...

架构师(JiaGouX)
我们都是架构师!


ZKRebalancerListener watcher

class ZKRebalancerListener(val group: String, val consumerIdString: String,

val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])

extends IZkChildListener {

private var isWatcherTriggered = false

private val lock = new ReentrantLock

private val cond = lock.newCondition()


private val watcherExecutorThread = new Thread(consumerIdString "_watcher_executor") {

override def run() {

var doRebalance = false

while (!isShuttingDown.get) {

lock.lock()

try {

// 如果isWatcherTriggered=false,则不会触发syncedRebalance. 等待1秒后,继续判断

if (!isWatcherTriggered)

cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag

} finally {

// 不管isWatcherTriggered值是多少,在每次循环时,都会执行. 如果isWatcherTriggered=true,则会执行syncedRebalance

doRebalance = isWatcherTriggered

// 重新设置isWatcherTriggered=false, 因为其他线程触发一次后就失效了,想要再次触发,必须再次设置isWatcherTriggered=true

isWatcherTriggered = false

lock.unlock()

}

if (doRebalance) syncedRebalance // 只有每次rebalanceEventTriggered时,才会调用一次syncedRebalance

}

}

}

watcherExecutorThread.start()


// 触发rebalance开始进行, 修改isWatcherTriggered标志位,触发cond条件运行

def rebalanceEventTriggered() {

inLock(lock) {

isWatcherTriggered = true

cond.signalAll()

}

}

watcherExecutorThread线程通过锁的方式判断何时需要进行syncedRebalance操作.

reinitializeConsumer的topicStreamsMap是从(topic,thread)-

声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系 [邮箱地址] 删除

路过

雷人

握手

鲜花

鸡蛋

相关分类

返回顶部