问题描述
public static class BQDestinationImp
extends DynamicDestinations<KV<String,TableRow>,KV<String,TableRow>> {
private String datasetName;
private String projectId;
public BQDestinationImp(String datasetName,String projectId) {
this.datasetName = datasetName;
this.projectId = projectId;
}
@Override
public KV<String,TableRow> getDestination(ValueInSingleWindow<KV<String,TableRow>> element) {
String key = element.getValue().getKey();
String tableName = String.format("%s:%s.%s",projectId,datasetName,key);
LOG.debug("Table Name {}",tableName);
return KV.of(tableName,element.getValue().getValue());
}
@Override
public TableDestination getTable(KV<String,TableRow> destination) {
TableDestination dest =
new TableDestination(destination.getKey(),"File Data output data from dataflow");
LOG.debug("Table Destination {}",dest.getTableSpec());
return dest;
}
@Override
public TableSchema getSchema(KV<String,TableRow> destination) {
TableRow bqRow = destination.getValue();
TableSchema schema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
List<TableCell> cells = bqRow.getF();
for (int i = 0; i < cells.size(); i++) {
Map<String,Object> object = cells.get(i);
String header = object.keySet().iterator().next();
/** currently all BQ data types are set to String */
fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
}
schema.setFields(fields);
return schema;
}
}
bqDataMap.apply(
"Write To BQ",BigQueryIO.<KV<String,TableRow>>write().to(new BQDestinationImp(options.getDatasetName(),options.getProjectId()))
.withFormatFunction(
element -> {
return element.getValue();
})
.withWritedisposition(BigQueryIO.Write.Writedisposition.WRITE_APPEND)
.withCreatedisposition(BigQueryIO.Write.Createdisposition.CREATE_IF_NEEDED)
.withoutValidation());
我真的不明白为什么我在使用 Java SDK 2.29 的 Apache Beam 中出现上述代码的以下错误
执行 Java 类时发生异常。 GroupByKey 的 keyCoder 必须是确定性的:KvCoder(StringUtf8Coder,TableRowJsonCoder) 不是确定性的,因为: [错误] 值编码器必须是确定性的:TableRowJsonCoder 不是确定性的,因为: [ERROR] TableCell 可以容纳任意实例,这可能是不确定的。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)