python - 修改单个 BigQuery 列并写入新表

标签 python google-cloud-dataflow apache-beam

我想修改 BigQuery 中的单个列并将更新的数据写入新表,而无需手动保留所有其他列。我可以使用以下代码完成我想做的事情:

row = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))    
new_row = row | beam.Map(lambda x: (x["col1"], x["col2"], preprocess(x["text_col"]))
output = new_row | beam.Map(lambda (col1, col2, processed_text): {"col1": col1, "col2": col2, "text": processed_text}

output | beam.io.WriteToBigQuery(path_to_new_table)

但是,这要求我基本上手动编写并保留每一列 - 如果我有 100 多个列(或者实际上甚至 10 多个列),这会很快变得非常困惑和麻烦。是否有更简单的方法来在一行上运行某些函数(本例中为 preprocess() )并仅更新该列并仍然保留其他列?

最佳答案

感谢@jkff,我已经知道如何做到这一点。该函数应该接受并接收一个字典,然后您可以只修改该字典的单个元素。像这样的东西:

new_row = row | beam.Map(lambda x: preprocess_text(x, col_to_transform='text_column')`

其中 preprocess_text() 类似于:

def preprocess_text(row, col_to_transform):
  row_copy = row.copy()
  line = row_copy[col_to_transform]
  line = ... # preprocessing transform goes here
  row_copy[col_to_transform] = line

  return row_copy

关于python - 修改单个 BigQuery 列并写入新表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46690531/

相关文章:

python - 解释 Apache Beam python 语法

google-cloud-platform - 如何在使用 Google Cloud Dataflow 清除 Cloud Memorystore 中的缓存后插入数据?

python - 使用 docker 入口点和 docker 外部的文件

google-cloud-platform - 启动 Dataflow Flex 模板时图像引用不正确

hadoop - Google Dataflow 的工作流程编排

python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道

python - 在考虑 Pandas DataFrame 中的多列的同时对组进行迭代操作

python - 在 exe 中使用从 DLL 导出的 Python 对象

python - virtualenv 上的 Redhat Python 2.7.6 安装

python - 数据流未显示流管道的输出收集计数?