character-encoding - Apache Beam/GCP 数据流编码问题

标签 character-encoding google-cloud-dataflow apache-beam

我正在数据实验室中“玩”apache beam/dataflow。 我正在尝试从 gcs 读取一个 csv 文件。 当我使用以下方法创建 pcollection 时:

lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://' + BUCKET_NAME + '/' + input_file, coder='StrUtf8Coder')

我收到以下错误:

LookupError: unknown encoding: "THE","NAME","OF","COLUMNS"

似乎列的名称被解释为编码?

我不明白哪里出了问题。 如果我不指定我得到的“编码器”

UnicodeDecodeError: 'utf8' codec can't decode byte 0xe0 in position 1045: invalid continuation byte

在 apache beam 之外,我可以通过从 gcs 读取文件来处理这个错误:

blob = storage.Blob(gs_path, bucket)
data = blob.download_as_string()
data.decode('utf-8', 'ignore')

我读到 apache beam 只支持 utf8 并且文件不只包含 utf8。

我应该下载然后转换为 pcollection 吗?

有什么建议吗?

最佳答案

一个可能的 hack 是创建一个继承自 Coder 类 ( apache_beam.coders.coders.Coder ) 的类

from apache_beam.coders.coders import Coder

class ISOCoder(Coder):
    """A coder used for reading and writing strings as ISO-8859-1."""

    def encode(self, value):
        return value.encode('iso-8859-1')

    def decode(self, value):
        return value.decode('iso-8859-1')

    def is_deterministic(self):
        return True

并将其作为参数传递给 beam 提供的 ReadFromText IO 转换 (apache_beam.io.textio.ReadFromText) 像这样

from apache_beam.io import ReadFromText

with beam.Pipeline(options=pipeline_options) as p:  
    new_pcollection = (  p | 'Read From GCS' >>
               beam.io.ReadFromText('input_file', coder=ISOCoder())

这背后的逻辑在这里详述

https://medium.com/@khushboo_16578/cloud-dataflow-and-iso-8859-1-2bb8763cc7c8

关于character-encoding - Apache Beam/GCP 数据流编码问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51935581/

相关文章:

python - 谷歌数据流: insert + update in BigQuery in a streaming pipeline

java - LayoutWrappingEncoder 的 LogBack 默认字符集?

mysql - 使用 utf8 的 MySQL 中列的字符串值不正确

mysql - 带有 JdbcIO 编写器的 ApacheBeam/DataFlow runner 创建了太多连接

mockito - 如何在 DoFn 单元测试中使用模拟?

java - 使用 PubSub 在本地运行 java 数据流管道

node.js - 在nodejs上的post请求中设置字符集

unicode - 为什么当我使用 (Apple 标志)unicode 时 Julia 返回 "\uf8ff"?

google-cloud-datastore - 如何将 Google Datastore CompositeFilter 与 Dataflow 结合使用?

java - Google Dataflow 作业在 eclipse 上运行时工作正常,但当我使用 mvn 编译它并使用 java -cp 命令运行文件时出现错误