java - BlockingQueue.size() 在发布者-订阅者中返回错误的大小

标签 java multithreading concurrency publish-subscribe blockingqueue

我在实现“一个发布者 - 多个订阅者”模式时遇到问题。发布者使用固定大小的缓冲区并对消息进行排队。这些消息将发送给所有订阅者。订阅者获取消息的顺序必须与发布消息的顺序相同。

我使用 BlockingQueue 来保存发布者消息 (publisherQueue) 并将它们传递给每个订阅者 BlockingQueue (subscriberQueue)。

问题在于缓冲区和订阅者工作正常,但缓冲区大小 (publisherQueue.size()) 始终返回 1。

System.out.println("Actual number of messages in buffer: " + publisherQueue.size());

这是我的完整代码:

PublisherSubscriberService.java

package program;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class PublisherSubscriberService {
    private int buffer;
    private int subscribersNumber;
    static Set<subscriber> subscribers = new HashSet<subscriber>();

    public PublisherSubscriberService(int buffer, int subscribersNumber) {
        this.buffer = buffer;
        this.subscribersNumber = subscribersNumber;
    }

    public void addsubscriber(subscriber subscriber) {
        subscribers.add(subscriber);
    }

    public void start() {
        publisher publisher = new publisher(buffer);
        System.out.println("publisher started the job");

        for (int i = 0; i < subscribersNumber; i++) {
            subscriber subscriber = new subscriber(buffer);
            subscriber.setName(Integer.toString(i + 1));
            subscribers.add(subscriber);
            new Thread(subscriber).start();
            System.out.println("Subscriber " + subscriber.getName() + " started the job");
        }
        new Thread(publisher).start();
    }

    public class Publisher implements Runnable {
        private int buffer;
        final BlockingQueue<Message> publisherQueue;

        public Publisher(int buffer) {
            this.buffer = buffer;
            publisherQueue = new LinkedBlockingQueue<>(buffer);
        }

        @Override
        public void run() {
            for (int i = 1; i < 100; i++) {
                Message messageObject = new Message("" + i);
                try {
                    Thread.sleep(50);
                    publisherQueue.put(messageObject);
                    System.out.println("Queued message no " +         messageObject.getMessage());
                    System.out.println("Actual number of messages in buffer:     " + publisherQueue.size());
                    for (subscriber subscriber : subscribers) {
                        subscriber.subscriberQueue.put(messageObject);
                    }
                    publisherQueue.take();
                } catch (InterruptedException e) {
                    System.out.println("Some error");
                    e.printStackTrace();
                }
            }
        }
    }

    class Subscriber implements Runnable {
        private String name;
        private int buffer;
        final BlockingQueue<Message> subscriberQueue;

        public Subscriber(int buffer) {
            this.buffer = buffer;
            subscriberQueue = new LinkedBlockingQueue<>(buffer);
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {
            try {
                Message messageObject;
                while (true) {
                    Thread.sleep(100);
                    messageObject = subscriberQueue.take();
                    System.out.println(this.getName() + " got message: " + messageObject.getMessage());
                }
            } catch (InterruptedException e) {
                System.out.println("Some error");
                e.printStackTrace();
            }
        }
    }
class Message {
    private String message;

    public Message(String str) {
        this.message = str;
    }

    public String getMessage() {
        return message;
    }

}
}

PublisherSubscriberProgram.java

    package program;

public class ProducerConsumerProgram {

    public static void main(String[] args) {
        ProducerConsumerService service = new ProducerConsumerService(10, 3);
        service.start();
    }
}

最佳答案

您的发布商队列中的项目永远不会超过 1 个。每次循环时,您都会放置和取出一个项目:

                **publisherQueue.put(messageObject);**
                System.out.println("Queued message no " +         messageObject.getMessage());
                System.out.println("Actual number of messages in buffer:     " + publisherQueue.size());
                for (subscriber subscriber : subscribers) {
                    subscriber.subscriberQueue.put(messageObject);
                }
                **publisherQueue.take();**

根据您提供的代码,甚至拥有发布者队列也是有意义的。

关于java - BlockingQueue.size() 在发布者-订阅者中返回错误的大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28255559/

相关文章:

java - 无法在 SQL Server 上使用 Java 和 JDBC 执行存储过程

Java 从字符串中获取子字符串值

java - 将每行逗号分隔的文件拆分为数组

java - 如何防止hadoop流关闭?

c# - 字典在后台加载

java - 这里ThreadLocal的目的是什么?

C++ 条件等待停止执行

java - 并发和按顺序从 2 个服务器下载 block - Android

java - play sbt 无法在 scala docker 镜像上启动

iphone - 如何让一段代码在单独的线程中运行?