我必须在 python 中编写一个 Dataflow 作业,它将从 GCS 读取两个不同的 .csv 文件,执行连接操作,对连接的数据帧的结果执行转换,然后最终将其发送到 BigQuery 表?
我对此很陌生,我知道我们可以从 apache.beam 进行所有管道操作
经过大量的研发。我终于找到了一个模板,但在给定的点上仍然有很多困惑。
import logging
import os
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'
class DataTransformation:
"""A helper class that translates a CSV into a format BigQuery will accept."""
def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
# Here we read the output schema from a json file. This is used to specify the types
# of data we are writing to BigQuery.
self.schema = os.path.join(dir_path, 'resources',
'gs://wahtch_dog_dataflow/schema.json')
# Parse the input csv and convert into a BigQuery-savable dictionary.
def read_all_from_url(beam.DoFn):
with FileSystems.open(url) as f:
return f.read()
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default = 'gs://wahtch_dog_dataflow/demo.csv')
parser.add_argument('--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='watchdog_output.transformed')
# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)
# DataIngestion is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
data_ingestion = DataTransformation()
url = "gs://smart-ivr-dl-pushed-data"
# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information such as the project ID and
# where Dataflow should store temp files.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(
p | beam.Create(urls)
|'Reading latest file' >> beam.ParDo(read_all_from_url())
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the simplest way of defining a schema:
# fieldName:fieldType
###### schema of the ivr
schema=schema ,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
DataTransformation.run()
我的问题:更新:
.我能够从目录中读取两个文件。但每个文件都充当 pcollection。让我告诉你我的逻辑步骤 1) 从本地目录读取两个文件。 2)使用连接操作连接两个数据帧:我被困在这里 3)在这个连接数据帧中执行一些转换。
class ReadOrc(beam.DoFn):
def process(self, element):
df = pd.read_csv(element)
yield df
csv_lines = (p | beam.Create(urls) |
'Reading latest file' >> beam.ParDo(ReadOrc())
| 'transform' >> beam.ParDo(transform()))
上面的代码从目录中读取 2 个文件,并且在 p 集合中具有类似 (df1, df2) 的值现在在转换中,我想加入数据框并进行预处理步骤。
最佳答案
我已经删除了所有额外的设置和配置,并且我正在共享一个应该 - 或多或少做你需要的小管道。
但请考虑 BigQuery 应该能够在不使用 Dataflow 作业的情况下自行导入单个 CSV 文件。这会更有帮助吗?
如果您仍然想使用 Dataflow 导入到 BQ,这个管道应该 - 或多或少 - 做到这一点:
根据您的意见,我建议您尝试以下操作:
import logging
import io
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.pipeline import PipelineOptions
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(
p
| beam.Create(urls)
| 'Finding latest file' >> fileio.MatchAll()
| 'Get file handlers' >> fileio.ReadMatches()
| 'Read each file handler' >> beam.FlatMap(
lambda rf: csv.reader(io.TextIOWrapper(rf.open())))
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,
schema=schema ,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
如果您正在从 CSV 读取数据帧,您可以执行
yield df.iterrows()
- 这会将数据框分解为单独的行 - 然后您可以加入它们。
关于python - 如何从数据流中的谷歌存储桶中读取csv文件,合并,对数据流中的数据帧进行一些转换,然后将其转储到bigquery中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62503816/