如何为TFX时间序列创建窗口序列

问题描述

我正在使用uci开源数据https://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction构建用于时间序列的TFX管道。由于时间序列的训练和评估需要窗口数据序列,因此我在Trainer.py模块文件的input_fn函数中使用了tf.keras.preprocessing.timeseries_dataset_from_array API,以创建用于训练和评估的窗口,但我得到了TypeError: dataset length is infinite. >

Trainer.py模块文件包含以下代码

from typing import List,Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options

LABEL_KEY = 'Appliances'

_DENSE_FLOAT_FEATURE_KEYS = ['lights','T1','RH_1','T2','RH_2','T3','RH_3','T4','RH_4','T5','RH_5','T6','RH_6','T7','RH_7','T8','RH_8','T9','RH_9','T_out','Press_mm_hg','RH_out','Windspeed','Visibility','Tdewpoint','rv1','rv2']


def _transformed_name(key):
    return key + '_xf'


def _transformed_names(keys):
    return [_transformed_name(key) for key in keys]


def _get_serve_tf_examples_fn(model,tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples,feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn


**def _input_fn(file_pattern: List[Text],data_accessor: DataAccessor,tf_transform_output: tft.TFTransformOutput,batch_size: int = 200) -> tf.data.Dataset:
    import numpy as np
    """Generates features and label for tuning/training.

    Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    Returns:
    A dataset that contains (features,indices) tuple where features is a
      dictionary of Tensors,and indices is a single Tensor of label indices.
    """
    dataset = data_accessor.tf_dataset_factory(
      file_pattern,dataset_options.TensorFlowDatasetoptions(
          batch_size=batch_size,label_key=_transformed_name(LABEL_KEY)),tf_transform_output.transformed_Metadata.schema)
  
    sequence_generator = tf.keras.preprocessing.timeseries_dataset_from_array(
    dataset,dataset,sequence_length=144,batch_size=1)
      
    
    return sequence_generator**


def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
    """Creates a DNN Keras model for classifying taxi data.

    Args:
    hidden_units: [int],the layer sizes of the DNN (input layer first).

    Returns:
    A keras Model.
    """
    real_valued_columns = [
      tf.feature_column.numeric_column(key,shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
    ]
    
    model = regressor_model(
      deep_columns=real_valued_columns,dnn_hidden_units=hidden_units or [100,70,50,25])
    
    return model


def regressor_model(deep_columns,dnn_hidden_units):
    """Build a simple keras regressor model.

    Args:
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int],the layer sizes of the hidden DNN.

    Returns:
    A Deep Keras model
    """
    # Following values are hard coded for simplicity in this example,# However prefarably they should be passsed in as hparams.

    # Keras needs the feature deFinitions at compile time.
    input_layers = {
      colname: tf.keras.layers.Input(name=colname,shape=(),dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
    }

    deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
    for numnodes in dnn_hidden_units:
        deep = tf.keras.layers.Dense(numnodes)(deep)


        output = tf.keras.layers.Dense(1)(deep)

        model = tf.keras.Model(input_layers,output)
        model.compile(
          loss='mean_absolute_error',optimizer=tf.keras.optimizers.Adam(lr=0.001),metrics=[tf.keras.metrics.MeanAbsoluteError()])
        model.summary(print_fn=absl.logging.info)
    return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
    """Train the model based on given args.

    Args:
    fn_args: Holds args used to train the model as name/value pairs.
    """
    # Number of nodes in the first layer of the DNN
    first_dnn_layer_size = 100
    num_dnn_layers = 4
    dnn_decay_factor = 0.7

    sequence_length = 144
    batch_size = 1

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    
    train_dataset = _input_fn(fn_args.train_files,fn_args.data_accessor,tf_transform_output,40)
    
    print('fn_args.train_files:',fn_args.train_files)
    print('fn_args.data_accessor:',fn_args.data_accessor)
    print('tf_transform_output:',tf_transform_output)
    print('train_dataset:')
    #for example in train_dataset.take(1):
    #  print(example)

    eval_dataset = _input_fn(fn_args.eval_files,40)
    
    print('fn_args.eval_files:',fn_args.eval_files)
    print('fn_args.data_accessor:',tf_transform_output)
    print('eval_dataset:',eval_dataset)

    model = _build_keras_model(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2,int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir,update_freq='batch')
    model.fit(
      train_dataset,steps_per_epoch=fn_args.train_steps,validation_data=eval_dataset,validation_steps=fn_args.eval_steps,callbacks=[tensorboard_callback])

    signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],dtype=tf.string,name='examples')),}
    model.save(fn_args.serving_model_dir,save_format='tf',signatures=signatures)

教练配置:

trainer = Trainer(
    module_file=os.path.abspath(trainer_module_file),custom_executor_spec=executor_spec.ExecutorClassspec(GenericExecutor),examples=transform.outputs['transformed_examples'],transform_graph=transform.outputs['transform_graph'],schema=schema_gen.outputs['schema'],train_args=trainer_pb2.TrainArgs(num_steps=10000),eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

错误

--------------------------------------------------------------------------- 
TypeError Traceback (most recent call last) <ipython-input-53-cc5cfdc341eb> in <module>() 
7 train_args=trainer_pb2.TrainArgs(num_steps=10000),8 eval_args=trainer_pb2.EvalArgs(num_steps=5000))
 ----> 9 context.run(trainer) 
/usr/local/lib/python3.6/dist-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args,**kwargs) 
65 # __IPYTHON__ variable is set by IPython,see 
66 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython. ---> 67 return fn(*args,**kwargs) 
68 else: 
69 absl.logging.warning(
 /usr/local/lib/python3.6/dist-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self,component,enable_cache,beam_pipeline_args) 
180 telemetry_utils.LABEL_TFX_RUNNER: runner_label,181 }): 
--> 182 execution_id = launcher.launch().execution_id 
183 
184 return execution_result.ExecutionResult( 
/usr/local/lib/python3.6/dist-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self) 
203 execution_decision.input_dict,204 execution_decision.output_dict,--> 205 execution_decision.exec_properties) 
206 
207 absl.logging.info('Running publisher for %s',/usr/local/lib/python3.6/dist-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self,execution_id,input_dict,output_dict,exec_properties) 
65 executor_context) # type: ignore 
66 
---> 67 executor.Do(input_dict,exec_properties) 

/usr/local/lib/python3.6/dist-packages/tfx/components/trainer/executor.py in Do(self,exec_properties) 
217 # Train the model 
218 absl.logging.info('Training model.') 
--> 219 run_fn(fn_args) 
220 
221 # Note: If trained with multi-node distribution workers,it is the user /content/trainer.py in run_fn(fn_args) 
160 
161 train_dataset = _input_fn(fn_args.train_files,--> 162 tf_transform_output,40) 
163 
164 print('fn_args.train_files:',fn_args.train_files) /content/trainer.py in _input_fn(file_pattern,data_accessor,batch_size) 
70 
71 dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
 ---> 72 dataset,batch_size=1) 
73 
74 #label_key=_transformed_name(LABEL_KEY) 
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/preprocessing/timeseries.py in timeseries_dataset_from_array(data,targets,sequence_length,sequence_stride,sampling_rate,batch_size,shuffle,seed,start_index,end_index) 
117 """ 
118 # Validate the shape of data and targets --> 
119 if targets is not None and len(targets) != len(data): 
120 raise ValueError('Expected data and targets to have the same number of ' 
121 'time steps (axis 0) but got ' 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/data/ops/dataset_ops.py in __len__(self) 
443 length = self.cardinality() 
444 if length.numpy() == INFINITE: --> 
445 raise TypeError("dataset length is infinite.") 
446 if length.numpy() == UNKNowN: 
447 raise TypeError("dataset length is unkNown.") 

TypeError: dataset length is infinite.

   

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)