java - 观察者-BlockingQueue

标签 java observer-pattern blockingqueue

我正在使用观察者模式和 BlockingQueue 来添加一些实例。现在在另一种方法中,我正在使用队列,但似乎 take() 永远在等待,即使我这样做:

/** {@inheritDoc} */
@Override
public void diffListener(final EDiff paramDiff, final IStructuralItem paramNewNode,
    final IStructuralItem paramOldNode, final DiffDepth paramDepth) {
    final Diff diff =
        new Diff(paramDiff, paramNewNode.getNodeKey(), paramOldNode.getNodeKey(), paramDepth);
    mDiffs.add(diff);
    try {
        mDiffQueue.put(diff);
    } catch (final InterruptedException e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mEntries++;

    if (mEntries == AFTER_COUNT_DIFFS) {
        try {
            mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
        } catch (final Exception e) {
            LOGWRAPPER.error(e.getMessage(), e);
        }
        mEntries = 0;
        mDiffs = new LinkedList<>();
    }
}

/** {@inheritDoc} */
@Override
public void diffDone() {
    try {
        mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
    } catch (final Exception e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mDone = true;
}

而 mDiffQueue 是一个 LinkedBlockingQueue,我这样使用它:

while (!(mDiffQueue.isEmpty() && mDone) || mDiffQueue.take().getDiff() == EDiff.INSERTED) {}

但我认为第一个表达式被检查,而 mDone 不为 true,那么也许 mDone 设置为 true (观察者总是多线程的?),但它已经在调用 mDiffQueue.take() 了? :-/

编辑:我现在真的不明白。我最近将其更改为:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
        if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
            break;
        }
    }
}

如果我在调试器中等待一段时间,它就会工作,但它也应该“实时”工作,因为 mDone 被初始化为 false,因此 while 条件应该为 true 并且正文应该被执行。

如果 mDiffQueue 为空且 mDone 为 true,则应跳过 while 循环体(这意味着队列不再被填充)。

编辑:似乎是:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
         if (mDiffQueue.peek() != null) {
             if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
                 break;
             }
         }
    }
}

尽管我不明白为什么 peek() 是强制性的。

编辑:

我正在做的是迭代一棵树,我想跳过所有插入的节点:

for (final AbsAxis axis = new DescendantAxis(paramRtx, true); axis.hasNext(); axis.next()) {
    skipInserts();
    final IStructuralItem node = paramRtx.getStructuralNode();
    if (node.hasFirstChild()) {
        depth++;
        skipInserts();
        ...

基本上计算树中的最大深度或级别,而不考虑在树的另一个版本中已删除的节点(用于比较旭日可视化),但是好吧,这可能超出了范围。只是为了说明我正在对尚未插入的节点执行某些操作,即使它只是调整最大深度。

问候,

约翰内斯

最佳答案

take() 是一个“阻塞调用”。这意味着它将阻塞(永远等待),直到队列中有东西为止,然后它将返回添加的内容。当然,如果队列中有东西,它会立即返回。

您可以使用 peek() 返回 take() 返回的内容 - 即 peek() 返回下一个项目,而不将其从队列中删除,或者如果队列中没有任何内容,则返回null。尝试在测试中使用 peek() (但也要检查 null)。

关于java - 观察者-BlockingQueue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7333146/

相关文章:

cocoa - KVO问题 "Cannot remove an observer"

java - 表达式 user-> { some code } 怎么可能是一个 Observer 对象呢?

java - java中的双观察者?

java - JPA/hibernate java.lang.NoSuchMethodError

java - 在方法和类中使用泛型和接口(interface)有什么区别

java - 如何立即释放等待 BlockingQueue 的线程

java - ArrayBlockingQueue 有两个条件 : notFull and notEmpty. 如何理解这两个条件?它是一种锁 split ?

c++ - 阻塞队列实现 : Where's the race condition?

java - Java代码中亲和传播的实现

Java 异常 - TransactionRolledbackLocalException