具有阻塞队列java的生产者-消费者设计模式

问题描述

我必须创建消息发送应用程序,向我们的客户发送短信。由于生成并向客户发送消息需要相当长的时间,因此我决定使用生产者和消费者模式来实现它。所以,这不会影响到原来的执行流程。

我将原始数据作为对象放入队列,这将由线程池中的一个消费者线程选择,然后生成消息并发送 SMS。一旦应用程序启动并运行,此流程应继续。

我的应用程序运行良好。但我发现消费者和生产者线程池创建的每个线程即使在完成发送 SMS 任务后仍处于等待状态。这是长时间运行的应用程序的问题还是我可以一直使用消费者和生产者线程池而不是每次调用 initializingSendMessage(RawData trxn) 方法时创建新的线程池?

MessageSendUtil 类用于创建公共队列和初始化任务。

public class SendMessageUtil {
    public static void initializingSendMessage(RawData trxn) {

        BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>();
        ExecutorService produceMessagePool = Executors.newFixedThreadPool(1);
        ExecutorService consumerMessagePool = Executors.newFixedThreadPool(5);
        try {
            produceMessagePool.submit(new Producer(sharedQueue));
            int i = 0;
            while (i++<5) {
                consumerMessagePool.submit(new Consumer(sharedQueue));
            }
            produceMessagePool.shutdown();
            consumerMessagePool.shutdown();
            
        } catch (Exception ex){
            System.out.println(ex.getMessage());
        }
    }

我的消费者和生产者类看起来像这样。

  public class Producer implements Runnable {
    private final BlockingQueue<Message> sharedQueue;
    public Producer(BlockingQueue<Message> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        try {
            Message message = new Message();
            message.setMessage("Test message sending");
            sharedQueue.add(message);
        } catch (Exception err) {
            err.printStackTrace();
        }
    }
}

/

public class Consumer implements Runnable {
    private final BlockingQueue<Message> sharedQueue;
    private MessageBroadcaster messageBroadcaster;
    public Consumer(BlockingQueue<Message> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        initializeMessageBroadcaster();

        //Send messages to costumer
        while(true){
            try {
                Message message = sharedQueue.take();
                messageBroadcaster.sendMessage(message);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

    private void initializeMessageBroadcaster() {
        if(Objects.isNull(messageBroadcaster)){
            messageBroadcaster = new MessageBroadcasterImpl();
        }
    }
}

多次调用initializingSendMessage(RawData trxn)后,活动线程显示如下。

live thread count

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)