使用批量更新Elasticsearch实体

问题描述

我具有以下(ES 7.xx)版本的数据库数据

"output": {
  "generic": [
      {
          "response_type": "text","text": ""
      },{
          "response_type": "text","text": "The callout generated this error: {\"response_code\":400,\"message\":\"Webhook call was not successful. Response code is [400].\",\"response_body\":{\"code\":\"cff11d18150e8018d5e304ac3fc35c94\",\"error\":\"There was an error processing your request.\"},\"content_type\":\"application/json\"}."
      },"text": "Customer email is <? context.webhook_result_1.message ?>."
      }
  ],"text": [
      "","The callout generated this error: {\"response_code\":400,\"content_type\":\"application/json\"}.","Customer email is <? context.webhook_result_1.message ?>."
  ],"nodes_visited": [
      "slot_2_1520179906877","handler_3_1520179906877","node_1_1520179877410","node_9_1530805081298","response_5_1604940050659","node_7_1520180645829"
  ],"webhook_error": {
      "webhook_result_1": {
          "response_code": 400,"message": "Webhook call was not successful. Response code is [400].","response_body": {
              "code": "cff11d18150e8018d5e304ac3fc35c94","error": "There was an error processing your request."
          },"content_type": "application/json"
      }
  },"nodes_visited_details": [

我如何更新其expirationDate小于当前时间的实体?例如是当前时间:

ID 00021已过期,因为其到期日期小于今天,因此应更新为当前时间。

类似 { "id":"1234","expirationDate":"17343234234","paths":"http:localhost:9090","work":"software dev","family":{ "baba":"jams","mother":"ela" } },{ "id":"00021","expirationDate":"0123234","paths":"http:localhost:8080","work":"software engi","family":{ "baba":"stev","mother":"hela" } } 起诉void updateExpiredEntity(List<ids> ids,Long currentTime)

请提供一些代码实现

这样正确吗?

void bulkupdate(List<UpdateQuery> queries,BulkOptions bulkOptions,IndexCoordinates index);

解决方法

如果您使用的是Elasticsearch 7.xx,我将假定您使用的是Spring boot 2.3.x随附的Spring Data Elasticsearch版本4.0.x。因为它是支持Elasticsearch 7.xx的版本。

此Spring Data Elasticsearch版本中有很多更改。通过查询更新文档就是其中之一。与之前我们自动连接ElasticsearchTemplate的方式不同,现在我们必须改为使用ElasticsearchRestTemplate和RestHighLevelClient。

在您的情况下,如果您想使用RestHighLevelClient通过查询更新文档。假设您将 expirationDate 存储为以秒为单位的数字映射类型,那么您要求的代码应如下所示。

public class ElasticsearchService {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private RestHighLevelClient highLevelClient;

    public void updateExpireDateDemo() throws IOException {
        String indexName = "test";
        Date currentDate = new Date();
        Long seconds = (Long) (currentDate.getTime() / 1000);
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
        request.setQuery(new RangeQueryBuilder("expirationDate").lte(seconds));
        Script updateScript = new Script(
                ScriptType.INLINE,"painless","ctx._source.expirationDate=" + seconds + ";",Collections.emptyMap());
        request.setScript(updateScript);
        highLevelClient.updateByQuery(request,RequestOptions.DEFAULT);
    }
}

我不太了解为什么您确实需要使用bulkUpdate,但是如果是这样的话。您必须先查询需要从Elasticsearch更新的记录,才能首先获取每个文档的ID。然后,您可以使用UpdateQuery列表进行更新。因此您的代码将如下所示。

@Service
public class ElasticsearchService {
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    public void updateExpireDateByBulkDemo() throws IOException {
        String indexName = "test";
        Date currentDate = new Date();
        Long seconds = (Long) (currentDate.getTime() / 1000);
        List<UpdateQuery> updateList = new ArrayList();
        RangeQueryBuilder expireQuery = new RangeQueryBuilder("expirationDate").lte(seconds);
        NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(expireQuery).build();
        SearchHits<Data> searchResult = elasticsearchRestTemplate.search(query,Data.class,IndexCoordinates.of(indexName));
        for (SearchHit<Data> hit : searchResult.getSearchHits()) {
            String elasticsearchDocumentId = hit.getId();
            updateList.add(UpdateQuery.builder(elasticsearchDocumentId).withScript("ctx._source.expirationDate=" + seconds + ";").build());
        }
        if (updateList.size() > 0) {
            elasticsearchRestTemplate.bulkUpdate(updateList,IndexCoordinates.of(indexName));
        }
    }
}

但是,这只会更新搜索结果的第一页。如果需要更新与查询匹配的每条记录,则必须在oder中使用 searchScroll 方法来获取每个文档ID。