Java BlockingQueue 似乎在传输过程中损坏了数据

标签 java multithreading corruption blockingqueue

我有 n 个生产者线程通过 BlockingQueue 为 1 个消费者线程提供数据。我正在使用 .put 和 .take (后者当 .peek != null 时)。除了明显在传输过程中不可避免的数据损坏之外,这对于至少十几条消息来说工作正常。目前我只实例化一个生产者线程。

例如,生产者线程将识别一个矩形并设置对象值,然后通过该对象的获取的调试行显示这些值。损坏之前设置的值的示例;

22:13:36.797 [Thread-1] DEBUG a.i.AdvancedVideoAnalytics - ROI = {574, 88, 42x110}

然后,消费者获取消息,这里是一个调试行,其中提取的值与前一个线程中的方式完全相同。显示了“损坏”值集的示例;

22:13:36.887 [Thread-0] DEBUG a.i.AwarenessAnalytics - ROI = {574, -1, 42x89}

相关生产者代码;

FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
... 
frameWithMotionDetection = new FrameWithMotionDetection();
frameWithMotionDetection.setMotionData(contourAnalysisResults);
frameWithMotionDetection.setCurrentFrame(frameToExamine);
frameWithMotionDetection.setCamera(camera);
logger.debug("FrameWithMotionDetection.CameraID = {}", frameWithMotionDetection.getCamera().getCameraId());
System.out.println("Preparing to send message to AwarenessAnalytics thread");
try {
    queue.put(frameWithMotionDetection);
    }catch (InterruptedException ex) { 
       System.out.println("Exception in queue.put: " + ex );
    }

主应用程序线程生成消费者线程;

FrameWithMotionDetection frameWithMotionDetection = new FrameWithMotionDetection();
BlockingQueue<FrameWithMotionDetection> q = new ArrayBlockingQueue<FrameWithMotionDetection>(1024);
AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(q);

相关消费者代码;

public AwarenessAnalytics(BlockingQueue<FrameWithMotionDetection> q) {
          this.queue = q;
}
...
FrameWithMotionDetection frameWithMotionDetection;
private final BlockingQueue<FrameWithMotionDetection> queue;
...
while (queue.peek() != null){
    frameWithMotionDetection = new FrameWithMotionDetection();
    try {

        frameWithMotionDetection = queue.take();
        frameWithMotionDetectionFromQueue.add(frameWithMotionDetection);
        framesToEvaluate = true;
        }catch (InterruptedException ex) { 
           logger.error("Exception in queue.take: {}", ex );
        }

    logger.debug("FrameMsg received");
    }

生产者线程(AdvancedVideoAnalytics)由消费者线程产生;

tempIntermediateVA = new AdvancedVideoAnalytics(queue);

鉴于大多数数据传输的成功性质,BlockingQueue 是潜在问题还是我应该寻找其他地方?

更新:

正在努力在通过 BlockingQueue 发送之前最终确定某些变量。这需要一个定义为的构造函数;

public FrameWithMotionDetection(
    ContourAnalysisResults motionData,
    Mat currentFrame,
    Camera camera) {
    this.motionData = motionData;
    this.currentFrame = currentFrame;
    this.camera = camera;
}

现在我正在努力定义一个构造函数,它允许我简单地从queue.take调用中实例化对象;

frameWithMotionDetection = new FrameWithMotionDetection(queue.take());

或者这是错误的做法吗?

更新2:在.take()之后直接插入调试语句,很明显问题不是BlockingQueue,因此将检查其他方面。感谢大家的帮助。

更新3:事实证明,我传递的复杂对象并未在使用者中实例化为新对象。我以为我创建了一个新实例,甚至将对象中的一些变量设置为最终的。一旦我停止在生产者线程中重置和重用复杂对象(现在每次都创建一个新对象),问题就消失了。有几个人非常乐于助人,并向@markspace 致敬。

最佳答案

如果没有所有代码,很难准确说出问题所在。但根据您提供给我们的信息,您正在为所有线程使用共享的 FrameWithMotionDetection 对象。

如果您在与 BlockingQueue 相同的级别和范围内定义 FrameWithMotionDetection,那么您就做错了。

在方法中定义FrameWithMotionDetection,并且不要让其转义该方法。

<小时/>

这当然与BlockingQueue无关。

关于Java BlockingQueue 似乎在传输过程中损坏了数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30312095/

相关文章:

java - 原型(prototype) spring bean 只给我一个类的实例

java - 通过包装 LinkedHashSet 实现 IdentityLinkedHashSet

python - 在 Django 中,如何为每个线程创建一个数据库连接

eclipse - 如何修复/重新创建 Eclipse 工作区和/或 Maven 缓存/存储库?

java - 程序的 MySQL boolean 输入未按预期工作

java - Spring Boot - 依赖项的初始化顺序

java - java中这个线程安全高效吗

c++ - 多线程 - 系统创建的附加线程?

复制 Char * 数组会损坏数据

c++ - wchar_t* 奇怪的行为