文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Nodejs v14源码分析之Event模块

2024-12-03 11:20

关注

本文转载自微信公众号「编程杂技」,作者theanarkh。转载本文请联系编程杂技公众号。     

events模块是Node.js中比较简单但是却非常核心的模块,Node.js中,很多模块都继承于events模块,events模块是发布、订阅模式的实现。我们首先看一下如何使用events模块。

  1. const { EventEmitter } = require('events');   
  2. class Events extends EventEmitter {}   
  3. const events = new Events();   
  4. events.on('demo', () => {   
  5.     console.log('emit demo event');   
  6. });   
  7. events.emit('demo'); 

接下来我们看一下events模块的具体实现。

1 初始化 当new一个EventEmitter或者它的子类时,就会进入EventEmitter的逻辑。

  1. function EventEmitter(opts) {   
  2.   EventEmitter.init.call(this, opts);   
  3. }   
  4.    
  5. EventEmitter.init = function(opts) {   
  6.   // 如果是未初始化或者没有自定义_events,则初始化   
  7.   if (this._events === undefined ||   
  8.       this._events === ObjectGetPrototypeOf(this)._events) {   
  9.      this._events = ObjectCreate(null);   
  10.      this._eventsCount = 0;   
  11.    }   
  12.       
  13.    this._maxListeners = this._maxListeners || undefined;   
  14.     
  15.    // 是否开启捕获promise reject,默认false   
  16.    if (opts && opts.captureRejections) {   
  17.      this[kCapture] = Boolean(opts.captureRejections);   
  18.    } else {   
  19.      this[kCapture] = EventEmitter.prototype[kCapture];   
  20.    }   
  21.  }; 

EventEmitter的初始化主要是初始化了一些数据结构和属性。唯一支持的一个参数就是captureRejections,captureRejections表示当触发事件,执行处理函数时,EventEmitter是否捕获处理函数中的异常。后面我们会详细讲解。

2 订阅事件 初始化完EventEmitter之后,我们就可以开始使用订阅、发布的功能。我们可以通过addListener、prependListener、on、once订阅事件。addListener和on是等价的,prependListener的区别在于处理函数会被插入到队首,而默认是追加到队尾。once注册的处理函数,最多被执行一次。四个api都是通过_addListener函数实现的。下面我们看一下具体实现。

  1. function _addListener(target, type, listener, prepend) {   
  2.   let m;   
  3.   let events;   
  4.   let existing;   
  5.   events = target._events;   
  6.   // 还没有初始化_events则初始化,_eventsCount为事件类型个数   
  7.   if (events === undefined) {   
  8.     events = target._events = ObjectCreate(null);   
  9.     target._eventsCount = 0;   
  10.    } else {   
  11.         
  12.      if (events.newListener !== undefined) {   
  13.        target.emit('newListener',  
  14.                      type,   
  15.                    listener.listener ?  
  16.                      listener.listener :  
  17.                      listener);   
  18.        // newListener处理函数可能会修改_events,这里重新赋值   
  19.        events = target._events;   
  20.      }   
  21.      // 判断是否已经存在处理函数   
  22.      existing = events[type];   
  23.    }   
  24.    // 不存在则以函数的形式存储,否则以数组形式存储   
  25.    if (existing === undefined) {   
  26.      events[type] = listener;   
  27.      // 新增一个事件类型,事件类型个数加一 
  28.      ++target._eventsCount;   
  29.    } else {   
  30.       
  31.      if (typeof existing === 'function') {   
  32.        existing = events[type] =   
  33.          prepend ? [listener, existing] : [existing, listener];   
  34.      } else if (prepend) {   
  35.        existing.unshift(listener);   
  36.      } else {   
  37.        existing.push(listener);   
  38.      }   
  39.     
  40.      // 处理告警,处理函数过多可能是因为之前的没有删除,造成内存泄漏   
  41.      m = _getMaxListeners(target);   
  42.      // 该事件处理函数达到阈值并且还没有提示过警告信息则提示 
  43.      if (m > 0 && existing.length > m && !existing.warned) {   
  44.        existing.warned = true;   
  45.        const w = new Error('错误信息…');   
  46.        w.name = 'MaxListenersExceededWarning';   
  47.        w.emitter = target;   
  48.        w.type = type;   
  49.        w.count = existing.length;   
  50.        process.emitWarning(w);   
  51.      }   
  52.    }   
  53.     
  54.    return target;   
  55.  } 

接下来我们看一下once的实现,对比其它几种api,once的实现相对比较复杂,因为我们要控制处理函数最多执行一次,所以我们需要保证在事件触发的时候,执行用户定义函数的同时,还需要删除注册的事件。

  1. ventEmitter.prototype.once = function once(type, listener) {   
  2.  this.on(type, _onceWrap(this, type, listener));   
  3.  return this;   
  4. ;   
  5.   
  6. unction onceWrapper() {   
  7.  // 还没有触发过   
  8.  if (!this.fired) {   
  9.    // 删除它   
  10.     this.target.removeListener(this.type, this.wrapFn);   
  11.     // 触发了   
  12.     this.fired = true;   
  13.     // 执行   
  14.     if (arguments.length === 0)   
  15.       return this.listener.call(this.target);   
  16.     return this.listener.apply(this.target, arguments);   
  17.   }   
  18. }   
  19. // 支持once api   
  20. function _onceWrap(target, type, listener) {   
  21.   // fired是否已执行处理函数,wrapFn包裹listener的函数   
  22.   const state = { fired: false, wrapFn: undefined, target, type, listener };   
  23.   // 生成一个包裹listener的函数   
  24.   const wrapped = onceWrapper.bind(state);   
  25.    
  26.   wrapped.listener = listener;   
  27.   // 保存包裹函数,用于执行完后删除,见onceWrapper   
  28.   state.wrapFn = wrapped;   
  29.   return wrapped;   

Once函数构造一个上下文(state)保存用户处理函数和执行状态等信息,然后通过bind返回一个带有该上下文(state)的函数wrapped注册到事件系统。当事件触发时,在wrapped函数中首先移除wrapped,然后执行用户的函数。Wrapped起到了劫持的作用。另外还需要在wrapped上保存用户传进来的函数,当用户在事件触发前删除该事件时或解除该函数时,在遍历该类事件的处理函数过程中,可以通过wrapped.listener找到对应的项进行删除。

3 触发事件 分析完事件的订阅,接着我们看一下事件的触发。

  1. EventEmitter.prototype.emit = function emit(type, ...args) {   
  2.   // 触发的事件是否是error,error事件需要特殊处理   
  3.   let doError = (type === 'error');   
  4.    
  5.   const events = this._events;   
  6.   // 定义了处理函数(不一定是type事件的处理函数)   
  7.   if (events !== undefined) {   
  8.         
  9.      if (doError && events[kErrorMonitor] !== undefined)   
  10.        this.emit(kErrorMonitor, ...args);   
  11.      // 触发的是error事件但是没有定义处理函数   
  12.      doError = (doError && events.error === undefined);   
  13.    } else if (!doError)  
  14.      // 没有定义处理函数并且触发的不是error事件则不需要处理,   
  15.      return false;   
  16.     
  17.    // If there is no 'error' event listener then throw.   
  18.    // 触发的是error事件,但是没有定义处理error事件的函数,则报错   
  19.    if (doError) {   
  20.      let er;   
  21.      if (args.length > 0)   
  22.        er = args[0];   
  23.      // 第一个入参是Error的实例   
  24.      if (er instanceof Error) {   
  25.        try {   
  26.          const capture = {};   
  27.             
  28.          Error.captureStackTrace(capture, EventEmitter.prototype.emit);   
  29.          ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {   
  30.            value: enhanceStackTrace.bind(this, er, capture),   
  31.            configurable: true   
  32.          });   
  33.        } catch {}   
  34.        throw er; // Unhandled 'error' event   
  35.      }   
  36.     
  37.      let stringifiedEr;   
  38.      const { inspect } = require('internal/util/inspect');   
  39.      try {   
  40.        stringifiedEr = inspect(er);   
  41.      } catch {   
  42.        stringifiedEr = er;   
  43.      }   
  44.      const err = new ERR_UNHANDLED_ERROR(stringifiedEr);   
  45.      err.context = er;   
  46.      throw err; // Unhandled 'error' event   
  47.    }   
  48.    // 获取type事件对应的处理函数   
  49.    const handler = events[type];   
  50.    // 没有则不处理   
  51.    if (handler === undefined)   
  52.      return false;   
  53.    // 等于函数说明只有一个   
  54.    if (typeof handler === 'function') {   
  55.      // 直接执行   
  56.      const result = ReflectApply(handler, this, args);   
  57.      // 非空判断是不是promise并且是否需要处理,见addCatch   
  58.      if (result !== undefined && result !== null) {   
  59.        addCatch(this, result, type, args);   
  60.      }   
  61.    } else {   
  62.      // 多个处理函数,同上   
  63.      const len = handler.length;   
  64.      const listeners = arrayClone(handler, len);   
  65.      for (let i = 0; i < len; ++i) {   
  66.        const result = ReflectApply(listeners[i], this, args);   
  67.        if (result !== undefined && result !== null) {   
  68.          addCatch(this, result, type, args);   
  69.        }   
  70.      }   
  71.    }   
  72.     
  73.    return true;   
  74.  } 

我们看到在Node.js中,对于error事件是特殊处理的,如果用户没有注册error事件的处理函数,可能会导致程序挂掉,另外我们看到有一个addCatch的逻辑,addCatch是为了支持事件处理函数为异步模式的情况,比如async函数或者返回Promise的函数。

  1. function addCatch(that, promise, type, args) {   
  2.   // 没有开启捕获则不需要处理   
  3.   if (!that[kCapture]) {   
  4.     return;   
  5.   }   
  6.   // that throws on second use.   
  7.   try {   
  8.     const then = promise.then;   
  9.    
  10.      if (typeof then === 'function') {   
  11.        // 注册reject的处理函数   
  12.        then.call(promise, undefined, function(err) {   
  13.          process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);   
  14.        });   
  15.      }   
  16.    } catch (err) {   
  17.      that.emit('error', err);   
  18.    }   
  19.  }   
  20.     
  21.  function emitUnhandledRejectionOrErr(ee, err, type, args) {   
  22.    // 用户实现了kRejection则执行   
  23.    if (typeof ee[kRejection] === 'function') {   
  24.      ee[kRejection](err, type, ...args);   
  25.    } else {   
  26.      // 保存当前值   
  27.      const prev = ee[kCapture];   
  28.      try {   
  29.           
  30.        ee[kCapture] = false;   
  31.        ee.emit('error', err);   
  32.      } finally {   
  33.        ee[kCapture] = prev;   
  34.      }   
  35.    }   
  36.  } 

4 取消订阅 我们接着看一下删除事件处理函数的逻辑。

  1. function removeAllListeners(type) {   
  2.       const events = this._events;   
  3.       if (events === undefined)   
  4.         return this;   
  5.    
  6.        
  7.        if (events.removeListener === undefined) {   
  8.          // 等于0说明是删除全部   
  9.          if (arguments.length === 0) {   
  10.            this._events = ObjectCreate(null);   
  11.            this._eventsCount = 0;   
  12.          } else if (events[type] !== undefined) {  
  13.              
  14.            if (--this._eventsCount === 0)   
  15.              this._events = ObjectCreate(null);   
  16.            else   
  17.              delete events[type];   
  18.          }   
  19.          return this;   
  20.        }   
  21.     
  22.         
  23.        if (arguments.length === 0) {   
  24.             
  25.          for (const key of ObjectKeys(events)) {   
  26.            if (key === 'removeListener'continue;   
  27.            this.removeAllListeners(key);   
  28.          }   
  29.          // 这里删除removeListener事件,见下面的逻辑   
  30.          this.removeAllListeners('removeListener');   
  31.          // 重置数据结构   
  32.          this._events = ObjectCreate(null);   
  33.          this._eventsCount = 0;   
  34.          return this;   
  35.        }   
  36.        // 删除某类型事件   
  37.        const listeners = events[type];   
  38.     
  39.        if (typeof listeners === 'function') {   
  40.          this.removeListener(type, listeners);   
  41.        } else if (listeners !== undefined) {   
  42.          // LIFO order   
  43.          for (let i = listeners.length - 1; i >= 0; i--) {   
  44.            this.removeListener(type, listeners[i]);   
  45.          }   
  46.        }   
  47.     
  48.        return this;   
  49.      } 

removeAllListeners函数主要的逻辑有两点,第一个是removeListener事件需要特殊处理,这类似一个钩子,每次用户删除事件处理函数的时候都会触发该事件。第二是removeListener函数。removeListener是真正删除事件处理函数的实现。removeAllListeners是封装了removeListener的逻辑。

  1. function removeListener(type, listener) {   
  2.    let originalListener;   
  3.    const events = this._events;   
  4.    // 没有东西可删除   
  5.    if (events === undefined)   
  6.      return this;   
  7.    
  8.    const list = events[type];   
  9.    // 同上   
  10.     if (list === undefined)   
  11.       return this;   
  12.     // list是函数说明只有一个处理函数,否则是数组,如果list.listener === listener说明是once注册的   
  13.     if (list === listener || list.listener === listener) {   
  14.       // type类型的处理函数就一个,并且也没有注册其它类型的事件,则初始化_events   
  15.       if (--this._eventsCount === 0)   
  16.         this._events = ObjectCreate(null);   
  17.       else {   
  18.         // 就一个执行完删除type对应的属性   
  19.         delete events[type];   
  20.         // 注册了removeListener事件,则先注册removeListener事件   
  21.         if (events.removeListener)   
  22.           this.emit('removeListener'
  23.                       type, 
  24.                       list.listener || listener);   
  25.       }   
  26.     } else if (typeof list !== 'function') {   
  27.       // 多个处理函数   
  28.       let position = -1;   
  29.       // 找出需要删除的函数   
  30.       for (let i = list.length - 1; i >= 0; i--) {   
  31.         if (list[i] === listener ||  
  32.              list[i].listener === listener) {   
  33.           // 保存原处理函数,如果有的话   
  34.           originalListener = list[i].listener;   
  35.           position = i;   
  36.           break;   
  37.         }   
  38.       }   
  39.     
  40.       if (position < 0)   
  41.         return this;   
  42.       // 第一个则出队,否则删除一个   
  43.       if (position === 0)   
  44.         list.shift();   
  45.       else {   
  46.         if (spliceOne === undefined)   
  47.           spliceOne = require('internal/util').spliceOne;   
  48.         spliceOne(list, position);   
  49.       }   
  50.       // 如果只剩下一个,则值改成函数类型   
  51.       if (list.length === 1)   
  52.         events[type] = list[0];   
  53.       // 触发removeListener   
  54.       if (events.removeListener !== undefined)   
  55.         this.emit('removeListener',  
  56.                     type, 
  57.                     originalListener || listener);   
  58.     }   
  59.     
  60.     return this;   
  61.   }; 

 

以上就是events模块的核心逻辑,另外还有一些工具函数就不一一分析。

 

来源:编程杂技内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯