前言
今天我们来学习下生产者与消费者模式。
生产者和消费者模式可以解决绝大多数并发问题,一般由生产者、数据缓冲区、消费者构成。
如下图,其原理是将原来的直接调用(消费者->生产者)变为了生产者生产数据放入缓存区,消费者从缓存区获取数据并消费这种模式。
可以知道MQ就是生产者与消费者模式的典型代表。
我们可以举例比如一个定时任务,每天要批处理数据,比如上传文件,每天如果要上传1000个文件或者更多,这时候我们使用平常的循环上传方法,明显大部分时间均浪费在了上传的时间上。
如果按照每个文件处理需要3s,1000个文件则至少需要3000s时间。
如果我们引入生产者和消费者模式,生产者部分负责查询组装数据并把它们放入数据缓存区,消费者部分负责处理数据并上传,可以大大提高并发性能。
使用生产者与消费者模式的典型优点如下:
并发支持
可以看到,如果消费者处理比较耗时,我们可以使用多个生产者生产数据或者消费者去处理队列数据,从而提高系统并发性能。即消费者和生产者可以为两个独立的并发主体。
解耦
我们将生产者和消费者分开后,即使生产者部分处理数据的逻辑有变化,也不会影响到消费者部分,而相比之前在一起的逻辑,我们可能需要改动整个业务部分以完成数据处理。即生产者和消费者没有过分的依赖关系,只要保证传输数据格式的正确性即可。
解决忙闲不均问题
可以看到生产者和消费者模式可以完美解决忙闲不均的问题,当生产者数据过多时,进入数据缓存区等待消费者慢慢处理,生产者数据少时,由于缓存区的数据,也不至于消费者无事可做。即无论生产者或者消费者谁快谁慢,我们总可以通过对他们的数量控制来均衡资源的分配。
正文
我们通过上面的例子来实践下消费者和生产者模式。
我们正常逻辑可能如下:
1 | public static void main(String[] args) throws Exception{ |
可以看到这个过程是非常耗时的,我们使用生产者和消费者模式来设计下这个业务场景。
我们数据缓存区使用队列来暂存数据,生产者组生产数据时会将数据放入队列,消费者消费数据时会从队列中获取数据。
我们用阻塞队列LinkedBlockingQueue
来作为数据缓存区,写一个生产者放入数据和消费者取出数据的方法。
如下:
1 | public class Context<E> { |
线程状态枚举:新线程(NEW)、可运行的(RUNNABLE)、运行中(RUNNING)、死亡(DEAD)、阻塞(BLOCKED)。
1 | enum ThreadState { |
然后我们构造两个模板接口,一个生产者模板接口一个消费者模板接口,分别提供生产者产生数据的方法和消费者消费数据的方法。具体实现有各自的业务实现类实现即可。
1 | /** |
创建一个生产者与消费者的协调者类,用来启动生产者或者消费者。
1 | /** |
生产者和消费者的线程单元如下:
1 | /** |
可以看到我们使用反射获取了production和consumption方法,并执行它们。启动了两个线程,生产者线程和消费者线程去处理业务,其中消费者线程利用了线程池,可以放置concurrencyMaxTotal个子线程去消费任务。
我们创建一个测试类进行测试,如下:
1 | public class CourrentTest implements ProducerTemplate<String>, ConsumerTemplate<String>{ |
运行后可以看到输出的结果。
这儿我们可以看到对比较耗时的上传方法(消费者端)进行了并发处理以提高效率,生产端如果保证了数据的安全性,我们可以使用并行流等放入数据以提高放入数据的效率。
其实我们看到这儿,可以理解线程池也是一个类似于生产者消费者模式的东西。线程池里面有任务就会去执行,相当于消费者,线程池里的队列相当于缓存区,而生产者就是我们一个个放入线程的Runable方法。
上述代码的运行原理图大致如下:
PS: 上述代码可以在我的GitHub项目里找到。
https://github.com/JavaZWT/framework-base
另外提供了一个简易模板SimpleTemplate可以适用生产者方法和消费者方法在一个类里的情况,只继承这一个方法即可。不用分别继承ConsumerTemplate和ProducerTemplate接口了。
总结
通过对上面一个列子使用生产者和消费者模式,我们了解了这种模式的一些适用情形和优点。
当然也了解了它的一些缺点,对于解决并发问题的方案,最要重视的应该就是数据安全问题了。
我们在平时工作中也可以考虑什么样的场景下可以使用这种模式,其实这种模式的适用场景还是蛮多的,对于一些处理较耗时的操作,文件上传、图片生成转换等都可以考虑这种模式。