java - Java Spark 中 "for"循环中某种类型的计数器

标签 java apache-spark

我正在用 Java 为 Spark 1.6.0 编写一个程序(因此,请不要在您的答案中提供 Scala 或 Python 代码),这是我想要实现的代码:

double i = 0d;
JavaRDD<Vector> ideas = objects.map(
        new Function<BSONObject, Vector>()
        {
            @Override public Vector call(final BSONObject t) throws Exception
            {
                double[] xy = new double[2];
                xy[0] = i++;
                xy[1] = ((Long)((Date)t.get("date")).toInstant().toEpochMilli()).doubleValue();
                return Vectors.dense(xy);                    
            }
        }
);

但 NetBeans 显示错误:“从内部类引用的局部变量必须是最终的或实际上是最终的”。

我还尝试使用 Spark Accumulators,但是如果我从我定义的 call() 方法调用 value() 方法,则会在作业期间引发 SparkException,告诉我“任务不可序列化”,然后作业失败。

那么,我怎样才能实现我的目标呢?

如果我的英语不完美(这不是我的母语),并且我的问题可能显得很菜鸟,我提前表示歉意,但我在网上找不到任何解决方案。

最佳答案

即使编译成功,它也不会按您的预期工作。每个执行器都有自己的闭包内引用的变量副本,并且任何修改都是严格本地的,不会传播回原始源。 Spark支持可写accumulators可以按如下方式使用:

Accumulator<Double> accum = sc.accumulator(0d);

objects.map(
  ...
  accum.add(1d)
  ...
)

但是当在转换内部使用时,它们提供了非常弱的保证(称为至少一次),并且正如您已经意识到的那样,从工作人员的角度来看,它们是只写的。

关于你的代码,看起来你需要的只是 zipWithIndex :

objects.zipWithIndex().map(...)

关于java - Java Spark 中 "for"循环中某种类型的计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35851978/

相关文章:

apache-spark - 如何指定saveAsTable保存文件的路径?

java - 我无法按照 udacity 的教程生成 SHA1 key

java - 为什么 Guava 不为小型 ImmutableLists 使用专门的类?

java - Apache Spark Streaming 中的非关联聚合

apache-spark - google cloud dataproc 上的 java.lang.UnsatisfiedLinkError : jep. Jep.init(Ljava/lang/ClassLoader;ZZ)

apache-spark - VectorUDT 用法

scala - 如何基于列子集在 Apache Spark 中实现 `except`?

java - Java卷积

java - java中焦点丢失的问题

java - 使用 xml 为按钮充气以设置样式