问题
假设现在我们正处于这样一个情景:需要使用Node.js逐行读取文件。Node有个API叫做readLine,它是一个包装器,可用于逐行从输入流中读取数据,不需要分析输入缓冲区、也不需要将文本分解为小块。
你可以像这样监听:
const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
input: fs.createReadStream('./file.txt'),
crlfDelay: Infinity
})
reader.on('line', (line) => console.log(line))
假设有这样一个简单的文件:
line 1
line 2
line 3
如果我们在创建的文件上运行代码,那么就能在控制台上逐行输出。但是,使用事件并不是编写可维护代码的最佳方法之一,因为事件是完全异步的,可能会中断代码流——因为是无序触发的,并且只能通过侦听器分配操作。
解决方案
除了事件 API之外,readline还有async iterator。现在我们可以不通过line事件中的侦听器读取,而是通过for关键字来读取。
举几个使用for循环的例子。第一个是最常见的使用计数器和条件:
for (let x = 0; x < array.length; x++) {
// Code here
}
我们也可以使用for … in表示法读取数组索引:
const a = [1,2,3,4,5,6]
for (let index in a) {
console.log(a[index])
}
在前一种情况下,console.log输出从1到6的数字,但是如果我们使用console.log (index),那么记录的是数组的索引,从0到5的数字。
下面,我们使用for … of表示法,直接获取数组的可枚举属性,即它的直接值:
const a = [1,2,3,4,5,6]
for (let item of a) {
console.log(item)
}
注意,这些方法都是同步的。那么,如果我们有一系列promise,这时该如何按顺序读取呢?
假设我们还有另一个接口,它总是返回一个Promise。为了按顺序解析promise,我们需要这样做:
async function readLine (files) {
for (const file of files) {
const line = await readFile(file) // Imagine readFile is our cursor
console.log(line)
}
}
而现在,多亏异步可迭代对象(如readline)的魔力,我们可以执行以下操作:
const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
input: fs.createReadStream('./xpto.txt'),
crlfDelay: Infinity
})
async function read () {
for await (const line of reader) {
console.log(line)
}
}
read()
注意,我们现在使用的是for、for await (const x of y)的新定义。
对于await和node.js
从10.x版开始,Node.js运行时原生支持for await表示法。如果你使用的是8.x或9.x版本,则需要使用--harmony_async_iteration标志启动Javascript文件。遗憾的是,Node.js的版本6和版本7不支持异步迭代器。
为了理解异步迭代器的概念,首先我们需要知道迭代器的本质。简而言之,迭代器是一个对象,公开next()函数,此函数返回另一个对象,其中{value: any, done: boolean}表示当前迭代的值,done表示序列中是否还有其他值。
遍历数组中所有项的迭代器示例如下:
const array = [1,2,3]
let index = 0
const iterator = {
next: () => {
if (index >= array.length) return { done: true }
return {
value: array[index++],
done: false
}
}
}
就其本身而言,迭代器没有实际用途,那么怎么办呢?为此,我们需要iterable。iterable是一个对象,它有一个Symbol.iterator键,该键返回的函数返回迭代器:
// ... Iterator code here ...
const iterable = {
[Symbol.iterator]: () => iterator
}
现在我们可以正常使用迭代器了,通过for (const x of iterable),我们可以一个一个地迭代array中的所有值。
在后台,所有数组和对象都有Symbol.iterator,这样就可以执行for (let x of [1,2,3])并返回我们想要的值。
我们可以看到,异步迭代器与迭代器完全相同,不同之处在于iterable拥有的是Symbol.asyncIterator,而不是Symbol.iterator,拥有的是解析为具有相同签名对象的Promise,而不是返回{value, done}的对象。
让我们把上面的迭代器变成一个异步迭代器:
const array = [1,2,3]
let index = 0
const asyncIterator = {
next: () => {
if (index >= array.length) return Promise.resolve({done: true})
return Promise.resolve({value: array[index++], done: false})
}
}
const asyncIterable = {
[Symbol.asyncIterator]: () => asyncIterator
}
异步迭代
我们可以通过调用next()函数来手动迭代迭代器:
// ... Async iterator Code here ...
async function manual () {
const promise = asyncIterator.next() // Promise
await p // Object { value: 1, done: false }
await asyncIterator.next() // Object { value: 2, done: false }
await asyncIterator.next() // Object { value: 3, done: false }
await asyncIterator.next() // Object { done: true }
}
为了遍历异步迭代器,我们需要使用for await,但请记住,关键字await只能在异步函数中使用,所以我们需要有类似这样的代码:
// ... Code above ...
async function iterate () {
for await (const num of asyncIterable) console.log(num)
}
iterate() // 1, 2, 3
但是,由于Node 8.x和9.x这样的老版本不支持异步迭代器,为了在这些版本中使用异步迭代器,我们可以简单地从对象中提取next并手动遍历:
// ... Async Iterator Code here ...
async function iterate () {
const {next} = asyncIterable[Symbol.asyncIterator]() // we take the next iterator function
for (let {value, done} = await next(); !done; {value, done} = await next()) {
console.log(value)
}
}
注意,for await更干净、更简洁,因为它的行为类似于常规循环,而且,除了更易于理解之外,还可以通过done键自行检查迭代器的结束。
处理错误
如果promise在迭代器中被拒绝,会发生什么?好吧,和任何被拒绝的promise一样,通过简单的try/catch就可以来捕获错误(因为我们使用的是await):
const asyncIterator = { next: () => Promise.reject('Error') }
const asyncIterable = { [Symbol.asyncIterator]: () => asyncIterator
async function iterate () {
try {
for await (const num of asyncIterable) {}
} catch (e) {
console.log(e.message)
}
}
iterate()
回退
关于异步迭代器,非常有趣的一点是,它们有Symbol.iterator的回退,这意味着你也可以将它与常规迭代器一起使用,例如,有这样一个promise数组:
const promiseArray = [
fetch('https://lsantos.dev'),
fetch('https://lsantos.me')
]
async function iterate () {
for await (const response of promiseArray) console.log(response.status)
}
iterate() // 200, 200
异步生成器
在大多数情况下,迭代器和异步迭代器可以创建自生成器。
生成器是允许暂停和恢复执行的函数,因此可以操作执行,然后通过next()函数获取下一个值。
异步生成器的行为类似于异步迭代器,但你必须手动实现停止机制,例如,这里我们构建一个用于git提交的随机消息生成器:
async function* gitCommitMessageGenerator () {
const url = 'https://whatthecommit.com/index.txt'
while (true) {
const response = await fetch(url)
yield await response.text() // We return the value
}
}
注意,在任何时候都不会返回{value, done}对象,因此循环无法知道执行何时完成。这时我们可以实现这样的函数:
// Previous Code
async function getCommitMessages (times) {
let execution = 1
for await (const message of gitCommitMessageGenerator()) {
console.log(message)
if (execution++ >= times) break
}
}
getCommitMessages(5)
// I'll explain this when I'm sober .. or revert it
// Never before had a small typo like this one caused so much damage.
// For real, this time.
// Too lazy to write descriptive message
// Ugh. Bad rebase.
用例
再来一个更有趣的示例,为一个真实用例构建异步迭代器。目前,适用于Node.js的Oracle数据库驱动程序支持resultSet API,因此可以在数据库上执行查询并返回,而数据流则可以通过getRow()方法逐个读取。
要创建resultSet,我们需要在数据库中执行查询,如下所示:
const oracle = require('oracledb')
const options = {
user: 'example',
password: 'example123',
connectString: 'string'
}
async function start () {
const connection = await oracle.getConnection(options)
const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
return resultSet
}
start().then(console.log)
resultSet有一个名为getRow()的方法,这个方法从数据库中返回要获取的下一行的Promise。我们可以创建一个逐行返回此resultSet的光标。下面让我们创建Cursor类:
class Cursor {
constructor(resultSet) {
this.resultSet = resultSet
}
getIterable() {
return {
[Symbol.asyncIterator]: () => this._buildIterator()
}
}
_buildIterator() {
return {
next: () => this.resultSet.getRow().then((row) => ({ value: row, done: row === undefined }))
}
}
}
module.exports = Cursor
查看光标是否接收到它应该处理的resultSet,并将其存储在当前状态。因此,我们需要更改之前的方法,以便返回光标而不是resultSet:
const oracle = require('oracledb')
const options = {
user: 'example',
password: 'example123',
connectString: 'string'
}
async function getResultSet() {
const connection = await oracle.getConnection(options)
const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
return resultSet
}
async function start() {
const resultSet = await getResultSet()
const cursor = new Cursor(resultSet)
for await (const row of cursor.getIterable()) {
console.log(row)
}
}
start()
这样,我们就可以遍历所有返回的行,不需要单独的Promise解析。
结论
异步迭代器非常强大,尤其是在Javascript等动态和异步语言中。有了它们,我们就可以将复杂的执行变成简单的代码,从而向用户隐藏复杂性,增加友好的用户体验。