文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Node.js子线程调试和诊断指南

2024-12-14 01:30

关注

调试、诊断子线程最直接的方式就是像调试、诊断主线程一样,但是无论是动态开启还是静态开启,子线程都不可避免地需要内置一些相关的非业务代码,本文介绍另外一种对子线程代码无侵入的调试方式,另外也介绍一下通过子线程调试主线程的方式。

1.初始化子线程的Inspector

在Node.js启动子线程的时候,会初始化Inspector。

  1. env_->InitializeInspector(std::move(inspector_parent_handle_)); 

在分析InitializeInspector之前,我们先看一下inspector_parent_handle_。

  1. std::unique_ptr inspector_parent_handle_; 

inspector_parent_handle_是一个ParentInspectorHandle对象,这个对象是子线程和主线程通信的桥梁。我们看一下他的初始化逻辑(在主线程里执行)。

  1. inspector_parent_handle_ = env->inspector_agent()->GetParentHandle(thread_id_, url); 

调用agent的GetParentHandle获取一个ParentInspectorHandle对象。

  1. std::unique_ptr Agent::GetParentHandle(int thread_id, const std::string& url) { 
  2.  return client_->getWorkerManager()->NewParentHandle(thread_id, url); 

内部其实是通过client_->getWorkerManager()对象的NewParentHandle方法获取ParentInspectorHandle对象,接下来我们看一下WorkerManager的NewParentHandle。

  1. std::unique_ptr WorkerManager::NewParentHandle(int thread_id, const std::string& url) { 
  2.   bool wait = !delegates_waiting_on_start_.empty(); 
  3.   return std::make_unique(thread_id, url, thread_, wait); 
  4.  
  5. ParentInspectorHandle::ParentInspectorHandle( 
  6.     int id, const std::string& url, 
  7.     std::shared_ptr parent_thread,  
  8.     bool wait_for_connect 
  9.     : id_(id),  
  10.       url_(url),  
  11.       parent_thread_(parent_thread), 
  12.       wait_(wait_for_connect) {} 

最终的架构图如下入所示。

分析完ParentInspectorHandle后继续看一下env_->InitializeInspector(std::move(inspector_parent_handle_))的逻辑(在子线程里执行)。

  1. int Environment::InitializeInspector( 
  2.     std::unique_ptr parent_handle) { 
  3.  
  4.   std::string inspector_path; 
  5.   inspector_path = parent_handle->url(); 
  6.   inspector_agent_->SetParentHandle(std::move(parent_handle)); 
  7.   inspector_agent_->Start(inspector_path, 
  8.                           options_->debug_options(), 
  9.                           inspector_host_port(), 
  10.                           is_main_thread()); 

首先把ParentInspectorHandle对象保存到agent中,然后调用agent的Start方法。

  1. bool Agent::Start(...) { 
  2.     // 新建client对象 
  3.    client_ = std::make_shared(parent_env_, is_main); 
  4.    // 调用agent中保存的ParentInspectorHandle对象的WorkerStarted 
  5.    parent_handle_->WorkerStarted(client_->getThreadHandle(), ...); 

Agent::Start创建了一个client对象,然后调用ParentInspectorHandle对象的WorkerStarted方法(刚才SetParentHandle的时候保存的),我们看一下这时候的架构图。

接着看parent_handle_->WorkerStarted。

  1. void ParentInspectorHandle::WorkerStarted( 
  2.     std::shared_ptr worker_thread, bool waiting) { 
  3.   std::unique_ptr request( 
  4.       new WorkerStartedRequest(id_, url_, worker_thread, waiting)); 
  5.   parent_thread_->Post(std::move(request)); 

WorkerStarted创建了一个WorkerStartedRequest请求,然后通过parent_thread_->Post提交,parent_thread_是MainThreadInterface对象。

  1. void MainThreadInterface::Post(std::unique_ptr request) { 
  2.   Mutex::ScopedLock scoped_lock(requests_lock_); 
  3.   // 之前是空则需要唤醒消费者 
  4.   bool needs_notify = requests_.empty(); 
  5.   // 消息入队 
  6.   requests_.push_back(std::move(request)); 
  7.   if (needs_notify) { 
  8.        // 获取当前对象的一个弱引用 
  9.        std::weak_ptr* interface_ptr = new std::weak_ptr(shared_from_this()); 
  10.       // 请求V8执行RequestInterrupt入参对应的回调 
  11.       isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) { 
  12.         // 把执行时传入的参数转成MainThreadInterface 
  13.         std::unique_ptr> interface_ptr { 
  14.           static_cast*>(opaque)  
  15.         }; 
  16.         // 判断对象是否还有效,是则调用DispatchMessages 
  17.         if (auto iface = interface_ptr->lock()) iface->DispatchMessages(); 
  18.  
  19.       }, static_cast(interface_ptr)); 
  20.   } 
  21.   // 唤醒消费者 
  22.   incoming_message_cond_.Broadcast(scoped_lock); 

我们看看这时候的架构图。

接着看回调里执行MainThreadInterface对象DispatchMessages方法的逻辑。

  1. void MainThreadInterface::DispatchMessages() { 
  2.   // 遍历请求队列 
  3.   requests_.swap(dispatching_message_queue_); 
  4.   while (!dispatching_message_queue_.empty()) { 
  5.     MessageQueue::value_type task; 
  6.     std::swap(dispatching_message_queue_.front(), task); 
  7.     dispatching_message_queue_.pop_front(); 
  8.     // 执行任务函数 
  9.     task->Call(this); 
  10.   } 

task是WorkerStartedRequest对象,看一下Call方法的代码。

  1. void Call(MainThreadInterface* thread) override { 
  2.   auto manager = thread->inspector_agent()->GetWorkerManager(); 
  3.   manager->WorkerStarted(id_, info_, waiting_); 

接着调用agent的WorkerManager的WorkerStarted。

  1. void WorkerManager::WorkerStarted(int session_id, 
  2.                                   const WorkerInfo& info, 
  3.                                   bool waiting) { 
  4.   children_.emplace(session_id, info); 
  5.   for (const auto& delegate : delegates_) { 
  6.     Report(delegate.second, info, waiting); 
  7.   } 

WorkerStarted记录了一个id和上下文,因为delegates_初始化的时候是空的,所以不会执行。至此,子线程Inspector初始化的逻辑就分析完了,结构图如下。

我们发现,和主线程不一样,主线程会启动一个WebSocket服务器接收客户端的连接请求,而子线程只是初始化了一些数据结构。下面我们看一下基于这些数据结构,主线程是如何动态开启调试子线程的。

2.主线程开启调试子线程的能力

我们可以以以下方式开启对子线程的调试。

  1. const { Worker, workerData } = require('worker_threads'); 
  2. const { Session } = require('inspector'); 
  3. // 新建一个新的通信通道 
  4. const session = new Session(); 
  5. session.connect(); 
  6. // 创建子线程 
  7. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
  8. // 子线程启动成功后开启调试子线程的能力 
  9. worker.on('online', () => { 
  10.     session.post("NodeWorker.enable"
  11.                  {waitForDebuggerOnStart: false},   
  12.                  (err) => {   
  13.                     err && console.log("NodeWorker.enable", err); 
  14.                  }); 
  15.     }); 
  16. // 防止主线程退出 
  17. setInterval(() => {}, 100000); 

我们先来分析一下connect函数的逻辑。

  1. connect() { 
  2.     this[connectionSymbol] = new Connection((message) => this[onMessageSymbol](message)); 

新建了一个Connection对象并传入一个回调函数,该回调函数在收到消息时被回调。Connection是C++层导出的对象,由模版类JSBindingsConnection实现。

  1. template  
  2. class JSBindingsConnection {} 

我们看看导出的路逻辑。

  1. JSBindingsConnection<Connection>::Bind(env, target); 

接着看Bind。

  1. static void Bind(Environment* env, Local target) { 
  2.     // class_name是Connection 
  3.     Local class_name = ConnectionType::GetClassName(env); 
  4.     Local tmpl = env->NewFunctionTemplate(JSBindingsConnection::New); 
  5.     tmpl->InstanceTemplate()->SetInternalFieldCount(1); 
  6.     tmpl->SetClassName(class_name); 
  7.     tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); 
  8.     env->SetProtoMethod(tmpl, "dispatch", JSBindingsConnection::Dispatch); 
  9.     env->SetProtoMethod(tmpl, "disconnect", JSBindingsConnection::Disconnect); 
  10.     target->Set(env->context(), 
  11.                 class_name, 
  12.                 tmpl->GetFunction(env->context()).ToLocalChecked()) 
  13.         .ToChecked(); 
  14.   } 
  15. 当我们在JS层执行new Connection的时候,就会执行JSBindingsConnection::New。

    1. static void New(const FunctionCallbackInfo& info) { 
    2.    Environment* env = Environment::GetCurrent(info); 
    3.    Local<Function> callback = info[0].As<Function>(); 
    4.    new JSBindingsConnection(env, info.This(), callback); 

    我们看看新建一个JSBindingsConnection对象时的逻辑。

    1. JSBindingsConnection(Environment* env, 
    2.                        Local wrap, 
    3.                        Local<Function> callback) 
    4.                        : AsyncWrap(env, wrap, PROVIDER_INSPECTORJSBINDING), 
    5.                          callback_(env->isolate(), callback) { 
    6.     Agent* inspector = env->inspector_agent(); 
    7.     session_ = LocalConnection::Connect
    8.         inspector, std::make_unique(env, this) 
    9.     );}static std::unique_ptr Connect
    10.       Agent* inspector,  
    11.       std::unique_ptr delegate 
    12. ) { 
    13.     return inspector->Connect(std::move(delegate), false); 
    14. 最终是传入了一个JSBindingsSessionDelegate对象调用Agent的Connect方法。

      1. std::unique_ptr Agent::Connect
      2.     std::unique_ptr delegate, 
      3.     bool prevent_shutdown) { 
      4.   int session_id = client_->connectFrontend(std::move(delegate), 
      5.                                             prevent_shutdown); 
      6.   // JSBindingsConnection对象的session_字段指向的对象                                          
      7.   return std::unique_ptr
      8.       new SameThreadInspectorSession(session_id, client_) 
      9.   ); 

      Agent的Connect方法继续调用client_->connectFrontend。

      1. int connectFrontend(std::unique_ptr delegate, 
      2.                       bool prevent_shutdown) { 
      3.     int session_id = next_session_id_++; 
      4.     channels_[session_id] = std::make_unique(env_, 
      5.                                                           client_, 
      6.                                                           getWorkerManager(), 
      7.                                                           std::move(delegate), 
      8.                                                           getThreadHandle(), 
      9.                                                           prevent_shutdown); 
      10.     return session_id; 

      connectFrontend新建了一个ChannelImpl对象,在新建ChannelImpl时,会初始化子线程处理的逻辑。

      1. explicit ChannelImpl(Environment* env, 
      2.                        const std::unique_ptr& inspector, 
      3.                        std::shared_ptr worker_manager, 
      4.                        std::unique_ptr delegate, 
      5.                        std::shared_ptr main_thread_, 
      6.                        bool prevent_shutdown) 
      7.       : delegate_(std::move(delegate)), prevent_shutdown_(prevent_shutdown), 
      8.         retaining_context_(false) { 
      9.     session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView()); 
      10.     // Node.js拓展命令的处理分发器 
      11.     node_dispatcher_ = std::make_unique(this); 
      12.     // trace相关 
      13.     tracing_agent_ = std::make_unique(env, main_thread_); 
      14.     tracing_agent_->Wire(node_dispatcher_.get()); 
      15.     // 处理子线程相关 
      16.     if (worker_manager) { 
      17.       worker_agent_ = std::make_unique(worker_manager); 
      18.       worker_agent_->Wire(node_dispatcher_.get()); 
      19.     } 
      20.     // 处理runtime 
      21.     runtime_agent_ = std::make_unique(); 
      22.     runtime_agent_->Wire(node_dispatcher_.get()); 

      我们这里只关注处理子线程相关的逻辑。看一下 worker_agent_->Wire。

      1. void WorkerAgent::Wire(UberDispatcher* dispatcher) { 
      2.   frontend_.reset(new NodeWorker::Frontend(dispatcher->channel())); 
      3.   NodeWorker::Dispatcher::wire(dispatcher, this); 
      4.   auto manager = manager_.lock(); 
      5.   workers_ = std::make_shared(frontend_, manager->MainThread()); 

      这时候的架构图如下

      接着看一下NodeWorker::Dispatcher::wire(dispatcher, this)的逻辑。

      1. void Dispatcher::wire(UberDispatcher* uber, Backend* backend){ 
      2.     std::unique_ptr dispatcher(new DispatcherImpl(uber->channel(), backend)); 
      3.     uber->setupRedirects(dispatcher->redirects()); 
      4.     uber->registerBackend("NodeWorker", std::move(dispatcher)); 

      首先新建了一个DispatcherImpl对象。

      1. DispatcherImpl(FrontendChannel* frontendChannel, Backend* backend) 
      2.         : DispatcherBase(frontendChannel) 
      3.         , m_backend(backend) { 
      4.         m_dispatchMap["NodeWorker.sendMessageToWorker"] = &DispatcherImpl::sendMessageToWorker; 
      5.         m_dispatchMap["NodeWorker.enable"] = &DispatcherImpl::enable; 
      6.         m_dispatchMap["NodeWorker.disable"] = &DispatcherImpl::disable; 
      7.         m_dispatchMap["NodeWorker.detach"] = &DispatcherImpl::detach; 

      除了初始化一些字段,另外了一个kv数据结构,这个是一个路由配置,后面我们会看到它的作用。新建完DispatcherImpl后又调用了uber->registerBackend("NodeWorker", std::move(dispatcher))注册该对象。

      1. void UberDispatcher::registerBackend(const String& name, std::unique_ptr dispatcher){ 
      2.     m_dispatchers[name] = std::move(dispatcher); 

      这时候的架构图如下。

      我们看到这里其实是建立了一个路由体系,后面收到命令时就会根据这些路由配置进行转发,类似Node.js Express框架路由机制。这时候可以通过session的post给主线程发送NodeWorker.enable命令来开启子线程的调试。我们分析这个过程。

      1. post(method, params, callback) { 
      2.     // 忽略参数处理 
      3.     // 保存请求对应的回调 
      4.     if (callback) { 
      5.       this[messageCallbacksSymbol].set(id, callback); 
      6.     } 
      7.     // 调用C++的dispatch 
      8.     this[connectionSymbol].dispatch(JSONStringify(message)); 

      this[connectionSymbol]对应的是JSBindingsConnection对象。

      1. static void Dispatch(const FunctionCallbackInfo& info) { 
      2.     Environment* env = Environment::GetCurrent(info); 
      3.     JSBindingsConnection* session; 
      4.     ASSIGN_OR_RETURN_UNWRAP(&session, info.Holder()); 
      5.     if (session->session_) { 
      6.       session->session_->Dispatch( 
      7.           ToProtocolString(env->isolate(), info[0])->string()); 
      8.     } 

      session_是一个SameThreadInspectorSession对象。

      1. void SameThreadInspectorSession::Dispatch( 
      2.     const v8_inspector::StringView& message) { 
      3.   auto client = client_.lock(); 
      4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) { 
      5.     channels_[session_id]->dispatchProtocolMessage(message); 

      最终调用了ChannelImpl的dispatchProtocolMessage。

      1. void dispatchProtocolMessage(const StringView& message) { 
      2.     std::string raw_message = protocol::StringUtil::StringViewToUtf8(message); 
      3.     std::unique_ptr value = 
      4.         protocol::DictionaryValue::cast(protocol::StringUtil::parseMessage( 
      5.             raw_message, false)); 
      6.     int call_id; 
      7.     std::string method; 
      8.     // 解析命令 
      9.     node_dispatcher_->parseCommand(value.get(), &call_id, &method); 
      10.     // 判断命令是V8内置命令还是Node.js拓展的命令 
      11.     if (v8_inspector::V8InspectorSession::canDispatchMethod( 
      12.             Utf8ToStringView(method)->string())) { 
      13.       session_->dispatchProtocolMessage(message); 
      14.     } else { 
      15.       node_dispatcher_->dispatch(call_id, method, std::move(value), 
      16.                                  raw_message); 
      17.     } 

      因为NodeWorker.enable是Node.js拓展的命令,所以会走到else里面的逻辑。根据路由配置找到该命令对应的处理逻辑(NodeWorker.enable以.切分,对应两级路由)。

      1. void UberDispatcher::dispatch(int callId, const String& in_method, std::unique_ptr parsedMessage, const ProtocolMessage& rawMessage){ 
      2.     // 找到一级路由配置 
      3.     protocol::DispatcherBase* dispatcher = findDispatcher(method); 
      4.     std::unique_ptr messageObject = DictionaryValue::cast(std::move(parsedMessage)); 
      5.     // 交给一级路由处理器处理 
      6.     dispatcher->dispatch(callId, method, rawMessage, std::move(messageObject)); 

      NodeWorker.enable对应的路由处理器代码如下

      1. void DispatcherImpl::dispatch(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr messageObject){ 
      2.     // 查找二级路由 
      3.     std::unordered_map::iterator it = m_dispatchMap.find(method); 
      4.     protocol::ErrorSupport errors; 
      5.     // 找到处理函数 
      6.     (this->*(it->second))(callId, method, message, std::move(messageObject), &errors); 

      dispatch继续寻找命令对应的处理函数,最终找到NodeWorker.enable命令的处理函数为DispatcherImpl::enable。

      1. void DispatcherImpl::enable(...){ 
      2.     std::unique_ptr weak = weakPtr(); 
      3.     DispatchResponse response = m_backend->enable(...); 
      4.     // 返回响应给命令(类似请求/响应模式) 
      5.     weak->get()->sendResponse(callId, response); 

      根据架构图可以知道m_backend是WorkerAgent对象。

      1. DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) { 
      2.   auto manager = manager_.lock(); 
      3.   std::unique_ptr delegate(new AgentWorkerInspectorDelegate(workers_)); 
      4.   event_handle_ = manager->SetAutoAttach(std::move(delegate)); 
      5.   return DispatchResponse::OK(); 

      继续调用WorkerManager的SetAutoAttach方法。

      1. std::unique_ptr WorkerManager::SetAutoAttach( 
      2.     std::unique_ptr attach_delegate) { 
      3.   int id = ++next_delegate_id_; 
      4.   // 保存delegate 
      5.   delegates_[id] = std::move(attach_delegate); 
      6.   const auto& delegate = delegates_[id]; 
      7.   // 通知子线程 
      8.   for (const auto& worker : children_) { 
      9.     Report(delegate, worker.secondfalse); 
      10.   } 
      11.   ... 

      SetAutoAttach遍历子线程。

      1. void Report(const std::unique_ptr& delegate, 
      2.             const WorkerInfo& info, bool waiting) { 
      3.   if (info.worker_thread) 
      4.     delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread); 

      info是一个WorkerInfo对象,该对象是子线程初始化和主线程建立关系的数据结构。delegate是AgentWorkerInspectorDelegate对象。

      1. void WorkerCreated(const std::string& title, 
      2.                      const std::string& url, 
      3.                      bool waiting, 
      4.                      std::shared_ptr target) override { 
      5.     workers_->WorkerCreated(title, url, waiting, target); 

      workers_是一个NodeWorkers对象。

      1. void NodeWorkers::WorkerCreated(const std::string& title, 
      2.                                 const std::string& url, 
      3.                                 bool waiting, 
      4.                                 std::shared_ptr target) { 
      5.   auto frontend = frontend_.lock(); 
      6.   std::string id = std::to_string(++next_target_id_); 
      7.   // 处理数据通信的delegate 
      8.   auto delegate = thread_->MakeDelegateThreadSafe( 
      9.       std::unique_ptr
      10.           new ParentInspectorSessionDelegate(id, shared_from_this()) 
      11.       ) 
      12.   ); 
      13.   // 建立和子线程V8 Inspector的通信通道 
      14.   sessions_[id] = target->Connect(std::move(delegate), true); 
      15.   frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting); 

      WorkerCreated建立了一条和子线程通信的通道,然后通知命令的发送方通道建立成功。这时候架构图如下。

      接着看attachedToWorker。

      1. void Frontend::attachedToWorker(const String& sessionId, std::unique_ptr workerInfo, bool waitingForDebugger){ 
      2.     std::unique_ptr messageData = AttachedToWorkerNotification::create() 
      3.         .setSessionId(sessionId) 
      4.         .setWorkerInfo(std::move(workerInfo)) 
      5.         .setWaitingForDebugger(waitingForDebugger) 
      6.         .build(); 
      7.     // 触发NodeWorker.attachedToWorker 
      8.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.attachedToWorker", std::move(messageData))); 

      继续看sendProtocolNotification

      1. void sendProtocolNotification( 
      2.       std::unique_ptr<Serializable> message) override { 
      3.     sendMessageToFrontend(message->serializeToJSON()); 
      4.  } 
      5.  
      6.  void sendMessageToFrontend(const StringView& message) { 
      7.     delegate_->SendMessageToFrontend(message); 
      8.  } 

      这里的delegate_是一个JSBindingsSessionDelegate对象。

      1. void SendMessageToFrontend(const v8_inspector::StringView& message) 
      2.         override { 
      3.       Isolate* isolate = env_->isolate(); 
      4.       HandleScope handle_scope(isolate); 
      5.       Context::Scope context_scope(env_->context()); 
      6.       MaybeLocal v8string = String::NewFromTwoByte(isolate, 
      7.                                                            message.characters16(), 
      8.                                                            NewStringType::kNormal, message.length() 
      9.       ); 
      10.       Local argument = v8string.ToLocalChecked().As(); 
      11.       // 收到消息执行回调 
      12.       connection_->OnMessage(argument); 
      13.  
      14. // 执行JS层回调 
      15. void OnMessage(Local value) { 
      16.    MakeCallback(callback_.Get(env()->isolate()), 1, &value); 

      JS层回调逻辑如下。

      1. [onMessageSymbol](message) { 
      2.     const parsed = JSONParse(message); 
      3.     // 收到的消息如果是某个请求的响应,则有个id字段记录了请求对应的id,否则则触发事件 
      4.     if (parsed.id) { 
      5.        const callback = this[messageCallbacksSymbol].get(parsed.id); 
      6.        this[messageCallbacksSymbol].delete(parsed.id); 
      7.        if (callback) { 
      8.          callback(null, parsed.result); 
      9.        } 
      10.      } else { 
      11.        this.emit(parsed.method, parsed); 
      12.        this.emit('inspectorNotification', parsed); 
      13.      } 
      14.   } 

      主线程拿到Worker Session对一个的id,后续就可以通过命令NodeWorker.sendMessageToWorker加上该id和子线程通信。大致原理如下,主线程通过自己的channel和子线程的channel进行通信,从而达到控制子线程的目的。

      我们分析一下NodeWorker.sendMessageToWorker命令的逻辑,对应处理函数为DispatcherImpl::sendMessageToWorker。

      1. void DispatcherImpl::sendMessageToWorker(...){ 
      2.     std::unique_ptr weak = weakPtr(); 
      3.     DispatchResponse response = m_backend->sendMessageToWorker(in_message, in_sessionId); 
      4.     // 响应 
      5.     weak->get()->sendResponse(callId, response); 
      6.     return

      继续分析m_backend->sendMessageToWorker。

      1. DispatchResponse WorkerAgent::sendMessageToWorker(const String& message, 
      2.                                                   const String& sessionId) { 
      3.   workers_->Receive(sessionId, message); 
      4.   return DispatchResponse::OK(); 
      5.  
      6. void NodeWorkers::Receive(const std::string& id, const std::string& message) { 
      7.   auto it = sessions_.find(id); 
      8.   it->second->Dispatch(Utf8ToStringView(message)->string()); 

      sessions_对应的是和子线程的通信的数据结构CrossThreadInspectorSession。看一下该对象的Dispatch方法。

      1. void Dispatch(const StringView& message) override { 
      2.     state_.Call(&MainThreadSessionState::Dispatch, 
      3.                 StringBuffer::create(message)); 

      再次调了MainThreadSessionState::Dispatch

      1. void Dispatch(std::unique_ptr message) { 
      2.     session_->Dispatch(message->string()); 

      session_是SameThreadInspectorSession对象。继续看它的Dispatch方法。

      1. void SameThreadInspectorSession::Dispatch( 
      2.     const v8_inspector::StringView& message) { 
      3.   auto client = client_.lock(); 
      4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) { 
      5.     channels_[session_id]->dispatchProtocolMessage(message); 

      通过层层调用,最终拿到了一个合子线程通信的channel,dispatchProtocolMessage方法刚才已经分析过,该方法会根据命令做不同的处理,因为我们这里发送的是V8内置的命令,所以会交给V8 Inspector处理。当V8 Inspector处理完后,会通过ChannelImpl的sendResponse返回结果。

      1. void sendResponse( 
      2.       int callId, 
      3.       std::unique_ptr message) override { 
      4.     sendMessageToFrontend(message->string()); 
      5.  
      6.  void sendMessageToFrontend(const StringView& message) { 
      7.     delegate_->SendMessageToFrontend(message); 
      8.  } 

      这里的delegate_是ParentInspectorSessionDelegate对象。

      1. void SendMessageToFrontend(const v8_inspector::StringView& msg) override { 
      2.   std::string message = protocol::StringUtil::StringViewToUtf8(msg); 
      3.   workers_->Send(id_, message); 
      4.  
      5. void NodeWorkers::Send(const std::string& id, const std::string& message) { 
      6.   auto frontend = frontend_.lock(); 
      7.   if (frontend) 
      8.     frontend->receivedMessageFromWorker(id, message); 
      9.  
      10. void Frontend::receivedMessageFromWorker(const String& sessionId, const String& message){ 
      11.     std::unique_ptr messageData = ReceivedMessageFromWorkerNotification::create() 
      12.         .setSessionId(sessionId) 
      13.         .setMessage(message) 
      14.         .build(); 
      15.     // 触发NodeWorker.receivedMessageFromWorker        
      16.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.receivedMessageFromWorker", std::move(messageData))); 

      m_frontendChannel是主线程的ChannelImpl对象。

      1. void sendProtocolNotification( 
      2.     std::unique_ptr<Serializable> message) override { 
      3.     sendMessageToFrontend(message->serializeToJSON()); 
      4.  
      5. void sendMessageToFrontend(const StringView& message) { 
      6.     delegate_->SendMessageToFrontend(message); 

      delegate_是C++层传入的JSBindingsSessionDelegate对象。最终通过JSBindingsSessionDelegate对象回调JS层,之前已经分析过就不再赘述。至此,主线程就具备了控制子线程的能力,但是控制方式有很多种。

      2.1 使用通用的V8命令

      通过下面代码收集子线程的CPU Profile信息。

      1. const { Worker, workerData } = require('worker_threads'); 
      2. const { Session } = require('inspector'); 
      3. const session = new Session(); 
      4. session.connect(); 
      5. let id = 1; 
      6. function post(sessionId, method, params, callback) { 
      7.     session.post('NodeWorker.sendMessageToWorker', { 
      8.         sessionId, 
      9.         message: JSON.stringify({ id: id++, method, params }) 
      10.     }, callback); 
      11.  
      12. session.on('NodeWorker.attachedToWorker', (data) => { 
      13.     post(data.params.sessionId, 'Profiler.enable'); 
      14.     post(data.params.sessionId, 'Profiler.start'); 
      15.     // 收集一段时间后提交停止收集命令 
      16.     setTimeout(() => { 
      17.         post(data.params.sessionId, 'Profiler.stop'); 
      18.     }, 10000) 
      19. }); 
      20.  
      21. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {  
      22.     const data = JSON.parse(message); 
      23.     console.log(data); 
      24. }); 
      25.  
      26. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
      27. worker.on('online', () => { 
      28.     session.post("NodeWorker.enable",{waitForDebuggerOnStart: false},  (err) => {  console.log(err, "NodeWorker.enable");}); 
      29. }); 
      30. setInterval(() => {}, 100000); 

      通过这种方式可以通过命令控制子线程的调试和数据收集。

      2.2 在子线程中动态执行脚本

      可以通过执行脚本开启子线程的WebSocket服务,像调试主线程一样。

      1. const { Worker, workerData } = require('worker_threads'); 
      2. const { Session } = require('inspector'); 
      3. const session = new Session(); 
      4. session.connect(); 
      5. let workerSessionId; 
      6. let id = 1; 
      7. function post(method, params) { 
      8.     session.post('NodeWorker.sendMessageToWorker', { 
      9.         sessionId: workerSessionId, 
      10.         message: JSON.stringify({ id: id++, method, params }) 
      11.     }); 
      12.  
      13. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {  
      14.     const data = JSON.parse(message); 
      15.     console.log(data); 
      16. }); 
      17.  
      18. session.on('NodeWorker.attachedToWorker', (data) => { 
      19.     workerSessionId = data.params.sessionId; 
      20.     post("Runtime.evaluate", { 
      21.         includeCommandLineAPI: true,  
      22.         expression: `const inspector = process.binding('inspector'); 
      23.                     inspector.open(); 
      24.                     inspector.url(); 
      25.                     ` 
      26.         }  
      27.     ); 
      28. }); 
      29.  
      30. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
      31. worker.on('online', () => { 
      32.     session.post("NodeWorker.enable",{waitForDebuggerOnStart: false},  (err) => {  err && console.log("NodeWorker.enable", err);}); 
      33. }); 
      34. setInterval(() => {}, 100000); 

      执行上面的代码就拿到以下输出

      1.   id: 1, 
      2.   result: { 
      3.     result: { 
      4.       type: 'string'
      5.       value: 'ws://127.0.0.1:9229/c0ca16c8-55aa-4651-9776-fca1b27fc718' 
      6.     } 
      7.   } 

      通过该地址,客户端就可以对子线程进行调试了。上面代码里使用process.binding而不是require加载inspector,因为刚才通过NodeWorker.enable命令为子线程创建了一个到子线程Inspector的channel,而JS模块里判断如果channel非空则报错Inspector已经打开。所以这里需要绕过这个限制,直接加载C++模块开启WebSocket服务器。

      3.子线程调试主线程

      不仅可以通过主线程调试子线程,还可以通过子线程调试主线程。Node.js在子线程暴露了connectToMainThread方法连接到主线程的Inspector(只能在work_threads中使用),实现的原理和之前分析的类似,主要是子线程连接到主线程的V8 Inspector,通过和该Inspector完成对主线程的控制。看下面一个例子。主线程代码

      1. const { Worker, workerData } = require('worker_threads');const http = require('http');const worker = new Worker('./worker.js', {workerData: {port: 80}}); 
      2.  
      3. http.createServer((_, res) => { 
      4.     res.end('main'); 
      5. }).listen(8000); 

      worker.js代码如下:

      1. const fs = require('fs'); 
      2. const { workerData: { port } } = require('worker_threads'); 
      3. const { Session } = require('inspector'); 
      4. const session = new Session(); 
      5. session.connectToMainThread(); 
      6. session.post('Profiler.enable'); 
      7. session.post('Profiler.start'); 
      8. setTimeout(() => { 
      9.     session.post('Profiler.stop', (err, data) => { 
      10.         if (data.profile) { 
      11.             fs.writeFileSync('./profile.cpuprofile', JSON.stringify(data.profile)); 
      12.         } 
      13.     }); 
      14. }, 5000) 

       

      免责声明:

      ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

      ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

      软考中级精品资料免费领

      • 历年真题答案解析
      • 备考技巧名师总结
      • 高频考点精准押题
      • 资料下载
      • 历年真题
      • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

        难度     813人已做
        查看
      • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

        难度     354人已做
        查看
      • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

        难度     318人已做
        查看
      • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

        难度     435人已做
        查看
      • 2024年上半年系统架构设计师考试综合知识真题

        难度     224人已做
        查看

      相关文章

      发现更多好内容
      咦!没有更多了?去看看其它编程学习网 内容吧