前言
最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。
使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:
注意: numprocs = 8
,代表开启 8 个进程来执行 command 中的命令。
如下:
docker-compose exec php-worker sh
/etc/supervisor/conf.d # ps -ef | grep php
7 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
8 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
9 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
10 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
11 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
12 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
13 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
14 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
44 root 0:00 grep php
Laravel 多进程读取队列内容是否会重复
在 Laravel 的某个控制器方法,一次放入多个任务队列:
dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
$this->dispatch((new SendFile3())->onQueue('sendfile'));
}
在队列处理的方法打印日志,打印处理的队列的 ID:
app/Jobs/SendFile3.php
job->getRawBody();
$info = json_decode($rawbody,true);
info('queue id:' . $info['id']);
}
Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:
请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:
这个时候开启 Supervisor 处理队列任务,并查看日志:
这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。
分析一下 Laravel 队列的处理
Laravel 中入队列方法
getConnection()->rpush($this->getQueue($queue),$payload);
return Arr::get(json_decode($payload,true),'id');
}
用的是 Redis 的 rpush 命令。
Laravel 中取队列方法
default;
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue.':delayed',$queue);
if (! is_null($this->expire)) {
$this->migrateExpiredJobs($queue.':reserved',$queue);
}
list($job,$reserved) = $this->getConnection()->eval(
LuaScripts::pop(),2,$queue,$queue.':reserved',$this->getTime() + $this->expire
);
if ($reserved) {
return new RedisJob($this->container,$this,$job,$reserved,$original);
}
}
这里用的是 lua 脚本取队列,如下:
那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对编程之家的支持。