通常 Web 应用的数据都会持久化到数据库中,随着应用数据量以及访问量的增加,就会导致数据库负担加重,从而导致网站的延迟。
Memcached 是一个高性能的分布式内存缓存服务器,用于缓存数据库的查询结果,从而减少数据库的访问次数,提高动态 Web 应用的速度、提高扩展性。
简介
分布式缓存在设计时需要考虑几点常见的原则:A) 缓存本身的水平线性扩展;B) 缓存大并发下本身的性能问题,包括内存的管理问题,如内存的分配、管理、回收机制;C) 避免缓存的单点故障问题,以及分布式系统常见的多副本和副本一致性问题。
memcached 采用基于文本的简单通讯协议,可以直接通过 telnet 进行操作;其事件处理是基于 libevent;虽然,我们说 memcached 是分布式缓存服务器,但实际上各个服务器之间并不通讯,其分布式主要是通过客户端实现。
安装、使用
在 CentOS 上可以通过 yum 简单安装,或者直接通过源码安装。
# yum install memcached # 直接安装
# systemctl start memcached # 启动
# ./configure && make && make test && make install # 通过源码编译
$ memcached -p 11211 -m 64m -d # 后台启动
$ telnet 127.1 11211 # 通过telnet链接
add foobar 0 60 5 # 添加,由重复则失败
get foobar # 获取foobar的值
set foobar 0 60 5 # 存在则更新,否则添加
delete foobar # 删除
stats # 查看状态,也可以使用memcached-tool
对于 add 和 set 命令,其中 0 为标志,60 表示数据存放 60s,5 表示放入多大数据。
上述是简单的启动和使用方式,常见的启动选项可以参考如下,详细内容查看 man 1 memcached 。
memcached [options]
-p [num]
指定 TCP 的监听端口,默认使用 11211。
-c [num]
可以同时处理的最大链接数,默认是 1024 。
-b [num]
设置 backlog 队列的大小,默认是 1024 。
-m [num]
用户可以使用多少MB的内存,默认是 64M 。
-d
后台运行。
-P [filename]
后台启动时指定 pidfile 的路径。
链接、线程处理
线程通过 LIBEVENT_THREAD *threads 表示,
main()
|-... ... # 设置进程打开文件个数限制等
|-daemonize() # 如果需要后台运行
| |-fork()
|-mlockall() # 锁定内存,保证内存不会换出
|
|-event_init() # 1. 初始化主线程的event_base,也就是libevent实例
|-stats_init() # 统计状态的初始化
|-assoc_init() # hash表的初始化
|-conn_init() # 连接初始化
|-slabs_init() # slabs结构的初始化
|-sigignore() # 忽略SIGPIPE信号
|-memcached_thread_init() # 2. 创建工作线程,默认是4个,可以通过pstree -p PID查看
| |
| |-pipe() # 3. 创建pipe用于主线程和工作线程通讯
| |
| |-setup_thread() # 创建每个线程自己的libevent的event_base
| | |-event_init() # 每个独立的线程都包含独立的event_base
| | |-event_set() # 创建pipe的读事件监听,回调thread_libevent_process方法
| | |-event_base_set()
| | |-event_add() # 添加事件操作
| | |-cq_init() # 4. 初始化每个工作线程的队列
| | |-pthread_mutex_init() # 初始化线程池
| | |-cache_create()
| |
| |-create_worker() # 循环创建线程,真正的创建工作线程,实际是worker_libevent()
| |-pthread_attr_init() # 属性初始化
| |-pthread_create() # 5. 创建线程worker_libevent(),直接启动
|
|-start_assoc_maintenance_thread() # 启动
|-start_item_crawler_thread()
|-start_lru_maintainer_thread()
|-start_slab_maintenance_thread()
|
|-clock_handler() # 初始化时间设置
|
|-server_sockets() # 6. 创建socket,绑定地址,设置非阻塞模式,包括TCP/UDP
| |-server_socket() # 实际的绑定函数
| |-conn_new() # 7. 将监听socket添加到main_base的libevent的事件队列中
|
|-save_pid() # 保存PID到一个文件中
|-event_base_loop() # 8. 启动主线程的libevetn循环
worker_libevent() # 新建的子线程,会初始化子线程的libevent
|-register_thread_initialized()
|-event_base_loop() # 开启事件循环,每个线程有自己的事件处理机制,启动libevent
event_handler() # Master/Worker线程libevent回调函数,此时有网络事件到达
|-drive_machine() # 进入业务处理状态机
thread_libevent_process() # pipe接收回调函数
|-read() # 读取pipe中的信息
主线程中如果有新的连接,会向其中一个线程的 pipe 中写入 1,子线程读取 pipe 中的数据,如果为 1,则说明从 pipe 中获取的数据是正确的。
在初始化完成之后,会忽略 SIGPIPE 信号,其中 PIPE 信号是当网络连接一端已经断开,这时发送数据,会发送 RST 包,当再次发送数据,会触发 PIPE 信号,而 PIPE 信号的默认动作是退出进程,因此忽略。
数据结构
CQ_ITEM 是主线程 accept() 后返回的已建立连接的 fd 的封装,而 CQ 是一个管理 CQ_ITEM 的单向链表。
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;
typedef struct conn conn;
struct conn {
int sfd;
sasl_conn_t *sasl_conn;
bool authenticated;
enum conn_states state;
enum bin_substates substate;
rel_time_t last_cmd_time;
struct event event;
short ev_flags;
... ...
};
LIBEVENT_THREAD 是 memcached 里的线程结构封装,如上所示,每个线程都包含一个 CQ 队列,一条通知管道 pipe,一个 libevent 的实例 event_base 以及线程的状态。
另外一个重要的结构是对每个网络连接的封装 conn,这个结构体的成员变量比较多,暂时只列举了部分。
线程交互
memcached 采用典型的 Master-Worker 线程模型,其模型很简单,Master 监听网络链接,接受链接请求之后通过线程间通讯来唤醒 Worker 线程,后续的读写操作都是通过 Worker 完成。
主线程和各个线程的处理都是通过 libevent 处理,通过实例化多个 libevent 实例实现,分别对应了一个主线程和 N 个 workers 线程。Master 线程和 Workers 线程全部都是通过 libevent 管理网络事件,也就是说实际上每个线程都是一个单独的 libevent 实例。
主线程的 libevent 实例在主线程初始化时设置,工作线程则在 setup_thread() 中建立 libevent 实例。
Master 线程负责监听客户端的建立连接请求,并 accept 连接,Workers 线程负责处理已经建立好的连接的读写等事件。
Master Thread
首先看看主线程是如何通知 workers 线程处理新连接的,在初始化时会添加监听事件。
static struct event_base *main_base; // 主libevent
static int server_socket(...) {
... ...
for (next= ai; next; next= next->ai_next) {
... ...
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
... ...
}
}
主线程的 libevent 注册的是监听 socket 描述字的可读事件,就是说当有建立连接请求时,主线程会处理,新建监听端口是通过 conn_new() 初始化,其回调的函数是 event_handler() 。
对于主线程来说,会进入到 conn_listening 分支,也就是调用 dispatch_conn_new() 函数;在该函数中,会创建一个新的 CQ_ITEM,然后通过 round robin 策略选择了一个 thread,并通过 cq_push 将这个 CQ_ITEM 放入了该线程的 CQ 队列里。
最后通过 write() 向该线程管道写了 1 字节数据,则该线程的 libevent 立即回调 thread_libevent_process() 函数进行处理。
Worker Thread
当 memcached 刚启动时,也就是当刚初始化完成之后,每个 workers 线程只有在自己线程管道的读端有数据时触发调用 thread_libevent_process() 方法,而主线程在有链接时会写入数据,该函数处理的就是主线程新建链接的请求。
static void thread_libevent_process(int fd, short which, void *arg) {
... ...
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
... ...
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
}
}
入参的 fd 是这个线程的管道读端的描述符,在上述的函数中,首先将管道的 1 个字节通知信号读出。需要注意的是,在水平触发模式下如果不处理该事件,则会被循环通知,直到该事件被处理。
cq_pop() 会从该线程的 CQ 队列中取队列头的一个 CQ_ITEM,这个 CQ_ITEM 是被主线程丢到这个队列里的,item->sfd 是已建立的连接的描述符,然后会调用 conn_new() 函数。
conn *conn_new(...) {
... ...
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
}
在 conn_new() 中,为 sfd 描述符注册 libevent 的读事件,接着会调用 event_add() 函数,也就是对该描述符的事件处理交给当前这个 workers 线程处理。
可以看到新的连接被注册了一个事件,实际上是 EV_READ|EV_PERSIST 事件,当该连接有可读数据时会调用函数 event_handler(),实际上 event_handler() 里主要是调用 memcached 的核心方法 drive_machine() 。
而 Worker 线程最终会走到 conn_read 分支,可以参考如下从网上找的工作流程。
也就是说,实际上,Master/Worker 线程的 libevent 实例都会调用 event_handler() 函数,而在该函数中的核心处理为其状态机的处理。
状态机
Worker 线程在处理 libevent 的事件时,会进入状态机处理不同的逻辑,详细的调用逻辑如下:
event_handler() # libevent回调函数,此时有网络事件到达
|-drive_machine() # 进入业务处理状态机
drive_machine() 就是通过当前连接的 state 来判断该进行何种处理,Master 和 Workers 的 libevent 事件处理都会调用该函数,其链接的状态是通过一个 enum 声明。
enum conn_states {
conn_listening, // 主线程listen的主要状态,其工作就是把接到连接分发到worker子线程
conn_new_cmd, // 将链接设置为初始态,准备接收下个命令,会清空读写buffer
conn_waiting, // 等待读取socket,实际上就是挂起该链接,等待新的链接到来
conn_read, // 从客户端读取信息,也就是命令信息
conn_parse_cmd, // 尝试从读取的buffer中解析命令
conn_write, // 主要是out_string()函数,一般都是提示信息和返回的状态信息
conn_nread, // 读取固定大小的数据,会更新到item、hash、lru中,并调转到conn_write
conn_swallow, // 忽略不需要的数据
conn_closing, // 服务端主动调用close()关闭链接,并把conn结构体放到空闲队列
conn_mwrite, // 将这个链接中的msglist返回给客户端,可能会返回多个
conn_closed, // 链接已经被关闭
conn_max_state // 由于assert判断
};
首先着重说一下 conn_swallow 状态,对于像 update 操作,如果分配 item 失败,显然后面读取的数据是无效的,不过客户端是不知道的,客户端会继续发送特定的数量的数据,就需要把读到的这些数据忽略掉。
接下来详细查看 drive_machine() 函数的处理流程,首先查看下主线程 conn_listening 状态的处理。
static void drive_machine(conn *c) {
... ...
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
if (sfd == -1) {
... ... // 多种的错误处理
break;
}
if (!use_accept4) { // 设置为非阻塞
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
}
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
... .. // 超过了设置的最大链接数
} else { // OK,分发给Worker线程,初始化状态为conn_new_cmd
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
}
}
}
conn_new_cmd
子线程最初进入的状态就是 conn_new_cmd 状态,这个状态主要是做一些清理。
static void drive_machine(conn *c) {
... ...
while (!stop) {
switch(c->state) {
... ...
case conn_new_cmd:
--nreqs; // 记录每个工作线程处理,记录每个libevent实例处理的事件,通过初始启动参数配置
if (nreqs >= 0) { // 还可以处理请求
reset_cmd_handler(c); // 清空缓冲区
} else { // 拒绝请求
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.conn_yields++; // 更新统计数据
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rbytes > 0) {
// 已经将数据读取到了buffer中,因此需要发送一个event
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
}
stop = true;
}
break;
}
... ...
}
}
static void reset_cmd_handler(conn *c) {
c->cmd = -1;
c->substate = bin_no_state;
if(c->item != NULL) {
item_remove(c->item); // 如果有item,则直接删除
c->item = NULL;
}
conn_shrink(c); // 整理缓冲区,暂不分析
if (c->rbytes > 0) { // 如果缓冲区还有数据,则直接到conn_parse_cmd中继续处理
conn_set_state(c, conn_parse_cmd);
} else { // 否则进入等待状态,状态机没有数据要处理,就进入这个状态
conn_set_state(c, conn_waiting);
}
}
根据是否有数据,分别会进入不同的状态,如果没有数据则会进入 conn_waiting 等待接收数据。
conn_waiting、conn_read
接下来主要是处理数据报文的接收,包括了 conn_waiting、conn_read 两个状态。
static void drive_machine(conn *c) {
... ...
while (!stop) {
switch(c->state) {
... ...
case conn_waiting:
// 更新libevent状态,也就是删除libevent事件后,重新注册libevent事件
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
conn_set_state(c, conn_read); // 进入读数据状态
stop = true;
break;
case conn_read:
// 判断采用UDP协议还是TCP协议
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED: // 未读取到数据
conn_set_state(c, conn_waiting); // 则继续等待
break;
case READ_DATA_RECEIVED: // 已经读取到数据
conn_set_state(c, conn_parse_cmd); // 开始解析命令
break;
case READ_ERROR: // 读取发生错误
conn_set_state(c, conn_closing); // 直接关闭链接
break;
case READ_MEMORY_ERROR: // 申请内存发生错误,则继续尝试
break;
}
break;
... ...
}
}
}
如果采用 TCP 协议则会调用 try_read_network() 从网络读取数据;对于 UDP 则比较简单,因为是数据报,读取到一个,就是一个完整的数据报,所以其处理过程简单。
conn_parse_cmd
从网络读取了数据之后,则会进入该状态,按协议来解析读取到的网络数据。
static void drive_machine(conn *c) {
... ...
while (!stop) {
switch(c->state) {
... ...
case conn_parse_cmd :
if (try_read_command(c) == 0) { // 尝试解析命令
// 如果读取到的数据不够,则继续等待
conn_set_state(c, conn_waiting);
}
break;
... ...
}
}
}
static int try_read_command(conn *c) {
... ...
if (c->protocol == binary_prot) {
... ...
dispatch_bin_command(c);
} else {
... ...
process_command(c, c->rcurr);
}
}
命令解析包括了文本协议和二进制协议,其命令的处理分别调用不同的命令处理函数。
内存管理
对于 memcached 内存高效管理是其最重要的任务之一,为了减小内存碎片的产生,采用 slab 管理其内存。简单来说,就是分配一块大内存,然后按照不同的块切分这些内存,存储业务数据时,按需选择合适的内存空间存储数据。
默认分配 64M 内存,之后所有的数据都是在这 64M 空间进行存储,在启动之后,不会释放这些内存,直到进程退出。
在 memcached 中,与内存相关的配置参数包括了设置内存大小、内存的增长因子,会在 slabs_init() 初始化的时候作为入参传入函数。
static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES]; // 定义slab class最大值,64个
static size_t mem_limit = 0; // 总的内存大小
static size_t mem_malloced = 0; // 初始化内存的大小
static void *mem_base = NULL; // 指向总的内存的首地址
static void *mem_current = NULL; // 当前分配到的内存地址
static size_t mem_avail = 0; // 当前可用的内存大小
void slabs_init(const size_t limit, const double factor, const bool prealloc) {
int i = POWER_SMALLEST - 1;
//size表示申请空间的大小,其值由配置的chunk_size和单个item的大小来指定
unsigned int size = sizeof(item) + settings.chunk_size;
mem_limit = limit; // 设置入参设置的内存上限
if (prealloc) { // 是否要预先分配设置的内存
mem_base = malloc(mem_limit); // 通过malloc()申请内存,并将内存基址指向mem_base
if (mem_base != NULL) {
mem_current = mem_base; // mem_current指向当前地址
mem_avail = mem_limit; // 可用内存大小为mem_limit
} else { // 预分配失败
fprintf(stderr, "Warning: Failed to allocate requested memory in"
" one large chunk.\nWill allocate in smaller chunks\n");
}
}
memset(slabclass, 0, sizeof(slabclass)); // 置空slab class数组
// 开始分配,i<200 && 单个chunk的size<单个item最大大小/内存增长因子
while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
if (size % CHUNK_ALIGN_BYTES) // 将size按照8byte对齐
size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
slabclass[i].size = size; // slab对应chunk的大小
slabclass[i].perslab = settings.item_size_max / slabclass[i].size; // slab对应的chunk的个数
size *= factor; // size下一个值为按增长因子的倍数增长
if (settings.verbose > 1) { // 如果有打开调试信息,则输出调试信息
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}
}
power_largest = i; // size已经增长到1M,再增加一个slab
slabclass[power_largest].size = settings.item_size_max; // slab的size为item_size_max
slabclass[power_largest].perslab = 1; // chunk个数为1
if (settings.verbose > 1) {
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}
... ...
if (prealloc) {
// 在每个slab class中都先预分配一个1M的内存
slabs_preallocate(power_largest);
}
}
Hash表
在 memcached 中,其采用链接法来处理 Hash 冲突,如果冲突太高,则会导致链表过长,从而会导致查找时的耗时变长。
为次当表中元素的个数超过 Hash 容量的 1.5 倍时就进行扩容,扩容过程由独立的线程来完成,扩容过程中会采用 2 个 Hash 表,将老表中的数据通过 Hash 算法映射到新表中,每次移动的桶的数目可以配置,默认是每次移动老表中的 1 个桶。
在启动时,会通过 start_assoc_maintenance_thread() 函数启动一个线程,正常如果不需要扩容,则实际会调用 pthread 的 cond_wait() 函数一直等待。
首先看下如何在 hash 表中添加元素。
// 在hash表中增加元素,不同于assoc_update,需要先确保没有相应的数据
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
// 如果正在扩容且目前进行扩容还没到需要插入元素的桶,则将元素添加到旧桶中
if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket]; // 添加元素
old_hashtable[oldbucket] = it;
} else { // 如果没扩容,或者扩容已经到了新的桶中,则添加元素到新表中
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}
pthread_mutex_lock(&hash_items_counter_lock);
hash_items++;
// 还没开始扩容,但是表中元素个数已经超过Hash表容量的1.5倍,触发开始扩容
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand(); // 唤醒扩容线程
}
pthread_mutex_unlock(&hash_items_counter_lock);
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
static void assoc_start_expand(void) { // 唤醒对应的扩容线程
if (started_expanding)
return;
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
CAS
CAS 也即 Compare And Set 或 Compare And Swap,实现的是一中无锁方法,或者说是乐观锁的一种技术,过程中会使用 CPU 提供的原子操作指令,可以提高系统的并发性能,在 Memcached 中用来保证数据的一致性,不是为了实现严格的锁。
当多个应用尝试修改同一个数据时,会出现相互覆盖的情况,此时使用 CAS 版本号验证,可以有效的保证数据的一致性。每次存储数据时,都会将生成的 CAS 值和 item 一起存储,后续的 get 会返回对应的 CAS 值,执行 set 等操作时,需要将 CAS 值传入。
其处理函数为 do_store_item() 。
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
... ...
} else if (comm == NREAD_CAS) {
/* validate cas operation */
if(old_it == NULL) {
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) { // CAS值一致
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_replace(old_it, it, hv); // 执行存储逻辑
stored = STORED;
} else { // CAS值不一致,不进行实际的存储
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if(settings.verbose > 1) { // 打印错误日志
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS;
}
}
}
uint64_t get_cas_id(void) // 为新的item生成cas值
{
static uint64_t cas_id = 0;
return ++cas_id;
}
参考
详细内容以及最新的版本可以查看官方网站 www.memcached.org 。