问题描述
在我的应用程序中,我有两个顶点(标准顶点和非工人顶点),其中一个顶点VerticleA
产生一些消息,而另一个顶点VerticleB
消耗它们。在VerticleB
中,我创建了几个使用者来使用这些消息。
class VerticleA : AbstractVerticle() {
val publisher: MessageProducer<String>
fun start(promise: Promise<Void>) {
publisher = vertx.eventBus().sender<String>("address")
.setWriteQueueMaxSize(queueSize)
vertx.setPeriodic(timeout) {
if(publisher.writeQueueFull())
return
getAsyncMessages() { messages ->
messages.forEach { publisher.wirte(it) }
}
}
}
}
class VerticleB : AbstractVerticle() {
val consumers: List<MyConsumers>
override fun start(promise: Promise<Void>) {
// Some initialization
consumers = (1..count).map { createConsumer() }
}
fun createConsumer(): MessageConsumer<String> {
val consumer = vertx.eventBus().consumer<String>("address")
consumer.handler { message ->
consumer.pause()
asyncProcess(message) { consumer.resume() }
}
return consumer
}
}
当我仅使用VerticleB
的单个实例(即使用DeploymentOptions.setInstances(1)
)部署应用程序时,一切正常。但是,当我将数字实例设置为不止一个时,使用方将在处理少量初始消息后停止处理消息。
从日志中,我可以看到每个使用者都消耗了几条消息,然后停止消耗。暂停和恢复日志也成对出现,即每个pause()
调用都有一个resume()
调用,因此即使发生错误,asyncProcess()
也会调用回调。消费者还没有结束。
我在这里https://vertx.io/docs/vertx-core/java/阅读了核心手册,但没有发现任何可以解决此问题的方法。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)