java Queue队列实现生产消费模式

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(899)   2023-03-28 11:29:14

一、什么是Queue

什么是Java编程中的Queue(队列)详细介绍请参考上一篇:Java Queue队列使用入门详解 该文章详细讲解了Java编程中的queue是什么,基本的操作等。

二、Java中Queue如何实现生产消费模式

以下为队列服务代码:
$title(QueneService.java)
package com.example.demospringbootblockingqueue.test;

import java.util.concurrent.LinkedBlockingQueue;

/***
 * 队列服务
 */
public class QueneService {
    private static LinkedBlockingQueue<Persion> queue = new LinkedBlockingQueue<>();

    /**
     * 获取队列信息
     *
     * @return
     */
    LinkedBlockingQueue<Persion> getQueue() {
        return queue;
    }
}

可以看到上面的代码非常简单就是一个Java的list集合对象
接下来就是生产者了:

$title(Producer.java)
package com.example.demospringbootblockingqueue.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 生产数据
 */
public class Producer {
    QueneService queneService;
    public Producer(QueneService queneService){
        this.queneService=queneService;
    }

    public void createPersion(Persion persion){
        try {
            queneService.getQueue().add(persion);
        }catch (Exception e){
            System.out.println("数据添加到队列错误。");
        }
    }
}

生产者主要就是创建模拟的数据对象,然后放入队列中。

消费者:

$title(Consumer.java)
package com.example.demospringbootblockingqueue.test;

import com.alibaba.fastjson.JSON;

import java.text.SimpleDateFormat;
import java.util.Date;

/***
 * 消费数据
 */
public class Consumer {

    public Consumer(QueneService queneService) {
        init(queneService);
    }

    /***
     * 初始化
     */
    public void init(QueneService queneService) {
        try {
            while (true) {
                Persion persion = queneService.getQueue().take();//
                dealData(persion);
            }
        } catch (Exception e) {
            System.out.println("队列消费处理发生错误");
            e.printStackTrace();
        }
    }

    /**
     * 消费数据
     **/
    public void dealData(Persion persion) {
        try {
            //架设处理需要一定的时间
            System.out.println("开始消费---》》》");
            Thread.sleep(3000);
            System.out.println("["+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"]成功消费:" + JSON.toJSONString(persion));
        } catch (Exception e) {
            System.out.println("处理消费发生错误");
        }
    }

}

以上为消费者代码。
 

三、运行演示

演示代码
$title(Test.java)
package com.example.demospringbootblockingqueue.test;

import com.alibaba.fastjson.JSON;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Test {
    public static void main(String[] args) {
        try {
            //  队列创建
            QueneService queneService=new QueneService();
            //创建消费者
           new Thread(new Runnable() {//由于消费者会阻塞线程所以启动一个子线程进行处理
               @Override
               public void run() {
                   Consumer consumer=new Consumer(queneService);
               }
           }).start();
            //创建生产者
            Producer producer=new Producer(queneService);

            //循环生产
            for (int i=0;i<10;i++){
                if (i<5){
                    //
                    Thread.sleep(5000);//5秒生产一个
                    Persion persion=new Persion("name+"+i,"男");
                    producer.createPersion(persion);
                    System.out.println("["+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) +"]成功生产:"+ JSON.toJSONString(persion));
                }else {
                    //
                    Thread.sleep(1000);//2秒生产一个
                    Persion persion=new Persion("name+"+i,"女");
                    producer.createPersion(persion);
                    System.out.println("["+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) +"]成功生产:"+ JSON.toJSONString(persion));
                }
            }
            System.out.println("生产完毕。");

        } catch (Exception e) {
            System.out.println("测试发生了错误");
            e.printStackTrace();
        }
    }
}
运行测试代码,输出结果为:
com.example.demospringbootblockingqueue.test.Test
Connected to the target VM, address: '127.0.0.1:61742', transport: 'socket'
开始消费---》》》
[2019-04-24 13:36:48]成功生产:{"name":"name+0","sex":"男"}
[2019-04-24 13:36:51]成功消费:{"name":"name+0","sex":"男"}
开始消费---》》》
[2019-04-24 13:36:53]成功生产:{"name":"name+1","sex":"男"}
[2019-04-24 13:36:56]成功消费:{"name":"name+1","sex":"男"}
开始消费---》》》
[2019-04-24 13:36:58]成功生产:{"name":"name+2","sex":"男"}
[2019-04-24 13:37:01]成功消费:{"name":"name+2","sex":"男"}
开始消费---》》》
[2019-04-24 13:37:03]成功生产:{"name":"name+3","sex":"男"}
[2019-04-24 13:37:06]成功消费:{"name":"name+3","sex":"男"}
开始消费---》》》
[2019-04-24 13:37:08]成功生产:{"name":"name+4","sex":"男"}
[2019-04-24 13:37:09]成功生产:{"name":"name+5","sex":"女"}
[2019-04-24 13:37:10]成功生产:{"name":"name+6","sex":"女"}
[2019-04-24 13:37:11]成功消费:{"name":"name+4","sex":"男"}
[2019-04-24 13:37:11]成功生产:{"name":"name+7","sex":"女"}
开始消费---》》》
[2019-04-24 13:37:12]成功生产:{"name":"name+8","sex":"女"}
[2019-04-24 13:37:13]成功生产:{"name":"name+9","sex":"女"}
生产完毕。
[2019-04-24 13:37:14]成功消费:{"name":"name+5","sex":"女"}
开始消费---》》》
[2019-04-24 13:37:17]成功消费:{"name":"name+6","sex":"女"}
开始消费---》》》
[2019-04-24 13:37:20]成功消费:{"name":"name+7","sex":"女"}
开始消费---》》》
[2019-04-24 13:37:23]成功消费:{"name":"name+8","sex":"女"}
开始消费---》》》
[2019-04-24 13:37:26]成功消费:{"name":"name+9","sex":"女"}
通过输出结果我们可以看到,最初的时候生产一个数据的时间比消耗一个数据时间长,所以生产成功后就立即开始消费了。但是后面由于生产速度快了,消费就出现排队了,最终结果是以先进先出的顺序消费完毕的。
 
标签: quene 队列 Java
地址:https://www.leftso.com/article/584.html

相关阅读

java Queue队列实现生产消费模式,什么是Java编程中的Queue(队列),手动实现生产者消费者模式
Java Queue队列使用入门详解
简述在本博客中,我们将会创建一个reids的消息队列,Redis可以被当成消息队列使用
LinkedBlockingQueue 阻塞队列实现生产/消费模型package com.example.demospringbootqueueasynctask; import org.sl...
Java编程中Spring Boot整合RabbitMQ实现消息中间件RabbitMQ的使用
本文将讲述CLH锁的使用场景,什么情况适合使用CLH锁?Java 怎么使用CLH锁?
线程池创建 /** * 队列用线程 * @return */ @Bean(name = "queuePool") public Thread...
Java MongoDB驱动程序,下载/升级,Java驱动程序兼容性,第三方框架和库
Spring Boot MQTT协议通过spring boot整合apache artemis实现Java语言MQTT协议通信,搭建MQTT服务器可以参考上一篇 MQTT Java入门-搭建MQ...
1. 线程的安全性问题:线程安全和非线程安全: 一个类在单线程环境下能够正常运行,并且在多线程环境下,使用方不做特别处理也能运行正常,我们就称其实线程安全的