java - BigQueryIO : Query configured via options, 但 "Value only available at runtime"

标签 java google-cloud-dataflow apache-beam

Apache 光束2.9.0

我已经建立了一个管道,从 BigQuery 中提取数据并对其进行一系列转换。这些选项使用 ValueProvider 附加了开始日期。 :

ValueProvider<String> getStartTime();

void setStartTime(ValueProvider<String> startTime);

然后我用 BigQueryIO 来提取数据(为了清楚地说明正在发生的事情而稍微改变一下):

BigQueryIO.read(
            (SerializableFunction<SchemaAndRecord, AggregatedRowRecord>)
                input -> new BigQueryParser().apply(input.getRecord()))
        .withoutValidation()
        .withTemplateCompatibility()
        .fromQuery(
            ValueProvider.NestedValueProvider.of(
                opts.getStartTime(),
                (SerializableFunction<String, String>)
                    input -> {
                      Instant instant = Instant.parse(input);

                      return String.format(
                          <large SQL statement with a %s in it>,
                          String.format(
                              "%d_%d_%d",
                              instant.get(ChronoField.YEAR),
                              instant.get(ChronoField.MONTH_OF_YEAR),
                              instant.get(ChronoField.DAY_OF_MONTH)));
                    }))
        .withCoder(<coder for AggregatedRowRecords>)
        .usingStandardSql()

然后将其正常添加到管道中 ( p.apply(<above>) )。

现在我运行它:

--project=<project> \
--tempLocation=<directory> \
--stagingLocation=<directory> \
--network=dataflow \
--subnetwork=<subnetwork> \
--defaultWorkerLogLevel=DEBUG
--appName=<name>
--runner=DirectRunner

这会导致以下错误:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=startTime, default=null}
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332)
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:302)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
        at <class>.main(<class>.java:<>)
Caused by: java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=startTime, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:228)
        at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.createBasicQueryConfig(BigQueryQuerySource.java:230)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.dryRunQueryIfNeeded(BigQueryQuerySource.java:175)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.getTableToExtract(BigQueryQuerySource.java:115)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.extractFiles(BigQuerySourceBase.java:102)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$2.processElement(BigQueryIO.java:783)

使用NestedValueProvider来自this example on setting up templates :

The user provides a substring for a BigQuery query, such as a specific date. The transform uses the substring to create the full query. Calling .get() returns the full query.

但是,删除值提供者逻辑似乎没有帮助。删除ValueProvider完全来自withQuery部分工作正常,但违背了能够通过选项设置它的目的。

最佳答案

异常解释了你的问题,Apache Beam首先构建管道和类,然后开始运行管道中的数据,在这个阶段,你无法访问选项,这只是构建管道的元数据.

克服这个问题的方法是创建一个 ParDo 函数/PTransform,它将在构造函数中获取您需要的选项作为参数,然后它可以在其逻辑中访问它。

参见示例:(我的用例,我前几天遇到了同样的问题)

管道:

   HistoryProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(HistoryProcessingOptions.class);
        Pipeline pipeline = Pipeline.create(options);

  pipeline.apply(SourceRead.of(options.getSourceBigQueryTable().get(),
            options.getSourceBigQueryDataset().get(),
            options.getSourceBigQueryProject().get(),
            options.getFromDate().get(),
            options.getToDate().get()
            ))

变压器本身:

public class SourceRead extends PTransform<PBegin, PCollection<TableRow>> {

private String sourceBigQueryTable;

private String sourceBigQueryDataset;

private String sourceBigQueryProject;

private String formDate;

private String toDate;

private static Logger logger = LoggerFactory.getLogger(SourceRead.class);


public SourceRead(String sourceBigQueryTable, String sourceBigQueryDataset, String sourceBigQueryProject, String formDate, String toDate) {
    this.sourceBigQueryTable = sourceBigQueryTable;
    this.sourceBigQueryDataset = sourceBigQueryDataset;
    this.sourceBigQueryProject = sourceBigQueryProject;
    this.formDate = formDate;
    this.toDate = toDate;
}

public static SourceRead of(String sourceBigQueryTable, String sourceBigQueryDataset, String sourceBigQueryProject, String yearToLoad, String dateToLoad) {
    return new SourceRead(sourceBigQueryTable, sourceBigQueryDataset, sourceBigQueryProject, yearToLoad, dateToLoad);
}





@Override
public PCollection<TableRow> expand(PBegin input) {
    String query = "SELECT * FROM TABLE_DATE_RANGE([" + sourceBigQueryProject + ":"+sourceBigQueryDataset+"."+sourceBigQueryTable+"],"
            + "TIMESTAMP('" + formDate + "'),"
            + "TIMESTAMP('" + toDate + "'))";
    logger.info("query is"+ query);
    return input.apply(BigQueryIO.readTableRows()
            .fromQuery(query));
}

关于java - BigQueryIO : Query configured via options, 但 "Value only available at runtime",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54848451/

相关文章:

python - ParDo 中的分区和多个输出之间的区别?

java - 有没有办法在不使用管道的情况下检查文件是否存在于 Apache Beam sdk 中

java - 共享首选项 edittext settext 错误

java - 并发请求事务以防止不必要的持久化

java - 如何读取资源文件? (谷歌云数据流)

google-cloud-dataflow - 如何在数据流中使用内存缓存?

google-cloud-dataflow - 使用 Cloud Dataflow 'side input' 在本地有效,但在云端执行时无效

redis - 使用 apache beam sdk 时无法连接到 Redis 服务器

Java Swing - jComboBox 不刷新

java - JSTL for every, var 包含方括号