我正在用Java编程,我有一个List<LogEntry> log
,它在不同线程之间共享。
这些“writer”线程已经在它们之间同步,因此一次只有一个线程可以在log
中添加或删除元素。
但是,由于我要实现的分布式算法,日志的某些部分是“安全的”,这意味着它们既不能由作者也不能由读者修改(我将在下面介绍)。 log
的这一部分由字段int committedIndex
指示,该字段被初始化为0并单调增加。
总之,作者修改了log
范围内(commitIndex,log.size())
中的元素,而读者则获得了log
范围内[0,commitIndex]
中的元素。读者开始从第一个条目开始读取,然后阅读下一个条目,直到到达log.get(commitIndex)
为止,然后停止并进入休眠状态,直到commitIndex
增加。它会更新一个字段lastApplied
,该字段被初始化为0,并单调递增,以便记住他在休眠前阅读的最后一个logEntry
。
如您所见,由于读取器和写入器访问log
的不同部分,因此无需同步读取器和写入器。
我的问题是:增加commitIndex
后,如何“唤醒”读者的线程?我需要这样的东西(由作家执行):
if(commitIndex is updated)
{
//wake up reader
}
和读者:
public void run() {
while(true){
//go to sleeep...
//now the reader is awaken!
while(lastApplied<commitIndex){
//do something with log.get(lastApplied)
lastApplied++;
}
}
显然,我非常简化了我的代码,以使您尽可能地理解我想要的东西,如果它不够清晰,我感到很抱歉(不要犹豫,向我询问任何有关此问题的信息)。谢谢!
最佳答案
使用共享的 LinkedBlockingQueue<Integer>
(在读者和所有作者之间),让每个作者向读者传达commitIndex
变量已被修改的信号:
作家:
if (commitIndex is updated) {
// wake up reader
this.queue.add(commitIndex);
}
读者:
public void run() {
while (true) {
// take() puts this thread to sleep until a writer calls add()
int commitIndex = this.queue.take();
// now the reader is awaken!
while (lastApplied < commitIndex) {
// do something with log.get(lastApplied)
lastApplied++;
}
}
}
在这里,我为读者和所有作者使用了
queue
属性,该属性应对应于LinkedBlockingQueue
的相同实例。注意:异常处理留作练习。
关于java - 许多作家一个读者没有并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28854953/