我正在运行一个 Apache Beam 管道(与 Google Dataflow 一起部署),该管道是通过 Apache Airflow 进行编排的。
DAG 文件如下所示:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="92f7fff3fbfed2f5fff3fbfebcf1fdff" rel="noreferrer noopener nofollow">[email protected]</a>'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj'
with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
lines = PythonVirtualenvOperator(
task_id='lines',
python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
lines
梁管道文件(custom_py_file.py
)如下:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import time
class ETL(beam.DoFn):
def process(self, row):
#process data
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
'--setup_file=/home/airflow/gcs/dags/setup.py',
'--disk_size_gb=350',
'--machine_type=n1-highmem-96',
'--num_workers=24',
'--autoscaling_algorithm=NONE'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
logging.getLogger().setLevel(logging.DEBUG)
run()
我正在使用 PythonVirtualenvOperator
因为我无法将 Python3 和 BashOperator 与我当前版本的 Airflow (版本:1.10.2-composer)一起使用,并且我需要 Python3 来运行此管道。
问题是,尽管运行成功,Airflow 仍提交了另一个数据流作业。请注意,这不是重试,因为日志显示这都是“一个”任务运行。但是,数据流日志显示它在成功运行一次后再次运行完全相同的作业。
这是怎么回事?成功的数据流作业是否未输出 0 值?如果运行正确,如何让它继续执行下一个任务?谢谢!
最佳答案
事实上,它不被视为重试,并且第一个作业结束后会执行一个作业,这一事实让我怀疑类似于 this 的内容。 。检查您的 Python 代码,我发现您同时调用了 with beam.Pipeline()
和 p.run()
:
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
这将触发两次连续的执行。您可以选择其中一个选项(但不能同时选择两个选项):
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p = beam.Pipeline(options=pipeline_options)
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
关于python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58718045/