google-bigquery - 数据流在本地返回正确的类型,但在云中执行时返回正确的类型

标签 google-bigquery google-cloud-dataflow

鉴于 BigQuery 中的下表:

enter image description here

具有以下 5 个值:

enter image description here

还有一个简单的 ParDo 来读取它并打印类型:

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;

public class FloatBug {
    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setRunner(BlockingDataflowPipelineRunner.class);
        options.setProject("<project_id>");
        options.setWorkerMachineType("n1-standard-1");
        options.setZone("us-central1-a");
        options.setStagingLocation("<gcs_bucket>");
        options.setNumWorkers(1);
        options.setMaxNumWorkers(1);
        options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE);
        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(BigQueryIO.Read.from("FLOAT_BUG.float_bug")).apply(ParDo.of(new DoFn<TableRow, TableRow>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                Object o = c.element().get("VHH");
                if (o instanceof Double) {
                    System.out.println("Awesome. Got expected Double: " + o);
                } else if (o instanceof Integer) {
                    System.out.println("Bummer. Got an Integer: " + o);
                } else {
                    assert false;
                }
            }
        }));
        pipeline.run();
    }
}

本地运行会为每个值返回一个 Double。这就是我所期望的:

Awesome. Got expected Double: 2.0
Awesome. Got expected Double: 2.245
Awesome. Got expected Double: 1.773
Awesome. Got expected Double: 4.567
Awesome. Got expected Double: 1.342

但是,使用 Dataflow 服务在中运行会返回值 2.0Integer:

Awesome. Got expected Double: 2.245
Awesome. Got expected Double: 1.342
Awesome. Got expected Double: 1.773
Awesome. Got expected Double: 4.567
Bummer. Got an Integer: 2

它应该返回一个 Double,而不是 2.0 的 Integer

最佳答案

观察结果是正确的。从 BigQuery 读取输入的管道可能会输出与 BigQuery 架构中的基础数据类型不同类型的数据。正如所观察到的,不同元素的类型也可能有所不同。

这是一个不幸的后果,因为 Dataflow Service 首先将数据从 BigQuery 导出到 Google Cloud Storage 中的 JSON 编码文件,然后从这些文件中读取数据。显然,JSON 不保留类型。例如, float 2.0 将被编码为字符串 "2",在 Java 中将被读取为 Integer。使用 DirectPipelineRunner 执行管道时不会发生这种情况,因为该运行程序直接从 BigQuery 读取。

现在,避免此类问题的最简单方法是通过 Java 中的 Number 抽象类。这是 DoubleInteger 等类的父类(super class)。将结果解释为 Number ,然后对其调用 doubleValue() 方法应该是安全的。


也就是说,展望 future ,我预计这种行为会改变。确切的时间表尚不清楚,但数据流服务的行为应该很快就会与本地执行相匹配。通过 Number 类的解决方法应该是正确的。

关于google-bigquery - 数据流在本地返回正确的类型,但在云中执行时返回正确的类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33937607/

相关文章:

google-analytics - 来自 Google 分析的 BigQuery 设置

mysql - Bigquery : search multiple tables and aggregate with first_seen and last_seen

sql - 如何使用 SQL 在 BigQuery 中查询 BYTES 字段?

java - Apache-beam Bigquery .fromQuery ClassCastException

java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?

python-3.x - 在 Dataflow Python 中从 PubSub 读取 AVRO 消息

java - 在 Java SDK 中排序 BigQuery 结果

sql - Google Big Query SQL - 获取最新的列值

python - 在 Python 中从 Dataflow 连接到 CloudSQL

google-cloud-dataflow - 通过 Dataflow 从 Google Cloud Storage 读取大型 gzip JSON 文件到 BigQuery