Java线程间通信不传递消息对象

标签 java multithreading listener communication

为了在线程之间进行通信,我遵循了Oracle Guarded Blocks example ,它很容易编译并运行。我的架构略有不同,因为我的消费者生成了生产者任务,尽管我在示例中尝试了这种变体并且它工作得很好。

我的主程序中的相关代码;

public static void main(String[] args) {
...
    FrameMsg frameMsg = new FrameMsg();
    AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(frameMsg);
    awarenessAnalytic.start();

来自消费者线程的相关代码;

public class AwarenessAnalytics extends Thread implements MotionEventListener{
    FrameMsg frameMsg;
    FrameWithMotionDetection frameWithMotionDetection;

      public AwarenessAnalytics(FrameMsg frameMsg) {
        this.frameMsg = frameMsg;
        System.out.println("AwarenessAnalytic frameMsg = " + this.frameMsg.hashCode());
        }
 AdvancedVideoAnalytics tempIntermediateVA;
 tempIntermediateVA = new AdvancedVideoAnalytics(frameMsg);

public void run() {

    tempIntermediateVA.start();

    while (true) {
        // TODO: create loop to process frames from each video stream
        frameWithMotionDetection = new FrameWithMotionDetection();
        // interthread message from AdvancedAnalytic
        System.out.println("Waiting for FrameMsg");
        frameWithMotionDetection = frameMsg.take();
        System.out.println("FrameMsg received");
}

生产者任务的相关代码;

public class AdvancedVideoAnalytics extends Thread {
  FrameMsg frameMsg;
  FrameWithMotionDetection frameWithMotionDetection;

public AdvancedVideoAnalytics (FrameMsg frameMsg) {
    this.frameMsg = frameMsg;
    System.out.println("AdvancedVideoAnalytic frameMsg = " + this.frameMsg.hashCode());
 }

// the run method includes;

// Send frame and any clusters detected
// as frameMsg
frameWithMotionDetection = new FrameWithMotionDetection();

frameWithMotionDetection.setMotionData(contourAnalysisResults);

frameWithMotionDetection.setCurrentFrame(frameToExamine);
System.out.println("Preparing to send message to AwarenessAnalytics thread");
frameMsg.put(frameWithMotionDetection);

FrameMsg 类;

public class FrameMsg {
// Message sent from video stream monitors to analytic fusion engine

private FrameWithMotionDetection frameWithMotionData;

//private String message;
// True if consumer should wait
// for producer to send message,
// false if producer should wait for
// consumer to retrieve message.
private boolean empty = true;

public synchronized FrameWithMotionDetection take() {
    // Wait until message is
    // available.
    System.out.println("Getting ready to take frameWithMotionData");
    while (empty) {
        try {
            wait(10);
            System.out.println("Waiting to take frameWithMotionData because empty = true");
        } catch (InterruptedException e) {}
    }
    // Toggle status.
    empty = true;
    System.out.println("Successfully took frameWithMotionData, empty = " + empty);
    // Notify producer that
    // status has changed.
    notifyAll();
    return frameWithMotionData;
}

public synchronized void put(FrameWithMotionDetection frameWithMotionData) {
    // Wait until message has
    // been retrieved.
    System.out.println("Getting ready to put frameWithMotionData");
    while (!empty) {
        try { 
            System.out.println("Waiting to put frameWithMotionData because empty = false");
            wait();
        } catch (InterruptedException e) {}
    }
    // Toggle status.
    empty = false;
    // Store message.
    this.frameWithMotionData = frameWithMotionData;
    System.out.println("Successfully put frameWithMotionData, empty = " + empty);
    // Notify consumer that status
    // has changed.
    notifyAll();
}

}

有趣的是,所有的frameMsg对象id都是相同的,我能够“放置”一个frameMsg并将生产者的空设置为false。然而,消费者看到的frameMsg对象总是为空返回'true'。

输出摘录如下所示;

VideoAnalyticsUnitTest frameMsg = 1704856573
AwarenessAnalytic frameMsg = 1704856573
AdvancedVideoAnalytic frameMsg = 1704856573

Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
(many of these)...
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Successfully put frameWithMotionData, empty = false
Waiting to take frameWithMotionData because empty = true
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Waiting to put frameWithMotionData because empty = false
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true

它会像最后三行一样继续,直到我终止程序。

我很困惑,因为; 1.我按照这个例子 2. 对象ID匹配

然而,消费者永远不会看到非空的frameMsg(这是一个复杂的对象)。

我错过了一些明显的事情吗?

我最初使用监听器发送消息,但我不希望巨大的应用程序占用监听器空间。现在阅读更多评论,似乎我可以使用监听器并使用阻塞队列将消息传递给消费者的运行部分。

如果是您,您会采用上述通信方法,还是恢复为具有阻塞队列的监听器?

最佳答案

正如@Bhargav Modi 指出的那样,代码遇到了编写多线程应用程序的更精细问题(同步块(synchronized block)与方法,在关键变量上使用 volatile 声明)。这些问题在测试过程中经常被遗漏,因为出现这些问题需要一定的机会(最臭名昭著的问题之一是 double checked locking )。

这是使用 Java concurrent 的充分理由类:编写非线程安全或存在多线程问题的代码的可能性较小。在您的情况下,SynchronousQueue看起来是一个很好的替代品。使用 SynchronousQueue,无需使用 empty 变量、this.frameWithMotionData 变量或 wait/notifyAll 机制。

关于Java线程间通信不传递消息对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29089699/

相关文章:

java - 如何在使用 Java 读取和处理数据库记录时进行异步消息传递

java - 编译后的 Eclipse maven 在 Eclipse 中看不到目标/类文件夹

java - DatePicker:点击 Button 旁边的监听器 react

android - 在 android 中更改应用程序的监听器

java - py4J 在多线程java上获取新的通信 channel 时出错

database - 是否可以监听关系数据库更新?

java - <foreach> 下的 <param> 不支持嵌套文件集元素

java - 为什么 Guava 不为小型 ImmutableLists 使用专门的类?

java - 这段代码是线程安全单例设计模式的正确示例吗?

multithreading - 在一个内核中的Spark worker上启动多个处理器线程