简言之,惊群现象就是当多个进程或线程在同时阻塞等待同一个事件时,如果这个事件发生,会唤醒所有的进程,但最终只可能有一个进程/线程对该事件进行处理,其他进程/线程会在失败后重新休眠,这种性能浪费就是惊群。
这里对 Linux 中的惊群现象进行简单介绍。
惊群
关于惊群的解释可以查看 Wiki 的解释 Thundering herd problem 。
accept()
常见的场景如下。
主进程执行 socket()+bind()+listen()
后,fork()
多个子进程,每个子进程都通过 accept()
循环处理这个 socket;此时,每个进程都阻塞在 accpet()
调用上,当一个新连接到来时,所有的进程都会被唤醒,但其中只有一个进程会 accept()
成功,其余皆失败,重新休眠。这就是 accept 惊群。
如果只用一个进程去 accept 新连接,并通过消息队列等同步方式使其他子进程处理这些新建的连接,那么将会造成效率低下;因为这个进程只能用来 accept 连接,该进程可能会造成瓶颈。
而实际上,对于 Linux 来说,这只是历史上的问题,现在的内核都解决该问题,也即只会唤醒一个进程。可以通过如下程序进行测试,只会激活一个进程。
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <netinet/in.h>
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
int connfd;
int pid, i, status;
char sendbuff[1024];
struct sockaddr_in serveraddr;
printf("Listening 0.0.0.0:1234\n");
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(1234);
bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
for(i = 0; i < PROCESS_NUM; i++) {
pid = fork();
if(pid == 0) {
while(1) {
connfd = accept(fd, (struct sockaddr*)NULL, NULL);
snprintf(sendbuff, sizeof(sendbuff), "accept PID is %d\n", getpid());
send(connfd, sendbuff, strlen(sendbuff) + 1, 0);
printf("process %d accept success!\n", getpid());
close(connfd);
}
}
}
wait(&status);
return 0;
}
然后,通过 telnet 127.1 1234
或者 nc 127.1 1234
测试链接即可。
epoll()
另外还有一个是关于 epoll_wait()
的,目前来仍然存在惊群现象。
主进程仍执行 socket()+bind()+listen()
后,将该 socket 加入到 epoll 中,然后 fork 出多个子进程,每个进程都阻塞在 epoll_wait()
上,如果有事件到来,则判断该事件是否是该 socket 上的事件,如果是,说明有新的连接到来了,则进行 accept 操作。
为了简化处理,忽略后续的读写以及对 accept 返回的新的套接字的处理,直接断开连接。
#include <netdb.h>
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
int main (int argc, char *argv[])
{
int sfd, efd;
int flags;
int n, i, k;
struct epoll_event event;
struct epoll_event *events;
struct sockaddr_in serveraddr;
sfd = socket(PF_INET, SOCK_STREAM, 0);
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(atoi("1234"));
bind(sfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
flags = fcntl (sfd, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl (sfd, F_SETFL, flags);
if (listen(sfd, SOMAXCONN) < 0) {
perror ("listen");
exit(EXIT_SUCCESS);
}
if ((efd = epoll_create(MAXEVENTS)) < 0) {
perror("epoll_create");
exit(EXIT_SUCCESS);
}
event.data.fd = sfd;
event.events = EPOLLIN; // | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event) < 0) {
perror("epoll_ctl");
exit(EXIT_SUCCESS);
}
/* Buffer where events are returned */
events = (struct epoll_event*)calloc(MAXEVENTS, sizeof event);
for(k = 0; k < PROCESS_NUM; k++) {
if (fork() == 0) { /* children process */
while (1) { /* The event loop */
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("process #%d return from epoll_wait!\n", getpid());
sleep(2); /* sleep here is very important!*/
for (i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
/* An error has occured on this fd, or the socket is not
* ready for reading (why were we notified then?)
*/
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
} else if (sfd == events[i].data.fd) {
/* We have a notification on the listening socket, which
* means one or more incoming connections.
*/
struct sockaddr in_addr;
socklen_t in_len;
int infd;
//char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1) {
printf("process %d accept failed!\n", getpid());
break;
}
printf("process %d accept successed!\n", getpid());
/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
close(infd);
}
}
}
}
}
int status;
wait(&status);
free (events);
close (sfd);
return EXIT_SUCCESS;
}
注意:上述的处理中添加了 sleep()
函数,实际上,如果很快处理完了这个 accept()
请求,那么其余进程可能还没有来得及被唤醒,内核队列上已经没有这个事件,无需唤醒其他进程。
那么,为什么只解决了 accept()
的惊群问题,而没有解决 epoll()
的?
当接收到一个报文后,显然只能由一个进程处理 (accept);而 epoll()
却不同,因为内核不知道对应的触发事件具体由哪些进程处理,那么只能是唤醒所有的进程,然后由不同的进程进行处理。
Nginx 解决
如上所述,如果采用 epoll,则仍然存在该问题,nginx 就是这种场景的一个典型,我们接下来看看其具体的处理方法。
nginx 的每个 worker 进程都会在函数 ngx_process_events_and_timers()
中处理不同的事件,然后通过 ngx_process_events()
封装了不同的事件处理机制,在 Linux 上默认采用 epoll_wait()
。
主要在 ngx_process_events_and_timers()
函数中解决惊群现象。
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
... ...
// 是否通过对accept加锁来解决惊群问题,需要工作线程数>1且配置文件打开accetp_mutex
if (ngx_use_accept_mutex) {
// 超过配置文件中最大连接数的7/8时,该值大于0,此时满负荷不会再处理新连接,简单负载均衡
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 多个worker仅有一个可以得到这把锁。获取锁不会阻塞过程,而是立刻返回,获取成功的话
// ngx_accept_mutex_held被置为1。拿到锁意味着监听句柄被放到本进程的epoll中了,如果
// 没有拿到锁,则监听句柄会被从epoll中取出。
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
// 此时意味着ngx_process_events()函数中,任何事件都将延后处理,会把accept事件放到
// ngx_posted_accept_events链表中,epollin|epollout事件都放到ngx_posted_events链表中
flags |= NGX_POST_EVENTS;
} else {
// 拿不到锁,也就不会处理监听的句柄,这个timer实际是传给epoll_wait的超时时间,修改
// 为最大ngx_accept_mutex_delay意味着epoll_wait更短的超时返回,以免新连接长时间没有得到处理
if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) {
timer = ngx_accept_mutex_delay;
}
}
}
}
... ...
(void) ngx_process_events(cycle, timer, flags); // 实际调用ngx_epoll_process_events函数开始处理
... ...
if (ngx_posted_accept_events) { //如果ngx_posted_accept_events链表有数据,就开始accept建立新连接
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
if (ngx_accept_mutex_held) { //释放锁后再处理下面的EPOLLIN EPOLLOUT请求
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted events %p", ngx_posted_events);
// 然后再处理正常的数据读写请求。因为这些请求耗时久,所以在ngx_process_events里NGX_POST_EVENTS标
// 志将事件都放入ngx_posted_events链表中,延迟到锁释放了再处理。
}
关于 ngx_use_accept_mutex
、ngx_accept_disabled
的修改可以直接 grep 查看。
SO_REUSEPORT
Linux 内核的 3.9 版本带来了 SO_REUSEPORT
特性,该特性支持多个进程或者线程绑定到同一端口,提高服务器程序的性能,允许多个套接字 bind()
以及 listen()
同一个 TCP 或者 UDP 端口,并且在内核层面实现负载均衡。
在未开启 SO_REUSEPORT
时,由一个监听 socket 将新接收的链接请求交给各个 worker 处理。
在使用 SO_REUSEPORT
后,多个进程可以同时监听同一个 IP:Port ,然后由内核决定将新链接发送给那个进程,显然会降低各个 worker 接收新链接时锁竞争。
参考
关于 Nginx 使用 SO_REUSEPORT
的介绍 Socket Sharding in NGINX Release 1.9.1 。