java - java.util 上的并发任务。从 BlockingQueue 轮询的多线程问题

标签 java multithreading

我收到了一个关于java.util.concurrent包的任务。我几乎完全成功了,但是有一些错误或错误。当队列为空并且运算符(operator)等待 5 秒时,poll 方法应该检索 null 并将其传递给运算符(operator),然后运算符(operator)回家。但这并没有发生。它检索 null 但不将其传递给运算符。对不起我的英语。)

public class Client extends Thread {

public CountDownLatch latch=new CountDownLatch(1);

private boolean waiting;

private final Random random=new Random();


public boolean isWaiting() {
    return waiting;
}

public void setWaiting(boolean isWaiting) {
    this.waiting = isWaiting;
}

private static final Logger LOGGER;


static {
    LOGGER = Logger.getLogger(Client.class);
    new DOMConfigurator().doConfigure("log4j.xml",
            LogManager.getLoggerRepository());
    LOGGER.setLevel(Level.INFO);
}

private int limitTime=new Random().nextInt(5000);

public void run(){
    ClientQueue.enqueueClient(this);
    while(waiting){
        if (random.nextBoolean()){
            try {
                latch.await(5, TimeUnit.SECONDS);
                if (!waiting) return;
                ClientQueue.removeFromQueue(this);
                reportTiredToWait();
                sleep(random.nextInt(1000)+500);
                ClientQueue.enqueueClient(this);
                reportDecidedToCallAgain();
            } catch (InterruptedException e) {
                LOGGER.info("Exception");
            }
        }
    }
}

public Client(String name) {
    super(name);
    this.waiting=true;
}

private void reportTiredToWait(){
    LOGGER.info("Client "+getName()+" was tired to wait and decided to hang up");
}

private void reportDecidedToCallAgain(){
    LOGGER.info("Client "+getName()+" decided to call again");
}

@Override
public String toString() {
    return "Client "+getName();
}

}

    public class ClientQueue {

    private static final Logger LOGGER;


    static {
        LOGGER = Logger.getLogger(ClientQueue.class);
        new DOMConfigurator().doConfigure("log4j.xml",
                LogManager.getLoggerRepository());
        LOGGER.setLevel(Level.INFO);
    }

    private static ClientQueue instance;

    private BlockingQueue<Client> queue;

    public static void printQueue(){
        System.out.println("LIST OF CLIENTS:");
        for (Client client :ClientQueue.getInstance().queue){
            System.out.println("CLIENT "+client.getName());
        }
        System.out.println("END OF LIST OF CLIENTS:");

    }

    private static ClientQueue getInstance()
    {
        if ( instance == null )
        {
            instance = new ClientQueue();
        }
        return instance;
    }

    private ClientQueue()
    {
        this.queue = new LinkedBlockingQueue<Client>();
    }

    public static void enqueueClient(Client client){
        getInstance().queue.add(client);
        reportClientEnqueued(client.getName());
    }

    public static void removeFromQueue(Client client){
        ClientQueue.getInstance().queue.remove(client);
        reportClientDeletedFromQueue(client.getName());
    }

    public static Client pollFirst(long time, TimeUnit timeUnit) throws InterruptedException{
        Client client=null;
            client = getInstance().queue.poll(time, timeUnit);

        if (client!=null){
            reportClientRetrievedFromQueue(client.getName());
        }
        return client;
    }

    private static  void reportClientEnqueued(String name){
        LOGGER.info("Client "+name+" was put on the waiting list");
    }

    private static void reportClientDeletedFromQueue(String name){
        LOGGER.info("Client " +name+" was deleted from waiting list");
    }

    private static void reportClientRetrievedFromQueue(String name){
        LOGGER.info("Client " +name+" was retrieved from waiting list");
    }

}

    public class Operator extends Thread{

    private static final Logger LOGGER;

    static {
        LOGGER = Logger.getLogger(Operator.class);
        new DOMConfigurator().doConfigure("log4j.xml",
                LogManager.getLoggerRepository());
        LOGGER.setLevel(Level.INFO);
    }

    private boolean running;

    public Operator(String name){
        super(name);
        running= true;
    }

    @Override
    public void run() {
        while (running){
                Client client=null;
                try {
                    client = ClientQueue.pollFirst(5, TimeUnit.SECONDS);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                if (client!=null){
                    String clientName=client.getName();
                    reportOperatorReceivedCall(clientName);
                    try {
                        client.setWaiting(false);
                        client.latch.countDown();
                        sleep(10000);
                        reportOperatorFinishedConversation(clientName);
                    } catch (InterruptedException e) {
                        LOGGER.error(e);
                    }
                } else{
                    reportOperatorFinishedHisWork();
                    running=false;
                }
        }
    }

    private void reportOperatorReceivedCall(String name){
        LOGGER.info("Operator "+getName()+" received call from Client "+name);
    }

    private void reportOperatorFinishedConversation(String name){
        LOGGER.info("Operator "+getName()+" finished conversation with Client "+name);
    }

    private void reportOperatorFinishedHisWork(){
        LOGGER.info("Operator "+getName()+" finished his work for today, he is too tired and decided to go home.");
    }


}

    public class Main {

    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);


        LinkedList<Client> clientList = new LinkedList<Client>();
        clientList.add(new Client("Vasya"));
        clientList.add(new Client("Tanya"));
        clientList.add(new Client("Petya"));
        clientList.add(new Client("Kolya"));
        clientList.add(new Client("Elena"));
        clientList.add(new Client("Anna"));
        for(int i = 0; i < clientList.size(); i++) {
            executor.schedule(clientList.get(i),  i, TimeUnit.SECONDS);
        }

        LinkedList<Operator> operatorList = new LinkedList<Operator>();
        operatorList.add(new Operator("Bob"));
        operatorList.add(new Operator("Sandra"));
        operatorList.add(new Operator("John"));
        for(int i = 0; i < operatorList.size(); i++) {
            executor.schedule(operatorList.get(i), 500, TimeUnit.MILLISECONDS);
        }
    }
}

最佳答案

ClientQueue.pollFirst 中有一个额外的分号。这里更正一下:

public static Client pollFirst(long time, TimeUnit timeUnit) throws InterruptedException{
    Client client=null;
        client = getInstance().queue.poll(time, timeUnit);

    if (client!=null) { // removed semicolon from this line
        reportClientRetrievedFromQueue(client.getName());
    }
    return client;
}

关于java - java.util 上的并发任务。从 BlockingQueue 轮询的多线程问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29176947/

相关文章:

multithreading - Delphi指针内存和释放

java - "Error creating bean with name ' scopedTarget.get '"接收 RabbitMQ 消息时

java - 所有类中都有变量,但仅在运行时才有值(value)

java - 为什么我的 JFrame 窗口会这样做?(检查描述)

java - 在 Ubuntu 14.04 的 chrome 中安装 java 插件

java - 在 RxJava 流程中添加线程池

c# - 线程同步 XNA 网络

java - 使用springfox创建对象json

java - spring ThreadPoolTask​​Executor 在每个方法调用上创建新线程

c# - 如何确定哪些逻辑核心共享同一个物理核心?