如何使用 Java 将我们的 CSV 文件索引到 Elastic Search 中?

问题描述

有什么方法可以使用 Java 将我们的 CSV 文件索引到 Elastic Search 中,我之前是使用 logstash 进行的,但现在我需要用 Java 对其进行编码并动态运行.. 我尝试使用 Index API,但它不适用于我的情况 有人可以帮我吗.. 我的 CSV 数据看起来像这样这只是我批量拥有此对象的示例..

示例 CSV 数据是这样的 ..

 id  profile_id  hier_name       attri_name     item
  1   1          CUSTOMER        CUSTOMER        C001
  2   1          CUSTOMER        CUSTOMER        C002
  3   1          CUSTOMER        CUSTOMER        C003

这是我尝试批量插入的方法,但它似乎不适用于我当前版本的 Elastic Search 7.12.0,

     package com.javadeveloperzone;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class ESBulkIndexingExample {

    String indexName,indexTypeName;
    TransportClient client = null;

    public static void main(String[] args) {
        ESBulkIndexingExample esExample = new ESBulkIndexingExample();
        try {
            esExample.initEStransportClinet();
            System.out.println("init done");
            esExample.CSVbulkImport(true);
            System.out.println("bulkimport done");
            esExample.refreshindices();

            esExample.search();
        } catch (Exception e) {
            e.printstacktrace();
        } finally {
            esExample.closeTransportClient(); // close transport client
        }
    }

    public ESBulkIndexingExample() {
        indexName = "document";
        indexTypeName = "bulkindexing";
    }

    public boolean initEStransportClinet() {
        try {
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));

            return true;
        } catch (Exception ex) {
            ex.printstacktrace();
            return false;
        }
    }

    public void CSVbulkImport(boolean isHeaderIncluded) throws IOException,ExecutionException,InterruptedException {

        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File(
                "/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        String line = null;
        int count = 0,noOfBatch = 1;
        if (bufferedReader != null && isHeaderIncluded) {
            bufferedReader.readLine();
        }
        while ((line = bufferedReader.readLine()) != null) {

            if (line.trim().length() == 0) {
                continue;
            }
            String data[] = line.split(",");
            if (data.length == 3) {

                try {
                    XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id",data[0])
                            .field("hierarchy_name",data[1]).field("attribute_name",data[2]).field("item_pk",data[3])
                            .endobject();

                    BulkRequestBuilder add = bulkRequest
                            .add(client.prepareIndex(indexName,indexTypeName,data[0]).setSource(xContentBuilder));

                    System.out.println(add);
                    if ((count + 1) % 500 == 0) {
                        count = 0;
                        addDocumentToESCluser(bulkRequest,noOfBatch,count);
                        noOfBatch++;
                    }
                } catch (Exception e) {
                    e.printstacktrace();
                }
            } else {
                System.out.println("Invalid data : " + line);
            }
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest,count);

    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count) {

        if (count == 0) {
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing Failed for Batch : " + noOfBatch);

            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()) {
                BulkItemResponse response = iterator.next();
                if (response.isFailed()) {
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of " + count + " documents," + numberOfDocFailed + " documents Failed");
            System.out.println(bulkResponse.buildFailureMessage());
        } else {
            System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
        }
    }

    public void refreshindices() {
        client.admin().indices().prepareRefresh(indexName).get();
    }

    public void search() {

        SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
        System.out.println("Total Hits : " + response.getHits().getTotalHits());
        System.out.println(response);
    }

    public void closeTransportClient() {
        if (client != null) {
            client.close();
        }
    }
}

到这里错误

org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:44)

有人可以帮我吗?

解决方法

我在 csv 文件中的数据如下所示:-

 id  profile_id  hier_name        attri_name     item
  1   1          CUSTOMER         CUSTOMER        C001
  2   1          CUSTOMER         CUSTOMER        C002
  3   1           CUSTOMER        CUSTOMER        C003

可以添加的依赖是

<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>7.12.1</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.12.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>7.12.1</version>
        </dependency>




import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;

@Component
public class ESBulkIndexing {

    String indexName,indexTypeName;
    TransportClient client = null;

    public ESBulkIndexing() {

        indexTypeName = "bulkindexing";
    }

    public boolean initEStransportClinet() throws UnknownHostException {
        client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));
        return true;
    }

    public void CSVbulkImport(boolean isHeaderIncluded,String index,String filename)
            throws IOException,ExecutionException,InterruptedException {
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File(filename + "/elastic.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        String line = null;
        int count = 0,noOfBatch = 1;
        if (bufferedReader != null && isHeaderIncluded) {
            bufferedReader.readLine();
        }
        while ((line = bufferedReader.readLine()) != null) {

            if (line.trim().length() == 0) {
                continue;
            }
            String data[] = line.split(",");
            try {
                XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id",data[1])
                        .field("hierarchy_name",data[2]).field("attribute_name",data[3]).field("item_pk",data[4])
                        .endObject();
                bulkRequest.add(client.prepareIndex(index,indexTypeName,data[0]).setSource(xContentBuilder));

                addDocumentToESCluser(bulkRequest,noOfBatch,count);
                noOfBatch++;
            } catch (Exception e) {
                e.printStackTrace();
            }
            addDocumentToESCluser(bulkRequest,count);
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest,count);

    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count) {

        if (count == 0) {
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {

            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();

            while (iterator.hasNext()) {
                BulkItemResponse response = iterator.next();
                if (response.isFailed()) {
                    numberOfDocFailed++;
                }
            }
            System.out.println(bulkResponse.buildFailureMessage());
        } else {
            System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
        }
    }

    public void refreshIndices(String index) {
        client.admin().indices().prepareRefresh(index).get();
    }

    public void closeTransportClient() {
        if (client != null) {
            client.close();
        }
    }
}