卡夫卡融合云ElasticsearchSink连接器:索引中的映射冲突

问题描述

我让Kafka在融合云上运行,在那里我可以使用Node.js客户端生成数据,数据以字符串形式发送,并且在融合云中获得以下字段。

enter image description here

然后,我创建了一个ElasticsearchSink Connector并将其连接到弹性搜索云。如果我没有在弹性搜索中创建任何映射,则数据传输会成功,但例外是这种格式。

"_source" : {
          "booked" : false,"phone_number" : "919191919191","location" : {
            "lon" : 60.23,"lat" : 78.233
          }
        }

现在的问题是,如果我想运行任何geo queries,它将不允许我输入以下错误

"root_cause" : [
      {
        "type" : "query_shard_exception","reason" : "Failed to find geo_point field [location]","index_uuid" : "C8Xxu9QlTMKN4Lk1LjpOmQ","index" : "locations"
      }

原因是动态映射不支持geo_field。因此,现在当我尝试创建弹性索引的自定义映射同时创建索引时,如下所示:

PUT /locations
{
  "mappings": {
    "properties": {
      "phone_number": {
        "type": "text"
      },"booked": {
        "type": "boolean"
      },"location": {
        "type": "geo_point"
        }
      }
  }
}

然后融合连接器失败,并显示以下错误

There is a mapping collision in your index: Can't merge a non object mapping with an object mapping.

我也尝试将booked作为text字段,但似乎无法正常工作。我尚未在Confluent云上实施任何架构。 这是来自融合云的一些基本配置。

enter image description here

如何执行映射,以便可以在Elastic Search中运行geo queries

更新:此问题仍然存在,主要是因为要发送到Kafka的数据格式

{
    "phone_number": "919191919191","location": {
            "lat": 78.233,"lon": 60.23
    },"booked": false,}

{
    "phone_number": "+919191919190","location": " 78.233,60.23",}

两种格式都无法映射到ElasticSearch中的上述映射,并且connector sink显示以下错误

Received Illegal Argument Exception from Elasticsearch: One of your fields' type does not match the mapped type in Elasticsearch

解决方法

Confluent Cloud在找出与架构相关的事情并将发现的内容存储在其内置的Schema Registry中做了一项偷偷摸摸的工作。恐怕您的映射无法正常工作,因为:

  1. 连接器仍在尝试发送以前存储的数据。
  2. 以前存储的数据仍附加到旧模式。
  3. Confluent Cloud并未意识到该架构已经发展。

尝试通过在Confluent Cloud中创建新环境(这将强制创建新的SR实例)来重置设置,或者使用全新的Kafka主题。无论哪种方式,都从新数据开始。连接器始终尝试保持乐观,并确保没有数据丢失,但是在此过程中这可能是错误的,因为架构已发展。

先在Elasticsearch上设置映射。完成此操作后,连接器将映射到正确的架构。另外,由于某种原因,它仅在我为Elasticsearch索引的映射使用动态模板时有效。