我有一个基本类型 int 的输入数组,我想使用多个线程处理该数组并将结果存储在相同类型和大小的输出数组中。以下代码在内存可见性方面正确吗?
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ArraySynchronization2
{
final int width = 100;
final int height = 100;
final int[][] img = new int[width][height];
volatile int[][] avg = new int[width][height];
public static void main(String[] args) throws InterruptedException, ExecutionException
{
new ArraySynchronization2().doJob();;
}
private void doJob() throws InterruptedException, ExecutionException
{
final int threadNo = 8;
ExecutorService pool = Executors.newFixedThreadPool(threadNo);
final CountDownLatch countDownLatch = new CountDownLatch(width - 2);
for (int x = 1; x < width - 1; x++)
{
final int col = x;
pool.execute(new Runnable()
{
public void run()
{
for (int y = 0; y < height; y++)
{
avg[col][y] = (img[col - 1][y] + img[col][y] + img[col + 1][y]) / 3;
}
// how can I make the writes to the data in avg[][] visible to other threads? is this ok?
avg = avg;
countDownLatch.countDown();
};
});
}
try
{
// Does this make any memory visibility guarantees?
countDownLatch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
// can I read avg here, will the results be correct?
for (int x = 0; x < width; x++)
{
for (int y = 0; y < height; y++)
{
System.out.println(avg[x][y]);
}
}
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// now I know tasks are completed and results synchronized (after thread death), but what if I plan to reuse the pool?
}
}
我不想在 CountDownLatch 上同步。我想知道如何使输出数组的写入对其他线程可见。假设我有一个想要处理的数组(例如图像),我可以在多个单独的任务中执行此操作,将输入数组的 block 处理到输出数组中,写入之间没有相互依赖关系输出。所有计算完成后,我希望输出数组中的所有结果可供读取。我怎样才能实现这种行为?我知道可以通过使用submit和Future.get()而不是execute来实现,我想知道如何正确实现这样的低级机制?另请参阅代码附近的评论中提出的问题。
最佳答案
嗯,只是想知道你是否真的需要一个闩锁。数组本身是内存中的保留 block ,每个单元都是专用的内存地址。 (顺便说一句,将其标记为 volatile 只会将数组的引用标记为 volatile ,而不是数组的单元格,请参阅 here )。因此,仅当多个线程写入访问同一单元时,您才需要协调对单元的访问。
问题是,你真的在这样做吗?或者目标应该是:如果可能的话避免协调访问,因为这是有代价的。
在您的算法中,您对行进行操作,那么为什么不对行进行并行化,以便每个线程仅读取和计算整个数组的行段的值并忽略其他行?
即
- 线程 0 -> 第 0、8、15 行...
- 线程 1 -> 第 1、9、16 行...
- ...
基本上是这样的(尚未测试):
for (int n = 0; n < threadNo; n++) { //each n relates to a thread
pool.execute(new Runnable() {
public void run() {
for (int row = n; row < height; row += threadNo) { //proceed to the next row for the thread
for (int col = 1; col < width-1; col++) {
avg[col][row] = (img[col - 1][row] + img[col][row] + img[col + 1][row]) / 3;
}
}
};
});
}
因此它们可以对整个数组进行操作而无需同步。通过在关闭池后放置循环打印结果将确保所有计算线程都已完成,唯一需要等待的线程是主线程。
此方法的另一种替代方法是为每个线程创建一个大小为 100/ThreadNo
的 avg 数组,以便每个线程在数组上对其进行写操作,然后使用 合并数组>System.arraycopy()
到一个数组中。
如果您打算重用该池,则应使用 submit
而不是在 Futures 上执行并调用 get()
您可以从提交中获得。
Set<Future> futures = new HashSet<>();
for(int n = 0; ...) {
futures.add(pool.submit(new Runnable() {...}));
}
for(Future f : futures) {
f.get(); //blocks until the task is completed
}
如果您想读取数组的中间状态,如果可以接受单个单元格上不一致的数据,则可以直接读取它,或者使用 AtomicIntegerArray正如尼古拉斯·菲洛托建议的那样。
-- 编辑--
在编辑使用闩锁宽度而不是原始线程号以及所有讨论之后,我想添加几句话。
正如 @jameslarge 指出的,它是关于如何建立“发生在之前”的关系,或者如何保证操作 A(即写入)发生在操作 B(即读取)之前。因此两个线程之间的访问需要协调。有多种选择
- volatile 关键字 - 不适用于数组,因为它仅将引用标记为 volatile ,而不将值标记为 volatile
- 同步 - 悲观锁定(
synchronized
修饰符或语句) - CAS - 乐观锁定,被相当多的并发实现使用
然而,每个同步点(悲观或乐观)都会建立一个发生之前的关系。选择哪一种,取决于您的要求。
您想要实现的是主线程的读操作和工作线程的写操作之间的协调。如何实现取决于您和您的要求。 CountDownLatch 对作业总数进行倒计数是一种方法(顺便说一句,锁存器使用一个 volatile int
状态属性)。一个CyclicBarrier也可能是一个值得考虑的构造,特别是如果您想读取一致的中间状态。或者 future.get(),或者...
所有这些都归结为工作线程必须发出信号表示它们已完成写入,以便读取器线程可以开始读取。
但是请注意使用 sleep 而不是同步。 Sleep 并没有建立“happens before”关系,使用 sleep 进行同步是一种典型的并发 bug 模式。 IE。在最坏的情况下, sleep 会在任何工作完成之前执行。
关于java - 如何使对数组的写入对其他线程可见,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39202361/