公牛队列阻塞作业,同时获得不同功能的作业

问题描述

当我在不同的函数调用 getJob 时,我需要一种方法来阻止工作人员处理作业。我环顾四周,但找不到解决方案。

我有以下设置。 在带有 express 的 nodeJS 中,我有工作节点。

  1. 以延迟状态创建的作业。
  2. 正在不同功能中访问作业
async function jobReader(id) {
   const job = await queue.getJob(id);
   /* do some stuff */
   await job.remove();
}
  1. 独立处理作业的工作节点。仅在延迟时间结束后才会处理作业。
queue.process(async (job) => {
   /* do some stuff */
})

queue.getJob(id) 不会阻止工作人员处理作业。所以在处理工作的工人和处理工作的 jobReader 上存在竞争。我正在根据工作状态将一些结果写入数据库。所以竞争条件是不可接受的。

显然,getJob 不会阻止工作人员处理作业。如果作业被其他具有 getJob 函数函数读取,是否有任何方法可以锁定或阻止工作人员的工作。

任何帮助或文档将不胜感激。

谢谢

解决方法

我想你应该稍微改变一下你的架构。 Worker Node 完全按照它的意图执行,它需要 jobs 并运行它们。因此,不要以某种方式阻止 queue,您应该只在用户批准/取消/失败(或在 120 秒后未发送响应)时将 job 添加到 queue ).

如果我理解正确的话,这应该能让您了解如何控制不同请求之间的作业:

// this is YOUR queueu object. I don't now implentation but think 
// of it like this..
const queue = new Queue()

// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {

}


// This could be your location where the user calls the api for creating a job.
app.post('/job',(req,res) => {

    // create the job as the user requested it
    const job = createJob(req)

    // Add a timeout for 120 seconds into your waitingJobs array. 
    // So if the user does not respond after that time,the job will 
    // be added to queue! .
    const timeout = setTimeout(() => {

        queue.add(job)

        // remove the reference after adding,garbage collection..
        waitingJobs[job.id] = null

    // job is added to queue automatically after 120 seconds
    },120 * 1000)

    // store the timeout in the job object!
    job.timeout = timeout

    // store the waiting job!
    waitingJobs[job.id] = job

    // respond to user,send back id so client can do another 
    // request if wanted.
    req.status(200).json({ message: 'Job created!',id: job.id })
})


app.post('/job/:id',res) => {

    const id = req.params.id

    if (!id) {
        req.status(400).json('bad job id provided')
        return
    }

    // get the queued job:
    const job = waitingJobs[id]

    if (!job) {
        req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
        return
    }

    // now the user responded to a specific job,clean the 
    // timeout first,so it won't be added to queue!
    if (job.timeout) {
        clearTimeout(job.timeout)
    }

    // Now the job won't be processed somewhere else!
    // you can do whatever you want...
    // example:

    // get the action
    const action = req.query.action
    
    if(!action) {
        res.status(400).json('Bad action provided: ' + action)
        return
    }

    if(action === 'APPROVE') {
        // job approved!,add it to queue so worker node 
        // can process it..
        queue.add(job)
    }

    if(action === 'CANCEL') {
        // do something else...
    }
    /// etc..

    
    // ofc clear the job reference after you did something..
    waitingJobs[job.id] = null
    

    // since everything worked,inform user the job will now be processed!
    res.status(200).json('Job ' + job.id + 'Will now be processed')
})

如果这对您有帮助或者您有任何疑问,请告诉我。