我现在正在学习并发性,并且我尝试编写一个程序,该程序应该在使用并发集合时演示先行发生关系。 如 java.concurrent 包中所述:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
下课我写了:
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class HappensBefore {
static int checks = 0;
static int shouldNotHappen = 0;
static Deque<Integer> syncList = new LinkedBlockingDeque<>();
public static boolean varToHappenBefore = false; //this var must be false when new element added to collection
static AtomicBoolean flag = new AtomicBoolean();
static class SyncTask implements Runnable {
int localTemp = -1;
private final Thread t = new Thread(new Counter());
@Override
public void run() {
t.start();
while (syncList.isEmpty()) { //just skip
}
while (true) {
if (!Thread.interrupted()) {
if (flag.get()) {
int r = syncList.peekLast();
if (r != localTemp) {
if (varToHappenBefore) {
shouldNotHappen++;
}
varToHappenBefore = true;
localTemp = r;
checks++;
}
flag.set(false);
}
} else {
t.interrupt();
break;
}
}
}
}
static class Counter implements Runnable {
int ctr = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
if (!flag.get()) {
flag.set(true);
varToHappenBefore = false;
syncList.add(ctr++);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SyncTask st = new SyncTask();
Thread s = new Thread(st);
s.start();
Thread.sleep(10000);//runtime ms
s.interrupt();
// s1.interrupt();
System.out.println("Elems times added: " + checks);
System.out
.println("Happens-before violated times: " + shouldNotHappen);
}
}
我所做的是启动线程 1,线程 1 将启动线程 2。 Thread1 检查最初设置为 false 的公共(public) boolean 值 varToHappenBefore。当 thread1 从集合中保存新元素时,它将此 boolean 值设置为 true。在下一个新元素上。如果此 boolean 值仍然为真,发生之前发生的违规,并且 shouldNotHappen 递增,则接收。
Thread1 检查并发集合是否有新元素,如果有,将其保存在临时变量中并递增总体计数器checks。 然后它切换原子 boolean 值让 thread2 添加新元素。在添加新元素之前,varToHappenBefore 设置为 false。由于原子 boolean 标志,thread2 不会在 thread1 之前运行代码。但是在线程 2 中切换标志是在添加元素和检查 varToHappenBefore 之前完成的,因为这两个操作(elem add 和 boolean toggle)是通过 atomic boolean 同步的。我确保 thread2 在 thread1 运行后只运行一次。如果 varToHappenBefore 发生在添加 elem 之前。到线程 2 中的集合,然后在线程 1 中读取(线程 1 检查 varToHappenBefore 仅当从集合中读取新元素时),然后 varToHappenBefore 在程序结束后必须保持为 0。 但我得到下一个结果:
元素添加次数:~10 000 000
happens-before 违规次数:0-10
可能我做错了什么,多线程是微妙而复杂的。希望得到您的帮助。
编辑:
我需要在 thread1 setFlag(false)
为真之后和获取 elem 之前逃避这种情况。从集合中,因为 thread2 然后可以在从集合中获取元素和在线程 1 中设置 varToHappenBefore = true;
之间工作。如果我制作 AtomicBoolean.compareAndSet()
检查 block ,那么我每 8mils 有 80k 次失败。而且它是可预测和明确的。但是,当我没有为 thread2 创建这样的窗口以在从集合读取和设置 boolean 值之间添加其他元素时,当 boolean 值为真并且新元素出现时,我仍然很少进行顺序读取。
最佳答案
您的示例分析和修复起来有点复杂。但是,如果您这样做只是为了了解并发收集情况下的 happens-before 契约,那么试试这个程序 -
public class HappensBeforeTest {
private static boolean producerStopped;
private static final Queue<String> queue = new LinkedBlockingQueue<>();
//private static final Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
private static class Producer implements Runnable {
public void run() {
int count = 0;
while (count++ < 10) {
producerStopped = (count == 10);
queue.add(String.valueOf(count));
}
System.out.println("Producer finished");
}
}
private static class Consumer implements Runnable {
public void run() {
while (!producerStopped) {
queue.poll();
}
System.out.println("Consumer finished");
}
}
当 queue
为 LinkedBlockingQueue
时,Producer
和 Consumer
线程都完成。
但是当 queue
是 LinkedList
时,有时 Consumer
没有完成并继续运行,因为 producerStopped
标志仍然存在对于 Consumer
可见为 false,即使 Producer
已将其更新为 true。
关于java - 并发收集先发生关系,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32428891/