libev 时间处理

2017-03-20 linux network

在之前的文章中介绍了 libev 库的基本概念以及常见的使用方法,其中与时间相关的内容只是简单介绍,不过这一部分涉及到的内容会比较多,而且杂,所以单独摘出来介绍下。

简介

默认 2-heap ,只有定义了 EV_USE_4HEAP 宏之后才会使用 4-heap ,后者通常用于数据量大时;具体来说,对于前者,任一节点,其父节点的位置为 floor(k/2),子节点位置为 2*k2*k+1

根据定理,对于 4 叉堆而言,下标为 x 的元素,其孩子节点的下标范围是 [4x+1, 4x+4],父节点的下标是 (x-1)/4,而在 libev 的代码中,使用数组存储堆时,4 叉堆的第一个元素存放在 a[3],2 叉堆的第一个元素存放在 a[1]

定时器管理

在管理定时器时,使用了堆这种数据结构,而且除了常见的最小 2 叉堆之外,它还实现了更高效的 4 叉堆,其主要目的是为了提高 CPU 的缓存利用率。

说白了,就是违背了局部性原理,因为在 2 叉堆中,对元素的操作通常在 NN/2 之间进行,所以对于含有大量元素的堆来说,两个操作数之间间隔比较远,对 CPU 缓存利用不太好。

libev 中的注释说明,对于元素个数为 50000+ 的堆来说,4 叉堆的效率要提高 5% 左右。

这个 heap 结构存储在数组中,可以参看静态二叉树、最小堆等概念,可以查看 Binary Heaps

代码实现

接下来看看 libev 中的实现。

堆元素

ANHE 就是堆元素,它要么就是一个指向时间监视器结构 ev_watcher_time 的指针 WT,要么除了包含该指针之外,还缓存了 ev_watcher_time 中的成员 at,堆中元素就是根据 at 的值进行组织的,具有最小 at 值得节点就是根节点。

//----- 宏EV_HEAP_CACHE_AT是为了提高在堆中的缓存利用率,主要是为了对at进行缓存
#if EV_HEAP_CACHE_AT
typedef struct {
    ev_tstamp at;
    WT w;
} ANHE;
#else
typedef WT ANHE;
#endif

在 libev 中,为了提高缓存命中率,在堆中可以选择缓存元素 at,文档中的原文是:

Heaps are not very cache-efficient. To improve the cache-efficiency of the timer and
periodics heaps, libev can cache the timestamp (at) within the heap structure(selected
by defining EV_HEAP_CACHE_AT to 1), which uses 8-12 bytes more per watcher and a few
hundred bytes more code, but avoids random read accesses on heap changes. This improves
performance noticeably with many (hundreds) ofwatchers.

宏定义

#if EV_USE_4HEAP

#define DHEAP 4
#define HEAP0 (DHEAP - 1) /* index of first element in heap */
#define HPARENT(k) ((((k) - HEAP0 - 1) / DHEAP) + HEAP0)
#define UPHEAP_DONE(p,k) ((p) == (k))
...
#else

#define HEAP0 1
#define HPARENT(k) ((k) >> 1)
#define UPHEAP_DONE(p,k) (!(p))
...

其中的宏 HEAP0 表示堆中第一个元素的下标;HPARENT(k) 是求下标为 k 的节点的父节点下标;UPHEAP_DONE 宏用于向上调整堆时,判断是否已经到达了根节点,对于4叉堆而言,根节点下标为3,其父节点的下标根据公式得出,也是3,所以结束的条件 ((p) == (k)),对于2叉堆而言,根节点下标为1,其父节点根据公式得出下标为0,所以结束的条件是 (!(p))

实现时它是一个最小堆,权值为 “即将触发的时刻”,所以其根节点总是最近要触发的 timer;对此堆有两个基本操作,upheap()downheap()

downheap()

downheap() 操作会与子节点比较,如果子节点中有小于当前节点的权,则选择最小的节点进行交换,并一直重复。

#define HEAP0                   1
#define HPARENT(k)              ((k) >> 1)
#define UPHEAP_DONE(p,k)        (!(p))

/* away from the root */
inline_speed void downheap(ANHE *heap, int N, int k)
{
        ANHE he = heap[k];

        for (;;) {
                int c = k << 1;          // 获取左子节点

                if (c >= N + HEAP0)      // 超过了范围
                        break;

                //c += c + 1 < N + HEAP0 && ANHE_at (heap [c]) > ANHE_at(heap[c + 1]) ? 1 : 0;
                // 找到子结点中较小的值
                c += ((c + 1) < (N + HEAP0)) && ANHE_at(heap[c]) > ANHE_at(heap[c + 1]) ? 1 : 0;

                if (ANHE_at(he) <= ANHE_at(heap[c])) // 满足最小栈的条件,则直接退出
                        break;

                heap[k] = heap[c]; // 子结点中的较小值C,赋值给K
                ev_active(ANHE_w(heap[k])) = k;  // 并更新原C在heap中的序号,也就是active字段

                k = c; // 继续准备下一轮的迭代
        }

        heap[k] = he;
        ev_active(ANHE_w(he)) = k;
}

对于 4 叉堆的实现如下。

// 从根向下调整,N为堆的元素个数,k表示要调整元素的索引
inline_speed void downheap (ANHE *heap, int N, int k)
{
	ANHE he = heap [k];//先获得调整
	ANHE *E = heap + N + HEAP0;//结束的指针
	for (;;) {
		ev_tstamp minat; //最小的元素
		ANHE *minpos; //最小元素的指针
		ANHE *pos = heap + DHEAP * (k - HEAP0) + HEAP0 + 1;//k的第一个孩子的指针
		//查找k的最小孩子
		if (expect_true (pos + DHEAP - 1 < E)) { //最后一个孩子没有越界,有四个孩子
			//在四个孩子中找最小的
			(minpos = pos + 0), (minat = ANHE_at (*minpos));//设置初值
			if (ANHE_at (pos [1]) < minat)
				(minpos = pos + 1), (minat = ANHE_at (*minpos));
			if (ANHE_at (pos [2]) < minat)
				(minpos = pos + 2), (minat = ANHE_at (*minpos));
			if (ANHE_at (pos [3]) < minat)
				(minpos = pos + 3), (minat = ANHE_at (*minpos));
		} else if (pos < E) { //有孩子,但是不是4个孩子
			(minpos = pos + 0), (minat = ANHE_at (*minpos));
			if (pos + 1 < E && ANHE_at (pos [1]) < minat)
				(minpos = pos + 1), (minat = ANHE_at (*minpos));
			if (pos + 2 < E && ANHE_at (pos [2]) < minat)
				(minpos = pos + 2), (minat = ANHE_at (*minpos));
			if (pos + 3 < E && ANHE_at (pos [3]) < minat)
				(minpos = pos + 3), (minat = ANHE_at (*minpos));
		} else { //其他情况,没孩子,不用调整退出循环
			break;
		}
		//当前节点小于最小孩子,已经是最小堆,不用调整退出
		if (ANHE_at (he) <= minat)
			break;
		//否则将最小元素调到k的位置
		heap [k] = *minpos;
		ev_active (ANHE_w (*minpos)) = k;//将时间监测器设置为索引k
		k = minpos - heap;//设置下一次调整的根节点
	}
	heap [k] = he;//将元素填充到k中
	ev_active (ANHE_w (he)) = k;
}

upheap

当对某一节点执行 upheap() 时,就是与其父节点进行比较,如果其值比父节点小,则交换,然后在对这个父节点重复 upheap() ,直到顶层。

libev timer watcher

ev_timer_start() 函数中,会将定时器监控器注册到事件驱动器上。

inline_speed void upheap(ANHE *heap, int k)
{
	ANHE he = heap[k]; // 保存需要调整的结点

	for (;;) {
		int p = HPARENT(k);  // 找到父结点所在的下标

		// 已经到了头或者满足最小栈的条件(子结点大于父结点),则直接退出
		if (UPHEAP_DONE(p, k) || ANHE_at(heap[p]) <= ANHE_at(he))
			break;

		heap[k] = heap[p]; // 与上述类似,进行交换
		ev_active(ANHE_w(heap[k])) = k;

		k = p;
	}

	heap [k] = he;
	ev_active (ANHE_w (he)) = k;
}

Timer Watcher

ev_timer_init() 中,分别设置 afterrepeat 参数,表示 after 秒后执行一次回调函数,之后每隔 repeat 秒执行一次。

//----- 结构体定义,对于时间主要是at+repeat参数
#define EV_WATCHER_TIME(type)      \
  EV_WATCHER (type)                \
  ev_tstamp at;     /* private, 函数初始化对应的after参数 */
typedef struct ev_timer {
  EV_WATCHER_TIME (ev_timer)       // 通用
  ev_tstamp repeat;  /* rw, 函数初始化对应的repeat参数 */
} ev_timer;

//----- 扩展后如下,其中前五个成员是监视器的公共成员
typedef struct ev_timer
{
    int active;        // 标明该监视器在堆数组timers中的下标
    int pending;
    int priority;
    void *data;
    void (*cb)(struct ev_loop *loop, struct ev_timer *w, int revents);

    ev_tstamp at;      // 定时器第一次触发的时间点,根据mn_now设置
    ev_tstamp repeat;  // 必须>=0,当大于0时表示每隔repeat秒该定时器再次触发;0表示只触发一次
} ev_timer;

//----- 初始化,意味着在after秒后执行,设置为0则会立即执行一次;然后每隔repeat秒执行一次
#define ev_timer_init(ev,cb,after,repeat)        \
    do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0)

//----- 如下为一个示例程序
void cb (EV_P_ ev_timer *w, int revents) {
    ev_break(EV_P_ EVBREAK_ONE);         // 实际是设置loop_done的值,也即退出主循环
}
ev_timer watcher;
ev_timer_init (&watcher, cb, 2.5, 1.0);  // 初始化,分别表示多长时间开始执行第一次,后面为时间间隔
ev_timer_start (loop, &watcher);

示例程序

#include <ev.h>       // a single header file is required
#include <time.h>     // for time()
#include <stdio.h>    // for printf()
#include <stdint.h>   // for uintmax_t

// every watcher type has its own typedef'd struct with the name ev_TYPE
ev_timer timeout_watcher;
ev_timer repeate_watcher;
ev_timer oneshot_watcher;

// another callback, this time for a time-out
static void timeout_cb (EV_P_ ev_timer *w, int revents)
{
        (void) w;
        (void) revents;
        printf("timeout at %ju\n", (uintmax_t)time(NULL));
        // this causes the innermost ev_run to stop iterating
        ev_break (EV_A_ EVBREAK_ONE);
}
static void repeate_cb (EV_P_ ev_timer *w, int revents)
{
        (void) w;
        (void) revents;
        printf("repeate at %ju\n", (uintmax_t)time(NULL));
}
static void oneshot_cb (EV_P_ ev_timer *w, int revents)
{
        (void) w;
        (void) revents;
        printf("oneshot at %ju\n", (uintmax_t)time(NULL));
        ev_timer_stop(EV_A_ w);
}

int main (void)
{
        time_t result;
        EV_P EV_DEFAULT; /* OR ev_default_loop(0) */

        result = time(NULL);
        printf("  start at %ju\n", (uintmax_t)result);

        // 2秒后执行函数
        //ev_timer_init (&oneshot_watcher, oneshot_cb, 2.0, 0.);
        ev_timer_init (&oneshot_watcher, oneshot_cb, 2.0, 1.);
        ev_timer_start (EV_A_ &oneshot_watcher);

        // 5秒后开始循环,每次间隔1秒,如果最后一个参数为0,则只执行一次
        ev_timer_init (&repeate_watcher, repeate_cb, 5., 1.);
        ev_timer_start (EV_A_ &repeate_watcher);

        // 10秒后执行超时,设置为-1表示不退出
        ev_timer_init (&timeout_watcher, timeout_cb, 10., 0.);
        ev_timer_start (EV_A_ &timeout_watcher);

        // now wait for events to arrive
        ev_run (EV_A_ 0);

        // break was called, so exit
        return 0;
}

函数执行流程

函数的初始化过程就是对各个成员变量设置初始值,一般都是设置为 0 ,详细内容可以直接查看下源码。

对于 timer 来说,需要注意的是,其中的 active 成员有特殊的含义,实际上就是该对象在 heap 数组中的序号,而其序号是从 1 开始。

void noinline ev_timer_start (EV_P_ ev_timer *w) EV_THROW
{
	// 通过init()初始化后active为0,也就是说还没有初始化过,如果已经初始化则不会重新设置
	if (expect_false (ev_is_active (w)))
		return;

   // 设置距离当前多久之后触发,注意,这里设置的是距离当前时间,该值会作为栈的权重
	ev_at (w) += mn_now;

	EV_FREQUENT_CHECK;

	++timercnt;
	// 启动定时器,如上所述,active作为栈的序号,也就是(timercnt+HEAP0-1)
	ev_start (EV_A_ (W)w, timercnt + HEAP0 - 1);
	// 调整timers内存的大小
	array_needsize (ANHE, timers, timermax, ev_active (w) + 1, EMPTY2);
	// 将上述设置的timer保存,如上ev_active()返回的是序号
	ANHE_w (timers [ev_active (w)]) = (WT)w;
	// 从watcher中的at值更新到缓存中,注意这里可以通过宏进行配置
	ANHE_at_cache (timers [ev_active (w)]);
	// 因为是从最后插入的值,所以一定是向上更新栈
	upheap (timers, ev_active (w));

	EV_FREQUENT_CHECK;
}

代码比较简单,首先设置监视器的 at 成员,表明在 at 时间点超时事件会触发,注意 at 是根据 mn_now 设置的,也就是相对于系统启动时间而言的(或者是日历时间)。

之后,就是将该监视器加入到堆 timers 中,首先将该监视器加到堆中的最后一个元素,然后调用 upheap 调整堆。注意监视器的 active 成员,表明该监视器在堆数组中的下标。

定时器触发

接着,看下在一个事件驱动器循环中是如何处理定时器监控器的,这里我们依然抛开其他的部分,只找定时器相关的看。

/* calculate blocking time */ 块里面,可以看到计算 blocking time 的时候会先:

if (timercnt) {
	ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now;
	if (waittime > to) waittime = to;
}

如果有定时器,那么就从定时器堆 timers 中取得堆顶上最小的一个时间,这样就保证了在这个时间前可以从 backend_poll() 中超时出来;跳出 poll() 等待后执行 timers_reify() 处理 pengding 的定时器。

inline_size void timers_reify (EV_P)
{
	// 依次取最小堆的堆顶,如果ANHE.at小于当前时间,表示该定时器watcher超时
	if (timercnt && ANHE_at(timers[HEAP0]) < mn_now) {
		do {
			ev_timer *w = (ev_timer *)ANHE_w(timers[HEAP0]);

			if (w->repeat) { /* first reschedule or stop timer */
				ev_at(w) += w->repeat;
				if (ev_at(w) < mn_now)
					ev_at(w) = mn_now;

				ANHE_at_cache(timers[HEAP0]);
				downheap(timers, timercnt, HEAP0);
			} else {
				ev_timer_stop(EV_A_ w); /* nonrepeating: stop timer */
			}

			EV_FREQUENT_CHECK;
			feed_reverse(EV_A_ (W)w);
		} while (timercnt && ANHE_at(timers[HEAP0]) < mn_now);

		feed_reverse_done(EV_A_ EV_TIMER);
	}
}
#include <ev.h>       // a single header file is required
#include <time.h>     // a single header file is required
#include <stdio.h>    // for puts

ev_periodic periodic_timer;

void periodic_action(EV_P_ ev_periodic *w, int revents)
{
	(void) w;
	(void) revents;
	time_t now = time(NULL);
	printf("current time is %s", ctime(&now));
}

static ev_tstamp rescheduler(ev_periodic *w, ev_tstamp now)
{
	(void) w;
	return now + 60;
}

int main(void)
{
	time_t now = time(NULL);
	EV_P EV_DEFAULT; /* OR ev_default_loop(0) */

	// 调用rescheduler()返回下次执行的时间,如果存在回调函数,则会忽略其它参数
	// 包括offset+interval,其输出示例如下:
	//     begin time is Fri Apr 14 21:51:47 2016
	//   current time is Fri Apr 14 21:52:47 2016
	//   current time is Fri Apr 14 21:53:47 2016
	//   current time is Fri Apr 14 21:54:47 2016
	ev_periodic_init(&periodic_timer, periodic_action, 0, 1, rescheduler);

	// 一般offset在[0, insterval]范围内,如下,也就是在最近的一个5秒整触发第一
	// 次回调函数,其输出示例如下:
	//     begin time is Fri Apr 21 23:24:18 2016
	//   current time is Fri Apr 21 23:24:25 2016
	//   current time is Fri Apr 21 23:24:35 2016
	//   current time is Fri Apr 21 23:24:45 2016
	//ev_periodic_init(&periodic_timer, periodic_action, 5, 10, NULL);

	// 如下只执行一次,也就是在20秒后触发
	//     begin time is Fri Apr 21 23:27:04 2016
	//   current time is Fri Apr 21 23:27:24 2016
	//ev_periodic_init(&periodic_timer, periodic_action, now+20, 0, NULL);      //3

	ev_periodic_start(EV_A_ &periodic_timer);

	printf("  begin time is %s", ctime(&now));

	ev_run (EV_A_ 0);

	return 0;
}

Periodic Wather

这是绝对时间定时器,不同于 ev_timer,它是基于日历时间的;例如,指定一个 ev_periodic 在 10 秒之后触发 (ev_now()+10),然后在 10 秒内将系统时间调整为去年,则该定时器会在一年后才触发超时事件,而 ev_timer 依然会在 10 秒之后触发。

首先看下 libev 中定义的结构体。

#define EV_WATCHER(type)              \
  int active; /* private */           \
  int pending; /* private */          \
  EV_DECL_PRIORITY /* private */      \
  EV_COMMON /* rw */                  \
  EV_CB_DECLARE (type) /* private */

#define EV_WATCHER_TIME(type)         \
  EV_WATCHER (type)                   \
  ev_tstamp at;     /* private */

typedef struct ev_periodic
{
  EV_WATCHER_TIME (ev_periodic)

  ev_tstamp offset; /* rw */
  ev_tstamp interval; /* rw */
  ev_tstamp (*reschedule_cb)(struct ev_periodic *w, ev_tstamp now) EV_THROW; /* rw */
} ev_periodic;

// 上述结构体实际上等价于如下
typedef struct ev_periodic
{
    int active;
    int pending;
    int priority;
    void *data;
    void (*cb)(struct ev_loop *loop, struct ev_periodic *w, int revents);
    ev_tstamp at;

    ev_tstamp offset; /* rw */
    ev_tstamp interval; /* rw */
    ev_tstamp (*reschedule_cb)(struct ev_periodic *w, ev_tstamp now) EV_THROW; /* rw */
} ev_periodic;

如上结构体,其中前六个成员与 ev_timer 一样,而且offset、interval和reschedule_cb都是用来设置触发时间的,这个会在下面说明。

初始化

与其它 Watcher 相似,初始化可通过 ev_init()+ev_periodic_set() 或直接通过 ev_periodic_init() 初始化,可以查看如下内容。

#define ev_init(ev,cb_) do {                      \
  ((ev_watcher *)(void *)(ev))->active  =         \
  ((ev_watcher *)(void *)(ev))->pending = 0;      \
  ev_set_priority ((ev), 0);                      \
  ev_set_cb ((ev), cb_);                          \
} while (0)

#define ev_periodic_set(ev,ofs_,ival_,rcb_)  do { \
  (ev)->offset = (ofs_);                          \
  (ev)->interval = (ival_);                       \
  (ev)->reschedule_cb = (rcb_);                   \
} while (0)

#define ev_periodic_init(ev,cb,ofs,ival,rcb) do { \
  ev_init ((ev), (cb));                           \
  ev_periodic_set ((ev),(ofs),(ival),(rcb));      \
} while (0)

启动定时器

其中启动函数如下。

void noinline ev_periodic_start (EV_P_ ev_periodic *w) EV_THROW
{
  if (expect_false (ev_is_active (w)))
    return;

  if (w->reschedule_cb)
    ev_at (w) = w->reschedule_cb (w, ev_rt_now);
  else if (w->interval)
    {
      assert (("libev: ev_periodic_start called with negative interval value", w->interval >= 0.));
      periodic_recalc (EV_A_ w);
    }
  else
    ev_at (w) = w->offset;

  EV_FREQUENT_CHECK;

  ++periodiccnt;
  ev_start (EV_A_ (W)w, periodiccnt + HEAP0 - 1);
  array_needsize (ANHE, periodics, periodicmax, ev_active (w) + 1, EMPTY2);
  ANHE_w (periodics [ev_active (w)]) = (WT)w;
  ANHE_at_cache (periodics [ev_active (w)]);
  upheap (periodics, ev_active (w));

  EV_FREQUENT_CHECK;
}

共有三种设置超时时间 at 的方法,也就是: