java - 在Spark Streaming应用程序中,如何在一行lines.foreachRDD()完成执行后执行lines.map()函数

标签 java spark-streaming

在 Spark Streaming 应用程序中,如何在lines.foreachRDD() block 执行完成后执行lines.map() 函数。我提供了一个我想要的最小示例:

public class Stackoverflow implements Serializable {

    public static List<DummyClass> list = null;

    public void init(String str) throws Exception {
        if (list == null) {
            synchronized (Stackoverflow.class) {
                if (list == null) {
                    list = new ArrayList<>();
                    for (int i = 0; i < 3; i++) {
                        list.add(new DummyClass());
                    }
                }
            }
        }
    }

    public JavaDStream<DataTuple> initFunction(JavaDStream<DataTuple> lines, final String str) throws Exception {

        lines.foreachRDD(
                new VoidFunction<JavaRDD<DataTuple>>() {
                    @Override
                    public void call(JavaRDD<DataTuple> dataTupleJavaRDD) throws Exception {
                        init(str);
                    }
                }
        );

        lines.map(new FinalTransformation(list));
        return lines;
    }

}

我希望lines.map()部分代码在其上面的部分之后执行,以便列表在执行时不为空。

最佳答案

不要运行 foreachrdd,而是运行一个映射,将输入更改为您想要的字符串和另一个映射。 Foreachrdd 并不是用于 rdd 的转换,这才是 map 的用途。

input.map(init).map(whatever)

关于java - 在Spark Streaming应用程序中,如何在一行lines.foreachRDD()完成执行后执行lines.map()函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37404390/

相关文章:

apache-spark - Apache Spark应用程序部署最佳实践

java - 是什么导致 Spark Kafka 连接器中出现 "unknown resolver null"?

hadoop - Spark 流 : HDFS

apache-spark - 是否有一个变量来识别 Spark 流中的每批数据?

hadoop - 动态加载文件时的 Spark Streaming 和 Data Locality

java - 当主线程仍在运行时,线程如何通知主线程

java - 指南:注入(inject)与接线

java - 如何在命令行上将系统属性传递给独立的 Java 应用程序

java - Log4j Fileappender 不会在它应该创建的位置创建文件

java - Tika 返回空字符串