python - Airflow 异常: DataFlow failed with return code 1

标签 python jar google-cloud-dataflow airflow

我正在尝试通过 Airflow 脚本执行数据流jar。为此,我使用 DataFlowJavaOperator。在 param jar 中,我传递本地系统中存在的可执行 jar 文件的路径。但是当我尝试运行此作业时,出现错误:

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
    _Dataflow(cmd).wait_for_done()
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
    self._proc.returncode))
Exception: DataFlow failed with return code 1`

我的 Airflow 脚本是:

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],

'dataflow_default_options': {
        'project': '<ProjectId>',
       # 'zone': 'europe-west1-d', (i am not sure what should i pass here)
        'stagingLocation': 'gs://spark_3/staging/'
    }
 }

 dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
 default_args=default_args)

 dataflow1 = DataFlowJavaOperator(
 task_id='dataflow_example',
 jar ='/root/airflow_scripts/csvwriter.jar',
 gcp_conn_id  = 'GCP_smoke', 
 dag=dag)

我不确定我犯了什么错误,有人可以帮助我摆脱这个

注意:我通过打包所有外部依赖项来创建此 jar,同时选择选项作为可运行 JAR 文件。

最佳答案

问题出在我使用的 jar 上。在使用 jar 之前,请确保 jar 按预期执行。

示例: 如果您的 jar 是 dataflow_job1.jar,请使用

执行该 jar
java -jar dataflow_job_1.jar --parameters_if_any

一旦您的 jar 成功运行,请继续在 Airflow DataflowJavaOperator jar 中使用该 jar。

此外, 如果您遇到与编码器相关的错误,您可能必须让自己的编码器来执行代码。 例如,我遇到了 TableRow 类的问题,因为它没有默认编码器,因此我必须弥补这一点:

表格行编码器:

public class TableRowCoder extends Coder<TableRow> {
private static final long serialVersionUID = 1L;
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of();
@Override
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException {
    tableRow.encode(value, outStream);

}
@Override
public TableRow decode(InputStream inStream) throws CoderException, IOException {
    return new TableRow().set("F1", tableRow.decode(inStream));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
    // TODO Auto-generated method stub
    return null;
}
@Override
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {


}
}

然后在您的代码中使用注册此编码器

pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder())

如果仍然存在错误(与编码人员无关),请导航至:

*.jar\META-INF\services\FileSystemRegistrar 

并添加可能出现的任何依赖项。

例如,可能会出现以下暂存错误:

Unable to find registrar for gs

我必须添加以下行才能使其正常工作。

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar

关于python - Airflow 异常: DataFlow failed with return code 1,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46175755/

相关文章:

python - 在 python 中的 CSV 文件的 2 列中写入 2 个列表

java - 如何在操作书中的 mahout 中运行示例

android - 在 JAR 文件中使用 R?

Python/Apache-光束 : How to Parse Text File To CSV?

查找未包含在另一个 python 列表中的 python 列表元素的 Pythonic 方法

python - 无法让 PyTest 在 VSCode 或终端中运行。未识别任何测试

python - GTK3:听主题变化

java - 如何在运行时加载 jar 文件

java - 如何为 Dataflow 作业指定多个输入路径

google-cloud-dataflow - 通过 Apache Beam 使用 ParquetIO 读写 parquet 文件的示例