这篇文章给大家分享的是有关Node.js中网络与流的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
涉及的知识点
libuv 中网络的实现
libuv 解决 accept (EMFILE错误)
BSD 套接字
SOCKADDR_IN
UNIX 域协议使用! 在进程间传递“文件描述符”
例子 tcp-echo-server/main.c
libuv 异步使用 BSD 套接字 的例子
libuv 中的网络和直接使用 BSD 套接字接口没有什么不同,有些事情更简单,都是无阻塞的,但概念都是一样的。此外,libuv 还提供了一些实用的函数来抽象出那些烦人的、重复的、低级的任务,比如使用BSD套接字结构设置套接字、DNS查询以及调整各种套接字参数。
int main() {
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
同步的例子
这是一个正常同步使用 BSD 套接字 的例子。
作为参照可以发现主要有如下几步
首先调用 socket() 为通讯创建一个端点,为套接字返回一个文件描述符。
接着调用 bind() 为一个套接字分配地址。当使用 socket() 创建套接字后,只赋予其所使用的协议,并未分配地址。在接受其它主机的连接前,必须先调用 bind() 为套接字分配一个地址。
当 socket 和一个地址绑定之后,再调用 listen() 函数会开始监听可能的连接请求。
最后调用 accept, 当应用程序监听来自其他主机的面对数据流的连接时,通过事件(比如Unix select()系统调用)通知它。必须用 accept()函数初始化连接。 accept() 为每个连接创立新的套接字并从监听队列中移除这个连接。
int main(void)
{
struct sockaddr_in stSockAddr;
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if(-1 == SocketFD)
{
perror("can not create socket");
exit(EXIT_FAILURE);
}
memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
stSockAddr.sin_family = AF_INET;
stSockAddr.sin_port = htons(1100);
stSockAddr.sin_addr.s_addr = INADDR_ANY;
if(-1 == bind(SocketFD,(const struct sockaddr *)&stSockAddr, sizeof(struct sockaddr_in)))
{
perror("error bind failed");
close(SocketFD);
exit(EXIT_FAILURE);
}
if(-1 == listen(SocketFD, 10))
{
perror("error listen failed");
close(SocketFD);
exit(EXIT_FAILURE);
}
for(;;)
{
int ConnectFD = accept(SocketFD, NULL, NULL);
if(0 > ConnectFD)
{
perror("error accept failed");
close(SocketFD);
exit(EXIT_FAILURE);
}
shutdown(ConnectFD, SHUT_RDWR);
close(ConnectFD);
}
close(SocketFD);
return 0;
}
uv_tcp_init
main > uv_tcp_init
1、对 domain 进行了验证, 需要是下面3种的一种
AF_INET 表示 IPv4 网络协议
AF_INET6 表示 IPv6
AF_UNSPEC 表示适用于指定主机名和服务名且适合任何协议族的地址
2、tcp 也是一种流, 调用 uv__stream_init 对流数据进行初始化
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
int domain;
domain = flags & 0xFF;
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
return UV_EINVAL;
if (flags & ~0xFF)
return UV_EINVAL;
uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);
...
return 0;
}
uv__stream_init
main > uv_tcp_init > uv__stream_init
流的初始化函数使用的地方还是特别多的, 也特别重要。下述 i/o 的完整实现参考 【libuv 源码学习笔记】线程池与i/o
1、对流会被调用的回调函数等进行一个初始化
如 read_cb 函数, 在本例子中 on_new_connection > uv_read_start 函数就会真实的设置该 read_cb 为用户传入的参数 echo_read, 其被调用时机是该 stream 上设置的 io_watcher.fd 有数据写入时, 在事件循环阶段被 epoll 捕获后。
alloc_cb 函数的调用过程同 read_cb, alloc 类型函数一般是设置当前需要读取的内容长度, 在流数据传输时通常首先会写入本次传输数据的长度, 然后是具体的内容, 主要是为了接收方能够合理的申请内存进行存储。如 grpc, thread-loader 中都有详细的应用。
close_cb 函数被调用在 stream 数据结束时或者出错时。
connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connection
connect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。
shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。
accepted_fd 当 accept 接收到新连接时, 存储 accept(SocketFD, NULL, NULL) 返回的 ConnectFD。
queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。
// queued_fds
1. 当收到其他进程通过 ipc 写入的数据时, 调用 uv__stream_recv_cmsg 函数
2. uv__stream_recv_cmsg 函数读取到进程传递过来的 fd 引用, 调用 uv__stream_queue_fd 函数保存。
3. queued_fds 被消费主要在 src/stream_wrap.cc LibuvStreamWrap::OnUvRead > AcceptHandle 函数中。
2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。
accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃
3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
uv__open_cloexec
main > uv_tcp_init > uv__stream_init > uv__open_cloexec
同步调用 open 方法拿到了 fd, 也许你会问为啥不像 【libuv 源码学习笔记】线程池与i/o 中调用 uv_fs_open 异步获取 fd, 其实 libuv 中并不全部是异步的实现, 比如当前的例子启动 tcp 服务前的一些初始化, 而不是用户请求过程中发生的任务, 同步也是能接受的。
int uv__open_cloexec(const char* path, int flags) {
#if defined(O_CLOEXEC)
int fd;
fd = open(path, flags | O_CLOEXEC);
if (fd == -1)
return UV__ERR(errno);
return fd;
#else
int err;
int fd;
fd = open(path, flags);
if (fd == -1)
return UV__ERR(errno);
err = uv__cloexec(fd, 1);
if (err) {
uv__close(fd);
return err;
}
return fd;
#endif
}
uv__stream_io
main > uv_tcp_init > uv__stream_init > uv__stream_io
双工流的 i/o 观察者回调函数, 如调用的 stream->connect_req 函数, 其值是例子中 uv_listen 函数的最后一个参数 on_new_connection。
当发生 POLLIN | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__read 函数
当发生 POLLOUT | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__write 函数
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
assert(uv__stream_fd(stream) >= 0);
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return;
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return;
if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
uv_ip4_addr
main > uv_ip4_addr
uv_ip4_addr 用于将人类可读的 IP 地址、端口对转换为 BSD 套接字 API 所需的 sockaddr_in 结构。
int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
memset(addr, 0, sizeof(*addr));
addr->sin_family = AF_INET;
addr->sin_port = htons(port);
#ifdef SIN6_LEN
addr->sin_len = sizeof(*addr);
#endif
return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));
}
uv_tcp_bind
main > uv_tcp_bind
从 uv_ip4_addr 函数的实现, 其实是在 addr 的 sin_family 上面设置值为 AF_INET, 但在 uv_tcp_bind 函数里面却是从 addr 的 sa_family属性上面取的值, 这让 c 初学者的我又陷入了一阵思考 ...
sockaddr_in 和 sockaddr 是并列的结构,指向 sockaddr_in 的结构体的指针也可以指向 sockaddr 的结构体,并代替它。也就是说,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函数初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化
原来是这样, 这里通过强制指针类型转换 const struct sockaddr* addr 达到的目的, 函数的最后调用了 uv__tcp_bind 函数。
int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int flags) {
unsigned int addrlen;
if (handle->type != UV_TCP)
return UV_EINVAL;
if (addr->sa_family == AF_INET)
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6);
else
return UV_EINVAL;
return uv__tcp_bind(handle, addr, addrlen, flags);
}
uv__tcp_bind
main > uv_tcp_bind > uv__tcp_bind
调用 maybe_new_socket, 如果当前未设置 socketfd, 则调用 new_socket 获取
调用 setsockopt 用于为指定的套接字设定一个特定的套接字选项
调用 bind 为一个套接字分配地址。当使用socket()创建套接字后,只赋予其所使用的协议,并未分配地址。
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
int err;
int on;
if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
return UV_EINVAL;
err = maybe_new_socket(tcp, addr->sa_family, 0);
if (err)
return err;
on = 1;
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
return UV__ERR(errno);
...
errno = 0;
if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
if (errno == EAFNOSUPPORT)
return UV_EINVAL;
return UV__ERR(errno);
}
...
}
new_socket
main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket
通过 uv__socket 其本质调用 socket 获取到 sockfd
调用 uv__stream_open 设置 stream i/o 观察的 fd 为步骤1 拿到的 sockfd
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
int err;
err = uv__socket(domain, SOCK_STREAM, 0);
if (err < 0)
return err;
sockfd = err;
err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
...
return 0;
}
uv__stream_open
main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open
主要用于设置 stream->io_watcher.fd 为参数传入的 fd。
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
#if defined(__APPLE__)
int enable;
#endif
if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return UV_EBUSY;
assert(fd >= 0);
stream->flags |= flags;
if (stream->type == UV_TCP) {
if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return UV__ERR(errno);
if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
uv__tcp_keepalive(fd, 1, 60)) {
return UV__ERR(errno);
}
}
#if defined(__APPLE__)
enable = 1;
if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
errno != ENOTSOCK &&
errno != EINVAL) {
return UV__ERR(errno);
}
#endif
stream->io_watcher.fd = fd;
return 0;
}
uv_listen
main > uv_listen
主要调用了 uv_tcp_listen 函数。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
err = ERROR_INVALID_PARAMETER;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
assert(0);
}
return uv_translate_sys_error(err);
}
uv_tcp_listen
main > uv_listen > uv_tcp_listen
调用 listen 开始监听可能的连接请求
挂载例子中传入的回调 on_new_connection
暴力改写 i/o 观察者的回调, 在上面的 uv__stream_init 函数中, 通过 uv__io_init 设置了 i/o 观察者的回调为 uv__stream_io, 作为普通的双工流是适用的, 这里 tcp 流直接通过 tcp->io_watcher.cb = uv__server_io 赋值语句设置 i/o 观察者回调为 uv__server_io
调用 uv__io_start 注册 i/o 观察者, 开始监听工作。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
...
if (listen(tcp->io_watcher.fd, backlog))
return UV__ERR(errno);
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
tcp->io_watcher.cb = uv__server_io;
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}
uv__server_io
main > uv_listen > uv_tcp_listen > uv__server_io
tcp 流的 i/o 观察者回调函数
调用 uv__accept, 拿到该连接的 ConnectFD
此时如果出现了上面 uv__stream_init 时说的 accept (EMFILE错误), 则调用 uv__emfile_trick 函数
把步骤1拿到的 ConnectFD 挂载在了 stream->accepted_fd 上面
调用例子中传入的回调 on_new_connection
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
...
while (uv__stream_fd(stream) != -1) {
assert(stream->accepted_fd == -1);
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
return;
if (err == UV_ECONNABORTED)
continue;
if (err == UV_EMFILE || err == UV_ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
break;
}
stream->connection_cb(stream, err);
continue;
}
UV_DEC_BACKLOG(w)
stream->accepted_fd = err;
stream->connection_cb(stream, 0);
...
}
uv__emfile_trick
main > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick
在上面的 uv__stream_init 函数中, 我们发现 loop 的 emfile_fd 属性上通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符。
当出现 accept (EMFILE错误)即文件描述符用尽时的错误时
首先将 loop->emfile_fd 文件描述符, 使其能 accept 新连接, 然后我们新连接将其关闭,以使其低于EMFILE的限制。接下来,我们接受所有等待的连接并关闭它们以向客户发出信号,告诉他们我们已经超载了--我们确实超载了,但是我们仍在继续工作。
static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
int err;
int emfile_fd;
if (loop->emfile_fd == -1)
return UV_EMFILE;
uv__close(loop->emfile_fd);
loop->emfile_fd = -1;
do {
err = uv__accept(accept_fd);
if (err >= 0)
uv__close(err);
} while (err >= 0 || err == UV_EINTR);
emfile_fd = uv__open_cloexec("/", O_RDONLY);
if (emfile_fd >= 0)
loop->emfile_fd = emfile_fd;
return err;
}
on_new_connection
当收到一个新连接, 例子中的 on_new_connection 函数被调用
通过 uv_tcp_init 初始化了一个 tcp 客户端流
调用 uv_accept 函数
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
uv_accept
on_new_connection > uv_accept
根据不同的协议调用不同的方法, 该例子 tcp 调用 uv__stream_open 方法
uv__stream_open 设置给初始化完成的 client 流设置了 i/o 观察者的 fd。该 fd 即是 uv__server_io 中提到的 ConnectFD 。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
assert(server->loop == client->loop);
if (server->accepted_fd == -1)
return UV_EAGAIN;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
err = uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;
case UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;
default:
return UV_EINVAL;
}
client->flags |= UV_HANDLE_BOUND;
done:
if (server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;
queued_fds = server->queued_fds;
server->accepted_fd = queued_fds->fds[0];
assert(queued_fds->offset > 0);
if (--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
} else {
memmove(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset * sizeof(*queued_fds->fds));
}
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return err;
}
uv_read_start
on_new_connection > uv_read_start
开启一个流的监听工作
挂载回调函数 read_cb 为例子中的 echo_read, 当流有数据写入时被调用
挂载回调函数 alloc_cb 为例子中的 alloc_buffer
调用 uv__io_start 函数, 这可是老朋友了, 通常用在 uv__io_init 初始化 i/o 观察者后面, 用于注册 i/o 观察者。
uv_read_start 主要是调用了 uv__read_start 函数。开始了普通流的 i/o 过程。
初始化 i/o 观察者在 uv_tcp_init > uv_tcp_init_ex > uv__stream_init > uv__io_init 设置其观察者回调函数为 uv__stream_io
注册 i/o 观察者为 uv__io_start 开始监听工作。
int uv__read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
stream->flags |= UV_HANDLE_READING;
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);
return 0;
}
感谢各位的阅读!关于“Node.js中网络与流的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!