使用Apache Beam BigQueryIO将数据插入BigQuery时数据丢失

问题描述

我正在使用以下代码使用apache Beam BigQueryIO将数据插入到BQ。我从kafka(Beam KafkaIO)中读取数据并进行处理,并创建String的Pcollection,然后将其流式传输到BQ。将数据写入BQ时,并不是将所有记录都写入Table。它也不会引发任何异常。

public class ConvertToTableRow extends DoFn<String,TableRow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private StatsDClient statsdClient;
private String statsDHost;
private int statsDPort = 9125;

public ConvertToTableRow(String statsDHost) {
    this.statsDHost = statsDHost;
}

@Setup
public void startup() {
    this.statsdClient = new NonBlockingStatsDClient("Metric",statsDHost,statsDPort);
}

@ProcessElement
public void processElement(@Element String record,ProcessContext context) {
    try {
        statsdClient.incrementCounter("bq.message");
        TableRow row = new TableRow();
        row.set("name","Value");
        Long timestamp = System.currentTimeMillis();

        DateFormat dateFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date date = new Date(timestamp);
        String insertDate = dateFormater.format(date);
        row.set("insert_date",insertDate);
        context.output(row);
    } catch (Exception e) {
        statsdClient.incrementCounter("exception.bq.message");
    }
}

@Teardown
public void teardown() {
    this.statsdClient.close();
}

}

private void streamWriteOutputToBQ(PCollection<TableRow> bqTableRows) {
    String tableSchema = //tableSchema;
    bqTableRows
    .apply((BigQueryIO.writeTableRows().skipInvalidRows().withMethod(Method.STREAMING_INSERTS)
            .to("myTable").withJsonSchema(tableSchema)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)));
        

}

我不确定是否缺少BigQueryIO的任何配置

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)