问题描述
当我在不同的函数上调用 getJob 时,我需要一种方法来阻止工作人员处理作业。我环顾四周,但找不到解决方案。
我有以下设置。 在带有 express 的 nodeJS 中,我有工作节点。
- 以延迟状态创建的作业。
- 正在不同功能中访问作业
async function jobReader(id) {
const job = await queue.getJob(id);
/* do some stuff */
await job.remove();
}
- 独立处理作业的工作节点。仅在延迟时间结束后才会处理作业。
queue.process(async (job) => {
/* do some stuff */
})
queue.getJob(id)
不会阻止工作人员处理作业。所以在处理工作的工人和处理工作的 jobReader 上存在竞争。我正在根据工作状态将一些结果写入数据库。所以竞争条件是不可接受的。
显然,getJob 不会阻止工作人员处理作业。如果作业被其他具有 getJob 函数的函数读取,是否有任何方法可以锁定或阻止工作人员的工作。
任何帮助或文档将不胜感激。
谢谢
解决方法
我想你应该稍微改变一下你的架构。 Worker Node
完全按照它的意图执行,它需要 jobs
并运行它们。因此,不要以某种方式阻止 queue
,您应该只在用户批准/取消/失败(或在 120 秒后未发送响应)时将 job
添加到 queue
).
如果我理解正确的话,这应该能让您了解如何控制不同请求之间的作业:
// this is YOUR queueu object. I don't now implentation but think
// of it like this..
const queue = new Queue()
// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {
}
// This could be your location where the user calls the api for creating a job.
app.post('/job',(req,res) => {
// create the job as the user requested it
const job = createJob(req)
// Add a timeout for 120 seconds into your waitingJobs array.
// So if the user does not respond after that time,the job will
// be added to queue! .
const timeout = setTimeout(() => {
queue.add(job)
// remove the reference after adding,garbage collection..
waitingJobs[job.id] = null
// job is added to queue automatically after 120 seconds
},120 * 1000)
// store the timeout in the job object!
job.timeout = timeout
// store the waiting job!
waitingJobs[job.id] = job
// respond to user,send back id so client can do another
// request if wanted.
req.status(200).json({ message: 'Job created!',id: job.id })
})
app.post('/job/:id',res) => {
const id = req.params.id
if (!id) {
req.status(400).json('bad job id provided')
return
}
// get the queued job:
const job = waitingJobs[id]
if (!job) {
req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
return
}
// now the user responded to a specific job,clean the
// timeout first,so it won't be added to queue!
if (job.timeout) {
clearTimeout(job.timeout)
}
// Now the job won't be processed somewhere else!
// you can do whatever you want...
// example:
// get the action
const action = req.query.action
if(!action) {
res.status(400).json('Bad action provided: ' + action)
return
}
if(action === 'APPROVE') {
// job approved!,add it to queue so worker node
// can process it..
queue.add(job)
}
if(action === 'CANCEL') {
// do something else...
}
/// etc..
// ofc clear the job reference after you did something..
waitingJobs[job.id] = null
// since everything worked,inform user the job will now be processed!
res.status(200).json('Job ' + job.id + 'Will now be processed')
})
如果这对您有帮助或者您有任何疑问,请告诉我。