小白笔记 - 进程间通信
进程间通信(IPC)是面试常问的一个概念。曾经我也只会像报菜名一样,简单列举一下IPC的几种方式:管道、FIFO、消息队列、共享内存、socket、信号……但现在,我发觉光报菜名并不够,还是需要讲一些更深入的东西。能吹上半个钟头,把人给唬的一愣一愣的,才是本事😂。
因此我就以Linux系统和POSIX接口作为学习对象,重新了解了一下IPC的知识,主要参考资料为《UNIX环境高级编程第3版》(以下简称为APUE),Linux手册以及维基百科,并尝试寻找一些实际项目中IPC使用的例子。(有些没找到。。)本文算是我的学习笔记和总结。
信号(signal)
信号是发送给一个进程的异步的通知。信号有几十种不同类型,名字以SIG开头,定义为正整数常量,算是一种能力有限的IPC方式。具体介绍参见APUE第10章和Linux手册signal(7)。
信号产生
- 用户终端操作。如
Ctrl+C
会产生终端中断信号(SIGINT
),常用来终止前台进程。Ctrl+Z
会产生终端停止信号(SIGTSTP
),常用来挂起前台进程,然后可以用fg
/bg
命令恢复进程执行。 - 异常产生信号。如进程段错误会产生
SIGSEGV
信号。 - 系统调用
kill
(2)。 - 用户命令
kill
(1)。如kill -9 pid
发送SIGKILL
杀进程。 - 其他。如
alarm
(2)定时器超时会产生SIGALRM
信号。
信号处理
- 捕捉信号。系统调用
signal
(2)或sigaction
(2)注册信号处理函数。pause
(2)使进程挂起直到捕捉到信号。SIGKILL
和SIGSTOP
不能捕捉。 - 执行系统默认操作。把信号处理函数设为
SIG_DFL
。默认操作大多是终止进程。 - 忽略。把信号处理函数设为
SIG_IGN
。SIGKILL
和SIGSTOP
不能忽略。 - 阻塞。
sigprocmask
(2)。
C语言接口
int kill(pid_t pid, int signo);
int raise(int signo); // 等于 kill(getpid(), signo)
unsigned int alarm(unsigned int seconds);
int pause(void);
void (*signal(int signo, void (*func) (int)))(int);
int sigaction(int signo, const struct sigaction *restrict act,
struct sigaction *restrict oact);
例子
abort
(3),system
(3),sleep
(3)的实现均用到了信号。
glibc的abort()
实现就是先把SIGABRT
的阻塞去除:
sigset_t sigs;
sigemptyset (&sigs);
sigaddset (&sigs, SIGABRT);
sigprocmask (SIG_UNBLOCK, &sigs, 0);
然后执行raise(SIGABRT)
。如果用户注册了SIGABRT
信号处理函数,这会执行用户注册的函数,所以需要把用户注册的去除,换成默认的:
struct sigaction act;
memset (&act, '\0', sizeof (struct sigaction));
act.sa_handler = SIG_DFL;
sigfillset (&act.sa_mask);
act.sa_flags = 0;
sigaction (SIGABRT, &act, NULL);
然后再执行raise(SIGABRT)
。
- Chromium如何处理
SIGINT
等信号。
POSIX环境中,Chromium也是通过sigaction
来处理SIGINT
等信号,在信号处理函数中执行一些浏览器关闭所需的操作:
// We need to handle SIGTERM, because that is how many POSIX-based distros
// ask processes to quit gracefully at shutdown time.
struct sigaction action;
memset(&action, 0, sizeof(action));
action.sa_handler = SIGTERMHandler;
CHECK_EQ(0, sigaction(SIGTERM, &action, nullptr));
// Also handle SIGINT - when the user terminates the browser via Ctrl+C. If
// the browser process is being debugged, GDB will catch the SIGINT first.
action.sa_handler = SIGINTHandler;
CHECK_EQ(0, sigaction(SIGINT, &action, nullptr));
// And SIGHUP, for when the terminal disappears. On shutdown, many Linux
// distros send SIGHUP, SIGTERM, and then SIGKILL.
action.sa_handler = SIGHUPHandler;
CHECK_EQ(0, sigaction(SIGHUP, &action, nullptr));
管道(pipe)
管道,也就是匿名管道,是UNIX系统最古老的IPC形式,具体介绍参见APUE第15章2至3节和Linux手册pipe(7)。
管道有两个局限:1. 管道是半双工的,数据只能在一个方向上流动。2. 管道只能在具有公共祖先的两个进程之间使用。
创建管道
- 管道通过
pipe
(2)创建:int pipe(int fd[2]);
fd
返回两个文件描述符,fd[1]
为写,fd[0]
为读。通常进程会先调用pipe
再调用fork
,创建父进程和子进程之间的IPC通道。 popen
(3)可以创建管道,执行shell命令,并通过管道连接其标准输入或者输出:FILE *popen(const char *cmdstring, const char *type); int pclose(FILE* fp);
- Shell中可以用
|
来创建一个从左边的标准输入到右边的标准输出的管道
例子
管道在Shell中很常见。如简单的统计:cat xxx | sort | uniq -c | sort -rg | less
。
命名管道(FIFO)
FIFO是一种文件类型,与管道相比,FIFO可以在任意两个进程之间传送数据。具体介绍参见APUE第15章5节和Linux手册fifo(7)。
创建命名管道
- C接口
mkfifo
(3)int mkfifo(const char *path, mode_t mode);
- 用户命令
mkfifo
(1)
使用FIFO和使用普通文件一样,先open,再read/write。如通过多个客户进程向一个FIFO写,一个服务器进程从FIFO读来通信。
例子
没找到。
System V IPC / POSIX IPC
下面的三种IPC方式(消息队列,信号量,共享内存)在UNIX上有两套实现:System V IPC和POSIX IPC。System V IPC顾名思义来源于System V系统,比较古老,APUE中称之为XSI IPC,具体介绍参见第15章6到9节。POSIX IPC出现的时间更晚,接口更简单,目前网上也推荐用POSIX IPC,所以下面就以POSIX IPC接口为例来看这三种IPC方式。
消息队列(message queue)
消息队列是一种异步的IPC方式,发送方把消息存放到消息队列中,直到接收方取出消息。具体介绍参见Linux手册mq_overview(7)。
- 创建/打开消息队列:
mq_open
(3),消息队列的名字是以斜杠开头的字符串/somename
。 - 发送/接收消息:
mq_send
(3),mq_receive
(3) - 消息异步通知:
mq_notify
(3),注册回调函数,在有新的消息来的时候执行。
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name); // 删除消息队列
例子
没找到。
信号量(semaphore)
信号量实际上是同步原语而不是IPC,用于共享资源的同步访问。只能用于进程/线程之间的同步,没法传递数据。具体介绍参见APUE第15章10节和Linux手册sem_overview(7)。
- 创建/打开信号量:
sem_open
(3),信号量的名字是以斜杠开头的字符串/somename
。sem_init
(3)创建未命名信号量。 - 信号量减1操作,wait(P):
sem_wait
(3),sem_trywait
(3) - 信号量加1操作,signal(V):
sem_post
(3)
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_close(sem_t *sem);
int sem_unlink(const char *name); // 删除命名信号量
int sem_destroy(sem_t *sem); // 销毁未命名信号量
共享内存(shared memory)
共享内存能让多个进程同时访问同一块内存,是一种高效的数据传递方式。具体介绍参见Linux手册shm_overview(7)。
- 创建/打开共享内存:
shm_open
(3),共享内存的名字是以斜杠开头的字符串/somename
。用ftruncate
(2)设置共享内存的大小。 - 映射到内存空间:
mmap
(2)
#include <sys/mman.h>
int shm_open(const char *name, int oflag, mode_t mode);
int shm_unlink(const char *name);
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
int munmap(void *addr, size_t length);
Linux还以tmpfs的形式在/dev/shm
提供了共享内存的接口,df -h
命令可以看到其使用情况。所以我们可以不用shm_*函数,直接在这个目录下创建文件,然后mmap
来使用共享内存。
例子
Chromium在POSIX环境下创建共享内存的方式就是在/dev/shm
目录创建文件,然后ftruncate
设置大小:
PlatformSharedMemoryRegion PlatformSharedMemoryRegion::Create(Mode mode, size_t size) {
if (size == 0)
return {};
if (size > static_cast<size_t>(std::numeric_limits<int>::max()))
return {};
CHECK_NE(mode, Mode::kReadOnly) << "Creating a region in read-only mode will "
"lead to this region being non-modifiable";
// This function theoretically can block on the disk, but realistically
// the temporary files we create will just go into the buffer cache
// and be deleted before they ever make it out to disk.
ThreadRestrictions::ScopedAllowIO allow_io;
// We don't use shm_open() API in order to support the --disable-dev-shm-usage
// flag.
FilePath directory;
if (!GetShmemTempDir(false /* executable */, &directory))
return {};
ScopedFD fd;
FilePath path;
fd.reset(CreateAndOpenFdForTemporaryFileInDir(directory, &path));
if (!fd.is_valid()) {
PLOG(ERROR) << "Creating shared memory in " << path.value() << " failed";
FilePath dir = path.DirName();
if (access(dir.value().c_str(), W_OK | X_OK) < 0) {
PLOG(ERROR) << "Unable to access(W_OK|X_OK) " << dir.value();
if (dir.value() == "/dev/shm") {
LOG(FATAL) << "This is frequently caused by incorrect permissions on "
<< "/dev/shm. Try 'sudo chmod 1777 /dev/shm' to fix.";
}
}
return {};
}
// Deleting the file prevents anyone else from mapping it in (making it
// private), and prevents the need for cleanup (once the last fd is
// closed, it is truly freed).
ScopedPathUnlinker path_unlinker(&path);
ScopedFD readonly_fd;
if (mode == Mode::kWritable) {
// Also open as readonly so that we can ConvertToReadOnly().
readonly_fd.reset(HANDLE_EINTR(open(path.value().c_str(), O_RDONLY)));
if (!readonly_fd.is_valid()) {
DPLOG(ERROR) << "open(\"" << path.value() << "\", O_RDONLY) failed";
return {};
}
}
// Get current size.
struct stat stat = {};
if (fstat(fd.get(), &stat) != 0)
return {};
const size_t current_size = stat.st_size;
if (current_size != size) {
if (HANDLE_EINTR(ftruncate(fd.get(), size)) != 0)
return {};
}
if (readonly_fd.is_valid()) {
struct stat readonly_stat = {};
if (fstat(readonly_fd.get(), &readonly_stat))
NOTREACHED();
if (stat.st_dev != readonly_stat.st_dev ||
stat.st_ino != readonly_stat.st_ino) {
LOG(ERROR) << "Writable and read-only inodes don't match; bailing";
return {};
}
}
return PlatformSharedMemoryRegion({std::move(fd), std::move(readonly_fd)},
mode, size, UnguessableToken::Create());
}
网络套接字(socket)
Socket是通过IP协议,可以在不同的计算机之间进行通信的方式。具体介绍参见APUE第16章和Linux手册socket(7)。
Unix网络编程是一个很大的领域,APUE的作者Richard Stevens还有另一部著作《UNIX网络编程》,第一卷专门讲的socket,第二卷专门讲的各种UNIX IPC。我没看过这两本书,也没怎么接触过Linux服务器后端这块,所以这里只简单写一下。
创建socket连接
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
int listen(int sockfd, int backlog);
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
socket
(3)创建socket。参数domain
确定通信使用的协议族,如AF_INET
是IPv4协议,AF_INET6
是IPv6协议。参数type
确定socket的类型,如SOCK_DGRAM
(固定长度无连接不可靠报文)和SOCK_STREAM
(有序可靠双向面向连接的字节流)。参数protocol
确定具体协议,传入0
表示选择默认的协议。如AF_INET
下,SOCK_DGRAM
的默认协议是UDP(IPPROTO_UDP
),SOCK_STREAM
的默认协议是TCP(IPPROTO_TCP
)。函数返回值为socket描述符,本质上是一个文件描述符。
bind
(2)将socket与一个地址关联。IPv4协议地址就是IP加端口号了。
面向连接的协议(TCP)需要一个连接过程。客户端需要用connect
(2)建立客户端的socket与服务器端的地址的连接。服务器端用listen
(2)监听连接请求,并用accept
(2)获得连接请求并建立连接。accept
返回一个连接到客户端的新的socket。而原来的socket不受影响,还在监听请求。
数据传输
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
有三个系统调用用来发送数据:send
,sendto
和sendmsg
。有三个系统调用用来接收数据:recv
,recvfrom
和recvmsg
。
调用接收函数没有数据可用时会阻塞,而socket也支持非阻塞I/O模型。服务器为了高并发也有许多别的I/O模型,不过这就是另一个话题了。
例子
socket我就不举例了,所有服务器软件都要用这个。
Unix域套接字(Unix domain socket)
Unix domain socket是用于同一台计算机上的IPC。其API与socket相似,但不用使网络协议,效率更高。Unix domain socket也提供了字节流和数据报两种接口,而且其数据报服务是可靠有序的。
创建Unix domain socket
- 未命名Unix domain socket:
socketpair
(2)创建一对未命名的相互连接的Unix domain socket。参数domain
应设为AF_UNIX
。参数type
可以是SOCK_STREAM
或者SOCK_DGRAM
。#include <sys/socket.h> int socketpair(int domain, int type, int protocol, int sockfd[2]);
- 命名Unix domain socket:也是用
socket
(2)创建。参数domain
应设为AF_UNIX
。参数type
可以是SOCK_STREAM
或者SOCK_DGRAM
。然后bind
(2)将Unix domain socket与一个地址关联。但这个地址就不是socket中的IP加端口号了,而是一个路径名。bind
会在这个路径名创建一个S_IFSOCK
文件。
Unix domain socket的连接和数据传输是和socket是一样的。
传输文件描述符
Unix domain socket是一种高级的IPC方式,高级的一点就是可以传输文件描述符。每个进程都有自己的文件表项,所以发送文件描述符不只是传输一个int值那么简单,而是需要内核处理的。
例子
Chromium的IPC库mojo在POSIX环境下就主要使用Unix domain socket作为IPC的方式:
#elif defined(OS_POSIX)
void CreateChannel(PlatformHandle* local_endpoint, PlatformHandle* remote_endpoint) {
int fds[2];
PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
// Set non-blocking on both ends.
PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
*local_endpoint = PlatformHandle(base::ScopedFD(fds[0]));
*remote_endpoint = PlatformHandle(base::ScopedFD(fds[1]));
DCHECK(local_endpoint->is_valid());
DCHECK(remote_endpoint->is_valid());
}
#endif