问题描述
我正在使用以下代码使用apache Beam BigQueryIO将数据插入到BQ。我从kafka(Beam KafkaIO)中读取数据并进行处理,并创建String的Pcollection,然后将其流式传输到BQ。将数据写入BQ时,并不是将所有记录都写入Table。它也不会引发任何异常。
public class ConvertToTableRow extends DoFn<String,TableRow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private StatsDClient statsdClient;
private String statsDHost;
private int statsDPort = 9125;
public ConvertToTableRow(String statsDHost) {
this.statsDHost = statsDHost;
}
@Setup
public void startup() {
this.statsdClient = new NonBlockingStatsDClient("Metric",statsDHost,statsDPort);
}
@ProcessElement
public void processElement(@Element String record,ProcessContext context) {
try {
statsdClient.incrementCounter("bq.message");
TableRow row = new TableRow();
row.set("name","Value");
Long timestamp = System.currentTimeMillis();
DateFormat dateFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date date = new Date(timestamp);
String insertDate = dateFormater.format(date);
row.set("insert_date",insertDate);
context.output(row);
} catch (Exception e) {
statsdClient.incrementCounter("exception.bq.message");
}
}
@Teardown
public void teardown() {
this.statsdClient.close();
}
}
private void streamWriteOutputToBQ(PCollection<TableRow> bqTableRows) {
String tableSchema = //tableSchema;
bqTableRows
.apply((BigQueryIO.writeTableRows().skipInvalidRows().withMethod(Method.STREAMING_INSERTS)
.to("myTable").withJsonSchema(tableSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)));
}
我不确定是否缺少BigQueryIO的任何配置
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)