Apache 光束 KvCoder(StringUtf8Coder,TableRowJsonCoder) 不是确定性的,因为

问题描述

public static class BQDestinationImp
        extends DynamicDestinations<KV<String,TableRow>,KV<String,TableRow>> {

    private String datasetName;
    private String projectId;

    public BQDestinationImp(String datasetName,String projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
    }

    @Override
    public KV<String,TableRow> getDestination(ValueInSingleWindow<KV<String,TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s",projectId,datasetName,key);
        LOG.debug("Table Name {}",tableName);
        return KV.of(tableName,element.getValue().getValue());
    }

    @Override
    public TableDestination getTable(KV<String,TableRow> destination) {
        TableDestination dest =
                new TableDestination(destination.getKey(),"File Data output data from dataflow");
        LOG.debug("Table Destination {}",dest.getTableSpec());
        return dest;
    }

    @Override
    public TableSchema getSchema(KV<String,TableRow> destination) {

        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();

        for (int i = 0; i < cells.size(); i++) {
            Map<String,Object> object = cells.get(i);
            String header = object.keySet().iterator().next();
            /** currently all BQ data types are set to String */
            fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }

        schema.setFields(fields);
        return schema;
    }
    
}             

bqDataMap.apply(
                "Write To BQ",BigQueryIO.<KV<String,TableRow>>write().to(new BQDestinationImp(options.getDatasetName(),options.getProjectId()))
                    .withFormatFunction(
                            element -> {
                                return element.getValue();
                            })
                 .withWritedisposition(BigQueryIO.Write.Writedisposition.WRITE_APPEND)
                 .withCreatedisposition(BigQueryIO.Write.Createdisposition.CREATE_IF_NEEDED)
                 .withoutValidation());

我真的不明白为什么我在使用 Java SDK 2.29 的 Apache Beam 中出现上述代码的以下错误

执行 Java 类时发生异常。 GroupByKey 的 keyCoder 必须是确定性的:KvCoder(StringUtf8Coder,TableRowJsonCoder) 不是确定性的,因为: [错误] 值编码器必须是确定性的:TableRowJsonCoder 不是确定性的,因为: [ERROR] TableCell 可以容纳任意实例,这可能是不确定的。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)