Linux 进程与线程(三)线程安全和高级I/O

线程同步

线程同步的必要性

  • 对共享资源的访问进行保护
    • 假设对于全局变量a,线程1和线程2都需要访问,a就是多个线程间的共享资源
  • 保护的目的是解决数据一致性问题
    • 当变量可以被多个线程修改和读取时,存在数据一致性问题,需要保证每个线程都能读取到有效且正确的值
  • 数据一致性问题的本质在于进程中的多个线程对共享资源的并发访问(同时访问)

互斥锁(Mutex)

互斥锁(mutex)是一种同步机制,用于在某一时刻只允许一个线程访问共享资源。其它线程在锁被占用时只能阻塞等待,直到锁被释放。

Linux中互斥锁用pthread_mutex_t类型表示。

当多个线程访问共享资源,并且至少有一个写入操作时,就需要上锁,比如:

  • 多个线程修改同一变量
  • 多个线程读写同一个链表或数组
  • 多个线程使用同一个文件描述符
  • 多个线程对同一硬件外设发起操作

互斥锁初始化

以下方法用于在定义mutex时就将其初始化:

1
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

PTHREAD_MUTEX_INITIALIZER这个宏携带了mutex的默认属性。

如果需要先定义互斥锁,再将其初始化,比如使用malloc()申请分配的互斥锁对象,可以使用:

1
2
3
#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

示例:

1
2
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);

或:

1
2
pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(mutex, NULL);

上锁和解锁

1
2
3
4
5
/* 上锁 */
int pthread_mutex_lock(pthread_mutex_t *mutex);

/* 解锁 */
int pthread_mutex_unlock(pthread_mutex_t *mutex);

互斥锁的属性

1
2
3
4
5
/* 销毁属性 */
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);

/* 初始化属性 */
int pthread_mutexattr_init(pthread_mutexattr_t *attr);

条件变量(Condition Variable)

条件变量允许一个线程(或多个)阻塞等待某个条件成立,而另一个线程可以在条件满足时调用函数去唤醒等待线程。和互斥锁不同,条件变量本身不保护数据,它需要和互斥锁配合使用。

条件变量初始化

静态方式

1
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

动态方式:

1
pthread_cond_init(&cond, NULL);

唤醒线程

1
2
pthread_cond_signal(&cond);  // 唤醒一个等待的线程
pthread_cond_broadcast(&cond); // 唤醒所有等待的线程

这两个函数都可以向指定的条件变量发送信号,通知一个或多个正在等待的线程。

例如在生产者-消费者模型中:

1
2
3
4
5
6
7
8
9
10
if (ret){
exit(-1);
}

for ( ; ; ){
pthread_mutex_lock(&mutex);
g_avail++;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond); //唤醒处于等待状态的消费者线程
}

等待条件变量

pthread_cond_wait(&cond, &mutex)可用于当判断某个条件不满足时,将线程设置为阻塞等待状态。该函数形参包含一个互斥锁,因为条件的检测(通常需要访问共享资源)是在互斥锁的保护下进行的,即条件本身是由互斥锁保护的。

条件变量本身并不保存状态信息,只是一种通讯机制,并不需要去深究cond里到底是什么东西。

1
2
3
4
5
pthread_mutex_lock(&mutex);        // 必须先加锁
while (条件不满足) {
pthread_cond_wait(&cond, &mutex); // 自动释放 mutex,等条件满足后再自动加锁
}
pthread_mutex_unlock(&mutex);

自旋锁(Spinlock)

自旋锁是一种忙等(busy-wait)的锁:当一个线程想获取锁但锁被占用时,它不会休眠,而是死循环反复检查锁是否释放。而互斥锁在获取不到锁时会进行阻塞(休眠),频繁在唤醒和休眠状态间切换的对于CPU的开销很大,因此互斥锁的开销远大于自旋锁,自旋锁的效率远高于互斥锁。

由于忙等特性,自旋锁通常用于以下情况:

  • 锁的持有时间非常短(比如几个指令周期)
  • 高性能、多核环境(线程不会被调度出去)
  • 中断上下文、内核态(不能睡眠的地方)

自旋锁不适合锁操作时间长的场景,比如文件读写、网络操作等。

自旋锁初始化

1
2
3
4
5
/* 销毁自旋锁 */
int pthread_spin_destroy(pthread_spinlock_t *lock);

/* 初始化自旋锁 */
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);

初始化时pshared表示自旋锁的进程共享属性,可选:

  • PTHREAD_PROCESS_SHARED:共享自旋锁。该自旋锁可以在多个进程中的线程之间共享
  • PTHREAD_PROCESS_PRIVATE:私有自旋锁。只有本进程内的线程才能够使用该自旋锁

上锁和解锁

1
2
3
4
5
6
/* 加锁,未获取到锁则自旋 */
int pthread_spin_lock(pthread_spinlock_t *lock);
/* 加锁,未获取到锁则立即返回EBUSY错误 */
int pthread_spin_trylock(pthread_spinlock_t *lock);
/* 解锁 */
int pthread_spin_unlock(pthread_spinlock_t *lock);

读写锁

读写锁有三种状态:

  • 读模式下的加锁状态(读加锁)
  • 写模式下的加锁状态(写加锁)
  • 不加锁状态

一次只有一个线程可以占有写模式的读写锁,但是可以有多个线程同时占有读模式的读写锁:

  • 当读写锁处于写加锁状态时,在这个锁被解锁之前,所有试图对这个锁进行加锁操作(不管是以读模式加锁还是以写模式加锁)的线程都会被阻塞
  • 当读写锁处于读加锁状态时,所有试图以读模式对它进行加锁的线程都可以加锁成功;但是任何以写模式对它进行加锁的线程都会被阻塞,直到所有持有读模式锁的线程释放它们的锁为止

读写锁非常适合于对共享数据读的次数远大于写的次数的情况:

  • 当读写锁处于写模式加锁状态时,它所保护的数据可以被安全的修改,因为一次只有一个线程可以在写模式下拥有这个锁;
  • 当读写锁处于读模式加锁状态时,它所保护的数据就可以被多个获取读模式锁的线程读取。

所以在应用程序当中,使用读写锁实现线程同步,当线程需要对共享数据进行读操作时,需要先获取读模式锁(对读模式锁进行加锁),当读取操作完成之后再释放读模式锁(对读模式锁进行解锁);当线程需要对共享数据进行写操作时,需要先获取到写模式锁,当写操作完成之后再释放写模式锁。

高级I/O

阻塞与非阻塞I/O

对于普通文件,总是以非阻塞的方式进行I/O操作。对于非普通文件,需要在open()函数中为形参flags指定O_NONBLOCK标志,open()调用成功后后续的I/O操作将以非阻塞方式进行。

阻塞I/O下,如果文件无数据可读,I/O会将调用者的应用程序挂起,进入休眠阻塞状态,直到有数据可读时解除阻塞。阻塞时将会释放CPU资源。如果非阻塞I/O下读文件,将会不断地进行轮询,占用较高的CPU资源。

I/O多路复用(IO multiplexing)

I/O 多路复用的本质是:

使用一个线程或进程,同时监听多个文件描述符(fd),一旦某个 fd 就绪,就进行相应操作。

select()

调用select()会一直阻塞,直到一个或多个文件描述符成为就绪态:

1
2
3
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

形参(不关心可以设置对应为NULL):

  • readfds:用来是否可读的文件描述符集合
  • writefds:用来检测是否可写的文件描述符集合
  • exceptfds:用来检测异常情况是否发生的文件描述符集合
  • timeout:阻塞时间上限,为NULL则一直阻塞

select()将阻塞,直到:

  • readfds、writefds、exceptfds指定的文件描述符中至少有一个变为就绪态
  • 该调用被信号处理函数中断
  • 超时

文件描述符集合的所有操作都可以由以下四个宏完成:

1
2
3
4
5
6
#include <sys/select.h>

void FD_CLR(int fd, fd_set *set); //将fd从fdset中移除
int FD_ISSET(int fd, fd_set *set); //如果fd是fdset中的成员,则返回true,否则返回false
void FD_SET(int fd, fd_set *set); //将fd添加到fdset中
void FD_ZERO(fd_set *set); //将fdset初始化为空

select有三种可能的返回值:

  • 返回-1,表示有错误发生
  • 返回0, 表示调用超时
  • 返回一个正整数n,表示n个文件描述符已处于就绪态

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>       
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/select.h>
#include <pthread.h>

#define MOUSE "/dev/input/mouse0" // 定义鼠标设备文件路径

int main(void)
{
char buf[100]; // 用于存放读取到的数据
int fd, ret = 0, flag;
fd_set rdfds; // 读文件描述符集合,用于 select 检测是否可读
int loops = 5; // 控制循环次数,最多轮询 5 次

// 打开鼠标设备为非阻塞模式
fd = open(MOUSE, O_RDONLY | O_NONBLOCK);
if (fd == -1){
perror("open file error\n"); // 打开失败就报错并退出
exit(-1);
}

// 将标准输入(fd=0,即键盘)设置为非阻塞模式
flag = fcntl(0, F_GETFL); // 获取当前文件描述符的标志位
flag |= O_NONBLOCK; // 设置非阻塞标志
fcntl(0, F_SETFL, flag); // 更新回去

while(loops--){
FD_ZERO(&rdfds); // 清空读文件描述符集合
FD_SET(0, &rdfds); // 把标准输入加入监听集合(键盘输入)
FD_SET(fd, &rdfds); // 把鼠标设备加入监听集合

// select 监听两个设备是否可读,阻塞直到有事件发生
// 注意:第一个参数要设置为最大 fd + 1
ret = select(fd + 1, &rdfds, NULL, NULL, NULL);
if (ret < 0){
perror("select error\n"); // 出错就退出
goto out;
}
else if (ret == 0){
fprintf(stderr, "select timeout\n"); // select 超时(本例不会超时,因为 timeout 是 NULL)
continue;
}

// 检查标准输入(键盘)是否就绪
if (FD_ISSET(0, &rdfds)){
ret = read(0, buf, sizeof(buf)); // 从键盘读取输入
if (ret > 0){
printf("keyboard: successfully read <%d> bytes\n", ret);
}
}

// 检查鼠标设备是否就绪
if (FD_ISSET(fd, &rdfds)){
ret = read(fd, buf, sizeof(buf)); // 从鼠标读取数据
if (ret > 0){
printf("mouse: successfully read <%d> bytes\n", ret);
}
}
}

out:
close(fd); // 关闭鼠标文件
exit(ret); // 正常退出
}

poll()

异步IO

异步IO能够在发起一个 I/O 请求后立即返回,等内核完成后再通知用户,不阻塞当前线程,非常适合高性能、响应敏感的场景。即“把事交给内核干,自己不等,等内核干完再来通知”。

示例(异步读取文件):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <aio.h>
#include <string.h>

int main() {
int fd;
struct aiocb cb; // AIO 控制块
char buffer[100];

// 打开文件
fd = open("test.txt", O_RDONLY);
if (fd < 0) {
perror("open");
exit(1);
}

// 清空控制块结构体
memset(&cb, 0, sizeof(struct aiocb));

cb.aio_fildes = fd; // 设置文件描述符
cb.aio_buf = buffer; // 指定接收数据的缓冲区
cb.aio_nbytes = sizeof(buffer); // 读取字节数
cb.aio_offset = 0; // 从文件开头读取

// 发起异步读请求
if (aio_read(&cb) < 0) {
perror("aio_read");
close(fd);
exit(1);
}

// 等待读取完成
while (aio_error(&cb) == EINPROGRESS) {
printf("Reading in progress...\n");
sleep(1);
}

// 检查结果
if (aio_error(&cb) != 0) {
perror("aio_error");
} else {
int ret = aio_return(&cb); // 获取实际读取的字节数
printf("Read %d bytes: %.*s\n", ret, ret, buffer);
}

close(fd);
return 0;
}

Linux 进程与线程(三)线程安全和高级I/O
http://akichen891.github.io/2025/04/22/Linux线程3/
作者
Aki
发布于
2025年4月22日
更新于
2025年4月22日
许可协议