博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
BlockingQueue队列生产者消费者示例
阅读量:6845 次
发布时间:2019-06-26

本文共 3081 字,大约阅读时间需要 10 分钟。

  hot3.png

package org.web.controller.queue;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * 生产者线程 */public class Producer implements Runnable {	public Producer(BlockingQueue queue) {		this.queue = queue;	}	public void run() {		String data = null;		Random r = new Random();		System.out.println("启动生产者线程!");		try {			while (isRunning) {				System.out.println("正在生产数据...");				Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));				data = "data:" + count.incrementAndGet();				System.out.println("将数据:" + data + "放入队列...");				if (!queue.offer(data, 2, TimeUnit.SECONDS)) {					System.out.println("放入数据失败:" + data);				}			}		} catch (InterruptedException e) {			e.printStackTrace();			Thread.currentThread().interrupt();		} finally {			System.out.println("退出生产者线程!");		}	}	public void stop() {		isRunning = false;	}	private volatile boolean isRunning = true;	private BlockingQueue queue;	private static AtomicInteger count = new AtomicInteger();	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;}package org.web.controller.queue;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;/** * 消费者线程 *  */public class Consumer implements Runnable {	public Consumer(BlockingQueue
 queue) { this.queue = queue; } public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println("拿到数据:" + data); System.out.println("正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } private BlockingQueue
 queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;}package org.web.controller.queue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue
 queue = new LinkedBlockingQueue
(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); }}

转载于:https://my.oschina.net/xiejunbo/blog/612985

你可能感兴趣的文章
mysql 实例下具体查看库,和表的大小
查看>>
老鸟谈生产场景删除文件及目录经验要领
查看>>
Redis原理介绍
查看>>
Linux网络管理及命令详解
查看>>
MVC -设计模式之王
查看>>
我的友情链接
查看>>
整理了一些Java软件工程师的基础知识点
查看>>
find命令和xargs命令详解
查看>>
关于nginx 代理转发 造成的流量突增
查看>>
MySql,MariaDB 存储过程和函数研究
查看>>
H3C S5820x 期望风道方向的注意事项
查看>>
BitKeeper姻缘了断
查看>>
MyEclipse安装Thymeleaf插件
查看>>
java
查看>>
网络安全之CMD下的网络安全配置
查看>>
开源 java CMS - FreeCMS2.8 积分规则管理
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
warning C4273: ****.dll链接不一致
查看>>
汽车常识全面介绍 - 动力系统
查看>>