前言
在构建微服务时,为了追求极致的效率,服务间一般会使用 RPC(Remote Procedure Call)来进行通信。本文通过 Node.js 来实践一下。
Node.js 朴素 RPC
首先我们来构建一下 server
:
// server.js
const net = require('net')
const {msgBuffer} = require('../utils')
const server = net.createServer((clientSocket) => {
clientSocket.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const message = JSON.parse(msgBuffer.handleData())
clientSocket.write(
JSON.stringify(fnMap[message.cmd].apply(null, message.params)) + '\n'
)
}
})
})
server.listen(9999, () => console.log('Listening on 9999'))
const fnMap = {
add: (...args) => {
let s = 0
for (let i = 0; i < args.length; i++) {
s += args[i]
}
return s
},
multiply: (...args) => {
let p = 1
for (let i = 0; i < args.length; i++) {
p *= args[i]
}
return p
},
}
// MessageBuffer
class MessageBuffer {
constructor(delimiter) {
this.delimiter = delimiter
this.buffer = ''
}
isFinished() {
if (
this.buffer.length === 0 ||
this.buffer.indexOf(this.delimiter) === -1
) {
return true
}
return false
}
push(data) {
this.buffer += data
}
getMessage() {
const delimiterIndex = this.buffer.indexOf(this.delimiter)
if (delimiterIndex !== -1) {
const message = this.buffer.slice(0, delimiterIndex)
this.buffer = this.buffer.replace(message + this.delimiter, '')
return message
}
return null
}
handleData() {
const message = this.getMessage()
return message
}
}
exports.msgBuffer = new MessageBuffer('\n')
我们新建了一个 TCP 的服务,并监听来自客户端的数据,注意这里我们通过一个 MessageBuffer
类来对数据进行解析(至于为什么这么做可参考考文末补充内容:关于 TCP “粘包”问题说明),将 TCP 数据流解析成我们的消息体。然后调用服务端预先配置好的方法,最后将返回值返回给客户端。
客户端相对比较简单,将函数调用相关数据按照事先规定好的格式发送给服务端即可:
const net = require('net')
const {msgBuffer} = require('../utils')
const client = net.connect({port: 9999}, () => {
client.write(JSON.stringify({cmd: 'add', params: [1, 2, 3]}) + '\n')
client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}) + '\n')
})
client.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const message = JSON.parse(msgBuffer.handleData())
console.log(message)
}
})
这样,一个非常简单的 RPC 雏形就出来了,不过目前这种方式还不是 RPC。所谓的 RPC,就是客户端必须像调用本地方法一样来调用远端的方法,而不是还需要自己组装消息体,并监听事件获取返回值。理想中的方式应该像这样:
const result = await client.add(1, 2, 3)
我们来改造一下。首先,我们定义一份配置文件,用来描述我们的 services
:
// services/index.js
class Calculator {
add(arr) {
let s = 0
for (let i = 0; i < arr.length; i++) {
s += arr[i]
}
return s
}
multiply(arr) {
let p = 1
for (let i = 0; i < arr.length; i++) {
p *= arr[i]
}
return p
}
}
module.exports = {
calculator: {
cls: Calculator,
methods: {
add: {
params: [{type: 'number[]', optional: false}],
return: {
type: 'number',
},
},
multiply: {
params: [{type: 'number[]', optional: false}],
return: {
type: 'number',
},
},
},
},
}
services
描述文件中包含了类以及它拥有的方法,方法参数(类型,是否可选),返回值类型等信息。为了简单一点,我们先不校验参数和返回值的类型。
然后就是我们的 server
:
const net = require('net')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Server {
constructor(services) {
this.tcpServer = net.createServer((clientSocket) => {
const serviceMap = this.createServiceMap(services)
clientSocket.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const {seqId, service, method, params} = JSON.parse(
msgBuffer.handleData()
)
clientSocket.write(
JSON.stringify({
seqId,
result: serviceMap[service][method].apply(null, params),
}) + '\n'
)
}
})
})
}
createServiceMap(services) {
const serviceMap = {}
Object.keys(services).forEach((serviceKey) => {
serviceMap[serviceKey] = new services[serviceKey].cls()
})
return serviceMap
}
listen(...args) {
this.tcpServer.listen(...args)
}
}
new Server(services).listen(9999)
server
中会监听 client
的连接,一旦有 client
进来,就根据 services
配置文件为其实例化所有 services
。之后开始接受 client
的数据,并根据 client
的消息调用相应的 service
中的方法,并返回结果。
注意到消息体中有个 seqId
,用来标识包的序号,必须将其返回给 client
,这样 client
才能知道返回的结果是跟哪个请求对应的。
最后就是我们的 client
:
const net = require('net')
const EventEmitter = require('events')
const {msgBuffer} = require('../utils')
const services = require('../services')
class Client {
constructor({port, services}) {
this.rspResolve = {}
this.seqId = 0
this.port = port
this.parseServices(services)
}
init() {
return new Promise((resolve, reject) => {
this.client = net.connect({port: this.port}, () => {
resolve()
})
this.client.on('data', (data) => {
msgBuffer.push(data)
while (!msgBuffer.isFinished()) {
const {seqId, result} = JSON.parse(msgBuffer.handleData())
this.rspResolve[seqId](result)
}
})
})
}
parseServices(services) {
for (const serviceKey in services) {
const service = services[serviceKey]
this[serviceKey] = {}
for (const method in service.methods) {
this[serviceKey][method] = (...params) => {
this.client.write(
JSON.stringify({
seqId: this.seqId,
service: serviceKey,
method,
params,
}) + '\n'
)
return new Promise((resolve, reject) => {
this.rspResolve[this.seqId++] = resolve
})
}
}
}
}
}
const client = new Client({port: 9999, services})
client.init().then(async () => {
console.log(await client.calculator.add([1, 2, 3, 4, 5]))
console.log(await client.calculator.multiply([1, 2, 3, 4, 5]))
})
初始化一个 client
时,会解析 services
,并在当前 client
实例上添加 services
的方法。方法中会将函数调用封装成消息发送给服务端并返回 Promise
对象,同时将 Promise
对象的 resolve
方法缓存在 resResolve
这个 Map
中,此时 Promise
对象还处于 pending
状态。
当 server
返回相应的 seqId
的结果时,resResolve
中对应的 resolve
方法会调用,从而将 Promise
对象状态设为 fulfilled
,此时 client
则可以获取到结果。
这样我们就实现了一个非常朴素的 RPC 框架。接下来我们简单看看业界常用的 RPC 框架是怎么做的吧,这里以 Thrift 为例。
Thrift RPC Demo
我们先准备一个 calculator.thrift
文件,用来描述 service
:
service Calculator {
i32 add(1:list<i32> arr),
i32 multiply(1:list<i32> arr)
}
由于 thrift
文件是语言无关的,所以我们需要通过它生成对应 Calculator.js
文件:
thrift -r --gen js:node calculator.thrift
这个文件包含 server
端和 client
相关的代码,在 client
端负责将函数调用转为消息发送给 server
,在 server
端负责读取消息,调用方法,返回结果给 client
。
然后 server
和 client
分别按照如下方式进行使用即可:
// server.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var server = thrift.createServer(Calculator, {
add(arr, result) {
let s = 0
for (let i = 0; i < arr.length; i++) {
s += arr[i]
}
result(null, s)
},
multiply(arr, result) {
let p = 1
for (let i = 0; i < arr.length; i++) {
p *= arr[i]
}
result(p)
},
})
server.listen(9090)
// client.js
var thrift = require('thrift')
var Calculator = require('./gen-nodejs/Calculator')
var transport = thrift.TBufferedTransport
var protocol = thrift.TBinaryProtocol
var connection = thrift.createConnection('localhost', 9090, {
transport: transport,
protocol: protocol,
})
var client = thrift.createClient(Calculator, connection)
client.add([1, 2], function (err, response) {
console.log(response)
})
下面,我们通过 Wireshark
来看看 thrift
通信的过程。
打开 Wireshark
,选择 Capturing from Loopback: lo0
,然后在 filter 中输入 tcp.port == 9090
。分别运行上面的 server
和 client
,则可抓包到如下内容:
我们先来看看第五行,可以看到 Wireshark
自动识别了 thrift
协议,并解析出这是一个 CALL
类型的消息,调用的方法为 add
。接下来我们再仔细看看 thrift
协议:
thrift
协议格式如上图所示,这里是一个参数的场景,如果有多个参数的话则可以在 Data -> List
后面继续添加,比如我们给 add
方法增加第二个参数,表示是否打印日志:
i32 add(1:list<i32> arr, 2:bool printLog)
抓包得到的内容如下:
返回的消息格式也类似,这里就不赘述了。
关于 RPC 的内容就先介绍到这,后面计划基于 Nest.js 再实战一下。
补充内容
关于 TCP “粘包”问题说明
首先声明一下,所谓的 TCP “粘包问题”其实并不是一个问题。
先看一个简单的例子:
// server.js
const net = require('net')
const server = net.createServer((clientSocket) => {
console.log('Client connected')
clientSocket.on('data', (data) => {
console.log('-------------------')
console.log(data.toString())
})
})
server.listen(9999, () => console.log('Listening on 9999'))
// client.js
const net = require('net')
const client = net.connect({port: 9999}, () => {
client.write(JSON.stringify({cmd: 'add', params: [1, 2]}))
client.write(JSON.stringify({cmd: 'multiply', params: [1, 2, 3]}))
})
启动 server
后再运行 client
,则 server
有可能会打印如下日志:
-------------------
{"cmd":"add","params":[1,2]}{"cmd":"multiply","params":[1,2,3]}
如上所示,客户端调用了两次 write
,但是服务端却只打印了一次。也就是说,两次发送的数据在服务端被一次性取出来了。即,使用方层面的两个包“粘在”了一起。原因在于 TCP 是面向字节流的,并没有包的概念,所以开发者需要对 data
事件获取到的数据进行解析。
以上就是Node.js高级编程使用RPC通信示例详解的详细内容,更多关于Node.js高级编程RPC通信的资料请关注编程网其它相关文章!