多线程案例-阻塞式队列
创始人
2024-05-07 01:18:15
0

1.什么是阻塞队列

阻塞队列是一种特殊的队列,在"先进先出"的原则下又引入了"阻塞"功能

阻塞队列能是一种线程安全的数据结构,具有以下特性:

当队列满的时候,继续入队列就会阻塞,直到其它线程从队列中取走元素
当队列空的时候,继续出队列就会阻塞,直到其它队列向队列中插入元素

阻塞式队列的典型应用场景是"生产者消费者模型"

2.生产者消费者模型

生产者消费者模型通过一个容器来解决生产者和消费者的强耦合问题,生产者与消费者之间不直接通讯,通过阻塞队列来通讯,生产数据后放入阻塞队列,消费者可以直接从阻塞队列获取数据

生产者消费者模型能带来两个非常重要的好处:

1.阻塞队列能使生产者和消费者之间解耦
2.阻塞队列起到缓冲的作用,平衡了生产者消费者的处理能力

1.开发中典型的场景:服务器之间的相互调用

当客户端程序向A服务器发起一个请求后,A服务器将请求转发给B服务器处理,然后B服务器处理完成后将结果返回,此时可以视为:A调用了B

这种场景下,AB两个服务器的耦合程度是比较高的!!如果B服务器出现问题,也会引起A的bug,不仅如此,如果A再需要调用C服务器,还需要修改A的代码,非常麻烦..

针对这种场景使用生产者消费者模型,能有效降低耦合

此时,AB之间的耦合就降低很多了,AB都只知道队列的存在,A的代码不与B相关,B的代码也不与A相关,AB任何一方出现问题不会影响到另一方

2.服务器开发中,用户发送的请求的数量是不可控的,如果没有充分的准备并且请求量超过了服务器的承受范围,服务器有可能直接被大量的请求冲垮.例如"秒杀"这种场景,服务器就会同一时刻受到大量请求,这个时候可以把这些请求放到阻塞队列中,让线程慢慢处理,能有效防止服务器被大量的请求冲垮

在Java标准中内置有阻塞队列BlockingQueue,是一个接口,实现类是LinkedBlockingQueue,put方法是入队列,take是出队列.这两个方法具有阻塞特性

下面使用标准库中的阻塞队列实现生产者消费者模型

public class ThreadDemo1 {public static void main(String[] args) {BlockingQueue queue = new LinkedBlockingQueue<>();//消费者Thread customer = new Thread(()->{while(true){try {int value = queue.take();System.out.println("消费元素: "+value);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"消费者");customer.start();//生产者Thread producer = new Thread(()->{Random random = new Random();while(true){try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}int num = random.nextInt(1000);System.out.println("生产元素: "+num);try {queue.put(num);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"生产者");producer.start();try {customer.join();producer.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

结果

我们可以发现,生产和消费是成对出现的,程序开始运行,因为在生产者线程中让线程休眠500ms后再执行,此时阻塞队列中为空,而消费者线程要再阻塞队列中使用take方法取元素,就会陷入阻塞状态,等到阻塞队列中有生产者插入元素后才继续执行取元素!!

接下来通过"循环队列"来实现一个阻塞队列

3.阻塞队列实现

实现阻塞队列是要实现一个普通队列然后加上"阻塞功能"

这里我们使用循环队列实现阻塞队列,下面是循环队列的三种状态

我们先写一个普通的循环队列的入队和出队操作

class MyBlockingQueue{private int[] items = new int[1000];private int head = 0;private int tail = 0;private int size = 0;//入队列public void put(int value){if(size == items.length){//队列满了.不能插入return;}items[tail] = value;tail++;//针对tail的处理//1)这个写法非常常见//tail = (tail+1)%items.length;//2)可读性好并且比求余的代码效率高if(tail >= items.length){tail = 0;}//插入成功size++;}//出队列public Integer take(){if(size == 0){//队列为空,不能出队return null;}int result = items[head];head++;if(head >= items.length){head = 0;}size--;return result;}
}

我们在队列中插入几个元素并取出

 public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();queue.put(1);queue.put(2);queue.put(3);queue.put(4);int result = 0;result = queue.take();System.out.println(result);result = queue.take();System.out.println(result);result = queue.take();System.out.println(result);result = queue.take();System.out.println(result);}

现在我们在普通队列的基础上加上阻塞功能

阻塞功能意味着该队列是要在多线程环境下使用的

多线程环境下要保证线程安全,需要给方法加锁
使用wait()notify()方法添加阻塞功能
当队列为空时和队列满时都需要阻塞
sleep()方法是指定休眠的时间后唤醒,但是我们不能确定指定的时间是多少,需要看程序运行情况

修改后:

public void put(int value){synchronized (this){if(size == items.length){//队列满了.不能插入//return;//阻塞try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}items[tail] = value;tail++;//针对tail的处理//1)这个写法非常常见//tail = (tail+1)%items.length;//2)可读性好并且比求余的代码效率高if(tail >= items.length){tail = 0;}//插入成功size++;//唤醒队列为空处的wait()this.notify();}}//出队列public Integer take(){int result = 0;synchronized (this){if(size == 0){//队列为空,不能出队//return null;//阻塞try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}result = items[head];head++;if(head >= items.length){head = 0;}size--;////唤醒队列满的wait()this.notify();}return result;}

上述代码还有个问题

如果notifyAll()了,这里的wait()一定会被唤醒!但是该线程还没有抢占到锁,当锁被这个线程抢占到时,队列的状态可能会是满的,因此我们最好用while循环,然后继续判断队列状态

这样是比较稳妥的方法

我们使用MyBlockingQueue再写一个生产者消费者模型,看是否能达到效果

MyBlockingQueue queue = new MyBlockingQueue();Thread customer = new Thread(()->{while(true){int result = queue.take();System.out.println("消费: "+result);}});customer.start();Thread producer = new Thread(()->{int count = 0;while(true){System.out.println("生产者: "+count);queue.put(count);count++;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();

也是成对出现的,只有生产者向队中插入了,消费者才能获取

我们调整代码,让消费者速度降低

再来看结果

生产者生产的数据将循环队列充满后开始阻塞,消费者线程休眠结束后开始获取数据,获取一个后,阻塞队列便出现一个空位置,唤醒put的wait()后,继续生产数据,然后又阻塞等待队列不满.....

至此就用循环队列实现了阻塞队列

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...