无法使用 alpakka 从 elasticsearch 读取数据

问题描述

我试图使用 alpakka 从 elasticsearch 读取文档(存储在 ES 中的 json 数据)。 我得到了这个 alpakka-Elasticsearch。 这里说你可以使用 ElasticsearchSource,ElasticsearchFlow or the ElasticsearchSink。 我试图实现 ElasticsearchSource 方法。所以我的代码看起来像这样

  val url = "http://localhost:9200"
  val connectionSettings = ElasticsearchConnectionSettings(url)
  val sourceSettings = ElasticsearchSourceSettings(connectionSettings)
  val elasticsearchParamsV7 = ElasticsearchParams.V7("category_index")
        val copy = ElasticsearchSource
          .typed[CategoryData](
            elasticsearchParamsV7,query = query,sourceSettings
          ).map { message: ReadResult[CategoryData] =>
          println("Inside message==================>  "+message)
          WriteMessage.createIndexMessage(message.id,message.source)
        } .runWith(
          ElasticsearchSink.create[CategoryData](
            elasticsearchParamsV7,ElasticsearchWriteSettings(connectionSettings)
          )
        )
  println("Final data==============>.  "+copy)

最后,复制返回 Future[Done] 的值。 但是我无法从 ES 读取数据。
有什么我遗漏的吗?
还有其他方法可以使用 akka http 客户端 api 来做同样的事情吗?
在 akka 中使用 ES 的首选方式是什么?

解决方法

要从 Elasticsearch 中读取数据,像这样应该就足够了:

  val matchAllQuery = """{"match_all": {}}"""
  val result = ElasticsearchSource
    .typed[CategoryData](
      elasticsearchParamsV7,query = matchAllQuery,sourceSettings
    ).map { message: ReadResult[CategoryData] =>
    println("Read message==================>  "+message)
  }.runWith(Sink.seq)

  result.onComplete(res => res.foreach(col => println(s"Read: ${col.size} records")))

如果 CategoryData 类型与索引中存储的内容不正确匹配,则查询可能不会返回结果。

如果有疑问,可以读取原始 JSON:

  val elasticsearchSourceRaw = ElasticsearchSource
    .create(
      elasticsearchParamsV7,settings = sourceSettings
    )

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...