python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道

标签 python google-cloud-dataflow airflow apache-beam

我正在运行一个 Apache Beam 管道(与 Google Dataflow 一起部署),该管道是通过 Apache Airflow 进行编排的。

DAG 文件如下所示:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator

import custom_py_file #beam job in this file 


default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 1),
    'email': ['<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="92f7fff3fbfed2f5fff3fbfebcf1fdff" rel="noreferrer noopener nofollow">[email protected]</a>'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

CONNECTION_ID = 'proj'

with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:


    lines = PythonVirtualenvOperator(
        task_id='lines',
        python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared 
        requirements=['apache-beam[gcp]', 'pandas'],
        python_version=3,
        dag=dag
    )

lines

梁管道文件(custom_py_file.py)如下:

def main():
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    import argparse
    import time


    class ETL(beam.DoFn):
        def process(self, row):
            #process data 

    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/input/input.txt',
            help='Input file to process.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=proj',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
              '--setup_file=/home/airflow/gcs/dags/setup.py',
              '--disk_size_gb=350',
              '--machine_type=n1-highmem-96',
              '--num_workers=24',
              '--autoscaling_algorithm=NONE'
              ]) 

        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p:
            rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
            etl = (rows | 'process data' >> beam.ParDo(ETL()))

        p.run().wait_until_finish()

    logging.getLogger().setLevel(logging.DEBUG)
    run()

我正在使用 PythonVirtualenvOperator因为我无法将 Python3 和 BashOperator 与我当前版本的 Airflow (版本:1.10.2-composer)一起使用,并且我需要 Python3 来运行此管道。

问题是,尽管运行成功,Airflow 仍提交了另一个数据流作业。请注意,这不是重试,因为日志显示这都是“一个”任务运行。但是,数据流日志显示它在成功运行一次后再次运行完全相同的作业。

dataflow logs

这是怎么回事?成功的数据流作业是否未输出 0 值?如果运行正确,如何让它继续执行下一个任务?谢谢!

最佳答案

事实上,它不被视为重试,并且第一个作业结束后会执行一个作业,这一事实让我怀疑类似于 this 的内容。 。检查您的 Python 代码,我发现您同时调用了 with beam.Pipeline()p.run():

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()

这将触发两次连续的执行。您可以选择其中一个选项(但不能同时选择两个选项):

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p = beam.Pipeline(options=pipeline_options)

rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()

关于python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58718045/

相关文章:

python - 将 n 个可区分的元素放入 k 个不可区分的盒子中

python - 使用 Pandas 将 JSON 转换为多个 DataFrame

python - pyglet:按下按钮时更改 Sprite 实例的图像

java - Apache Beam 中的嵌套前 N 个

python - 如何在虚拟环境中运行 Airflow PythonOperator

mysql - 如何在本地将 Airflow 连接到 SQLite?

Python如何在函数内部使用(函数F1内的变量)(F2在F1内)

java - 如何创建在固定时间间隔内触发一次且仅触发一次的流式Beam管道

google-cloud-dataflow - GroupByKey 转换的早期结果

redis - 如何在不使用mysql的情况下在airflow中使用CeleryExecutor