java - 如何使用 Java 在 Dataflow Apache Beam 中将 PCollection<Row> 转换为整数

标签 java google-cloud-platform apache-beam dataflow

创建 pcollection

PCollection<Row> count = pt.apply(SqlTransform.query(Constants.total_count));
        PCollectionView<Long> outputCount = detail_count
                .apply("Row to long",
                        ParDo.of(new RowToLong())).apply(View.asSingleton());

查询

String total_count  = select sum(cast(col1 as INT)) as total_count from <table>

转换 RowToInteger 方法

public class RowToLong extends DoFn<Row, Integer> {
    public static final Logger LOG = LoggerFactory.getLogger(RowToLong.class.getName());    

    private PCollectionView<Integer> outputCount;      

    @ProcessElement
    public void processElement(ProcessContext context) {    
        

        Integer total_count= Long.valueOf(context.element().getInt64("total_count"));          
          context.output(total_count);    
    }

}

错误

java.lang.Integer cannot be cast to java.lang.Long

最佳答案

String total_count  = select sum(cast(col1 as bigint)) as total_count from <table>
Integer total_count = context.element().getInt64("total_count");

关于java - 如何使用 Java 在 Dataflow Apache Beam 中将 PCollection<Row> 转换为整数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71142185/

相关文章:

python-3.x - 数据流作业卡在从 Pub/Sub 读取

python - Apache Beam DataFlow 运行程序抛出设置错误

java - 使用 Jersey 的 JAX RS 的内存问题

java - 使用 Java 8 的 SonarQube 和 squidS2095

java - 我的关于Mysql和JAVA的代码有什么问题

python - 如何将谷歌云存储中的文件打开到云函数中

load-balancing - 如何使用 GCP 负载均衡器将所有 HTTP 请求重定向到 HTTPS

java - Spring的存储过程,解析结果Map

python-3.x - 从 Google Cloud Storage 中的 csv 读取 n 行以与 Python csv 模块一起使用

apache-beam - Apache Beam CloudBigtableIO 读/写错误处理