我想对多组输入值进行并行计算。我需要同步 calculate(a, b, inputIndex)
方法吗?
private static final String FORMULA = "(#{a} + #{b}) * (#{a} + #{b} * #{b} - #{a})";
private List<Pair<Integer, Integer>> input = Arrays.asList(
new ImmutablePair<>(1, 2),
new ImmutablePair<>(2, 2),
new ImmutablePair<>(3, 1),
new ImmutablePair<>(4, 2),
new ImmutablePair<>(1, 5)
);
private List<String> output = new ArrayList<>(Arrays.asList("", "", "", "", ""));
public void calculate() {
IntStream.range(0, input.size()).forEach(idx -> {
Pair<Integer, Integer> pair = input.get(idx);
Thread threadWrapper = new Thread(
() -> this.calculate(pair.getLeft(), pair.getRight(), idx)
);
threadWrapper.start();
});
try {
Thread.sleep(4000); // waiting for threads to finish execution just in case
System.out.println("Calculation result => " + output);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void calculate(Integer a, Integer b, int inputIndex) {
System.out.println("Thread with index " + inputIndex + " started calculation.");
Evaluator eval = new Evaluator();
eval.putVariable("a", a.toString());
eval.putVariable("b", b.toString());
try {
String result = eval.evaluate(FORMULA);
Thread.sleep(3000);
output.set(inputIndex, result);
System.out.println("Thread with index " + inputIndex + " done.");
} catch (EvaluationException | InterruptedException e) {
e.printStackTrace();
}
}
因为如果 calculate
方法的代码在 Runnable
的 run
方法中,我就不需要这样做了。 (而且我认为我在那里不需要同步集合,因为对于 input
我只通过索引获取数据,对于 output
我将元素放入特定位置)
最佳答案
重要的是要强调,尝试代码并获得正确的输出不足以证明程序的正确性,尤其是在涉及多线程时。在您的情况下,它可能会偶然工作,原因有两个:
您的代码中有调试输出语句,即
System.out.println(…)
,它引入了线程同步,如引用实现PrintStream
内部同步您的代码很简单,运行时间不够长,无法通过 JVM 进行深度优化
显然,如果您在生产环境中使用类似的代码,这两个原因可能都不存在。
为了获得正确的程序,即使将 calculate(Integer a, Integer b, int inputIndex)
更改为 synchronized
方法也是不够的。同步仅足以建立关于线程在同一对象上同步的happens-before关系。
您的启动方法 calculate()
不会在 this
实例上同步,它也不会执行足以建立 的任何其他操作happens-before 与计算线程的关系(比如调用Thread.join()
。它只调用Thread.sleep(4000)
,显然不会甚至保证其他线程在该时间内完成。此外,Java Language Specification states explicitly:
It is important to note that neither
Thread.sleep
norThread.yield
have any synchronization semantics. In particular, the compiler does not have to flush writes cached in registers out to shared memory before a call toThread.sleep
orThread.yield
, nor does the compiler have to reload values cached in registers after a call toThread.sleep
orThread.yield
.For example, in the following (broken) code fragment, assume that
this.done
is a non-volatile
boolean
field:while (!this.done) Thread.sleep(1000);
The compiler is free to read the field
this.done
just once, and reuse the cached value in each execution of the loop. This would mean that the loop would never terminate, even if another thread changed the value ofthis.done
.
请注意,示例中关于 this.done
的内容也适用于列表后备数组的数组元素。如果您不使用不可变的 String
实例,效果可能会更糟。
但是没有必要让整个方法同步
,只是数据交换必须是线程安全的。最干净的解决方案是使整个方法没有副作用,即将签名转为 String calculate(Integer a, Integer b)
并让方法返回结果而不是操作共享数据结构。如果该方法没有副作用,则不需要任何同步。
然后,调用者必须将结果值组装到一个 List
中,但由于您已经在使用 Stream API,因此此操作是免费的:
private static final String FORMULA = "(#{a} + #{b}) * (#{a} + #{b} * #{b} - #{a})";
private List<Pair<Integer, Integer>> input = Arrays.asList(
new ImmutablePair<>(1, 2),
new ImmutablePair<>(2, 2),
new ImmutablePair<>(3, 1),
new ImmutablePair<>(4, 2),
new ImmutablePair<>(1, 5)
);
public void calculate() {
List<String> output = input.parallelStream()
.map(pair -> this.calculate(pair.getLeft(), pair.getRight()))
.collect(Collectors.toList());
System.out.println("Calculation result => " + output);
}
private String calculate(Integer a, Integer b) {
System.out.println(Thread.currentThread()+" does calculation of ("+a+","+b+")");
Evaluator eval = new Evaluator();
eval.putVariable("a", a.toString());
eval.putVariable("b", b.toString());
try {
String result = eval.evaluate(FORMULA);
Thread.sleep(3000);
System.out.println(Thread.currentThread()+" with ("+a+","+b+") done.");
return result;
} catch (EvaluationException | InterruptedException e) {
throw new RuntimeException(e);
}
}
关于java - 线程运行方法内部使用的方法的同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44421378/