uv

很久之前看的libuv源码的时候记录的一些乱七八糟的东西ORZ。 强烈推荐直接看参考链接搞清楚文件IO和网络IO怎么实现的就行,我的记录大概只有我自己能看懂。

1 设计理念

libuv uses a thread pool to make asynchronous file I/O operations possible, but network I/O is always performed in a single thread, each loop’s thread.

Threads are used internally to fake the asynchronous nature of all of the system calls.

2 事件循环

一个死循环执行多个phase,每一个phase检查是否有事件发生,其中最重要的phase就是uv_poll_io这个阶段负责调用比如epoll这样的多路复用api监听事件, 有对应的事件发生则执行IO操作然后执行回调(比如epoll -> read -> read_cb, 记住epoll不执行实际的IO, 只是监听), 在回调中可能又会添加新的事件保证事件循环不断的进行下去。主要流程为: 执行IO -> 执行回调 -> 执行IO -> 执行回调 …

注意网络IO和文件IO是不太一致的,后者利用了线程池。

在网络IO中执行IO的任务为uv__io_t中的cb,执行位置为uv_poll_io
在文件IO中执行的任务为uv__workwork,执行位置为线程池。
两种任务的回调都在uv_poll_io中执行。

备注: epoll_pwait设置signal然后wait可能被信号唤醒而不是因为有事件发生

2.1 uv__io_poll

实际执行poll事件的函数。以epoll为例,如果有事件发生就调用对应的io_watcher的cb函数即可

2.2 网络IO流程

https://blog.butonly.com/posts/node.js/libuv/6-libuv-stream/

uv_tcp_init
-> uv__stream_init (初始化stream
-> uv__io_init(初始化uv__io_t: 赋值stream中的uv__io_tcbuv__stream_io)实际任务)
-> uv_listenuv_tcp_listen
-> uv__io_start 添加IO观察者uv__io_tloop->watchers
-> uv_poll_io中有事件发生执行uv__stream_io, uv__stream_io执行对应的read, connect, write操作后执行对应的回调read_cb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// io_watcher的cb,会根据stream当前状态调用实际的IO任务,比如uv__read
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;

stream = container_of(w, uv_stream_t, io_watcher);

assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));

if (stream->connect_req) {
uv__stream_connect(stream);
return;
}

assert(uv__stream_fd(stream) >= 0);

/* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);

if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */

/* Short-circuit iff POLLHUP is set, the user is still interested in read
* events and uv__read() reported a partial read but not EOF. If the EOF
* flag is set, uv__read() called read_cb with err=UV_EOF and we don't
* have to do anything. If the partial read flag is not set, we can't
* report the EOF yet because there is still data to read.
*/
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}

if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */

if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);

/* Write queue drained. */
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// 添加监听IO对象
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// omitted
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

// 添加到loop的watcher里面
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}

// 实际的IO任务
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int count;
int err;
int is_ipc;

stream->flags &= ~UV_HANDLE_READ_PARTIAL;

/* Prevent loop starvation when the data comes in as fast as (or faster than)
* we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
* /
count = 32;

is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;

/* XXX: Maybe instead of having UV_HANDLE_READING we just test if
* tcp->read_cb is NULL or not?
*/
while (stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);

buf = uv_buf_init(NULL, 0);
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if (buf.base == NULL || buf.len == 0) {
/* User indicates it can't or won't handle the read. */
stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}

assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);

if (!is_ipc) {
do {
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
/* ipc uses recvmsg */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
/* Set up to receive a descriptor even if one isn't in the message */
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;

do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}

if (nread < 0) {
/* Error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Wait for the next one. */
if (stream->flags & UV_HANDLE_READING) {
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
} else {
/* Error. User should call uv_close(). */
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, &buf);
return;
} else {
/* Successful read */
ssize_t buflen = buf.len;

if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}
stream->read_cb(stream, nread, &buf); // 调用call back

/* Return if we didn't fill the buffer, there is no more data to read. */
if (nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
}
}

2.3 文件IO流程

2.3.1 线程池

初始化4个线程, 全部执行worker函数
worker函数使用信号量进行休眠等待有任务提交时苏醒,苏醒后从work queue中取出任务执行,用户可以通过api提交任务到work queue同时唤醒正在休眠的线程。

2.3.2 提交任务

https://github.com/libuv/help/issues/62

uv_fs__t是文件系统请求,内部api比如uv_fs_read会产生uv_work_submit从而使用线程池处理, uv_work代表需要线程池处理的任务uv_work->work是要执行的任务,具体流程如下

uv_fs_open(初始化uv_fs__t:init->path, 然後提交任務:post)
-> uv__work_submit(初始化uv__work,赋值其中的work(uv__fs_work)和done(uv__fs_done)属性,提交任务)
-> uv_cond_signal(唤醒工作线程)
-> w->work(w) 执行实际的IO操作(uv__fs_work, uv__fs_work中会根据uv_fs_t中的fs_type映射到实际的操作系统API)
-> uv_async_send -> uv__async_send (写字符串到async_wfduv__io_poll 会检查所有的 file description使用epoll_wait所以会知道文件操作完成)
-> uv__io_poll中执行回调 w->cb(也就是uv__async_io,和stream中类似也是提前设置的,stream中叫uv__stream_io)
-> (uv__work_done -> uv__fs_done)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void uv__fs_work(struct uv__work* w) {
int retry_on_eintr;
uv_fs_t* req;
ssize_t r;

req = container_of(w, uv_fs_t, work_req);
retry_on_eintr = !(req->fs_type == UV_FS_CLOSE ||
req->fs_type == UV_FS_READ);

do {
errno = 0;

#define X(type, action) \
case UV_FS_ ## type: \
r = action; \
break;

switch (req->fs_type) {
X(ACCESS, access(req->path, req->flags));
X(CHMOD, chmod(req->path, req->mode));
// omitted
X(UTIME, uv__fs_utime(req));
X(WRITE, uv__fs_write_all(req));
default: abort();
}
#undef X
} while (r == -1 && errno == EINTR && retry_on_eintr);
}

3 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
struct uv_loop_s {
/* User data - use this for whatever. */
void* data;
/* Loop reference counting. */
unsigned int active_handles;
void* handle_queue[2];
void* active_reqs[2];
/* Internal flag to signal loop stop. */
unsigned int stop_flag;
UV_LOOP_PRIVATE_FIELDS
};

#define UV_LOOP_PRIVATE_FIELDS \
unsigned long flags; \
int backend_fd; \
void* pending_queue[2]; \
void* watcher_queue[2]; \
uv__io_t** watchers; \
unsigned int nwatchers; \
unsigned int nfds; \
void* wq[2]; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
uv_rwlock_t cloexec_lock; \
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* check_handles[2]; \
void* idle_handles[2]; \
void* async_handles[2]; \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
uv__io_t async_io_watcher; \
int async_wfd; \
struct { \
void* min; \
unsigned int nelts; \
} timer_heap; \
uint64_t timer_counter; \
uint64_t time; \
int signal_pipefd[2]; \
uv__io_t signal_io_watcher; \
uv_signal_t child_watcher; \
int emfile_fd; \
UV_PLATFORM_LOOP_FIELDS \

两种IO请求

1
2
3
4
5
6
7
8
9
struct uv__io_s {
uv__io_cb cb; // 在epoll返回后,会调用对应的实际的IO任务,比如read (回调由IO任务调用)
void* pending_queue[2];
void* watcher_queue[2];
unsigned int pevents; /* Pending event mask i.e. mask at next tick. */
unsigned int events; /* Current event mask. */
int fd;
UV_IO_PRIVATE_PLATFORM_FIELDS
};
1
2
3
4
5
6
struct uv__work {
void (*work)(struct uv__work *w); // 实际的IO任务
void (*done)(struct uv__work *w, int status); // 之后的回调
struct uv_loop_s* loop;
void* wq[2];
};
1
typedef void* QUEUE[2]

参考

https://zhuanlan.zhihu.com/p/83765851
https://blog.butonly.com/posts/node.js/libuv/6-libuv-stream/
https://github.com/libuv/help/issues/62
https://blog.butonly.com/posts/node.js/libuv/7-libuv-async/