python - 如何从数据流中的谷歌存储桶中读取csv文件,合并,对数据流中的数据帧进行一些转换,然后将其转储到bigquery中?

标签 python google-cloud-platform google-cloud-dataflow apache-beam

我必须在 python 中编写一个 Dataflow 作业,它将从 GCS 读取两个不同的 .csv 文件,执行连接操作,对连接的数据帧的结果执行转换,然后最终将其发送到 BigQuery 表?
我对此很陌生,我知道我们可以从 apache.beam 进行所有管道操作
经过大量的研发。我终于找到了一个模板,但在给定的点上仍然有很多困惑。

import logging
import os

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions


os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'


class DataTransformation:
    """A helper class that translates a CSV into a format BigQuery will accept."""

     def __init__(self):
         dir_path = os.path.dirname(os.path.realpath(__file__))
         # Here we read the output schema from a json file.  This is used to specify the types
         # of data we are writing to BigQuery.
         self.schema = os.path.join(dir_path, 'resources',
                                    'gs://wahtch_dog_dataflow/schema.json')
        
     # Parse the input csv and convert into a BigQuery-savable dictionary.
     def read_all_from_url(beam.DoFn):
           with FileSystems.open(url) as f:
            return f.read()
    

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default = 'gs://wahtch_dog_dataflow/demo.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='watchdog_output.transformed')

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)

    # DataIngestion is a class we built in this script to hold the logic for
    # transforming the file into a BigQuery table.
    data_ingestion = DataTransformation()
    url = "gs://smart-ivr-dl-pushed-data"
    # Initiate the pipeline using the pipeline arguments passed in from the
    # command line. This includes information such as the project ID and
    # where Dataflow should store temp files.
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (
     p | beam.Create(urls)
       |'Reading latest file' >> beam.ParDo(read_all_from_url())
    
     # This stage of the pipeline translates from a CSV file single row
     # input as a string, to a dictionary object consumable by BigQuery.
     # It refers to a function we have written. This function will
     # be run in parallel on different workers using input from the
     # previous stage of the pipeline.
     | 'String To BigQuery Row' >>
         beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.BigQuerySink(
             # The table name is a required argument for the BigQuery sink.
             # In this case we use the value passed in from the command line.
             known_args.output,
             # Here we use the simplest way of defining a schema:
             # fieldName:fieldType

             ###### schema of the ivr
             schema=schema ,

             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()

        
if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)
     DataTransformation.run()
我的问题:
  • read_all_from_url:这里如何读取多个文件,这里的url是什么?它是存储桶的名称还是存储路径?
  • 如何从存储桶中读取模式,(上面我是从某个地方找到的,我们可以这样读取,但我怀疑它是否可以像上面那样读取模式)
  • 如何进行转型?就像我想执行 groupby 等。

  • 更新:
    .我能够从目录中读取两个文件。但每个文件都充当 pcollection。让我告诉你我的逻辑步骤 1) 从本地目录读取两个文件。 2)使用连接操作连接两个数据帧:我被困在这里 3)在这个连接数据帧中执行一些转换。
    class ReadOrc(beam.DoFn):
          def process(self, element):
               df = pd.read_csv(element) 
               yield df
            
    
    csv_lines = (p | beam.Create(urls) |
                'Reading latest file' >> beam.ParDo(ReadOrc()) 
              | 'transform' >> beam.ParDo(transform()))
    
    上面的代码从目录中读取 2 个文件,并且在 p 集合中具有类似 (df1, df2) 的值
    现在在转换中,我想加入数据框并进行预处理步骤。

    最佳答案

    我已经删除了所有额外的设置和配置,并且我正在共享一个应该 - 或多或少做你需要的小管道。
    但请考虑 BigQuery 应该能够在不使用 Dataflow 作业的情况下自行导入单个 CSV 文件。这会更有帮助吗?

    如果您仍然想使用 Dataflow 导入到 BQ,这个管道应该 - 或多或少 - 做到这一点:
    根据您的意见,我建议您尝试以下操作:

    import logging
    import io
    
    import apache_beam as beam
    from apache_beam.io import fileio
    from apache_beam.pipeline import PipelineOptions
    
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    
    (
     p 
     | beam.Create(urls)
     | 'Finding latest file' >> fileio.MatchAll()
     | 'Get file handlers' >> fileio.ReadMatches()
     | 'Read each file handler' >> beam.FlatMap(
           lambda rf: csv.reader(io.TextIOWrapper(rf.open())))
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             known_args.output,
             schema=schema ,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    
    p.run().wait_until_finish()
    

    如果您正在从 CSV 读取数据帧,您可以执行 yield df.iterrows() - 这会将数据框分解为单独的行 - 然后您可以加入它们。

    关于python - 如何从数据流中的谷歌存储桶中读取csv文件,合并,对数据流中的数据帧进行一些转换,然后将其转储到bigquery中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62503816/

    相关文章:

    Django Google App Engine,如何使用 Google Cloud Storage 提供静态文件

    google-cloud-dataflow - GCP Dataflow - 吞吐量逐渐减慢,Workers 未得到充分利用

    google-cloud-dataflow - 如何在beam sdk 2.0中创建自定义Combine.PerKey

    python - 通过广播逐元素添加稀疏 scipy 矩阵向量

    python - psycopg2 无法插入特定列

    python - pygame.time.wait() 使窗口卡住

    java - google bigquery 中的压缩在生成的文件中没有扩展名

    python - Pandas dataframe 使用某些条件将一列数据拆分为 2

    python - 从 Google Cloud 存储读取 csv 到 pandas 数据框

    google-compute-engine - 如何停止谷歌云数据流中的流管道