python - 如何将 csv 转换为 apache beam 数据流中的字典

标签 python csv google-bigquery google-cloud-dataflow apache-beam

我想读取一个 csv 文件并使用 apache beam 数据流将其写入 BigQuery。为此,我需要以字典的形式将数据呈现给 BigQuery。我如何才能使用 Apache Beam 转换数据以执行此操作?

我的输入 csv 文件有两列,我想在 BigQuery 中创建一个后续的两列表。我知道如何在 BigQuery 中创建数据,这很简单,我不知道如何将 csv 转换为字典。下面的代码不正确,但应该让我了解我正在尝试做什么。

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

最佳答案

编辑:从版本 2.12.0 开始,Beam 带有新的 fileio 转换,允许您从 CSV 中读取而无需重新实现源。你可以这样做:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

我最近为此为 Apache Beam 编写了一个测试。你可以上the Github repository看看.


旧答案 依赖于重新实现源。这不再是这样做的主要推荐方式:)

想法是有一个源返回解析的 CSV 行。您可以通过将 FileBasedSource 类子类化以包含 CSV 解析来实现这一点。特别是,read_records 函数看起来像这样:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec

关于python - 如何将 csv 转换为 apache beam 数据流中的字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41170997/

相关文章:

python - BaseSpider 和 CrawlSpider 在一起

python - selenium、chromedriver_autoinstaller 和 pyinstaller 在一个文件中

android - 如何将我的 sqlite 数据库导出到 android 中的 csv?

google-bigquery - 在 BigQuery 中使用参数查看

python - 如何从过程中将项目 append 到全局列表

python - 从多个表中提取第一行并添加一列(Python)

c# - 当我尝试在 C# 中转义双引号时,我得到多个而不是一个

python - 如果行中的指定值匹配条件,则从 CSV 返回行

sql - BigQuery 中的 EXP() 返回浮点错误

sql - 谷歌 BigQuery : Add new column to a table with constant value