本篇内容介绍了“Websocket库Ws的原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
ws服务器逻辑由websocket-server.js的WebSocketServer类实现。该类初始化了一些参数后就执行以下代码
if (this._server) { // 给server注册下面事件,返回一个注销函数(用于注销下面注册的事件) this._removeListeners = addListeners(this._server, { // listen成功的回调 listening: this.emit.bind(this, 'listening'), error: this.emit.bind(this, 'error'), // 收到协议升级请求的回调 upgrade: (req, socket, head) => { this.handleUpgrade(req, socket, head, (ws) => { // 处理成功,触发链接成功事件 this.emit('connection', ws, req); }); } });
我们看到ws监听了upgrade事件,当有websocket请求到来时就会执行handleUpgrade处理升级请求,升级成功后触发connection事件。我们先看handleUpgrade。handleUpgrade逻辑不多,主要是处理和校验升级请求的一些http头。ws提供了一个校验的钩子。处理完http头后,会调verifyClient校验是否允许升级请求。如果成功则执行completeUpgrade。顾名思义,completeUpgrade是完成升级请求的函数,该函数返回同意协议升级并且设置一些http响应头。另外还有一些重要的逻辑处理。
const ws = new WebSocket(null); // 设置管理socket的数据 ws.setSocket(socket, head, this.options.maxPayload); // cb就是this.emit('connection', ws, req); cb(ws);
我们看到这里新建了一个WebSocket对象并且调用了他的setSocket函数。我们来看看他做了什么。setSocket的逻辑非常多,我们慢慢分析。
数据接收者
class Receiver extends Writable {}
我们看到数据接收者是一个可写流。这就意味着我们可以往里面写数据。
const receiver = new Receiver(); receiver.write('hello');
我们看一下这时候Receiver的逻辑。
_write(chunk, encoding, cb) { if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); this._bufferedBytes += chunk.length; this._buffers.push(chunk); this.startLoop(cb); }
首先记录当前数据的大小,然后把数据存起来,最后执行startLoop。
startLoop(cb) { let err; this._loop = true; do { switch (this._state) { // 忽略其他case case GET_DATA: err = this.getData(cb); break; default: // `INFLATING` this._loop = false; return; } } while (this._loop); cb(err); }
我们知道websocket是基于tcp上层的应用层协议,所以我们收到数据时,需要解析出一个个数据包(粘包问题),所以Receiver其实就是一个状态机,每次收到数据的时候,都会根据当前的状态进行状态流转。比如当前处于GET_DATA状态,那么就会进行数据的处理。我们接着看一下数据处理的逻辑。
getData(cb) { let data = EMPTY_BUFFER; // 提取数据部分 if (this._payloadLength) { data = this.consume(this._payloadLength); if (this._masked) unmask(data, this._mask); } // 是控制报文则执行controlMessage if (this._opcode > 0x07) return this.controlMessage(data); // 做了压缩,则先解压 if (this._compressed) { this._state = INFLATING; this.decompress(data, cb); return; } // 没有压缩则直接处理(先存到_fragments,然后执行dataMessage) if (data.length) { this._messageLength = this._totalPayloadLength; this._fragments.push(data); } return this.dataMessage(); }
我们执行websocket协议定义了报文的类型,比如控制报文,数据报文。我们分别看一下这两个的逻辑。
controlMessage(data) { // 连接关闭 if (this._opcode === 0x08) { this._loop = false; if (data.length === 0) { this.emit('conclude', 1005, ''); this.end(); } } else if (this._opcode === 0x09) { this.emit('ping', data); } else { this.emit('pong', data); } this._state = GET_INFO; }
我们看到控制报文包括三种(conclude、ping、pong)。而数据报文只有this.emit('message', data);一种。这个就是接收者的整体逻辑。
2 数据发送者
数据发送者是对websocket协议的封装,当用户调研数据发送者的send接口发送数据时,数据发送者会组装成一个websocket协议的包再发送出去。
send(data, options, cb) { const buf = toBuffer(data); const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; let opcode = options.binary ? 2 : 1; let rsv1 = options.compress; if (this._firstFragment) { this._firstFragment = false; if (rsv1 && perMessageDeflate) { rsv1 = buf.length >= perMessageDeflate._threshold; } this._compress = rsv1; } else { rsv1 = false; opcode = 0; } if (options.fin) this._firstFragment = true; // 需要压缩 if (perMessageDeflate) { const opts = { fin: options.fin, rsv1, opcode, mask: options.mask, readOnly: toBuffer.readOnly }; // 正在压缩,则排队等待,否则执行压缩 if (this._deflating) { this.enqueue([this.dispatch, buf, this._compress, opts, cb]); } else { this.dispatch(buf, this._compress, opts, cb); } } else { // 不需要压缩,直接发送 this.sendFrame( Sender.frame(buf, { fin: options.fin, rsv1: false, opcode, mask: options.mask, readOnly: toBuffer.readOnly }), cb ); } }
send函数做了一些参数的处理后发送数据,但是如果需要压缩的话,要压缩后才能发送。数据处理完成后调用真正的发送函数
sendFrame(list, cb) { if (list.length === 2) { this._socket.cork(); this._socket.write(list[0]); this._socket.write(list[1], cb); this._socket.uncork(); } else { this._socket.write(list[0], cb); } }
了解了数据接收者和发送者的逻辑后,我们看一下websocket对象和setSocket函数做了什么事情,websocket对象本质是对TCP socket的封装。它接收来自底层的数据,然后透传给数据接收者,数据接收者处理完后,触发websocket对应的对应的事件,比如message事件。发送数据的时候,websocket会调用数据发送者的接口,数据发送者组装成websocket协议的数据包后再发送出去,架构如下图所示。
接下来我们看看setSocket的逻辑
setSocket(socket, head, maxPayload) { // 数据接收者,负责处理tcp上收到的数据(socket是tcp层的socket) const receiver = new Receiver(...); // 数据发送者,负责发送数据给对端 this._sender = new Sender(socket, this._extensions); // 数据接收者,负责解析数据 this._receiver = receiver; // net模块的tcp socket this._socket = socket; // 关联起来 receiver[kWebSocket] = this; socket[kWebSocket] = this; // 监听接收者的事件,解析数据的时候会回调 receiver.on('conclude', receiverOnConclude); // 下面两个事件由Writable触发 receiver.on('drain', receiverOnDrain); receiver.on('error', receiverOnError); receiver.on('message', receiverOnMessage); receiver.on('ping', receiverOnPing); receiver.on('pong', receiverOnPong); // 清除定时器 socket.setTimeout(0); // 关闭nagle算法 socket.setNoDelay(); // 升级请求中,携带的http body,通常是空 if (head.length > 0) socket.unshift(head); // 监听tcp底层的事件 socket.on('close', socketOnClose); socket.on('data', socketOnData); socket.on('end', socketOnEnd); socket.on('error', socketOnError); this.readyState = WebSocket.OPEN; this.emit('open'); }
我们看到里面监听了各种事件,下面以data事件为例,看一下处理过程。当tcp socket收到数据的时候会执行socketOnData函数。
function socketOnData(chunk) { // 会调用receiver里的_write函数,其实就是换成到receiver对象上,如果数据解析出错,会触发socket error事件 if (!this[kWebSocket]._receiver.write(chunk)) { this.pause(); } }
socketOnData通过接收者的接口把数据传给接收者,接收者会解析数据,然后触发对应的事件,比如message。
receiver.on('message', receiverOnMessage); function receiverOnMessage(data) { this[kWebSocket].emit('message', data); }
然后ws的socket对象继续往上层触发message事件。this[kWebSocket]的值是ws提供的socket对象本身。架构图如下。
这就是ws实现websocket协议的基本原理,具体细节可以参考源码。
“Websocket库Ws的原理是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!