我想创建自定义累加器,但在使用它们时我感觉不安全,因为我现在只能在本地测试它们。
我的问题是:
在创建累加器时,不变性是“必须”还是“应该”?
虽然我现在找不到链接/引用,但我已经读到累加器只允许不可变对象(immutable对象)。 但是在spark的api(1.6)中,AccumulableParam和AccumulatorParam的addInPlace方法有如下描述: “将两个累加值合并在一起。允许修改并返回第一个值以提高效率(避免分配对象)。”
哪个是正确的?如果允许可变对象,如何使用它们安全地创建累加器?
比方说,我有一个带有一个字段的可变类,并让该字段成为一个整数数组。当我们有一个可变类时如何覆盖 addInPlace 方法?
我应该写(选项 1):
public MyClass addInPlace(MyClass c1, MyClass c2){
c1.update(c2); //Where int array of c1 is updated(let's say we add two arrays) and c1 itself is returned.
return c1;
}
或者我应该写(Option2):
public MyClass addInPlace(MyClass c1, MyClass c2){
return update2(c1,c2); //Where a new MyClass object is returned with an array(created by adding arrays of c1 and c2)
}
选项 2 似乎更安全,但需要额外分配。但是,上面引用自 API 表示允许修改以避免分配。
此外,如果我有一个对象数组(比方说 MyClass2)而不是整数数组,我应该克隆对象还是使用对象本身。 假设我想为 MyClass2 的 PriorityQueue 创建一个累加器(也许我应该为这个问题输入另一个条目?)。
我将不胜感激关于累加器/Spark 的任何答案和高级引用/文档,尤其是在 Java 中。
编辑:
感谢 zero323 的回答。
我希望我能找到让我困惑的链接,但现在一切都清楚了。 但是,我还有两个问题。
1) 我遇到了以下累加器实现来跟踪在日志文件中看到的浏览器类型的次数。您可以从 ( https://brosinski.com/post/extending-spark-accumulators/) 查看详细信息。
实现如下:
public class MapAccumulator implements AccumulatorParam<Map<String, Long>>, Serializable {
@Override
public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) {
return mergeMap(t1, t2);
}
@Override
public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) {
return mergeMap(r1, r2);
}
@Override
public Map<String, Long> zero(final Map<String, Long> initialValue) {
return new HashMap<>();
}
private Map<String, Long> mergeMap( Map<String, Long> map1, Map<String, Long> map2) {
Map<String, Long> result = new HashMap<>(map1);
map2.forEach((k, v) -> result.merge(k, v, (a, b) -> a + b));
return result;
}
}
我的问题是:
为什么我们没有
map2.forEach((k, v) -> map1.merge(k, v, (a, b) -> a + b));
另外,假设我想要一个
Map<Integer, ArrayList<MyClass>> or ArrayList<ArrayList<MyClass>>
我可以有类似的东西吗(选项 1):
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
for(int i=0;i<a2.size();i++){
a1.get(i).addAll(a2.get(i))
}
return a1;
}
或者我应该写(选项 2):
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
ArrayList<ArrayList<MyClass>> result= new ArrayList<ArrayList<MyClass>>();
for(int i=0;i<a1.size();i++){
result.add(new ArrayList<MyClass>());
result.get(i).addAll(a1.get(i));
result.get(i).addAll(a2.get(i));
}
return result;
}
那么就蓄能器安全性而言,这两种选择之间有区别吗?
2) 说累加器不是线程安全的,是指一个 rdd 元素可以多次更新累加器吗?还是您的意思是在该过程中使用的对象可以由另一个线程从代码中的其他地方更改?
或者只有在将蓄电池运送给驱动程序时才会出现问题,如链接 zero323 shared ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulable.scala#L43 ) 中所写:
"如果这个 [[Accumulable]] 是内部的。内部 [[Accumulable]]s 将通过心跳报告给驱动程序。对于内部 [[Accumulable]]s,R
必须是线程安全,以便正确报告它们。”
对于冗长的条目,我深表歉意,但我希望它对社区也有帮助。
最佳答案
创建自定义累加器时是否需要不变性?不它不是。你已经发现 AccumulableParam.addAccumulator
和 AccumulableParam.addInPlace
明确允许修改第一个参数。如果你深入研究,你会发现这个场景实际上是在 AccumulatorSuite
中测试的。其中使用了以下参数:
new AccumulableParam[mutable.Set[A], A] {
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
t1 ++= t2
t1
}
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
t1 += t2
t1
}
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
new mutable.HashSet[A]()
}
}
直觉上,由于每个任务都有自己的累加器并以顺序方式对分区进行操作,因此不应该出现可变性成为问题的情况。
尽管如此,as stated somewhere else累加器不是线程安全的。因此,您可能应该忘记将累加器与分区级别的并行处理相结合。
关于java - 自定义累加器的不变性是 "must"还是 "should"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36188617/