python - 添加requirements.txt [Python]时数据流失败

标签 python google-cloud-dataflow dataflow requirements

因此,当我尝试使用 DataflowRunner 运行数据流并包含如下所示的 requirements.txt 时

google-cloud-storage==1.28.1
pandas==1.0.3
smart-open==2.0.0

每次在这条线上失败
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://..../beamapp-.../numpy-1.18.2.zip...
Traceback (most recent call last):
File "Database.py", line 107, in <module>
run()
File "Database.py", line 101, in run
| 'Write CSV' >> beam.ParDo(WriteCSVFIle(options.output_bucket, 
pandora_options.output_folder))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
    self.run().wait_until_finish()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
    self._options).run(False)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
    self.create_job_description(job)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
    resources = self._stage_resources(job.options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
    staging_location=google_cloud_options.staging_location)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
    pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
    local_path_to_artifact, artifact_name)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
    self.stage_file(to_folder, to_name, f, total_size=total_size)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
    response = self._storage_client.objects.Insert(request, upload=upload)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
    upload=upload, upload_config=upload_config)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
    response = send_func(self.stream.tell())
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
    start, additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
    return self.__SendMediaRequest(request, end)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
    retries=self.num_retries, check_response_func=CheckResponse)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
    max_retry_wait, total_wait_sec))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
    raise retry_args.exc
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
    cachekey,
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
    content,
httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.

这是我正在运行的命令
python Database.py     
--runner DataflowRunner     
--project XXX     
--staging_location gs://.../staging     
--temp_location gs://.../temp     
--template_location gs://.../Template     
--requirements_file requirements.txt

如果我删除 --requirements_file requirements.txt 它会完成但是当我尝试运行该作业时它会失败,因为它找不到包。
  • 我正在使用云存储列出存储桶中的所有文件,因此如果您有其他不涉及云存储的解决方案,将不胜感激

  • 这是我的 dataflow-requirements-cache 文件夹。在清理它之前,我有多个不同版本的文件,例如
    botocore-1.16.16.tar.gz
    botocore-1.16.17.tar.gz
    botocore-1.16.18.tar.gz
    

    我清理后它看起来像这样,(尝试上传numpy时仍然失败)
    numpy-1.18.4.zip
    urllib3-1.25.9.tar.gz
    smart_open-2.0.0.tar.gz
    six-1.15.0.tar.gz
    setuptools-47.1.0.zip
    s3transfer-0.3.3.tar.gz
    rsa-4.0.tar.gz
    requests-2.23.0.tar.gz
    pytz-2020.1.tar.gz
    python-dateutil-2.8.1.tar.gz
    pyasn1-modules-0.2.8.tar.gz
    pyasn1-0.4.8.tar.gz
    protobuf-3.12.2.tar.gz
    pandas-1.0.3.tar.gz
    jmespath-0.10.0.tar.gz
    idna-2.9.tar.gz
    googleapis-common-protos-1.51.0.tar.gz
    google-resumable-media-0.5.0.tar.gz
    google-cloud-storage-1.28.1.tar.gz
    google-cloud-core-1.3.0.tar.gz
    google-auth-1.15.0.tar.gz
    google-api-core-1.17.0.tar.gz
    docutils-0.15.2.tar.gz
    chardet-3.0.4.tar.gz
    certifi-2020.4.5.1.tar.gz
    cachetools-4.1.0.tar.gz
    botocore-1.16.18.tar.gz
    boto3-1.13.18.tar.gz
    boto-2.49.0.tar.gz
    

    - - 编辑 - -
    完整的输出
    (airflow) afragotsis-mac:pandora_database afragotsis$ python PandoraDatabase.py \
    >     --runner DataflowRunner \
    >     --project XXX \
    >     --staging_location gs://.../dataflow-template/PandoraDatabase/staging \
    >     --temp_location gs://.../dataflow-template/PandoraDatabase/temp \
    >     --template_location gs://.../dataflow-template/PandoraDatabase/pandoraTemplate \
    >     --requirements_file requirements.txt \
    >     --save_main_session True
    WARNING:apache_beam.options.pipeline_options:--region not set; will default to us-central1. Future releases of Beam will require the user to set --region explicitly, or else have a default set via the gcloud tool. https://cloud.google.com/compute/docs/regions-zones
    INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
    INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb...
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt in 0 seconds.
    INFO:apache_beam.runners.portability.stager:Executing command: ['/Users/afragotsis/opt/anaconda3/envs/airflow/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/numpy-1.18.4.zip...
    Traceback (most recent call last):
      File "PandoraDatabase.py", line 125, in <module>
        run()
      File "PandoraDatabase.py", line 119, in run
        | 'Write CSV' >> beam.ParDo(WriteCSVFIle(pandora_options.output_bucket, pandora_options.output_folder))
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
        self.run().wait_until_finish()
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
        self._options).run(False)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
        return self.runner.run_pipeline(self, self._options)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
        self.dataflow_client.create_job(self.job), self)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
        return fun(*args, **kwargs)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
        self.create_job_description(job)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
        resources = self._stage_resources(job.options)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
        staging_location=google_cloud_options.staging_location)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
        pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
        local_path_to_artifact, artifact_name)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
        return fun(*args, **kwargs)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
        self.stage_file(to_folder, to_name, f, total_size=total_size)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
        response = self._storage_client.objects.Insert(request, upload=upload)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
        upload=upload, upload_config=upload_config)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
        http_request, client=self.client)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
        return self.StreamInChunks()
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
        additional_headers=additional_headers)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
        response = send_func(self.stream.tell())
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
        start, additional_headers=additional_headers)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
        return self.__SendMediaRequest(request, end)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
        retries=self.num_retries, check_response_func=CheckResponse)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
        max_retry_wait, total_wait_sec))
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
        raise retry_args.exc
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
        check_response_func=check_response_func)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
        redirections=redirections, connection_type=connection_type)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
        redirections, connection_type)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
        redirections, connection_type)
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
        cachekey,
      File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
        content,
    httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.
    

    dataflow-requirements-cache 的完整路径
    /private/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache
    

    尝试上传 numpy 时总是失败

    最佳答案

    好的,所以无论我尝试什么,我都无法使其与需求文件一起使用。所以我尝试了安装文件。所以现在命令看起来像这样

    python Database.py     
    --runner DataflowRunner     
    --project XXX     
    --staging_location gs://.../staging     
    --temp_location gs://.../temp
    --template_location gs://.../Template       
    --setup_file /Users/.../setup.py \
    --save_main_session True 
    

    和安装文件是这个
    import setuptools
    
    REQUIRED_PACKAGES = [
              'google-cloud-storage==1.28.1',
              'pandas==1.0.3',
              'smart-open==2.0.0'
          ]
    
    PACKAGE_NAME = 'my_package'
    PACKAGE_VERSION = '0.0.1'
    
    setuptools.setup(
        name=PACKAGE_NAME,
        version=PACKAGE_VERSION,
        description='Example project',
        install_requires=REQUIRED_PACKAGES,
        packages=setuptools.find_packages(),
    )
    

    关于python - 添加requirements.txt [Python]时数据流失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62032382/

    相关文章:

    go - 在Apache Beam Dataflow管道中控制并行度

    java.lang.ClassCastException : com. google.gson.internal.LinkedTreeMap 无法转换为 java.util.LinkedHashMap

    stream - 数据流处理

    task-parallel-library - 您是否需要等待 TPL 数据流 DataflowBlock.NullTarget<T> 完成

    Python:txt 文件输出的停用词不是每行

    python - 根据浮点值查询数据帧索引

    python - 从二进制文件中提取数据并对其进行排序

    java - 使用前缀键删除行范围

    python - 当一个数据帧为空时迭代两个 pandas 数据帧错误

    azure-data-factory - 使用 Azure 数据工厂 (ADF) 数据流 (DF) 从/向 Azure Data Lake Store gen1 源和接收数据