Apache Beam-将BigQuery TableRow写入Cassandra

问题描述

我正在尝试从BigQuery读取数据(使用TableRow)并将输出写入Cassandra。该怎么做?

这就是我尝试过的。这有效:

/* Read BQ */
PCollection<CxCpmMapProfile> data =  p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord,CxCpmMapProfile>() {
    public CxCpmMapProfile apply(SchemaAndRecord record) {
        GenericRecord r = record.getRecord();
        return new CxCpmMapProfile((String) r.get("channel_no").toString(),(String) r.get("channel_name").toString());
    }
}).fromQuery("SELECT channel_no,channel_name FROM `dataset_name.table_name`").usingStandardsql().withoutValidation());

/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
    .withHosts(Arrays.asList("<IP addr1>","<IP addr2>"))
    .withPort(9042)
    .withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
    .withEntity(CxCpmMapProfile.class));

但是当我使用TableRow更改阅读BQ 部分时,是这样的:

/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
    .fromQuery("SELECT channel_no,channel_name FROM `dataset_name.table_name`")
    .usingStandardsql().withoutValidation());

写给卡桑德拉中,出现以下错误

The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)

解决方法

该错误是由于输入PCollection包含TableRow个元素,而CassandraIO读取需要CxCpmMacProfile个元素。您需要将BigQuery的元素读取为CxCpmMacProfile元素。 BigQueryIO documentation举例说明了如何通过read(SerializableFunction)方法从表中读取行并将其解析为自定义类型。