如何在多线程中使用 node-addon-api 保存回调以备后用

问题描述

我尝试在多线程模式下调用回调,但应用程序崩溃了。我想知道如何在多线程环境中正确调用回调。其中node_event_deliver接口是在另一个线程中调用的。

我的系统是macos,发现在macos下不能使用ThreadSafeFunction,怎么办。

系统完整性保护:已启用

Crashed Thread:        0  CrbrowserMain  dispatch queue: com.apple.main-thread

Exception Type:        EXC_BAD_ACCESS (SIGSEGV)
Exception Codes:       KERN_INVALID_ADDRESS at 0x0000000008257065
Exception Note:        EXC_CORPSE_NOTIFY

Termination Signal:    Segmentation fault: 11
Termination Reason:    Namespace SIGNAL,Code 0xb
Terminating Process:   exc handler [98763]

VM Regions Near 0x8257065:
--> 
    __TEXT                      102f57000-102f7f000    [  160K] r-x/r-x SM=COW  /Users/*/移动办公.app/Contents/MacOS/移动办公

Thread 0 Crashed:: CrbrowserMain  dispatch queue: com.apple.main-thread
0   com.github.Electron.framework   0x0000000109bbecb6 napi_is_detached_arraybuffer + 502
1   com.github.Electron.framework   0x0000000109bb6deb napi_create_function + 1035
2   com.github.Electron.framework   0x00000001078863ad v8::internal::ClassScope::ResolvePrivateNamesPartially() + 14509
3   com.github.Electron.framework   0x00000001078860a2 v8::internal::ClassScope::ResolvePrivateNamesPartially() + 13730
4   com.github.Electron.framework   0x00000001078856a1 v8::internal::ClassScope::ResolvePrivateNamesPartially() + 11169
5   com.github.Electron.framework   0x0000000107e65f78 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 484488
6   com.github.Electron.framework   0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
7   com.github.Electron.framework   0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
8   com.github.Electron.framework   0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
9   com.github.Electron.framework   0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
10  com.github.Electron.framework   0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
11  com.github.Electron.framework   0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
12  com.github.Electron.framework   0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
13  com.github.Electron.framework   0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
14  com.github.Electron.framework   0x0000000107df5b1b v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 24619
15  com.github.Electron.framework   0x0000000107df58f8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 24072
16  com.github.Electron.framework   0x00000001078d74a1 v8::internal::Execution::Call(v8::internal::Isolate*,v8::internal::Handle<v8::internal::Object>,int,v8::internal::Handle<v8::internal::Object>*) + 897
17  com.github.Electron.framework   0x00000001078d71ba v8::internal::Execution::Call(v8::internal::Isolate*,v8::internal::Handle<v8::internal::Object>*) + 154
18  com.github.Electron.framework   0x000000010786338c v8::Function::Call(v8::Local<v8::Context>,v8::Local<v8::Value>,v8::Local<v8::Value>*) + 332
19  com.github.Electron.framework   0x0000000109b6c1f6 node::InternalCallbackScope::Close() + 1734
20  com.github.Electron.framework   0x0000000109b6c465 node::MakeCallback(v8::Isolate*,v8::Local<v8::Object>,v8::Local<v8::Function>,v8::Local<v8::Value>*,node::async_context) + 181
21  com.github.Electron.framework   0x00000001073b908c ElectronInitializeIcuandStartNode + 1290636



//
//  node_v6_api.h
//  rcsnodejs
//
//  Created by liuchao on 16/11/14.
//  copyright © 2016年 rcsnodejs. All rights reserved.
//

#ifndef node_v6_api_h
#define node_v6_api_h

#include "v6_api.h"
#include <map>
#include "uv.h"
#include <queue>
#include <mutex>
#include "ClipboardHelper.h"
#include "triesearch_cpp.hpp"
#include "utils/md5/md5.h"
#include <napi.h>

using namespace trie;

// CALL_NODE_USE_ASYNC_QUEUE 用户定义SDK线程和NodeJS线程交互的两种工作模式开关
// 1. 异步:使用Queue执行
// 2. 同步:使用condition_variable通知执行 (关闭宏定义,认)
//
// 前提: NodeJS 的V8线程使用libuv的main_loop执行,也只能由main_loop执行,不允许其他线程访问V8实例
//       同时,libuv 唯一线程安全的API为uv_async_send,是一个类似于我们lua stack interrupter的唤醒机制
//
// 那么线程交互自然也就有两种: 同步和异步两种
//
// 问题: LuaSDK 部分业务如消息的listener callback 后会更新本地的sync_version,使用异步可能会造成消息还在队列
//       LuaSDK 更新sync_version后进程退出,造成客户端丢失消息
//
// 所以: `认`必须采用同步模式
//
// 同步模式优势:
// - Mac下测试:消息发送->回调->消息发送->回调 cpu 102% 速度:1.7W/s; 性能够用
// - 回调可以返回Javascript的返回值
// - 同步执行,保证强时序
//
// 异步模式优势:
// - SDK执行不受主线程执行的干扰
// - 更强的吞吐量


#define CALL_NODE_USE_ASYNC_QUEUE

typedef uint64_t (*p_v6_getTickCount)();
typedef int(*pInitSdk)(int a,v6_logger_func b,v6_node_callback c,const char* d,const char* e,const char* f,const char* g,int h,const char* i);
typedef int(*p_v6_sdk_exec)(int cid,char * bizMethod,char *jsonParamStr,void* response_callback);
typedef char* (*p_v6_queryRegPath)(const char* szPath,const char* szKey,bool bA2u);
typedef void (*p_createUuid2)(uuidString buf);
struct EventItem {
    const char *event;
    char *data;
    int cid;
};

void node_event_deliver(const char* event,const char* json,int cid);

class RcsSdk : public Napi::ObjectWrap<RcsSdk> {
 public:
  static Napi::Object Init(Napi::Env env,Napi::Object exports);
  RcsSdk(const Napi::CallbackInfo& info);

    void addCallback(int cid,Napi::Function callback);

    int getCid()
    {
        return cid++;
    }
    int getinitV6sdkRst()
    {
        return m_initV6sdkRst;
    }

    void notifyMainLoop();
    void destoryObject();

#ifdef CALL_NODE_USE_ASYNC_QUEUE
    std::queue<EventItem*> asyncEventQueue;
#else
    void waitForEventProcess(EventItem* item);
    void notifyEventComplete();
    EventItem* syncEventItem;
    char* syncEventResult;
#endif
    Napi::FunctionReference listener_callback;
    std::mutex eventSyncMutex;

    p_v6_getTickCount v6_getTickCount;
    p_v6_sdk_exec v6_exec;
    p_v6_queryRegPath v6_queryRegKey;
    p_createUuid2 v6_createUuid2;

    std::string m_proxyInfo;
    std::map<int,Napi::FunctionReference> callbacks;
 private:
  void sdk_call_method(const Napi::CallbackInfo& info,const char* bizMethod);

    Napi::Value bind(const Napi::CallbackInfo& info);
    Napi::Value isValid(const Napi::CallbackInfo& info);

    Napi::Value getsmscode(const Napi::CallbackInfo& info);
    Napi::Value doLogin(const Napi::CallbackInfo& info);
    Napi::Value setProxyInfo(const Napi::CallbackInfo& info);
    Napi::Value createQrCode(const Napi::CallbackInfo& info);

    Napi::Value checkExclusive(const Napi::CallbackInfo& info);

    std::condition_variable eventCondition;
    uv_async_t uv_async;
    int cid;
    int m_initV6sdkRst;
    std::string m_cachePath;
    // searchtrees_manager m_treeMgr;
    // cclipboardHelper m_clipBoardHelper;
#ifndef _WIN32
    // imageHelper_mac m_imageHelper_mac;
#endif
    // CipcstubMgr m_ipcstubMgr;

};

#endif /* node_v6_api_h */

//
//  node_v6_api.c
//  rcsnodejs
//
//  Created by liuchao on 16/11/14.
//  copyright © 2016年 rcsnodejs. All rights reserved.
//

#include "node_v6_api.h"
#include <stdio.h>
#include "uv.h"
#include <mutex>


// 所有的 Callbacks & Listeners,先放到RcsSDK实例字段上等唤醒的主线程来获取执行
// 线程安全:sdkMap、asyncEventQueue/syncEventItem 字段会被SDK线程和主线程同时访问,使用mutex锁
//typedef void(*v6_logger_func)(int level,const char *msg);
//typedef void(*v6_node_callback)(const char *action,const char *jsonStr);//call back to NodeJS proxy.


RcsSdk* g_rcsSdkV6 = NULL;
static std::mutex globalMutex;
int cid11 = 2;

void node_event_deliver(const char* event,int cid){

    //LuaSdk 后台线程
    LOGDEBUG("[<<V8 ENGINE>>]:node_event_deliver begin,event:%s json:%s,cid:%d\n",event,json,cid);
    if(NULL == g_rcsSdkV6)
    {
        LOGDEBUG("[<<V8 ENGINE>>]:node_event_deliver sdk NULL\n");
    }
#ifdef CALL_NODE_USE_ASYNC_QUEUE
    // 异步执行
    EventItem *item = (EventItem*)malloc(sizeof(EventItem));
    assert(item);
    item->data = (char*)malloc(strlen(json)+1);
    assert(item->data);
    item->event = event;
    item->cid = cid;
    strcpy(item->data,json);
    
    g_rcsSdkV6->eventSyncMutex.lock();
    g_rcsSdkV6->asyncEventQueue.push(item);
    g_rcsSdkV6->eventSyncMutex.unlock();
    g_rcsSdkV6->notifyMainLoop();
#else
    // 同步执行
    char* result = NULL;
    EventItem item;
    item.event = event;
    item.data = (char *)json;
    g_rcsSdkV6->waitForEventProcess(&item);
    g_rcsSdkV6->eventSyncMutex.lock();
    result = g_rcsSdkV6->syncEventResult;
    g_rcsSdkV6->eventSyncMutex.unlock();
#endif
    //LOGDEBUG("node_event_deliver end,event:%s json:%s\n",json);
}

static void node_do_event_process(RcsSdk* sdk,EventItem *item){
   LOGDEBUG("enter here.");
    if(!std::strcmp(item->event,"callback"))
    {
        int cid = item->cid;
        if(NULL ==  g_rcsSdkV6->callbacks[cid])
        {
            LOGDEBUG("[<<V8 ENGINE>>]:callback is null,cid is %d",cid);
        }
        else
        {
           LOGDEBUG("[<<V8 ENGINE>>]:callback is null,cid);
              g_rcsSdkV6->callbacks[cid].Call({  
                Napi::String::New(g_rcsSdkV6->callbacks[cid].Env(),item->data)});
        }

    }
    else
    {

        if ( NULL == g_rcsSdkV6->listener_callback)
        {
            LOGDEBUG("[<<V8 ENGINE>>]:listener_callback is null");
        }
        else
        {
            g_rcsSdkV6->listener_callback.Call({ Napi::String::New(g_rcsSdkV6->listener_callback.Env(),item->event),Napi::String::New(g_rcsSdkV6->listener_callback.Env(),item->data)});
        }
    }
    //LOGDEBUG("[<<V8 ENGINE>>]:node_do_event_process:%s %s\n",item->event,item->data);
    return;
}
// SDK线程调用 uv_async_send 后,主线程被唤醒后执行
// uv_async_send 是libuv唯一线程安全的API,但不保证每次调用通知到,只是能确保至少一次的唤醒主线程
// 参见: http://docs.libuv.org/en/v1.x/async.html
// int uv_async_send(uv_async_t* async)
// Wakeup the event loop and call the async handle’s callback.
//
// Note It’s safe to call this function from any thread. The callback will be
// called on the loop thread.
// Warning libuv will coalesce calls to uv_async_send(),that is,not every call to
// it will yield an execution of the callback. For example: if uv_async_send() is
// called 5 times in a row before the callback is called,the callback will only be
// called once. If uv_async_send() is called again after the callback was called,it
// will be called again.
static void node_event_process(uv_async_t *handle){
    //nodejs 主线程
   LOGDEBUG("[<<V8 ENGINE>>]:node_event_process begin");
    // Nan::HandleScope scope;
    RcsSdk *sdk = (RcsSdk*)handle->data;
    
#ifdef CALL_NODE_USE_ASYNC_QUEUE
    while(true){
        EventItem *item = NULL;
        sdk->eventSyncMutex.lock();
        if(!sdk->asyncEventQueue.empty()){
            item = sdk->asyncEventQueue.front();
            sdk->asyncEventQueue.pop();
        }
        sdk->eventSyncMutex.unlock();
        if(NULL == item){
            break;
        }
        node_do_event_process(sdk,item);
        free(item->data);
        free(item);
    }
#else
    sdk->eventSyncMutex.lock();
    EventItem *item = sdk->syncEventItem;
    sdk->syncEventItem = NULL;
    sdk->eventSyncMutex.unlock();
    if(NULL == item)
    {
        LOGERROR("[<<V8 ENGINE>>]:node_event_process sync syncEventItem=NULL");
    }
    else
    {
        v8::Local<v8::Value> ret = node_do_event_process(sdk,item);
        char *result = NULL;
        // 暂只支持返回String类型(必须在主线程解析v8对象)
        if(!ret.IsEmpty() && ret->Isstring()){
            result = *Nan::Utf8String(v8::Local<v8::String>::Cast(ret));
        }
        // 此处分配内存,由下次调用前释放,最多保留上一次的结果占用的内存
        if(result != NULL){
            char* cpy = (char*)malloc(strlen(result)+1);
            assert(cpy);
            strcpy(cpy,result);
            result = cpy;
        }
        sdk->eventSyncMutex.lock();
        sdk->syncEventResult = result;
        sdk->eventSyncMutex.unlock();
    }
    sdk->notifyEventComplete();
#endif
//    LOGDEBUG("[<<V8 ENGINE>>]:node_event_process end");
}

Napi::Object RcsSdk::Init(Napi::Env env,Napi::Object exports) {
  Napi::Function func =
      DefineClass(env,"RcsSdk",{ InstanceMethod("bind",&RcsSdk::bind),InstanceMethod("isValid",&RcsSdk::isValid),InstanceMethod("getsmscode",&RcsSdk::getsmscode),InstanceMethod("doLogin",&RcsSdk::doLogin),InstanceMethod("createQrCode",&RcsSdk::createQrCode),InstanceMethod("checkExclusive",&RcsSdk::checkExclusive),InstanceMethod("setProxyInfo",&RcsSdk::setProxyInfo)});                   

  Napi::String name = Napi::String::New(env,"RcsSdk");
  exports.Set(name,func);
  return exports;
}


RcsSdk::RcsSdk(const Napi::CallbackInfo& info)
    : Napi::ObjectWrap<RcsSdk>(info)
#ifndef CALL_NODE_USE_ASYNC_QUEUE
:syncEventItem(NULL),syncEventResult(NULL),m_initV6sdkRst(-1)
#endif
{
    cid = 1;
    // LOGDEBUG("[<<V8 ENGINE>>]:begin initSdk: level:%d scriptPath:%s productversion:%s mode:%d storage:%s portraitpath:%s",log_level,scriptPath,productversion,mode,storagePath,portraitPath);
  
    Napi::Number arg1 = info[0].As<Napi::Number>();
    Napi::String arg2 = info[1].As<Napi::String>();
    Napi::String arg3 = info[2].As<Napi::String>();
    Napi::Number arg4 = info[3].As<Napi::Number>();
    Napi::String arg5 = info[4].As<Napi::String>();
    Napi::String arg6 = info[5].As<Napi::String>();
    Napi::String arg7 = info[6].As<Napi::String>();

    int level = arg1.Int32Value();
    auto path = arg2.As<Napi::String>().Utf8Value();
    auto productversion = arg3.Utf8Value();
    int mode = arg4.Int32Value();
    auto sdkLogPath = arg5.Utf8Value();
    auto storagePath = arg6.Utf8Value();
    auto proxyInfo = arg7.Utf8Value();

    v6_getTickCount = v6_getTickCount2;
    v6_exec = v6_execute_from_Node;
    v6_createUuid2 = v6_createUuid;
    m_initV6sdkRst = v6_init_sdk2(level,NULL,node_event_deliver,path.c_str(),storagePath.c_str(),sdkLogPath.c_str(),productversion.c_str(),proxyInfo.c_str());
    
    // 每个State上有且只有一个用于中断主线程的 uv_async_t
    uv_async_init(uv_default_loop(),&uv_async,node_event_process);
    uv_async.data = this;
        
    globalMutex.lock();
    g_rcsSdkV6 = this;
    globalMutex.unlock();
}

void RcsSdk::addCallback(int cid,Napi::Function callback){
    callbacks[cid] = Napi::Reference<Napi::Function>::New(callback,1);;
    callbacks[cid].SuppressDestruct();
}

void RcsSdk::notifyMainLoop(){
    uv_async_send(&uv_async);
}
void RcsSdk::destoryObject()
{
    g_rcsSdkV6 = NULL;
}
#ifndef CALL_NODE_USE_ASYNC_QUEUE
void RcsSdk::waitForEventProcess(EventItem* item){
    std::unique_lock<std::mutex> lock(eventSyncMutex);
    syncEventItem = item;
    if(syncEventResult != NULL){
        free(syncEventResult);
        syncEventResult = NULL;
    }
    notifyMainLoop();
    eventCondition.wait(lock);
}
void RcsSdk::notifyEventComplete(){
    std::unique_lock<std::mutex> lock(eventSyncMutex);
    eventCondition.notify_one();
}
#endif

Napi::Value RcsSdk::bind(const Napi::CallbackInfo& info) {
    Napi::Function callback = info[0].As<Napi::Function>();
    g_rcsSdkV6->listener_callback = Napi::Reference<Napi::Function>::New(callback,1);
    g_rcsSdkV6->listener_callback.SuppressDestruct();
  return Napi::Number::New(info.Env(),0);
}

Napi::Value RcsSdk::isValid(const Napi::CallbackInfo& info) {
  return Napi::Number::New(info.Env(),0);
}

#define ADD_CALLBACK(__index) \
if(info[__index].IsFunction() && cid > 0){ \
    Napi::Function callback = info[__index].As<Napi::Function>();\
    addCallback(cid,callback); \
}

void RcsSdk::sdk_call_method(const Napi::CallbackInfo& info,const char* bizMethod) {
    if (info.Length() == 1) {
        if (!info[0].IsFunction()) {
            LOGERROR("sdk_call_method: %s Wrong arguments",bizMethod);
          Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
            return;
        }
        int cid = getCid();
        cid = v6_exec(cid,(char *)bizMethod,NULL);
        ADD_CALLBACK(0);
        return;
    } else if (info.Length() == 2) {
        if (!info[0].Isstring() || !info[1].IsFunction()) {
            LOGERROR("sdk_call_method: %s Wrong arguments",bizMethod);
            Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
            return;
        }
    Napi::String json = info[0].As<Napi::String>();
        int cid = getCid();
        cid = v6_exec(cid,(char *)json.Utf8Value().c_str(),NULL);
        ADD_CALLBACK(1);
        return;
    }
    else {
        Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
        return;
    }
}

Napi::Value RcsSdk::getsmscode(const Napi::CallbackInfo& info){
  sdk_call_method(info,"getSmsCode");
}

Napi::Value RcsSdk::doLogin(const Napi::CallbackInfo& info){
  sdk_call_method(info,"doLogin");
}

Napi::Value RcsSdk::setProxyInfo(const Napi::CallbackInfo& info){
  sdk_call_method(info,"setProxyInfo");
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)