设计模式之(二)生产者-消费者

本文首发于 我的个人博客

前言

维基百科中,这么描述 生产者消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

场景

我们公司自己项目中,有个场景,就是IM消息,当我们收到消息时候,进行一些业务逻辑的处理,还有数据库的操作,然后刷新列表。存在的问题是,如果消息接收的特别快,例如离线消息,可能登陆的是,有几百条消息拉取下来,如果每一条每一条的处理,将会导致两个问题:

  • 上次刷新还没完成,下次就进来了。导致界面闪的问题
  • 每条消息进行一次写入数据库操作,IO操作耗时,所以导致,性能问题严重

解决方案

上述问题,使用生产者-消费者就能解决这个问题

为了简单高效。我们用计时器,间隔0.1秒接收一条消息,刷新列表,假设需要2秒。

代码

定义变量

1
2
@property (nonatomic,strong) NSMutableArray *array;//存放数据
@property (nonatomic,strong) dispatch_semaphore_t semaphore;

开启定时器

1
2
3
NSTimer *curTimer =[NSTimer timerWithTimeInterval:0.1 target:self selector:@selector(producerFuncWithNumber:) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop] addTimer:curTimer forMode:NSDefaultRunLoopMode];
[curTimer fire];

假设0.1秒收到一条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//生产者
- (void)producerFuncWithNumber:(NSInteger )number{

number = random()%10;
//生产者生成数据
dispatch_queue_t t = dispatch_queue_create("222222", DISPATCH_QUEUE_CONCURRENT);

dispatch_async(t, ^{
dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);

[self.array addObject:[NSString stringWithFormat:@"%ld",(long)number]];
NSLog(@"生产了%lu 个",(unsigned long)self.array.count);
dispatch_semaphore_signal(self.semaphore);

});
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//消费者
- (void)consumerFunc{

dispatch_queue_t t1 = dispatch_queue_create("11111", DISPATCH_QUEUE_CONCURRENT);

dispatch_async(t1, ^{

while (YES) {
if (self.array.count > 0) {
dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);
NSLog(@"消费了%lu 个",(unsigned long)self.array.count);
[self.array removeAllObjects];
[self reload];
dispatch_semaphore_signal(self.semaphore);

}
}
});
}

每次刷新的时候,假设用时2秒

1
2
3
4
-(void)reload{
NSLog(@"休眠2秒");
sleep(2);
}

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#import "ViewController.h"

@interface ViewController ()
@property (nonatomic,strong) NSMutableArray *array;//存放数据
@property (nonatomic,strong) dispatch_semaphore_t semaphore;
@end

@implementation ViewController

- (void)viewDidLoad {
[super viewDidLoad];
// Do any additional setup after loading the view.
//开启计时器
NSTimer *curTimer =[NSTimer timerWithTimeInterval:0.1 target:self selector:@selector(producerFuncWithNumber:) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop] addTimer:curTimer forMode:NSDefaultRunLoopMode];
[curTimer fire];

[self consumerFunc];
}

-(void)reload{
NSLog(@"休眠2秒");
sleep(2);
}
- (NSMutableArray *)array{
if (!_array) {
_array = [NSMutableArray array];
}
return _array;
}

- (dispatch_semaphore_t)semaphore{
if (!_semaphore) {
_semaphore = dispatch_semaphore_create(1);
}
return _semaphore;
}

//生产者
- (void)producerFuncWithNumber:(NSInteger )number{

number = random()%10;
//生产者生成数据
dispatch_queue_t t = dispatch_queue_create("222222", DISPATCH_QUEUE_CONCURRENT);

dispatch_async(t, ^{
dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);

[self.array addObject:[NSString stringWithFormat:@"%ld",(long)number]];
NSLog(@"生产了%lu 个",(unsigned long)self.array.count);
dispatch_semaphore_signal(self.semaphore);

});
}

//消费者
- (void)consumerFunc{

dispatch_queue_t t1 = dispatch_queue_create("11111", DISPATCH_QUEUE_CONCURRENT);

dispatch_async(t1, ^{

while (YES) {
if (self.array.count > 0) {
dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);
NSLog(@"消费了%lu 个",(unsigned long)self.array.count);
[self.array removeAllObjects];
[self reload];
dispatch_semaphore_signal(self.semaphore);

}
}
});
}
@end

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
iOS-生产者消费者[5508:75404] 生产了1
iOS-生产者消费者[5508:75407] 生产了2
iOS-生产者消费者[5508:75406] 生产了3
iOS-生产者消费者[5508:75411] 生产了4
iOS-生产者消费者[5508:75440] 生产了5
iOS-生产者消费者[5508:75443] 生产了6
iOS-生产者消费者[5508:75450] 生产了7
iOS-生产者消费者[5508:75458] 生产了8
iOS-生产者消费者[5508:75463] 生产了9
iOS-生产者消费者[5508:75472] 生产了10
iOS-生产者消费者[5508:75480] 生产了11
iOS-生产者消费者[5508:75481] 生产了12
iOS-生产者消费者[5508:75482] 生产了13
iOS-生产者消费者[5508:75494] 生产了14
iOS-生产者消费者[5508:75518] 生产了15
iOS-生产者消费者[5508:75521] 生产了16
iOS-生产者消费者[5508:75526] 生产了17
iOS-生产者消费者[5508:75528] 生产了18
iOS-生产者消费者[5508:75531] 生产了19
iOS-生产者消费者[5508:75545] 生产了20
iOS-生产者消费者[5508:75405] 消费了20
iOS-生产者消费者[5508:75405] 休眠2
iOS-生产者消费者[5508:75545] 生产了1
iOS-生产者消费者[5508:75531] 生产了2
iOS-生产者消费者[5508:75528] 生产了3
iOS-生产者消费者[5508:75526] 生产了4
iOS-生产者消费者[5508:75521] 生产了5
iOS-生产者消费者[5508:75518] 生产了6
iOS-生产者消费者[5508:75494] 生产了7
iOS-生产者消费者[5508:75482] 生产了8
iOS-生产者消费者[5508:75481] 生产了9
iOS-生产者消费者[5508:75480] 生产了10
iOS-生产者消费者[5508:75472] 生产了11
iOS-生产者消费者[5508:75463] 生产了12
iOS-生产者消费者[5508:75458] 生产了13
iOS-生产者消费者[5508:75450] 生产了14
iOS-生产者消费者[5508:75443] 生产了15
iOS-生产者消费者[5508:75440] 生产了16
iOS-生产者消费者[5508:75406] 生产了17
iOS-生产者消费者[5508:75407] 生产了18
iOS-生产者消费者[5508:75404] 生产了19
iOS-生产者消费者[5508:75411] 生产了20
iOS-生产者消费者[5508:75405] 消费了20
iOS-生产者消费者[5508:75405] 休眠2
iOS-生产者消费者[5508:75411] 生产了1
iOS-生产者消费者[5508:75404] 生产了2

。。。

由输出结果可知,每次完成业务逻辑需要2秒的话,可以等待上次完成,再进行下次取数据,此时,已经有了20条数据,可以一次性处理,对性能是个挺大的提升。

注意点

生产者和消费者各自在信号量处理,为了保证数据的唯一性,需要用信号量 dispatch_semaphore_t semaphore 来保证多条线程不拥挤,不抢数据。

总结

以上就是对生产者消费者的简单实用,实际使用的时候,可以灵活实用,有时候能有挺大的优化空间。

Demo地址