我正在用 Java 为 Spark 1.6.0 编写一个程序(因此,请不要在您的答案中提供 Scala 或 Python 代码),这是我想要实现的代码:
double i = 0d;
JavaRDD<Vector> ideas = objects.map(
new Function<BSONObject, Vector>()
{
@Override public Vector call(final BSONObject t) throws Exception
{
double[] xy = new double[2];
xy[0] = i++;
xy[1] = ((Long)((Date)t.get("date")).toInstant().toEpochMilli()).doubleValue();
return Vectors.dense(xy);
}
}
);
但 NetBeans 显示错误:“从内部类引用的局部变量必须是最终的或实际上是最终的”。
我还尝试使用 Spark Accumulators,但是如果我从我定义的 call() 方法调用 value() 方法,则会在作业期间引发 SparkException,告诉我“任务不可序列化”,然后作业失败。
那么,我怎样才能实现我的目标呢?
如果我的英语不完美(这不是我的母语),并且我的问题可能显得很菜鸟,我提前表示歉意,但我在网上找不到任何解决方案。
最佳答案
即使编译成功,它也不会按您的预期工作。每个执行器都有自己的闭包内引用的变量副本,并且任何修改都是严格本地的,不会传播回原始源。 Spark支持可写accumulators可以按如下方式使用:
Accumulator<Double> accum = sc.accumulator(0d);
objects.map(
...
accum.add(1d)
...
)
但是当在转换内部使用时,它们提供了非常弱的保证(称为至少一次),并且正如您已经意识到的那样,从工作人员的角度来看,它们是只写的。
关于你的代码,看起来你需要的只是 zipWithIndex
:
objects.zipWithIndex().map(...)
关于java - Java Spark 中 "for"循环中某种类型的计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35851978/