apache-beam - 使用 tensorflow transform 的 writeTransform 函数时出错

标签 apache-beam tensorflow-serving tensorflow-transform

我目前正在使用 Tensorflow 转换库来转换和保存转换,虽然它以前工作得很好,但目前我面临一些类似于下面的问题

我不断收到同样的错误,例如 -

'BeamDatasetMetadata' object has no attribute 'schema' [while running 'AnalyzeAndTransformDataset/TransformDataset/ConvertAndUnbatch']

是否有人熟悉上述错误,我们该如何解决?

我的转换函数如下所示 -

# ### Transformation Function
def transform_data(train_data_file, test_data_file, working_dir):
  """Transform the data and write out as a TFRecord of Example protos.
  Read in the data using the CSV reader, and transform it using a
  preprocessing pipeline that scales numeric data and converts categorical data
  from strings to int64 values indices, by creating a vocabulary for each
  category.
  Args:
    train_data_file: File containing training data
    test_data_file: File containing test data
    working_dir: Directory to write transformed data and metadata to
  """

  def preprocessing_fn(inputs):

    """Preprocess input columns into transformed columns."""
    outputs = {}

    # Scale numeric columns to have range [0, 1].
    for key in NUMERIC_FEATURE_KEYS:
      outputs[key] = tft.scale_to_0_1(inputs[key])

    # For all categorical columns except the label column, we use
    # tft.string_to_int which computes the set of unique values and uses this
    # to convert the strings to indices.
    for key in CATEGORICAL_FEATURE_KEYS:
      tft.uniques(inputs[key], vocab_filename=key)

    """ We would use the lookup table when the label is a string value
        In our case here Creative_id = 0/1 so we can direclty assign output as is
    """
    outputs[LABEL_KEY] = inputs[LABEL_KEY]

    return outputs

  # The "with" block will create a pipeline, and run that pipeline at the exit
  # of the block.
  with beam.Pipeline() as pipeline:
    with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
      # Create a coder to read the data with the schema.  To do this we
      # need to list all columns in order since the schema doesn't specify the
      # order of columns in the csv.

      ordered_columns = [
         'app_category', 'connection_type', 'creative_id', 'day_of_week',
       'device_size', 'geo', 'hour_of_day', 'num_of_connects',
       'num_of_conversions', 'opt_bid', 'os_version'
      ]
      converter = csv_coder.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)

      # Read in raw data and convert using CSV converter.  Note that we apply
      # some Beam transformations here, which will not be encoded in the TF
      # graph since we don't do the from within tf.Transform's methods
      # (AnalyzeDataset, TransformDataset etc.).  These transformations are just
      # to get data into a format that the CSV converter can read, in particular
      # removing empty lines and removing spaces after commas.
      raw_data = (
          pipeline
          | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
          | 'FilterTrainData' >> beam.Filter(
              lambda line: line and line != 'app_category,connection_type,creative_id,day_of_week,device_size,geo,hour_of_day,num_of_connects,num_of_conversions,opt_bid,os_version')
          | 'FixCommasTrainData' >> beam.Map(
              lambda line: line.replace(', ', ','))
          | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))

      # Combine data and schema into a dataset tuple.  Note that we already used
      # the schema to read the CSV data, but we also need it to interpret
      # raw_data.

      raw_dataset = (raw_data, RAW_DATA_METADATA)
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

      transformed_data, transformed_metadata = transformed_dataset

      transformed_data_coder = example_proto_coder.ExampleProtoCoder(transformed_metadata.schema)

      _ = (
          transformed_data
          | 'EncodeTrainData' >> beam.Map(transformed_data_coder.encode)
          | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
              os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE)))

      # Now apply transform function to test data.  In this case we also remove
      # the header line from the CSV file and the trailing period at the end of
      # each line.
      raw_test_data = (
         pipeline
          | 'ReadTestData' >> textio.ReadFromText(test_data_file, skip_header_lines=1)
          | 'FixCommasTestData' >> beam.Map(
              lambda line: line.replace(', ', ','))
          | 'DecodeTestData' >> beam.Map(converter.decode))

      raw_test_dataset = (raw_test_data, RAW_DATA_METADATA)

      transformed_test_dataset = ((raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      # Don't need transformed data schema, it's the same as before.
      transformed_test_data, _ = transformed_test_dataset

      _ = (
          transformed_test_data
          | 'EncodeTestData' >> beam.Map(transformed_data_coder.encode)
          | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
             os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE)))

      _ = (
          transform_fn
          | 'WriteTransformFn' >>
          transform_fn_io.WriteTransformFn(working_dir))

输出栈 -

pip show tensorflow-transform apache-beam

Name: tensorflow-transform
Version: 0.4.0
Summary: A library for data preprocessing with TensorFlow
Home-page: UNKNOWN
Author: Google Inc.
Author-email: tf-transform-feedback@google.com
License: Apache 2.0
Location: /usr/local/lib/python2.7/dist-packages
Requires: six, apache-beam, protobuf
---
Name: apache-beam
Version: 2.4.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: dev@beam.apache.org
License: Apache License, Version 2.0
Location: /usr/local/lib/python2.7/dist-packages
Requires: oauth2client, httplib2, mock, crcmod, grpcio, futures, pyvcf, avro, typing, pyyaml, dill, six, hdfs, protobuf
You are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

虽然上述问题似乎并不会一直发生!看起来它与其他包有一些冲突。

最佳答案

看不到,但这行看起来不完整:

```
converter = csv_coder.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)
```

一种可行的方法:

```
INPUT_SCHEMA = dataset_schema.from_feature_spec({
'label':tf.FixedLenFeature(shape=[], dtype=tf.float32),
'id': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'date': tf.FixedLenFeature(shape=[], dtype=tf.string),
'random': tf.FixedLenFeature(shape=[], dtype=tf.string),
'name': tf.FixedLenFeature(shape=[], dtype=tf.string),
'tweet': tf.FixedLenFeature(shape=[], dtype=tf.string),
})
```

```
converter_input = coders.CsvCoder(
['label','id','date','random','name','tweet'],
INPUT_SCHEMA,
delimiter=delimiter)
```

然后对于像您的实际问题一样接缝的转换步骤,这也是一个示例。

```
input_metadata = 
dataset_metadata.DatasetMetadata(schema=TRANSFORM_INPUT_SCHEMA)
TRANSFORM_INPUT_SCHEMA = dataset_schema.from_feature_spec({
'id': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'label': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'tweet': tf.FixedLenFeature(shape=[], dtype=tf.string),
'answer_to_nbr': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'nbr_of_tags': tf.FixedLenFeature(shape=[], dtype=tf.float32),
})
train_dataset = (train_dataset, input_metadata)
    transformed_dataset, transform_fn = (train_dataset
                                          | 'AnalyzeAndTransform' >> 
beam_impl.AnalyzeAndTransformDataset(
                 preprocessing_fn))
```

希望对您有所帮助 :) 如果您发布到您的 github 存储库,我可以查看完整代码,看看我是否可以提供帮助!祝你好运!

查看此存储库以获得帮助 https://github.com/Fematich/tftransform-demo

关于apache-beam - 使用 tensorflow transform 的 writeTransform 函数时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50183295/

相关文章:

python - 将 .txt 文件拆分为多个元素

java - Apache Beam 和 BigQuery

python - Apache Beam 中的异步 API 调用

tensorflow - 是否可以导出语法网模型 (Parsey McParseface) 以与 TensorFlow Serving 一起使用?

tensorflow-serving - 如何在 REST API 请求中指定模型版本标签?

python - 应用 TensorFlow Transform 来转换/缩放生产中的特征

java - 云数据流 Watermark 卡住并增加系统滞后

docker - 如何仅从没有 session 的keras .h5文件获取Tensorflow session

python - 如何在Google Cloud深度学习虚拟机上安装tensorflow-transform?

python - 训练期间未在检查点中导出用于预处理的 TF hub 模块变量