java - 使用 PriorityBlockingQueue 提供记录的对象进行处理

标签 java multithreading priority-queue

我有一个应用程序,它从多个序列化对象日志中读取对象并将它们交给另一个类进行处理。我的问题集中在如何高效、干净地读取对象并将其发送出去。

代码是从应用程序的旧版本中提取的,但我们最终保持原样。直到上周才真正使用它,但我最近开始更仔细地查看代码以尝试改进它。

打开N ObjectInputStream s,并从每个流中读取一个对象并将其存储在一个数组中(假设下面的 inputStreams 只是与每个日志文件对应的 ObjectInputStream 对象的数组):

for (int i = 0; i < logObjects.length; i++) {
    if (inputStreams[i] == null) {
        continue;
    }
    try {
        if (logObjects[i] == null) {
            logObjects[i] = (LogObject) inputStreams[i].readObject();
        }
    } catch (final InvalidClassException e) {
        LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
    } catch (final EOFException e) {
        inputStreams[i] = null;
    }
}

序列化到文件的对象是 LogObject对象。这是LogObject类:

public class LogObject implements Serializable {

    private static final long serialVersionUID = -5686286252863178498L;

    private Object logObject;
    private long logTime;

    public LogObject(Object logObject) {
        this.logObject = logObject;
        this.logTime = System.currentTimeMillis();
    }

    public Object getLogObject() {
        return logObject;
    }

    public long getLogTime() {
        return logTime;
    }
}

一旦对象进入数组,它就会比较日志时间并发送最早时间的对象:

// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
    logObject = logObjects[i];
    if (logObject == null) {
        continue;
    }
    if (logObject.getLogTime() < minTime) {
        index = i;
        minTime = logObject.getLogTime();
    }
}

handler.handleOutput(logObjects[index].getLogObject());

我的第一个想法是为每个文件创建一个线程,读取对象并将其放入 PriorityBlockingQueue 中。 (使用自定义比较器,该比较器使用 LogObject 日志时间进行比较)。然后另一个线程可以取出这些值并将它们发送出去。

这里唯一的问题是,一个线程可以将一个对象放入队列中,并在另一个线程将一个可能较早时间的对象放入队列之前将其取出。这就是为什么在检查日志时间之前首先将对象读入并存储在数组中的原因。

这个约束是否会阻止我实现多线程设计?或者有什么方法可以调整我的解决方案以使其更加高效?

最佳答案

据我了解您的问题,您需要严格按顺序处理LogObjects。在这种情况下,您的代码的初始部分是完全正确的。这段代码的作用是merge sort多个输入流。您需要为每个流读取一个对象(这就是需要临时数组的原因),然后采用适当的(最小/最大)LogObject 并将句柄传递给处理器。

根据您的上下文,您也许可以在多个线程中进行处理。您唯一需要更改的是将 LogObjects 放入 ArrayBlockingQueue 中,并且处理器可能在多个独立线程上运行。另一种选择是发送 LogObjects 以在 ThreadPoolExecutor 中进行处理。最后一个选项更加简单明了。

但请注意其中的几个陷阱:

  • 为了使该算法正常工作,各个流必须已经排序。否则你的程序就会被破坏;
  • 当您进行并行处理时,严格来说消息处理顺序是没有定义的。因此所提出的算法仅保证消息处理开始顺序(调度顺序)。这可能不是你想要的。

所以现在你应该面临几个问题:

  1. 确实需要处理订单吗?
  2. 如果是,是否需要全局顺序(针对所有消息)或本地顺序(针对独立的消息组)?

这些问题的答案将对您进行并行处理的能力产生重大影响。

如果第一个问题的答案是,遗憾的是,并行处理不是一个选项。

关于java - 使用 PriorityBlockingQueue 提供记录的对象进行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33160082/

相关文章:

java - 什么是NullPointerException,我该如何解决?

java - Spring Boot Thymeleaf 字符编码为 UTF-8

java - 如何在java中使用同一个套接字连接进行多个http请求?

windows - 如何在Windows中找到特定线程的入口点?

c++ - 反转优先级队列中元素的顺序

java - elasticsearch 和 searchguard ssl 的 keystore 和信任库问题

java - 使用 synchronized 顺序执行线程

ios - Swift - 在 vi​​ewDidLoad() 中完成 http 请求之前查看负载

java - Java 中具有基于级别的通知的优先级队列

haskell - 如何实现一个最优的、纯功能性的、双端优先级队列?