开始学 CSAPP的第十二章

最后一章了,不知不觉这本书已经看完了!!

梗概

并发有很多用处,比如在进行 IO 访问,服务访问的时候,如果不能合理利用并发,那么有很多时钟周期会被浪费掉。

最简单的并发方式是使用多进程,让内核进行调度的方法来并发。

基于进程的并发编程

父进程接受客户端连接请求,然后 fork 一个子进程,让子进程处理主要服务逻辑。

而在父进程中,需要及时的关闭连接描述的副本,否则会因为内存泄漏导致系统崩溃。

基于进程的并发服务器

因为服务器通常会运行很长的时间,这意味着操作系统不会帮助进程回收内存,我们需要做到以下几点:

  • 包括一个 SIGCHLD 处理程序来回收僵死进程,因为 Linux 的信号是不排队的,所以说一次要处理完所有的子进程。
  • 对于父进程,需要在 fork 之后马上 close 建立连接的描述符,对于子进程来说,也要关闭所有的文件描述符。

进程的优劣

进程共享文件表,但是不共享用户地址空间(这既是优点也是缺点),不会发生内存覆盖的情况,但是也造成了通信困难。

为了通信,需要使用 IPC(进程间通信)机制。

基于 I/O 多路复用的并发编程

提到了一个困境,就是说如果希望它能同时相应连接,并且能接受我的命令行 read 读入。但是两个时间应该先等待哪一个呢?这里我们可以用一个 I/O 多路复用的技术。基本的思路就是使用 select 函数,要求内核挂起进程,只有在一个或多个I/O 事件发生后,才将控制返回给应用程序,就像在下面的示例中一样:

1
2
3
4
5
6
7
8
9
10
#include <sys/select.h>

int select(int n, fd_set *fdset, NULL, NULL, NULL);
// 返回已准备好的描述符的非零的个数,若出错则为 -1。

FD_ZERO(fd_set *fdset); /* Clear all bits in fdset */
FD_CLR(int fd, fd_set *fdset); /* Clear bit fd in fdset */
FD_SET(int fd, fd_set *fdset); /* Turn on bit fd in fdset */
FD_ISSET(int fd, fd_set *fdset); /* Is bit fd in fdset on? */
// 处理描述符集合的宏。

比如我现在有两个文件描述符:fd1,fd2

我要编写一个程序,表示当某个描述符准备时进行读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include<sys/select.h>
#include<unistd.h>
fd_set readset,ready_set;
FD_CLR(&readset);
FD_SET(fd1,&readset);
FD_SET(fd2,&readset);
while(1){
ready_set=readset;//每次需要重新赋值一遍
select(listenfd + 1, &readset, NULL, NULL, NULL);
if(FD_ISSET(fd1,&readset)){
read(fd1,buffer,0x100);
}
if(FD_ISSET(fd2),&readset){
read(fd2,buffer,0x100);
}

}

差不多就是这样的一个形式,如果两个都没有准备好,那么阻塞。

书上给了个之前网络编程的例子。

IO多路复用技术的优劣

总结下来就是:

优点:共享内存空间,调试方便,可以事件驱动,并且因为不需要上下文切换,所以高效。

缺点:如果面对网络延迟较高的用户,那么这个用户会阻塞整个服务,甚至会有恶意用户故意只发送部分文本行,导致整个服务被阻塞。

基于线程的并发编程

基于线程的并发编程是之前两种方法的混合。线程一部分上来说是跟进程概念一样的,会参与调度,每个线程都有自己的线程上下文,包括:

  • 线程 ID
  • 栈指针
  • 程序计数器
  • 通用目的寄存器
  • 状态码

共享整个进程的虚拟空间,其中包括:

  • 代码
  • 数据
  • 共享库
  • 打开的文件描述符

线程执行的模型

每个进程都有一个主线程,在某一时刻,主线程创建一个对等线程(peer thread),从这个时间点开始,两个线程就并发地运行。最后,因为主线程执行一个慢速系统调用,例如 read 或者 sleep,或者因为被系统的间隔计时器中断,控制就会通过上下文切换传递到对等线程。对等线程会执行一段时间,然后控制传递回主线程,依次类推。

因为线程大部分数据是共享的,所以切换线程的上下文就比进程快多了。

POSIX线程

也就是我们熟知的 pthread,它定义了大概有60个函数,运行创建,杀死,回收线程,还可以像进程发送信号那样通知对等线程。

创建线程

我们可以使用 pthread_create 函数来创建一个线程,并让线程执行一个线程例程(pthread routin)。

1
2
3
4
5
6
7
#include <pthread.h>
typedef void *(func)(void *);

int pthread_create(pthread_t *tid, pthread_attr_t *attr,
func *f, void *arg);

// 若成功则返回 0,若出错则为非零。

第一个参数用于接收返回的 tid,第二个参数传递线程属性,第三个参数为线程回调函数,第四个参数为回调函数的参数。

对于被创建的线程,可以通过调用 pthread_self() 来获取自己的线程id。

终止线程

pthread_exit(void *thread_return) 函数用于退出本线程,并且从不返回。

如果任意一个线程调用了 exit() 那么会导致所有线程都要被迫退出。pthread_cancel(pthread_t tid) 可以根据线程 id 杀死一个线程。

回收已终止的线程资源

线程通过调用 pthread_join 函数等待其他线程终止。

1
2
3
4
5
#include <pthread.h>

int pthread_join(pthread_t tid, void **thread_return);

// 若成功则返回 0,若出错则为非零。

pthread_join 函数会阻塞,直到线程 tid 终止,将线程例程返回的通用 (void*) 指针赋值为 thread_return 指向的位置,然后回收已终止线程占用的所有内存资源。

注意,和 Linux 的 wait 函数不同,pthread_join 函数只能等待一个指定的线程终止。没有办法让 pthread_wait 等待任意一个线程终止。这使得代码更加复杂,因为它迫使我们去使用其他一些不那么直观的机制来检测进程的终止。

分离线程

在任何一个时间点上,线程是可结合的(joinable)或者是分离的(detached)。一个可结合的线程能够被其他线程收回和杀死。在被其他线程回收之前,它的内存资源(例如栈)是不释放的。一个分离的线程是不能被其他线程回收或杀死的。它的内存资源在它终止时由系统自动释放。

为了避免内存泄露,线程创建就是 可结合 的,可以使用 pthread_detach 函数去分离一个线程。

对于一些 web 服务器,我们就可以分离线程让主线程不必等待,等到线程自己执行完毕后等待系统回收内存资源。

初始化线程

pthread_once 函数允许你初始化与线程例程相关的状态。

1
2
3
4
5
6
7
8
#include <pthread.h>

pthread_once_t once_control = PTHREAD_ONCE_INIT;

int pthread_once(pthread_once_t *once_control,
void (*init_routine)(void));

// 总是返回 0。

这个函数后面会有大作用,现在先不讨论具体用法了。

基于线程的并发服务器

这里主要用到了一个指针传值,在 accept 请求时会返回一个连接描述符,假如在线程中还没有拿这个值,此时主线程下一个 accept 已经到来。就会导致这个线程取出了一个错误的描述符。

这就导致了一个竞争的危险。

多线程中共享变量

这一节主要研究哪些变量是线程中所共享的。

线程内存模型

这里前面也说过了,寄存器,PC这些肯定是独立的,因为虚拟内存是共享的,所以事实上,它们的栈虽然说是独立的但是也可以是共享,只要知道内存地址,便可以轻松访问其它线程的栈,在上一小节最后一个例子中也看到了,直接使用堆上的指针传进去取出。

将变量映射到内存

一般有三种类型的变量:

  • 局部变量,在栈或者是寄存器上。
  • 局部静态变量,在bss或者data段上。
  • 全局变量,在 bss 或者 data 段上。

共享变量

事实上,局部变量也能共享,只要我取地址拿到值,别的线程一样可以访问。但是前面编译也讲过,一旦某个参数或者变量有取地址的行为,那么它就不可能再长时间存储在寄存器当中。

用信号量同步线程

共享变量的方便引入了另一个问题:同步错误。

日常提到的,多个线程做加法,从零开始,每次加一,加的结果不等于加的次数。

主要是因为对一个变量加法分了三步操作

  • LOAD
  • ADD
  • STORE

即便是在 x86 下面汇编也是这样的三个步骤:

1
2
3
movq 	cnt(%rip),%rdx
addq $1,%rdx
movq %rdx,cnt(%rip)

所以对于 cnt++ 这一句话来说,可见自增操作非原子的,是可被打断的。如果我在取数的时候,另一个线程也刚好取数,那样就会导致同步错误,因为两个线程都加一仅仅是对它们取得的数加的,这样导致一个直观的问题就是加了两次但是值只多了1,这就是没有同步的后果。

信号量

Edsger Dijkstra,并发编程领域的先锋人物,提出了一种经典的解决同步不同执行线程问题的方法,这种方法是基于一种叫做信号量(semaphore)的特殊类型变量的。信号量 s 是具有非负整数值的全局变量,只能由两种特殊的操作来处理,这两种操作称为 P 和 V:

  • P(s):如果 s 是非零的,那么 P 将 s 减 1,并且立即返回。如果 s 为零,那么就挂起这个线程,直到 s 变为非零,而一个 V 操作会重启这个线程。在重启之后,P 操作将 s 减 1,并将控制返回给调用者。
  • V(s):V 操作将 s 加 1。如果有任何线程阻塞在 P 操作等待 s 变成非零,那么 V 操作会重启这些线程中的一个,然后该线程将 s 减 1,完成它的 P 操作。

使用信号量来实现互斥

特别的,二元信号量被称为锁,P操作被称为加锁,V操作被称为解锁。

我们只需要在访问变量之前加锁,在写完变量之后解锁就可以保证调度不会在加锁与解锁之间的程序流被打断。

使用信号量来调度共享资源

这里有两个很经典的例子:生产者消费者问题,读写者问题。

生产者和消费者问题

背景引入

这个其实就字面意思了

思路

其实这个写起来也很简单,我只需要构建一个环形队列,然后设置一个信号量,初值为环形队列的长度。

生产者和消费者共用一个信号量,同时也共用锁。生产者生产时,对信号量 -1 (P 操作),消费者消费时,对信号量+1 (V 操作)。

但是这样产生了一个问题:就是 V 操作永远不会被阻塞,因此会导致生产者吃掉没有生产的东西。

那换个角度,让吃会被阻塞,生产不会被阻塞,先给信号量减到 0,然后吃的话是 P 操作,生产的话是 V 操作,这样也出现一个问题就是生产者如果比消费者快很多,那么会导致缓冲区溢出。

好像鱼和熊掌不可兼得,其实算下来的话还是下面个方案比较划算,毕竟上面那个方案会出很大的纰漏嘛。

例程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//32位编译
#include<stdio.h>
#include<signal.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdlib.h>
#include<unistd.h>
#include<time.h>
#define P(m) sem_wait(m)
#define V(m) sem_post(m)
#define LEN 0x10
int k=1;

typedef struct AAA{
int *buffer;
int len;
int front;
int rear;
sem_t mutex;
sem_t s;
sem_t smutex;
}SBUF;
SBUF test;
void handler(int sig){
k=0;
}
void InitSBUF(SBUF* s){//初始化结构体
s->buffer=(int*)malloc(sizeof(int)*LEN);
s->len=LEN;
s->front=0;
s->rear=0;
sem_init(&(s->mutex),0,1);
sem_init(&(s->smutex),0,1);
sem_init(&(s->s),0,LEN);
for(int i=0;i<s->len;i++){
P(&(s->s));//把信号量减到0
}
}
void add(pthread_t id){
P(&test.mutex);

V(&test.s);//增加食物对信号量 V

test.buffer[test.front]=id;
test.front++;
test.front%=test.len;
printf("grass %u add one food\n",id);
V(&test.mutex);
}
void sub(pthread_t id){//全局上锁确保不冲突
P(&test.smutex);

P(&test.s);//减少食物对信号量 P

int food=test.buffer[test.rear];
test.rear++;
test.rear%=test.len;
printf("animal %u eat food by %u\n",id,food);
V(&test.smutex);
}
void grass(){
while(k){
int t=1+rand()%4;
sleep(t);
add(pthread_self());
}
}
void animal(){
while(k){
int t=1+rand()%4;
sleep(t);
sub(pthread_self());
}
}
int main(int argc,char *argv[]){
srand(time(NULL));
setbuf(stdin,NULL);
setbuf(stdout,NULL);
InitSBUF(&test);
signal(SIGINT,handler);
pthread_t an[4],gr[4];
for(int i=0;i<4;i++){
pthread_create(gr+i,NULL,grass,NULL);
}
for(int i=0;i<4;i++){
pthread_create(an+i,NULL,animal,NULL);
}
for(int i=0;i<4;i++){
pthread_join(gr[i],NULL);
}
for(int i=0;i<4;i++){
pthread_join(an[i],NULL);
}
}

读-写者问题

背景引入

这个问题在生活中很常见,比12306铁路售票系统,同一时间,可以允许很多个用户一起查,因为它们之间并没有干扰,你看你的我看我的,很和谐。但是一旦其中一个人要买一张车票了(对数据库写),那么写的时候必须清除所有的读用户,因为很可能他们会读到一个错误的结果——显示有票结果一点卖完了。

并且写的时候也不能同时写,虽然说可能我们俩写的数据不一样,可以一起进行,但是保不准它们买的是同一张车票,就会导致卖出去两张车票而车票数只减了 1。

思路

因此在读的时候我们必须先占据写锁,并且只占一次,剩下来的读者不必占用写锁,但是解锁不能因为第一个读者获取了写锁就让第一个人走的时候把写锁放开了,而应该等所有的读者都读完才能放开写锁。

所以我们维护一个变量 cnt,记录读者的数量,当读者为 0 时,进入时我需要占据写锁,出来时,若我是最后一个人我,我需要放开写锁。

为了维护这个变量,我们同样还需要再设置一个锁,保证同一时间只有一个读者对 cnt 进行加减操作。

例程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include<stdio.h>
#include<signal.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdlib.h>
#include<unistd.h>
#include<time.h>
#define P(m) sem_wait(m)
#define V(m) sem_post(m)
typedef struct AAA{
sem_t writelock;
sem_t readlock;
size_t cnt;
int value;
}RDWR;
RDWR test;
int k=1;
void handler(int sig){
k=0;
}
void Init(RDWR* s){
s->cnt=0;
sem_init(&s->readlock,0,1);
sem_init(&s->writelock,0,1);
s->value=0;
}
int readInt(){
return test.value;
}
void writeInt(int value){
test.value=value;
}
void reader(){
while(k){
int t=1+rand()%4;
sleep(t);
P(&test.readlock);//访问 cnt之前先上锁
if(test.cnt==0){
P(&test.writelock);
}
test.cnt++;
V(&test.readlock);
int value=readInt();
printf("reader %u read value is %u\n",pthread_self(),value);

P(&test.readlock);//写cnt之前也先上锁
test.cnt--;
if(test.cnt==0){
V(&test.writelock);
}
V(&test.readlock);
}
}

void writer(){
while(k){
int t=1+rand()%4;
sleep(t);
P(&test.writelock);//管好写锁即可
int value=rand();
writeInt(value);
printf("writer %u write value changed %u\n",pthread_self(),value);
V(&test.writelock);
}
}
int main(){
srand(time(NULL));
setbuf(stdin,NULL);
setbuf(stdout,NULL);
Init(&test);
signal(SIGINT,handler);
pthread_t rd[4],wr[4];
for(int i=0;i<4;i++){
pthread_create(rd+i,NULL,reader,NULL);
}
for(int i=0;i<4;i++){
pthread_create(wr+i,NULL,writer,NULL);
}
for(int i=0;i<4;i++){
pthread_join(rd[i],NULL);
}
for(int i=0;i<4;i++){
pthread_join(wr[i],NULL);
}

}

这个程序中,保证每个读者读到的值刚好是最近一个修改者修改的值。

符合同时读的时候可以有很多读者,不能有写者;写的时候不仅只能有一个写者,还不能有读者存在。

小结

并发感觉学到这差不多了应该,已经把主要的都学完了,我也算是吃完这本神书了,希望祝我考研一臂之力吧,加油!