java - 如何使用 Hazelcast 制作多线程应用程序

标签 java multithreading queue pool hazelcast

我正在尝试为我的毕业项目开发一个项目。我正在使用免费版本的 Hazelcast,因此无法向支持人员寻求帮助。

我编写了一个在单台计算机上运行的java应用程序。我使用 LinkedList 作为队列,并有一个包含 5 个工作线程的池。工作线程只是从队列中取出一个作业并执行它。

作业代码:

package com.stackoverflow.multithread.app;

import java.util.Date;

/**
 * Job
 */
public class Job implements Runnable {

    private final Object req;
    private final long createTime = new Date().getTime();

    public Job(Object req) {
        this.req = req;
    }

    public boolean isPoison() {
        return req == null;
    }

    public long getWaitTime(){
        return new Date().getTime() - createTime;
    }

    @Override
    public void run() {
        try {
             //Do the job
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

工作队列代码

package com.stackoverflow.multithread.app;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
 * WorkQueue
 */
public class WorkQueue {

    private static int minThreads;
    private static int maxThreads;
    private static final List<PoolWorker> threads = new ArrayList<PoolWorker>();
    private static final LinkedList queue = new LinkedList();
    private static WorkQueue instance = null;

    /**
     * WorkQueue
     */
    protected WorkQueue() {
        minThreads = 1;
        maxThreads = 5;

        for (int i = 0; i < minThreads; i++) {
            PoolWorker worker = new PoolWorker();
            threads.add(worker);
            worker.start();
        }
    }

    /**
     * getInstance
     *
     * @return Singleton WorkQueue instance
     */
    public static WorkQueue getInstance() {
        if (instance == null) {
            instance = new WorkQueue();
        }
        return instance;
    }

    /**
     * clone
     *
     * @return null
     * @throws CloneNotSupportedException: Singleton class can not be cloned.
     */
    @Override
    public WorkQueue clone() throws CloneNotSupportedException {
        throw new CloneNotSupportedException("Singleton class can not be cloned.");
    }

    public void execute(Job r) {
        synchronized (queue) {
            queue.addLast(r);
            manageWorkers();
            queue.notify();
        }
    }

    private void manageWorkers(){
        while ((queue.size() / 2 > threads.size() || (queue.size() > 0 && ((Job)queue.peekFirst()).getWaitTime() > 1000)) && threads.size() < maxThreads){
            PoolWorker worker = new PoolWorker();
            threads.add(worker);
            worker.start();
        }
        if (queue.size() < threads.size() && threads.size() > minThreads){
            execute(new Job(null)); //poison
        }
    }

    private class PoolWorker extends Thread {

        @Override
        public void run() {
            Job r;

            while (true) {
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            queue.wait();
                        } catch (InterruptedException ignored) {
                        }
                    }

                    r = (Job) queue.removeFirst();
                    manageWorkers();
                    if (r.isPoison()) {
                        break;
                    }
                }
                // If we don't catch RuntimeException, 
                // the pool could leak threads
                try {
                    r.run();
                } catch (RuntimeException e) {
                    e.printStackTrace();
                } catch (ExceptionInInitializerError e){
                    e.printStackTrace();
                } catch (Exception e){
                    e.printStackTrace();
                } 
            }
            threads.remove(this);
        }
    }
}

现在我想在主动-主动集群上进行这项工作,并希望使用 HazelCast (v3.3),以确保每个和所有作业都应该执行一次,即使其中一台计算机出现故障。

我检查了ExecutorService,但它一个接一个地执行作业(单线程)。我找不到一个好的解决方案来做到这一点。人们提到 ParallelExecutorService 但它要么在此版本中不可用,要么不属于免费版本。

请注意,我不必使用 Hazelcast 来执行此操作。任何免费的解决方案对我来说都很好。

有什么建议吗?

最佳答案

为什么不使用 IExecutorService,它实现了 Executor(Service) 接口(interface)。因此,您可以将一个任务放入其中(确保它是可序列化的),集群中的任何成员都可以接受该任务来处理它。

无需搞乱您自己的线程池。

以下是您可以查看的示例集: https://github.com/hazelcast/hazelcast-code-samples/tree/master/distributed-executor

关于java - 如何使用 Hazelcast 制作多线程应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27378444/

相关文章:

c++ - C++ 中多线程的 join() 和 detach() 有什么不同?

java - 在一个全局事务的范围内使用 JTA 同时调用对不同数据源的少量查询

c - 使用循环链表实现Queue

java - Windows-1252 编码 - 显示不正确的字符

java - 使用大型地理数据集在 ELKI 上运行 OPTICS 集群

Java服务器客户端绑定(bind)错误

java - Thread.sleep(0) 和 Thread.yield() 语句是否等效?

objective-c - "Block"主线程 (dispatch_get_main_queue()) 和(或不)定期运行 currentRunLoop - 有什么区别?

performance - 如何减慢或设置 Kafka 流消费者的给定速度?

java - 在任何标准库中是否有帮助程序类实现对 boolean 集合的逻辑操作?