ZKRebalancerListener watcher class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() private val watcherExecutorThread = new Thread(consumerIdString "_watcher_executor") { override def run() { var doRebalance = false while (!isShuttingDown.get) { lock.lock() try { // 如果isWatcherTriggered=false,则不会触发syncedRebalance. 等待1秒后,继续判断 if (!isWatcherTriggered) cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag } finally { // 不管isWatcherTriggered值是多少,在每次循环时,都会执行. 如果isWatcherTriggered=true,则会执行syncedRebalance doRebalance = isWatcherTriggered // 重新设置isWatcherTriggered=false, 因为其他线程触发一次后就失效了,想要再次触发,必须再次设置isWatcherTriggered=true isWatcherTriggered = false lock.unlock() } if (doRebalance) syncedRebalance // 只有每次rebalanceEventTriggered时,才会调用一次syncedRebalance } } } watcherExecutorThread.start() // 触发rebalance开始进行, 修改isWatcherTriggered标志位,触发cond条件运行 def rebalanceEventTriggered() { inLock(lock) { isWatcherTriggered = true cond.signalAll() } } watcherExecutorThread线程通过锁的方式判断何时需要进行syncedRebalance操作. reinitializeConsumer的topicStreamsMap是从(topic,thread)- |
|
声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系
[邮箱地址] 删除
|