java - 读取线程中的解析消息

标签 java multithreading parsing

如果标题有点模糊,请原谅我。我会尽力更好地解释我想要实现的目标。

有一个名为 parsebytes 的函数,它是我实现的外部接口(interface)的一部分。它需要一个字节数组和一个长度。这个特定程序中的所有解析都在单个线程上运行,因此我希望尽快从解析字节中获取数据,以便它可以返回以获取更多数据。我的伪代码方法是这样的: 创建一个外部运行的线程(ParserThreadClass)。 每次调用 parsebytes 时,通过循环遍历所有字节并执行 byteQueue.add(bytes[i]) 将字节放入 ParserThreadClass 中的队列中。这段代码被一个synchronized(byteQueue)包围 实际上,这应该释放解析字节以返回并获取更多数据。

当这种情况发生时,我的 ParserThreadClass 也在运行。这是run()函数中的代码

while (!shutdown) //while the thread is still running
    {
        synchronized (byteQueue) 
        {
            bytes.addAll(byteQueue);  //an arraylist
            byteQueue.clear();
        }

        parseMessage();   //this will take the bytes arraylist and build an xml message.
    }

我在这里效率太低了吗?如果是这样,有人可以告诉我应该如何解决这个问题吗?

最佳答案

这就是我之前尝试解决问题的方法。基本上,您有一个生产者线程,就像这里一样,它读取文件并将项目放入队列中。然后你有一个工作线程从队列中读取内容并处理它们。代码如下,但它看起来与您正在做的基本相同。我发现这几乎没有给我带来任何加速,因为相对于磁盘读取,我需要对每行进行的处理非常快。如果您必须执行的解析非常密集,或者 block 非常大,那么您可能会发现通过这种方式可以加快一些速度。但如果它非常小,就不要期望看到太多性能改进,因为该过程受 IO 限制。在这些情况下,您需要并行化磁盘访问,而这在单台计算机上实际上无法做到。

public static LinkedBlockingQueue<Pair<String, String>> mappings;
public static final Pair<String, String> end =
    new Pair<String, String>("END", "END");
public static AtomicBoolean done;
public static NpToEntityMapping mapping;
public static Set<String> attested_nps;
public static Set<Entity> possible_entities;

public static class ProducerThread implements Runnable {
    private File f;

    public ProducerThread(File f) {
        this.f = f;
    }

    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(f));
            String line;
            while ((line = reader.readLine()) != null) {
                String entities = reader.readLine();
                String np = line.trim();
                mappings.put(new Pair<String, String>(np, entities));
            }
            reader.close();
            for (int i=0; i<num_threads; i++) {
                mappings.put(end);
            }
        } catch (InterruptedException e) {
            System.out.println("Producer thread interrupted");
        } catch (IOException e) {
            System.out.println("Producer thread threw IOException");
        }
    }
}

public static class WorkerThread implements Runnable {
    private Dictionary dict;
    private EntityFactory factory;

    public WorkerThread(Dictionary dict, EntityFactory factory) {
        this.dict = dict;
        this.factory = factory;
    }

    public void run() {
        try {
            while (!done.get()) {
                Pair<String, String> np_ent = mappings.take();
                if (np_ent == end) {
                    done.set(false);
                    continue;
                }
                String entities = np_ent.getRight();
                String np = np_ent.getLeft().toLowerCase();
                if (attested_nps == null || attested_nps.contains(np)) {
                    int np_index = dict.getIndex(np);
                    HashSet<Entity> entity_set = new HashSet<Entity>();
                    for (String entity : entities.split(", ")) {
                        Entity e = factory.createEntity(entity.trim());
                        if (possible_entities != null) {
                            possible_entities.add(e);
                        }
                        entity_set.add(e);
                    }
                    mapping.put(np_index, entity_set);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Worker thread interrupted");
        }
    }
}

编辑:

以下是启动生产者线程和工作线程的主线程的代码:

    Thread producer = new Thread(new ProducerThread(f), "Producer");
    producer.start();
    ArrayList<Thread> workers = new ArrayList<Thread>();
    for (int i=0; i<num_threads; i++) {
        workers.add(new Thread(new WorkerThread(dict, factory), "Worker"));
    }
    for (Thread t : workers) {
        t.start();
    }
    try {
        producer.join();
        for (Thread t : workers) {
            t.join();
        }
    } catch (InterruptedException e) {
        System.out.println("Main thread interrupted...");
    }

将生产者线程中完成的工作直接在主线程中完成也应该没问题,从而无需在主代码中启动和加入另一个线程。不过,请务必在浏览文件之前启动工作线程,并在完成工作后加入它们。不过,我不确定这种方式和我这里的方式之间的性能差异。

关于java - 读取线程中的解析消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13456056/

相关文章:

java - 类有两个同名属性 "actionsList"

java - 更改特定像素周围 24 个像素的最佳方法是什么?

c# - WPF:使用计时器在线程中构建队列

c# - 无法解析 DNS(有时?)

C++如何读取文件并解析逗号分隔值

python迭代一个非常大的文件流

java - 在 Java 中清理错误的 XML

Java ThreadPool 重用可运行对象,而不是为每个任务创建新对象

c++ - 基于生产者-消费者的多线程图像处理

java - 在 Java 中解析日期格式