有没有办法使用 Spark 结构化流读取 ActiveMQ 中先前排队的消息?

问题描述

我有一个生产者正在向 ActiveMQ 的主题和具有以下属性的客户端发送消息:

val optionsMap: Map[String,String] =
      Map[String,String]("brokerUrl" -> brokerUrl,"topic" -> topicName,"persistence" -> "memory","username" -> username,"password" -> password,"clientId" -> "something")

现在,当我的客户端应用程序未运行并且我向 ActiveMQ topic 发送消息时,Enqueued Messages数量增加 1,而 Dequeued Messages数量保持不变。 但是,一旦我启动我的客户端,Dequeued Messages数量就等于 Enqueued Messages数量,尽管我的客户端应用程序没有产生任何输出。如何解决这个问题?我希望我的客户端应用程序输出所有先前排队的消息。

解决方法

除非您发送到具有注册持久订阅的主题,否则代理将立即丢弃所有已发送的消息,因为这是主题的定义。只有活跃的 Topic 订阅者或持久订阅者才会收到发送到 Topic 的消息。