Apache Beam ReadFromSpanner 解码问题

问题描述

我正在尝试在 GCP 数据流管道中运行以下脚本。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple,Optional
from apache_beam.io.gcp.spanner import *
from past.builtins import unicode
import logging

class ItemRow(NamedTuple):
    item_id: unicode


class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self,element):
    logging.info("row: %s",element)
    yield element

class SpannerToSpannerAndBigQueryPipelineOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    path parameter is necessary for execution of pipeline
    """
    @classmethod
    def _add_argparse_args(cls,parser):
        parser.add_argument(
            '--SOURCE_SPANNER_PROJECT_ID',type=str,help='Source Spanner project ID',default='project_id')
        parser.add_argument(
            '--SOURCE_SPANNER_DATASET_ID',help='Source Spanner dataset ID',default='dataset_id')
        parser.add_argument(
            '--SOURCE_SPANNER_INSTANCE_ID',help='Source Spanner instance ID',default='instance_id')
        parser.add_argument(
            '--SOURCE_QUERY',help='sql to run in Source Spanner Instance',required=True)


# Setup pipeline

def run():

    beam.coders.registry.register_coder(ItemRow,beam.coders.RowCoder)
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    importer_options = pipeline_options.view_as(
        SpannerToSpannerAndBigQueryPipelineOptions)

    
    rows = (
        p
        | "Read from source Spanner" >> ReadFromSpanner(
            project_id=importer_options.soURCE_SPANNER_PROJECT_ID,instance_id=importer_options.soURCE_SPANNER_INSTANCE_ID,database_id=importer_options.soURCE_SPANNER_DATASET_ID,row_type=ItemRow,sql='Select item_id from Items WHERE created_ts BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 5 SECOND) AND CURRENT_TIMESTAMP()',timestamp_bound_mode=TimestampBoundMode.MAX_STALEnesS,staleness=3,time_unit=TimeUnit.HOURS,).with_output_types(ItemRow)
    )

    rows | 'Log results' >> beam.ParDo(LogResults())

    result = p.run()
    result.wait_until_finish()



if __name__ == "__main__":
    run()

但是,我在解码从 Spanner 获得的结果时遇到了问题。这些是我的 Dataflow 作业的输出日志:


"An exception was raised when trying to execute the workitem 6665479626992209510 : Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",line 649,in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",line 179,in execute
    op.start()
  File "dataflow_worker/native_operations.py",line 38,in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py",line 39,line 44,line 48,in dataflow_worker.native_operations.NativeReadOperation.start
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/inmemory.py",line 108,in __iter__
    yield self._source.coder.decode(value)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py",line 468,in decode
    return self.get_impl().decode(encoded)
  File "apache_beam/coders/coder_impl.py",line 226,in apache_beam.coders.coder_impl.StreamCoderImpl.decode
  File "apache_beam/coders/coder_impl.py",line 228,line 123,in apache_beam.coders.coder_impl.CoderImpl.decode_from_stream
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py",line 215,in decode_from_stream
    is_null in zip(self.components,nulls)))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py",in <genexpr>
    is_null in zip(self.components,nulls)))
  File "apache_beam/coders/coder_impl.py",line 259,in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
  File "apache_beam/coders/coder_impl.py",line 261,in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py",line 414,in decode
    return value.decode('utf-8')
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 9: invalid start byte
"

我不确定如何解决这个问题。我使用这个 https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.gcp.spanner.html?highlight=spanner#module-apache_beam.io.gcp.spanner 示例作为起点。问题似乎在于解码从 Spanner 获得的结果。几乎没有关于如何为我尝试查询的 Spanner 表指定架构的文档。

还有一个用于 Spanner 的实验性 IO 模块,它不使用 Java 扩展模块。是否建议切换到实验版?

谢谢

解决方法

我无法使用 apache_beam.io.gcp.spanner 模块运行管道,因此我最终使用了 apache_beam.io.gcp.experimental.spannerio 模块。