鉴于 BigQuery 中的下表:
具有以下 5 个值:
还有一个简单的 ParDo 来读取它并打印类型:
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
public class FloatBug {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("<project_id>");
options.setWorkerMachineType("n1-standard-1");
options.setZone("us-central1-a");
options.setStagingLocation("<gcs_bucket>");
options.setNumWorkers(1);
options.setMaxNumWorkers(1);
options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(BigQueryIO.Read.from("FLOAT_BUG.float_bug")).apply(ParDo.of(new DoFn<TableRow, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
Object o = c.element().get("VHH");
if (o instanceof Double) {
System.out.println("Awesome. Got expected Double: " + o);
} else if (o instanceof Integer) {
System.out.println("Bummer. Got an Integer: " + o);
} else {
assert false;
}
}
}));
pipeline.run();
}
}
本地运行会为每个值返回一个 Double。这就是我所期望的:
Awesome. Got expected Double: 2.0
Awesome. Got expected Double: 2.245
Awesome. Got expected Double: 1.773
Awesome. Got expected Double: 4.567
Awesome. Got expected Double: 1.342
但是,使用 Dataflow 服务在云中运行会返回值 2.0
的 Integer
:
Awesome. Got expected Double: 2.245
Awesome. Got expected Double: 1.342
Awesome. Got expected Double: 1.773
Awesome. Got expected Double: 4.567
Bummer. Got an Integer: 2
它应该返回一个 Double
,而不是 2.0 的 Integer
最佳答案
观察结果是正确的。从 BigQuery 读取输入的管道可能会输出与 BigQuery 架构中的基础数据类型不同类型的数据。正如所观察到的,不同元素的类型也可能有所不同。
这是一个不幸的后果,因为 Dataflow Service 首先将数据从 BigQuery 导出到 Google Cloud Storage 中的 JSON 编码文件,然后从这些文件中读取数据。显然,JSON 不保留类型。例如, float 2.0
将被编码为字符串 "2"
,在 Java 中将被读取为 Integer
。使用 DirectPipelineRunner
执行管道时不会发生这种情况,因为该运行程序直接从 BigQuery 读取。
现在,避免此类问题的最简单方法是通过 Java 中的 Number
抽象类。这是 Double
和 Integer
等类的父类(super class)。将结果解释为 Number
,然后对其调用 doubleValue()
方法应该是安全的。
也就是说,展望 future ,我预计这种行为会改变。确切的时间表尚不清楚,但数据流服务的行为应该很快就会与本地执行相匹配。通过 Number
类的解决方法应该是正确的。
关于google-bigquery - 数据流在本地返回正确的类型,但在云中执行时返回正确的类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33937607/