python-2.7 - apache beam 2.7.0 在 utf-8 编码法语字符中崩溃

标签 python-2.7 google-cloud-platform google-cloud-datastore apache-beam

我正在尝试将 csv 从 Google 云平台的存储桶写入数据存储区,其中包含法语字符/口音,但我收到有关解码的错误消息。

尝试从“latin-1”编码和解码为“utf-8”但没有成功(使用unicode、unicodedata和编解码器 )我尝试手动更改内容...

我使用的操作系统默认采用“ascii”编码,我在“Anaconda3/envs/py27/lib/site.py”中手动更改为utf-8。

def setencoding():
    """Set the string encoding used by the Unicode implementation.  The
    default is 'ascii', but if you're willing to experiment, you can
    change this."""
    encoding = "utf-8" # Default value set by _PyUnicode_Init()
    sys.setdefaultencoding("utf-8")

我已经在本地尝试使用测试文件,通过打印然后将带有重音符号的字符串写入文件中,它成功了!

string='naïve café'
test_decode=codecs.utf_8_decode(string, "strict", True)[0]
print(test_decode)

with  open('./test.txt', 'w') as outfile:
    outfile.write(test_decode)

但是 apache_beam 运气不佳...

然后我尝试手动更改“/usr/lib/python2.7/encodings/utf_8.py”并放置“忽略”而不是“<强>严格”到codecs.utf_8_decode

def decode(input, errors='ignore'):
    return codecs.utf_8_decode(input, errors, True)

但我意识到 apache_beam 不使用此文件或至少不考虑任何更改

有什么想法可以处理吗?

请查看下面的错误消息

Traceback (most recent call last):
  File "etablissementsFiness.py", line 146, in <module>
    dataflow(run_locally)
  File "etablissementsFiness.py", line 140, in dataflow
    | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
  File "C:\Users\Georges\Anaconda3\envs\py27\lib\site-packages\apache_beam\pipel
ine.py", line 414, in __exit__
    self.run().wait_until_finish()
  File "C:\Users\Georges\Anaconda3\envs\py27\lib\site-packages\apache_beam\runne
rs\dataflow\dataflow_runner.py", line 1148, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow
pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", lin
e 156, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.nativ
e_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.nativ
e_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.nativ
e_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.nativ
e_operations.NativeReadOperation.start
    for value in reader:
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/textio.py", line 2
01, in read_records
    yield self._coder.decode(record)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/coders.py", li
ne 307, in decode
    return value.decode('utf-8')
  File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0xe9 in position 190: invalid continuation byte

最佳答案

尝试编写一个 CustomCoder 类并在解码时“忽略”任何错误:

from apache_beam.coders.coders import Coder

class CustomCoder(Coder):
    """A custom coder used for reading and writing strings as UTF-8."""

    def encode(self, value):
        return value.encode("utf-8", "replace")

    def decode(self, value):
        return value.decode("utf-8", "ignore")

    def is_deterministic(self):
        return True

然后,使用coder=CustomCoder()读取和写入文件:

lines = p | "Read" >> ReadFromText("files/path/*.txt", coder=CustomCoder())

# More processing code here...

output | WriteToText("output/file/path", file_name_suffix=".txt", coder=CustomCoder())

关于python-2.7 - apache beam 2.7.0 在 utf-8 编码法语字符中崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52853497/

相关文章:

android - 如何在 Chrome 扩展程序和 Android 应用程序之间传递数据/文件?

java - Google App Engine 数据存储区中的父键集群

OpenCV Python : No drawMatchesknn function

docker - 在 GCP 中 kubernetes 内的同一节点上从 Pod 到 Pod 的通信

linux -/var/log/daemon.log 占用的空间多了怎么减少呢?

java - App Engine Datastore - 将属性增量增加 2

python - 从 io 导入 BytesIO ImportError : cannot import name BytesIO

postgresql - Psycopg2 SQL 语句在查询生成器中工作,而不是在 Python 中工作

python - 使用 Python 的请求对表单数据进行 POST 请求

java - 如何在 Apache Beam 中以 byte[] 形式读取文件?