Fork me on GitHub

生产者和消费者模式

前言

今天我们来学习下生产者与消费者模式。

生产者和消费者模式可以解决绝大多数并发问题,一般由生产者、数据缓冲区、消费者构成。

如下图,其原理是将原来的直接调用(消费者->生产者)变为了生产者生产数据放入缓存区,消费者从缓存区获取数据并消费这种模式。

upload successful

可以知道MQ就是生产者与消费者模式的典型代表。

我们可以举例比如一个定时任务,每天要批处理数据,比如上传文件,每天如果要上传1000个文件或者更多,这时候我们使用平常的循环上传方法,明显大部分时间均浪费在了上传的时间上。

如果按照每个文件处理需要3s,1000个文件则至少需要3000s时间。

如果我们引入生产者和消费者模式,生产者部分负责查询组装数据并把它们放入数据缓存区,消费者部分负责处理数据并上传,可以大大提高并发性能。

使用生产者与消费者模式的典型优点如下:

  1. 并发支持

    可以看到,如果消费者处理比较耗时,我们可以使用多个生产者生产数据或者消费者去处理队列数据,从而提高系统并发性能。即消费者和生产者可以为两个独立的并发主体。

  2. 解耦

    我们将生产者和消费者分开后,即使生产者部分处理数据的逻辑有变化,也不会影响到消费者部分,而相比之前在一起的逻辑,我们可能需要改动整个业务部分以完成数据处理。即生产者和消费者没有过分的依赖关系,只要保证传输数据格式的正确性即可。

  3. 解决忙闲不均问题

    可以看到生产者和消费者模式可以完美解决忙闲不均的问题,当生产者数据过多时,进入数据缓存区等待消费者慢慢处理,生产者数据少时,由于缓存区的数据,也不至于消费者无事可做。即无论生产者或者消费者谁快谁慢,我们总可以通过对他们的数量控制来均衡资源的分配。

正文

我们通过上面的例子来实践下消费者和生产者模式。

我们正常逻辑可能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception{
//1. 组装数据
//数据库查询、组装数据过程略,由for循环插入数据代替
List<String> list = new ArrayList<>();
for(int i = 0;i<1000;i++){
//假设处理每条数据花费平均10ms时间
Thread.sleep(10);
list.add(i+"");
}

//2. 上传数据/文件
//上传过程略
for (int i =0;i<list.size();i++){
//假设每个文件平均耗时1s
Thread.sleep(1000);
System.out.println(i);
}
}

可以看到这个过程是非常耗时的,我们使用生产者和消费者模式来设计下这个业务场景。

我们数据缓存区使用队列来暂存数据,生产者组生产数据时会将数据放入队列,消费者消费数据时会从队列中获取数据。

我们用阻塞队列LinkedBlockingQueue来作为数据缓存区,写一个生产者放入数据和消费者取出数据的方法。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class Context<E> {
private static final Logger log = LoggerFactory.getLogger(Context.class);
//阻塞队列用来暂存数据
private final LinkedBlockingQueue<E> consumptionQueue = new LinkedBlockingQueue<E>(2500);
// 生产线程的状态
private volatile ThreadState producersThreadState;
// 消费线程的状态
private volatile ThreadState consumersThreadState;
/**
* 获取队列大小
* @return
* @throws Exception
*/
int getConsumptionQueueSize() {
return consumptionQueue.size();
}
/**
* 将指定元素插入到此队列的尾部,如有必要(队列空间已满且消费线程未停止运行),则等待空间变得可用。
* @param e
* @return boolean true:插入成功;false:插入失败(消费线程已停止运行)
* @throws Exception
*/
public boolean offerDataToConsumptionQueue(E e) throws Exception {
//设置生产者线程为运行
setProducersThreadState(ThreadState.RUNNING);
// 如果消费线程停止了,不再生产数据
if (ThreadState.DEAD == this.getConsumersThreadState()){
return false;
}
//一直尝试将数据放入队列
while (true) {
//将数据放入队列,如果成功返回成功
if (consumptionQueue.offer(e, 2, TimeUnit.SECONDS)){
return true;
}
// 添加元素失败,很有可能是队列已满,再次检查消费线程是否工作中
// 如果消费线程停止了,不再生产数据
if (ThreadState.DEAD == this.getConsumersThreadState()) {
return false;
}
}
}
/**
* 获取并移除此队列的头,如果此队列为空且生产线程已停止,则返回 null
* @return E 队列的头元素,如果队列为空且生产线程已停止则返回null
* @throws Exception
*/
public E pollDataFromConsumptionQueue() throws Exception {
//设置消费者线程为运行
setConsumersThreadState(ThreadState.RUNNING);
//一直尝试从队列里获取数据
while (true) {
//尝试从队列里获取数据
E e = consumptionQueue.poll(20, TimeUnit.MILLISECONDS);
if (e != null){
return e;
}
// 没有从队列里获取到元素,并且生产线程已停止,则返回null
if (ThreadState.DEAD == this.getProducersThreadState()){
return null;
}
log.debug("demand exceeds supply(供不应求,需生产数据)...");
Thread.sleep(50);
}
}

/**
* 获取 producersThreadState
* @return producersThreadState
*/
ThreadState getProducersThreadState() {
return producersThreadState;
}

/**
* 设置 producersThreadState
* @param producersThreadState
*/
void setProducersThreadState(ThreadState producersThreadState) {
this.producersThreadState = producersThreadState;
}

/**
* 获取 consumersThreadState
* @return consumersThreadState
*/
ThreadState getConsumersThreadState() {
return consumersThreadState;
}

/**
* 设置 consumersThreadState
* @param consumersThreadState
*/
void setConsumersThreadState(ThreadState consumersThreadState) {
this.consumersThreadState = consumersThreadState;
}

}

线程状态枚举:新线程(NEW)、可运行的(RUNNABLE)、运行中(RUNNING)、死亡(DEAD)、阻塞(BLOCKED)。

1
2
3
enum ThreadState {
NEW, RUNNABLE, RUNNING, DEAD, BLOCKED;
}

然后我们构造两个模板接口,一个生产者模板接口一个消费者模板接口,分别提供生产者产生数据的方法和消费者消费数据的方法。具体实现有各自的业务实现类实现即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 生产者模板
* @param <C_E>
*/
public interface ProducerTemplate<C_E> {
/**
* 生产数据
* @param context
* @throws Exception
*/
void production(Context<C_E> context) throws Exception;
}
/**
* 消费者模板
* @param <C_E>
*/
public interface ConsumerTemplate<C_E> {
/**
* 消费数据
* @param context
* @throws Exception
*/
void consumption(Context<C_E> context) throws Exception;
}

创建一个生产者与消费者的协调者类,用来启动生产者或者消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 生产与消费协调者
*/
public class Coordinator {

private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
private final Lock lock = new ReentrantLock();
private final Condition enabledConsumers = lock.newCondition();
private volatile boolean isEnabledForConsumers;
private final Context<?> context;
// 是否等待生产及消费完成
private boolean isWaitingToFinish;
// 最大消费线程数
private int consumersMaxTotal;

public Coordinator(Context<?> context, int consumersMaxTotal) {
this(context, consumersMaxTotal, true);
}

public Coordinator(Context<?> context, int consumersMaxTotal, boolean isWaitingToFinish) {
this.context = context;
this.consumersMaxTotal = consumersMaxTotal;
this.isWaitingToFinish = isWaitingToFinish;
}

/**
*启动生产、消费
* @param producerTemplate 生产者模板
* @param consumerTemplate 消费者模板
*/
public void start(ProducerTemplate<?> producerTemplate,ConsumerTemplate<?> consumerTemplate) throws Exception {
if (context.getConsumersThreadState() != null || context.getProducersThreadState() != null){
return;
}
ProducersThreadUnit producersThreadUnit = new ProducersThreadUnit(producerTemplate, "production", context);
ConsumersThreadUnit consumersThreadUnit = new ConsumersThreadUnit(consumerTemplate, "consumption", context);
this.start(producersThreadUnit, consumersThreadUnit);
}

/**
* 启动生产、消费(适用于生产函数、消费函数不在一个类里实现,或者一个类里有多对生产、消费组合,或者方法入参列表复杂)
* @param producersThreadUnit
* @param consumersThreadUnit
*/
public void start(ProducersThreadUnit producersThreadUnit, ConsumersThreadUnit consumersThreadUnit) throws Exception {
if (context.getConsumersThreadState() != ThreadState.NEW || context.getProducersThreadState() != ThreadState.NEW){
return;
}
long time = System.currentTimeMillis();
try {
//启动生产者
Thread startProducersThread = this.startProducers(producersThreadUnit);
//启动消费者
Thread startConsumersThread = this.startConsumers(consumersThreadUnit);
if (!this.isWaitingToFinish){
return;
}
startProducersThread.join();
if (startConsumersThread != null){
startConsumersThread.join();
}
} catch (Exception e) {
log.error("start worker error...", e);
throw e;
}
log.info(String.format("processing is completed... man-hour(millisecond)=[%s]", System.currentTimeMillis() - time));
}

/**
* 启动生产
* @param producersThreadUnit
* @return
*/
private Thread startProducers(ProducersThreadUnit producersThreadUnit) throws Exception {
Thread thread = new Thread(producersThreadUnit);
thread.start();
return thread;
}

/**
* 启动消费
* @param consumersThreadUnit
* @return
*/
private Thread startConsumers(ConsumersThreadUnit consumersThreadUnit) throws Exception {
lock.lock();
try {
log.info("wating for producers...");
while (!isEnabledForConsumers){
// 等待生产(造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态),假定可能发生虚假唤醒(这并非是因为等待超时),因此总是在一个循环中等待
// 间隔检查,防止意外情况下线程没能被成功唤醒(机率小之又小,导致线程无限挂起)
enabledConsumers.await(5, TimeUnit.SECONDS);
}
if (context.getConsumptionQueueSize() == 0){
return null;
}
log.info("start consumers before...");
Thread thread = new Thread(consumersThreadUnit);
thread.start();
return thread;
} catch (Exception e) {
log.error("start consumers error...", e);
throw e;
} finally {
lock.unlock();
}
}
}

生产者和消费者的线程单元如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
	/**
* 生产线程
*/
public class ProducersThreadUnit implements Runnable {

private Object targetObject;
private String targetMethodName;
private Object[] targetMethodParameters;
private ExecutorService executorService = Executors.newFixedThreadPool(1);

public ProducersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) {
this.targetObject = targetObject;
this.targetMethodName = targetMethodName;
this.targetMethodParameters = targetMethodParameters;
context.setProducersThreadState(ThreadState.NEW);
}

@Override
public void run() {
try {
executorService.execute(new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters));
context.setProducersThreadState(ThreadState.RUNNABLE);
executorService.shutdown();
// 阻塞线程,直到生产中(消费队列不为空)或者停止生产
while (!executorService.isTerminated() && context.getConsumptionQueueSize() == 0){
Thread.sleep(20);
}
log.info("production the end or products have been delivered,ready to inform consumers...");
this.wakeConsumers();
log.info("wait until the production is complete...");
while (!executorService.isTerminated()){
// 等待生产完毕
Thread.sleep(200);
}
} catch (Exception e) {
log.error(String.format("production error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e);
if (!executorService.isShutdown()){
executorService.shutdown();
}
} finally {
log.info("production the end...");
context.setProducersThreadState(ThreadState.DEAD);
// 无论在何种情况下,必须确保能够结束挂起中的消费者线程
isEnabledForConsumers = true;
}
}

/**
* 向消费者发送信号
*/
private void wakeConsumers() {
// 即使唤醒消费者线程失败,也可以使用该句柄结束挂起中的消费者线程
isEnabledForConsumers = true;
lock.lock();
try {
enabledConsumers.signal();
} catch (Exception e) {
log.error("inform to consumers error...", e);
} finally {
lock.unlock();
}
}

}

/**
* 消费线程
*/
public class ConsumersThreadUnit implements Runnable {

private Object targetObject;
private String targetMethodName;
private Object[] targetMethodParameters;

public ConsumersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) {
this.targetObject = targetObject;
this.targetMethodName = targetMethodName;
this.targetMethodParameters = targetMethodParameters;
context.setConsumersThreadState(ThreadState.NEW);
}

@Override
public void run() {
ThreadPoolExecutor threadPoolExecutor = null;
int concurrencyMaxTotal = Coordinator.this.consumersMaxTotal;
try {
threadPoolExecutor = new ThreadPoolExecutor(0, concurrencyMaxTotal, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
while (concurrencyMaxTotal > 0) {
if (threadPoolExecutor.getPoolSize() > context.getConsumptionQueueSize()) {
if (ThreadState.DEAD == context.getProducersThreadState()) {
// 无须再提交新任务
break;
}else {
Thread.sleep(50);
// 再次检查是否有必要提交新任务
continue;
}
}
RunnableThreadUnit consumers = new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters);
threadPoolExecutor.execute(consumers);
context.setConsumersThreadState(ThreadState.RUNNABLE);
log.info("submit consumption task...");
concurrencyMaxTotal--;
}
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
// 等待消费完毕
Thread.sleep(100);
}
} catch (Exception e) {
log.error(String.format("consumption error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e);
if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown()) {
threadPoolExecutor.shutdown();
}
} finally {
log.info("consumption the end...");
context.setConsumersThreadState(ThreadState.DEAD);
}
}
}

/**
*线程单元(无返回值)
*/
public class RunnableThreadUnit implements Runnable {

private final static Logger logger = LoggerFactory.getLogger(RunnableThreadUnit.class);

private Object object;
private String methodName;
private Object[] methodParameters;

public RunnableThreadUnit(Object object, String methodName, Object... methodParameters) {
if (object == null || StringUtils.isBlank(methodName) || methodParameters == null) {
throw new RuntimeException("init runnable thread unit error...");
}
this.object = object;
this.methodName = methodName;
this.methodParameters = methodParameters;
}

@Override
public void run() {
try {
Class<?>[] classes = new Class[methodParameters.length];
for (int i = 0; i < methodParameters.length; i++) {
classes[i] = methodParameters[i].getClass();
}
Method method = object.getClass().getMethod(methodName, classes);
method.invoke(object, methodParameters);
} catch (Exception e) {
logger.error(String.format("execute runnable thread unit error... service=[%s],invokeMethodName=[%s]", object, methodName), e);
}
}

}

可以看到我们使用反射获取了production和consumption方法,并执行它们。启动了两个线程,生产者线程和消费者线程去处理业务,其中消费者线程利用了线程池,可以放置concurrencyMaxTotal个子线程去消费任务。

我们创建一个测试类进行测试,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CourrentTest implements ProducerTemplate<String>, ConsumerTemplate<String>{
@Override
public void production(Context<String> context) throws Exception {
//1. 组装数据
//数据库查询、组装数据过程略,由for循环插入数据代替
for(int i = 0;i<1000;i++){
Thread.sleep(10);
//插入不成功,说明可能是消费者线程死亡或者队列已满
if(!context.offerDataToConsumptionQueue(i+"")){
return;
}
}
}
@Override
public void consumption(Context<String> context) throws Exception {
//2. 上传数据/文件
//消费者消费数据
while (true) {
String str = context.pollDataFromConsumptionQueue();
if (str == null) {
break;
}
//假设每个文件上传消耗1s时间
Thread.sleep(1000);
System.out.println(str);
}
}
//测试
public static void main(String[] args) throws Exception{
CourrentTest courrentTest = new CourrentTest();
new Coordinator(new Context<String>(),10).start(courrentTest,courrentTest);
}
}

运行后可以看到输出的结果。

这儿我们可以看到对比较耗时的上传方法(消费者端)进行了并发处理以提高效率,生产端如果保证了数据的安全性,我们可以使用并行流等放入数据以提高放入数据的效率。

其实我们看到这儿,可以理解线程池也是一个类似于生产者消费者模式的东西。线程池里面有任务就会去执行,相当于消费者,线程池里的队列相当于缓存区,而生产者就是我们一个个放入线程的Runable方法。

上述代码的运行原理图大致如下:

upload successful

PS: 上述代码可以在我的GitHub项目里找到。

https://github.com/JavaZWT/framework-base

另外提供了一个简易模板SimpleTemplate可以适用生产者方法和消费者方法在一个类里的情况,只继承这一个方法即可。不用分别继承ConsumerTemplate和ProducerTemplate接口了。

总结

通过对上面一个列子使用生产者和消费者模式,我们了解了这种模式的一些适用情形和优点。

当然也了解了它的一些缺点,对于解决并发问题的方案,最要重视的应该就是数据安全问题了。

我们在平时工作中也可以考虑什么样的场景下可以使用这种模式,其实这种模式的适用场景还是蛮多的,对于一些处理较耗时的操作,文件上传、图片生成转换等都可以考虑这种模式。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%