python - 数据流批处理作业不缩放

标签 python google-cloud-platform google-compute-engine google-cloud-dataflow apache-beam

尽管 Dataflow 将目标工作人员设置为 1000,但我的 Dataflow 作业(工作 ID:2020-08-18_07_55_15-14428306650890914471)并未扩展超过 1 个工作人员。

作业配置为查询 Google Patents BigQuery 数据集,使用 ParDo 自定义函数和 transformers (huggingface) 库标记文本,序列化结果,并将所有内容写入一个巨大的 Parquet 文件。

我曾假设(在昨天运行作业后,它映射了一个函数而不是使用 beam.DoFn 类)问题是一些非并行对象消除了缩放;因此,将标记化过程重构为一个类。

这是脚本,使用以下命令从命令行运行:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz

脚本:

    import os
    import re
    import argparse
    
    import google.auth
    import apache_beam as beam
    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.runners import DataflowRunner
    
    
    from apache_beam.io.gcp.internal.clients import bigquery
    import pyarrow as pa
    import pickle
    from transformers import AutoTokenizer
    
    
    print('Defining TokDoFn')
    class TokDoFn(beam.DoFn):
        def __init__(self, tok_version, block_size=200):
            self.tok = AutoTokenizer.from_pretrained(tok_version)
            self.block_size = block_size
    
        def process(self, x):
            txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
            enc = self.tok.encode(txt)
    
            for idx, token in enumerate(enc):
                chunk = enc[idx:idx + self.block_size]
                serialized = pickle.dumps(chunk)
                yield serialized
    
    
    def run(argv=None, save_main_session=True):
        query_big = '''
        with data as (
          SELECT 
            (select text from unnest(abstract_localized) limit 1) abs_text,
            (select text from unnest(description_localized) limit 1) desc_text,
            (select text from unnest(claims_localized) limit 1) claims_text,
            publication_date,
            filing_date,
            grant_date,
            application_kind,
            ipc
          FROM `patents-public-data.patents.publications` 
        )
    
        select *
        FROM data
        WHERE
          abs_text is not null 
          AND desc_text is not null
          AND claims_text is not null
          AND ipc is not null
        '''
    
        query_sample = '''
        SELECT *
        FROM `client_name.patent_data.patent_samples`
        LIMIT 2;
        '''
    
        print('Start Run()')
        parser = argparse.ArgumentParser()
        known_args, pipeline_args = parser.parse_known_args(argv)
    
        '''
        Configure Options
        '''
        # Setting up the Apache Beam pipeline options.
        # We use the save_main_session option because one or more DoFn's in this
        # workflow rely on global context (e.g., a module imported at module level).
        options = PipelineOptions(pipeline_args)
        options.view_as(SetupOptions).save_main_session = save_main_session
    
        # Sets the project to the default project in your current Google Cloud environment.
        _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
        # Sets the Google Cloud Region in which Cloud Dataflow runs.
        options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    
        # IMPORTANT! Adjust the following to choose a Cloud Storage location.
        dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
        # Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
        options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
    
        # Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
        options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
    
        # The directory to store the output files of the job.
        output_gcs_location = f'{dataflow_gcs_location}/output'
    
        print('Options configured per GCP Notebook Examples')
        print('Configuring BQ Table Schema for Beam')
    
    
        #Write Schema (to PQ):
        schema = pa.schema([
            ('block', pa.binary())
        ])
    
        print('Starting pipeline...')
        with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
            res = (p
                   | 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
                   | beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
                   | beam.Map(lambda x: {'block': x})
                   | beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
                                            schema,
                                            record_batch_size=1000)
                   )
            print('Pipeline built. Running...')
    
    if __name__ == '__main__':
        import logging
        logging.getLogger().setLevel(logging.INFO)
        logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
        run()

最佳答案

解决方案有两个:

当我运行我的工作时,超出了以下配额,全部在“Compute Engine API”下(在此处查看您的配额:https://console.cloud.google.com/iam-admin/quotas):

  • CPU(我要求增加到 50 个)
  • 永久性磁盘标准 (GB)(我请求增加到 12,500)
  • In_Use_IP_Address(我要求增加到 50)

注意:如果您在作业运行时读取控制台输出,任何超过的配额都应作为信息行打印出来。

按照 Peter Kim 的上述建议,我将标志 --max_num_workers 作为命令的一部分传递:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22

然后我开始缩放!

总而言之,如果有一种方法可以在达到配额时通过 Dataflow 控制台提示用户,并提供一种简单的方法来请求增加配额(和推荐的补充配额),以及关于应要求增加的金额的建议。

关于python - 数据流批处理作业不缩放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63471735/

相关文章:

GCE 上的 Python : connection failed because connected host has failed to respond

python - 包上的 `del` 有某种内存

python - 使用正则表达式提取列表中的部分字符串

python - 尽管队列看起来是空的,但 Celery Redis 实例已满

google-cloud-platform - 如何使用python获取GCP项目的计费成本

google-cloud-platform - BigTable 节点的磁盘空间可以少于 2.5TB 吗?

python - 在 Python 中与 Pandas 进行大型合并时出现 MemoryError

python - Python 中的网络爬虫

google-compute-engine - 无法在 Google Compute Engine (GCE) 上安装 kubectl?

ssh - 如何使用 SSH 使用密码连接到 GCP VM 实例?