Vertx:Verticle的多个实例中的多个使用者

问题描述

在我的应用程序中,我有两个顶点(标准顶点和非工人顶点),其中一个顶点VerticleA产生一些消息,而另一个顶点VerticleB消耗它们。在VerticleB中,我创建了几个使用者来使用这些消息。

class VerticleA : AbstractVerticle() {
  val publisher: MessageProducer<String>
  
  fun start(promise: Promise<Void>) {

    publisher = vertx.eventBus().sender<String>("address")
               .setWriteQueueMaxSize(queueSize)

    vertx.setPeriodic(timeout) {
      if(publisher.writeQueueFull())
        return

      getAsyncMessages() { messages ->
         messages.forEach { publisher.wirte(it) }
      }
    }
  }
}

class VerticleB : AbstractVerticle() {
  
  val consumers: List<MyConsumers>
  
  override fun start(promise: Promise<Void>) {
    // Some initialization
    
    consumers = (1..count).map { createConsumer() }
  }

  fun createConsumer(): MessageConsumer<String> {
    val consumer = vertx.eventBus().consumer<String>("address")
    
    consumer.handler { message -> 
      consumer.pause()
      
      asyncProcess(message) { consumer.resume() }
    }
    return consumer
  }
}

当我仅使用VerticleB的单个实例(即使用DeploymentOptions.setInstances(1))部署应用程序时,一切正常。但是,当我将数字实例设置为不止一个时,使用方将在处理少量初始消息后停止处理消息。

从日志中,我可以看到每个使用者都消耗了几条消息,然后停止消耗。暂停和恢复日志也成对出现,即每个pause()调用都有一个resume()调用,因此即使发生错误,asyncProcess()也会调用回调。消费者还没有结束。

我在这里https://vertx.io/docs/vertx-core/java/阅读了核心手册,但没有发现任何可以解决此问题的方法。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)