文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

探索异步迭代器在 Node.js 中的使用

2024-12-03 16:12

关注

目录

在 Events 中使用 asyncIterator

在 Stream 中使用 asyncIterator

在 MongoDB 中使用 asyncIterator

在 Events 中使用 asyncIterator

Node.js v12.16.0 中新增了 events.on(emitter, eventName) 方法,返回一个迭代 eventName 事件的异步迭代器。

events.on() 示例 1

如下例所示, for await...of 循环只会输出 Hello 当触发 error 事件时会被 try catch 所捕获。

  1. const { on, EventEmitter } = require('events'); 
  2.  
  3. (async () => { 
  4.   const ee = new EventEmitter(); 
  5.   const ite = on(ee, 'foo'); 
  6.  
  7.   process.nextTick(() => { 
  8.     ee.emit('foo''Hello'); 
  9.     ee.emit('error', new Error('unknown mistake.')) 
  10.     ee.emit('foo''Node.js'); 
  11.   }); 
  12.  
  13.   try { 
  14.     for await (const event of ite) { 
  15.       console.log(event); // prints ['Hello'
  16.     } 
  17.   } catch (err) { 
  18.     console.log(err.message); // unknown mistake. 
  19.   } 
  20. })(); 

上述示例,如果 EventEmitter 对象实例 ee 触发了 error 事件,错误信息会被抛出并且退出循环,该实例注册的所有事件侦听器也会一并移除。

events.on() 示例 2

for await...of 内部块的执行是同步的,每次只能处理一个事件,即使你接下来还有会立即执行的事件,也是如此。如果是需要并发执行的则不建议使用,这个原因会在下面解析 events.on() 源码时给出答案。

如下所示,虽然事件是按顺序同时触发了两次,但是在内部块模拟了 2s 的延迟,下一次事件的处理也会得到延迟。

  1. const ite = on(ee, 'foo'); 
  2.  
  3. process.nextTick(() => { 
  4.   ee.emit('foo''Hello'); 
  5.   ee.emit('foo''Node.js'); 
  6.   // ite.return(); // 调用后可以结束 for await...of 的遍历 
  7.   // ite.throw() // 迭代器对象抛出一个错误 
  8. }); 
  9.  
  10. try { 
  11.   for await (const event of ite) { 
  12.     console.log(event); // prints ['Hello'] ['Node.js'
  13.     await sleep(2000); 
  14.   } 
  15. } catch (err) { 
  16.   console.log(err.message); 
  17.  
  18. // Unreachable here 
  19. console.log('这里将不会被执行'); 

上例中最后一句代码是不会执行的,此时的迭代器会一直处于遍历中,虽然上面两个事件 emit 都触发了,但是迭代器并没有终止,什么时候终止呢?也就是当内部出现一些错误或我们手动调用可迭代对象的 return() 或 throw() 方法时迭代器才会终止。

events.on() 开启一个 Node.js 服务器

之前一篇文章《“Hello Node.js” 这一次是你没见过的写法》写过一段使用 events.on() 开启一个 HTTP 服务器的代码,在留言中当时有小伙伴对此提出疑惑,基于本章对异步迭代器在 events.on() 中使用的学习,可以很好的解释。

相关代码如下所示:

  1. import { createServer as server } from 'http'
  2. import { on } from 'events'
  3. const ee = on(server().listen(3000), 'request'); 
  4. for await (const [{ url }, res] of ee) 
  5.   if (url === '/hello'
  6.     res.end('Hello Node.js!'); 
  7.   else 
  8.     res.end('OK!'); 

以上代码看似新颖,其核心实现就是使用 events.on() 返回 createServer() 对象 request 事件的异步可迭代对象,之后用 for await...of 语句遍历,客户端每一次请求,就相当于做了一次 ee.emit('request', Req, Res)。

由于内部块的执行是同步的,下一次事件处理需要依赖上次事件完成才可以执行,对于一个 HTTP 服务器需要考虑并发的,请不要使用上面这种方式!

解析 Node.js 源码对 events.on 异步迭代器的实现

events 模块直接导出了 on() 方法,这个 on() 方法主要是将异步迭代器与事件的 EventEmitter 类的实例对象做了结合,实现还是很巧妙的,以下对核心源码做下解释,理解之后你完全也可以自己实现一个 events.on()。

下面继续看 unconsumedPromises 从何而来。

  1. module.exports = EventEmitter; 
  2. module.exports.on = on
  3.  
  4. function on(emitter, event) { 
  5.   const unconsumedEvents = []; 
  6.   const unconsumedPromises = []; 
  7.   const iterator = ObjectSetPrototypeOf({ // {1} 
  8.     next() { .... }, 
  9.     return() { ... }, 
  10.     throw(err) { ... }, 
  11.     [SymbolAsyncIterator]() { // {2} 
  12.       return this; 
  13.     } 
  14.   }, AsyncIteratorPrototype); // {3} 
  15.   eventTargetAgnosticAddListener(emitter, event, eventHandler); // {4} 
  16.   if (event !== 'error') { 
  17.     addErrorHandlerIfEventEmitter(emitter, errorHandler); // {5} 
  18.   } 
  19.   return iterator; 
  20.                
  21.   function eventHandler(...args) { // {6} 
  22.     const promise =  .shift(); 
  23.     if (promise) { 
  24.       // 以下等价于 promise.resolve({ value: args, done: false }); 
  25.       PromiseResolve(createIterResult(args, false)); 
  26.     } else { 
  27.       // for await...of 遍历器内部块的执行是同步的,所以每次只能处理 1 个事件,如果同时触发多个事件,上次事件未完成剩下的事件会被保存至 unconsumedEvents 中,待上次事件完成后,遍历器会自动调用 iterator 对象的 next() 方法,消费所有未处理的事件。 
  28.       unconsumedEvents.push(args); 
  29.     } 
  30.   } 
  31.  
  32. function eventTargetAgnosticAddListener(emitter, name, listener, flags) { 
  33.   ... 
  34.   emitter.on(name, listener); 

以下是 iterator 对象的 next() 方法实现:

  1. const iterator = ObjectSetPrototypeOf({ 
  2.   next() { 
  3.     // {1} 首先,我们会消费所有未读消息 
  4.     const value = unconsumedEvents.shift(); 
  5.     if (value) { 
  6.       return PromiseResolve(createIterResult(value, false)); 
  7.     } 
  8.  
  9.     // {2} 如果发生一次 error 就会执行 Promise.reject 抛出一个错误,在这个错误发生后也会停止事件监听。 
  10.     if (error) { 
  11.       const p = PromiseReject(error); 
  12.       // Only the first element errors 
  13.       error = null
  14.       return p; 
  15.     } 
  16.  
  17.     // {3} 如果迭代器对象完成,Promise.resolve done 设置为 true 
  18.     if (finished) { 
  19.       return PromiseResolve(createIterResult(undefined, true)); 
  20.     } 
  21.  
  22.     // {4} 等待直到一个事件发生 
  23.     return new Promise(function(resolve, reject) { 
  24.       unconsumedPromises.push({ resolve, reject }); 
  25.     }); 
  26.   } 
  27.   ... 

在 Stream 中使用 asyncIterator

Node.js Stream 模块的可读流对象在 v10.0.0 版本试验性的支持了 [Symbol.asyncIterator] 属性,可以使用 for await...of 语句遍历可读流对象,在 v11.14.0 版本以上已 LTS 支持。

异步迭代器 与 Readable

借助 fs 模块创建一个可读流对象 readable。

  1. const fs = require('fs'); 
  2. const readable = fs.createReadStream('./hello.txt', { 
  3.   encoding: 'utf-8'
  4.   highWaterMark: 1 
  5. }); 

以往当我们读取一个文件时,需要监听 data 事件,拼接数据,在 end 事件里判断完成,如下所示:

  1. function readText(readable) { 
  2.   let data = ''
  3.   return new Promise((resolve, reject) => { 
  4.     readable.on('data', chunk => { 
  5.       data += chunk; 
  6.     }) 
  7.     readable.on('end', () => { 
  8.       resolve(data); 
  9.     }); 
  10.     readable.on('error', err => { 
  11.       reject(err); 
  12.     }); 
  13.   }) 

现在通过异步迭代器能以一种更简单的方式实现,如下所示:

  1. async function readText(readable) { 
  2.   let data = ''
  3.   for await (const chunk of readable) { 
  4.     data += chunk; 
  5.   } 
  6.   return data; 

现在我们可以调用 readText 做测试。

  1. (async () => { 
  2.   try { 
  3.     const res = await readText(readable); 
  4.     console.log(res); // Hello Node.js 
  5.   } catch (err) { 
  6.     console.log(err.message); 
  7.   } 
  8. })(); 

使用 for await...of 语句遍历 readable,如果循环中因为 break 或 throw 一个错误而终止,则这个 Stream 也将被销毁。

上述示例中 chunk 每次接收的值是根据创建可读流时 highWaterMark 这个属性决定的,为了能清晰的看到效果,在创建 readable 对象时我们指定了 highWaterMark 属性为 1 每次只会读取一个字符。

从 Node.js 源码看 readable 是如何实现的 asyncIterator

与同步的迭代器遍历语句 for...of 类似,用于 asyncIterator 异步迭代器遍历的 for await...of 语句在循环内部会默认调用可迭代对象 readable 的 Symbol.asyncIterator() 方法得到一个异步迭代器对象,之后调用迭代器对象的 next() 方法获取结果。

本文以 Node.js 源码 v14.x 为例来看看源码是如何实现的。当我们调用 fs.createReadStream() 创建一个可读流对象时,对应的该方法内部会调用 ReadStream 构造函数

  1. // https://github.com/nodejs/node/blob/v14.x/lib/fs.js#L2001 
  2. function createReadStream(path, options) { 
  3.   lazyLoadStreams(); 
  4.   return new ReadStream(path, options); 

其实在 ReadStream 这个构造函数里没有我们要找的,重点是它通过原型的方式继承了 Stream 模块的 Readable 构造函数。

  1. function ReadStream(path, options) { 
  2.   ... 
  3.   Readable.call(this, options); 

那么现在我们重点来看看 Readable 这个构造函数的实现。

Readable 原型上定义了 SymbolAsyncIterator 属性,该方法返回了一个由生成器函数创建的迭代器对象。

  1. // for await...of 循环会调用 
  2. Readable.prototype[SymbolAsyncIterator] = function() { 
  3.   let stream = this; 
  4.   ... 
  5.   const iter = createAsyncIterator(stream); 
  6.   iter.stream = stream; 
  7.   return iter; 
  8. }; 
  9.  
  10. // 声明一个创建异步迭代器对象的生成器函数 
  11. async function* createAsyncIterator(stream) { 
  12.   let callback = nop; 
  13.  
  14.   function next(resolve) { 
  15.     if (this === stream) { 
  16.       callback(); 
  17.       callback = nop; 
  18.     } else { 
  19.       callback = resolve; 
  20.     } 
  21.   } 
  22.  
  23.   const state = stream._readableState; 
  24.  
  25.   let error = state.errored; 
  26.   let errorEmitted = state.errorEmitted; 
  27.   let endEmitted = state.endEmitted; 
  28.   let closeEmitted = state.closeEmitted; 
  29.   
  30.   // error、endclose 事件控制了什么时候结束迭代器遍历。 
  31.   stream 
  32.     .on('readable'next
  33.     .on('error'function(err) { 
  34.       error = err; 
  35.       errorEmitted = true
  36.       next.call(this); 
  37.     }) 
  38.     .on('end'function() { 
  39.       endEmitted = true
  40.       next.call(this); 
  41.     }) 
  42.     .on('close'function() { 
  43.       closeEmitted = true
  44.       next.call(this); 
  45.     }); 
  46.  
  47.   try { 
  48.     while (true) { 
  49.       // stream.read() 从内部缓冲拉取并返回数据。如果没有可读的数据,则返回 null 
  50.       // readable 的 destroy() 方法被调用后 readable.destroyed 为 true,readable 即为下面的 stream 对象 
  51.       const chunk = stream.destroyed ? null : stream.read(); 
  52.       if (chunk !== null) { 
  53.         yield chunk; // 这里是关键,根据迭代器协议定义,迭代器对象要返回一个 next() 方法,使用 yield 返回了每一次的值 
  54.       } else if (errorEmitted) { 
  55.         throw error; 
  56.       } else if (endEmitted) { 
  57.         break; 
  58.       } else if (closeEmitted) { 
  59.         break; 
  60.       } else { 
  61.         await new Promise(next); 
  62.       } 
  63.     } 
  64.   } catch (err) { 
  65.     destroyImpl.destroyer(stream, err); 
  66.     throw err; 
  67.   } finally { 
  68.     if (state.autoDestroy || !endEmitted) { 
  69.       // TODO(ronag): ERR_PREMATURE_CLOSE? 
  70.       destroyImpl.destroyer(stream, null); 
  71.     } 
  72.   } 

通过上面源码可以看到可读流的异步迭代器实现使用了生成器函数 Generator yield,那么对于 readable 对象遍历除了 for await...of 遍历之外,其实也是可以直接使用调用生成器函数的 next() 方法也是可以的。

  1. const ret = readable[Symbol.asyncIterator]() 
  2. console.log(await ret.next()); // { value: 'H', done: false } 
  3. console.log(await ret.next()); // { value: 'e', done: false } 

异步迭代器与 Writeable

通过上面讲解,我们知道了如何遍历异步迭代器从 readable 对象获取数据,但是你有没有想过如何将一个异步迭代器对象传送给可写流?正是此处要讲的。

从迭代器中创建可读流

Node.js 流对象提供了一个实用方法 stream.Readable.from(),对于符合 Symbol.asyncIterator 或 Symbol.iterator 协议的可迭代对象(Iterable)会先创建一个可读流对象 readable 之后从迭代器中构建 Node.js 可读流。

以下是 从理解到实现轻松掌握 ES6 中的迭代器 一文中曾讲解过的例子,r1 就是我们创建的可迭代对象。使用 stream.Readable.from() 方法则可以将可迭代对象构造为一个可读流对象 readable。

  1. function Range(start, end) { 
  2.   this.id = start; 
  3.   this.end = end
  4. Range.prototype[Symbol.asyncIterator] = async function* () { 
  5.   while (this.id <= this.end) { 
  6.     yield this.id++; 
  7.   } 
  8. const r1 = new Range(0, 3); 
  9. const readable = stream.Readable.from(r1); 
  10. readable.on('data', chunk => { 
  11.   console.log(chunk); // 0 1 2 3 
  12. }); 

传送异步迭代器到可写流

使用 pipeline 可以将一系列的流和生成器函数通过管道一起传送,并在管道完成时获取通知。

使用 util.promisify 将 pipeline 转化为 promise 形式。

  1. const util = require('util'); 
  2. const pipeline = util.promisify(stream.pipeline); // 转为 promise 形式 
  3.  
  4. (async () => { 
  5.   try { 
  6.     const readable = stream.Readable.from(r1); 
  7.     const writeable = fs.createWriteStream('range.txt'); 
  8.     await pipeline( 
  9.       readable, 
  10.       async function* (source) { 
  11.         for await (const chunk of source) { 
  12.           yield chunk.toString(); 
  13.         } 
  14.       }, 
  15.       writeable 
  16.     ); 
  17.     console.log('Pipeline 成功'); 
  18.   } catch (err) { 
  19.     console.log(err.message); 
  20.   } 
  21. })() 

在写入数据时,传入的 chunk 需是 String、Buffer、Uint8Array 类型,否则 writeable 对象在写入数据时会报错。由于我们自定义的可迭代对象 r1 里最终返回的值类型为 Number 在这里需要做次转换,管道中间的生成器函数就是将每次接收到的值转为字符串。

在 MongoDB 中使用 asyncIterator

除了上面我们讲解的 Node.js 官方提供的几个模块之外,在 MongoDB 中也是支持异步迭代的,不过介绍这点的点资料很少,MongoDB 是通过一个游标的概念来实现的。

MongoDB 中的 cursor

本处以 Node.js 驱动 mongodb 模块来介绍,当我们调用 db.collection.find() 这个方法返回的是一个 cursor(游标),如果想要访问文档那么我们需要迭代这个游标对象来完成,但是通常我们会直接使用 toArray() 这个方法来完成。

下面让我们通过一段示例来看,现在我们有一个数据库 example,一个集合 books,表里面有两条记录,如下所示:

image.png

查询 books 集合的所有数据,以下代码中定义的 myCursor 变量就是游标对象,它不会自动进行迭代,可以使用游标对象的 hasNext() 方法检测是否还有下一个,如果有则可以使用 next() 方法访问数据。

通过以下日志记录可以看到在第三次调用 hasNext() 时返回了 false,如果此时在调用 next() 就会报错,游标已关闭,也就是已经没有数据可遍历了。

  1. const MongoClient = require('mongodb').MongoClient; 
  2. const dbConnectionUrl = 'mongodb://127.0.0.1:27017/example'
  3.  
  4. (async () => { 
  5.   const client = await MongoClient.connect(dbConnectionUrl, { useUnifiedTopology: true }); 
  6.   const bookColl = client.db('example').collection('books'); 
  7.   const myCursor = await bookColl.find(); 
  8.   
  9.   console.log(await myCursor.hasNext()); // true 
  10.   console.log((await myCursor.next()).name); // 深入浅出Node.js 
  11.   console.log(await myCursor.hasNext()); // true 
  12.   console.log((await myCursor.next()).name); // Node.js实战 
  13.   console.log(await myCursor.hasNext()); // false 
  14.   console.log((await myCursor.next()).name); // MongoError: Cursor is closed 
  15. })() 

直接调用 next() 也可检测,如果还有值则返回该条记录,否则 next() 方法返回 null。

  1. console.log((await myCursor.next()).name); 
  2. console.log((await myCursor.next()).name); 
  3. console.log((await myCursor.next())); 

MongoDB 异步迭代器实现源码分析

MongoDB 中游标是以 hasNext() 返回 false 或 next() 返回为 null 来判断是否达到游标尾部,与之不同的是在我们的 JavaScript 可迭代协议定义中是要有一个 Symbol.asyncIterator 属性的迭代器对象,且迭代器对象是 { done, value } 的形式。

幸运的是 MongoDB Node.js 驱动已经帮助我们实现了这一功能,通过一段源码来看在 MongoDB 中的实现。

find 方法返回的是一个可迭代游标对象。

  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/collection.js#L470 
  2.  
  3. Collection.prototype.find = deprecateOptions( 
  4.   { 
  5.     name'collection.find'
  6.     deprecatedOptions: DEPRECATED_FIND_OPTIONS, 
  7.     optionsIndex: 1 
  8.   }, 
  9.   function(query, options, callback) { 
  10.     const cursor = this.s.topology.cursor
  11.       new FindOperation(this, this.s.namespace, findCommand, newOptions), 
  12.       newOptions 
  13.     ); 
  14.  
  15.     return cursor
  16.   } 
  17. ); 

核心实现就在这里,这是一个游标的核心类,MongoDB Node.js 驱动程序中所有游标都是基于此,如果当前支持异步迭代器,则在 CoreCursor 的原型上设置 Symbol.asyncIterator 属性,返回基于 Promise 实现的异步迭代器对象,这符合 JavaScript 中关于异步可迭代对象的标准定义。

  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/core/cursor.js#L610 
  2.  
  3. if (SUPPORTS.ASYNC_ITERATOR) { 
  4.   CoreCursor.prototype[Symbol.asyncIterator] = require('../async/async_iterator').asyncIterator; 
  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/async/async_iterator.js#L16 
  2.  
  3. // async function* asyncIterator() { 
  4. //   while (true) { 
  5. //     const value = await this.next(); 
  6. //     if (!value) { 
  7. //       await this.close(); 
  8. //       return
  9. //     } 
  10.  
  11. //     yield value; 
  12. //   } 
  13. // } 
  14.  
  15. // TODO: change this to the async generator function above 
  16. function asyncIterator() { 
  17.   const cursor = this; 
  18.  
  19.   return { 
  20.     nextfunction() { 
  21.       return Promise.resolve() 
  22.         .then(() => cursor.next()) 
  23.         .then(value => { 
  24.           if (!value) { 
  25.             return cursor.close().then(() => ({ value, done: true })); 
  26.           } 
  27.           return { value, done: false }; 
  28.         }); 
  29.     } 
  30.   }; 

目前是默认使用的 Promise 的形式实现的,上面代码中有段 TODO, Node.js 驱动关于异步迭代实现这块可能后期会改为基于生成器函数的实现,这对我们使用是没变化的.

使用 for await...of 遍历可迭代对象 cursor

还是基于我们上面的示例,如果换成 for await...of 语句遍历就简单的多了。

  1. const myCursor = await bookColl.find(); 
  2. for await (val of myCursor) { 
  3.   console.log(val.name); 

在 MongoDB 中的聚合管道中使用也是如此,就不再做过多分析了,如下所示:

  1. const myCursor = await bookColl.aggregate(); 
  2. for await (val of myCursor) { 
  3.   console.log(val.name); 

对于遍历庞大的数据集时,使用游标它会批量加载 MongoDB 中的数据,我们也不必担心一次将所有的数据存在于服务器的内存中,造成内存压力过大。

传送 cursor 到可写流

MongoDB 游标对象本身也是一个可迭代对象(Iterable),结合流模块的 Readable.from() 则可转化为可读流对象,是可以通过流的方式进行写入文件。

但是要注意 MongoDB 中的游标每次返回的是单条文档记录,是一个 Object 类型的,如果直接写入,可写流是会报参数类型错误的,因为可写流默认是一个非对象模式(仅接受 String、Buffer、Unit8Array),所以才会看到在 pipeline 传输的中间又使用了生成器函数,将每次接收的数据块处理为可写流 Buffer 类型。

  1. const myCursor = await bookColl.find(); 
  2. const readable = stream.Readable.from(myCursor); 
  3. await pipeline( 
  4.   readable, 
  5.   async function* (source) { 
  6.     for await (const chunk of source) { 
  7.       yield Buffer.from(JSON.stringify(chunk)); 
  8.     } 
  9.   }, 
  10.   fs.createWriteStream('books.txt'
  11. ); 

Reference

本文转载自微信公众号「Nodejs技术栈」,可以通过以下二维码关注。转载本文请联系Nodejs技术栈公众号。

 

来源:Nodejs技术栈内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯