如何从 TFX BulkInferrer 获取数据帧或数据库写入?

问题描述

我对 TFX 非常陌生,但有一个显然可以正常工作的 ML 管道,该管道将通过 BulkInferrer 使用。这似乎只以 Protobuf 格式生成输出,但由于我正在运行批量推理,我想将结果通过管道传输到数据库。 (DB 输出似乎应该是批量推理的认值,因为批量推理和 DB 访问都利用了并行化……但 Protobuf 是按记录的序列化格式。)

我假设我可以使用诸如 Parquet-Avro-Protobuf 之类的东西来进行转换(尽管这是在 Java 中,而管道的其余部分在 Python 中),或者我可以自己编写一些东西来逐一使用所有 protobuf 消息-一个,将它们转换为 JSON,将 JSON 反序列化为一个 dicts 列表,然后将 dict 加载到 Pandas DataFrame 中,或者将其存储为一堆键值对,我将其视为一次性数据库......但那对于一个非常常见的用例,听起来像是涉及并行化和优化的大量工作和痛苦。顶级 Protobuf 消息定义是 Tensorflow 的 PredictionLog

必须一个常见的用例,因为像 this one 这样的 TensorFlowModelAnalytics 函数使用 Pandas DataFrame。我宁愿能够直接写入数据库(最好是 Google BigQuery)或 Parquet 文件(因为 Parquet / Spark 似乎比 Pandas 并行化得更好),而且,这些似乎应该是常见用例,但我没有找到任何例子。也许我使用了错误搜索词?

我还查看了 PredictExtractor,因为“提取预测”听起来接近我想要的……但官方文档似乎没有说明应该如何使用该类。我认为 TFTransformOutput 听起来像是一个有前途的动词,但实际上它是一个名词。

我显然在这里遗漏了一些基本的东西。有没有人想将 BulkInferrer 结果存储在数据库中的原因?是否有配置选项允许我将结果写入数据库?也许我想向 TFX 管道添加一个 ParquetIOBigQueryIO 实例? (TFX 文档说它使用 Beam "under the hood" 但这并没有说明我应该如何将它们一起使用。)但是这些文档中的语法看起来与我的 TFX 代码完全不同,我不确定它们兼容吗?

帮助?

解决方法

(从相关问题复制以提高知名度)

经过一番挖掘,这里有一种替代方法,它假设事先不知道 feature_spec。执行以下操作:

  • 通过向组件构造添加 output_example_spec,将 BulkInferrer 设置为写入 output_examples 而不是 inference_result
  • 在主管道中的 StatisticsGen 之后添加一个 SchemaGen 和一个 BulkInferrer 组件,以生成上述 output_examples 的架构
  • 使用来自 SchemaGenBulkInferrer 的工件读取 TFRecords 并执行任何必要的操作。
bulk_inferrer = BulkInferrer(
     ....
     output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
         output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
             predict_output=bulk_inferrer_pb2.PredictOutput(
                 output_columns=[bulk_inferrer_pb2.PredictOutputCol(
                     output_key='original_label_name',output_column='output_label_column_name',)]))]
     ))

 statistics = StatisticsGen(
     examples=bulk_inferrer.outputs.output_examples
 )

 schema = SchemaGen(
     statistics=statistics.outputs.output,)

之后,您可以执行以下操作:

import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils

# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec

# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files,compression_type='GZIP')

# parse dataset with spec
def parse(raw_record):
    return tf.io.parse_example(raw_record,spec)

dataset = dataset.map(parse)

此时,数据集就像任何其他已解析的数据集一样,因此写入 CSV 或 BigQuery 表或其他任何内容都很简单。它确实帮助我们在 ZenML 使用我们的 BatchInferencePipeline

,

在这里回答我自己的问题以记录我们所做的事情,尽管我认为下面@Hamza Tahir 的回答客观上更好。这可能会为需要更改开箱即用 TFX 组件操作的其他情况提供一个选项。虽然它很hacky:

我们复制并编辑了文件 tfx/components/bulk_inferrer/executor.py,在 _run_model_inference() 方法的内部管道中替换了此转换:

| 'WritePredictionLogs' >> beam.io.WriteToTFRecord(
             os.path.join(inference_result.uri,_PREDICTION_LOGS_FILE_NAME),file_name_suffix='.gz',coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog)))

有了这个:

| 'WritePredictionLogsBigquery' >> beam.io.WriteToBigQuery(
           'our_project:namespace.TableName',schema='SCHEMA_AUTODETECT',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,custom_gcs_temp_location='gs://our-storage-bucket/tmp',temp_file_format='NEWLINE_DELIMITED_JSON',ignore_insert_ids=True,)

(这是有效的,因为当您导入 BulkInferrer 组件时,每个节点的工作被分配给在工作节点上运行的这些执行程序,并且 TFX 将其自己的库复制到这些节点上。它不会复制所有内容 来自用户空间库,这就是为什么我们不能只是继承 BulkInferrer 并导入我们的自定义版本。)

我们必须确保 'our_project:namespace.TableName' 处的表具有与模型输出兼容的架构,但不必将该架构转换为 JSON/AVRO。

理论上,我的团队希望使用围绕此构建的 TFX 发出拉取请求,但目前我们正在硬编码几个关键参数,并且没有时间将其提供给真正的公众/生产状态。