问题描述
我如何在 celery beat 中以不同的方式在工作日和周末安排我的任务?
时间表在我的settings.py
文件中设置如下
{
"task_weekday": {
"task": "tasks.my_regular_task","schedule": crontab(minute="0-30",hour="4,5",day_of_week="mon-fri"),"options": {"queue": "queue_name"},},"task_weekend": {
"task": "tasks.my_regular_task","schedule": crontab(minute="0-5",hour="10,12",day_of_week="sat,sun"),}
但是,当我设置它时,它在今天(2021 年 3 月 21 日,星期日)运行工作日计划,而不是采用周末计划。
我将应用时区设置为 'US/Pacific'
,CELERY_ENABLE_UTC
设置为 False
。
设置后,我看到以下日志条目,但它运行工作日任务计划。
[2021-03-21 17:57:50,082: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task_weekday tasks.my_regular_task() <crontab: 0-30 4,5 mon-fri * * (m/h/d/dM/MY)>
<ScheduleEntry: task_weekend tasks.my_regular_task() <crontab: 0-5 10,12 sat,sun * * (m/h/d/dM/MY)>
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>
我也尝试过每隔几分钟运行一次任务,以测试它采用哪个时间表并采用周末时间表:
{
"task_weekday": {
"task": "tasks.my_regular_task","schedule": crontab(minute="*/2",hour="*","schedule": crontab(minute="*/3",}
[2021-03-21 18:03:27,075: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task_weekend tasks.my_regular_task() <crontab: */3 * sat,sun * * (m/h/d/dM/MY)>
<ScheduleEntry: task_weekday tasks.my_regular_task() <crontab: */2 * mon-fri * * (m/h/d/dM/MY)>
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>
[2021-03-21 18:03:27,076: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2021-03-21 18:03:27,080: DEBUG/MainProcess] beat: Waking up in 32.91 seconds.
[2021-03-21 18:04:00,024: DEBUG/MainProcess] beat: Synchronizing schedule...
2021-03-21 18:04:00,041 | INFO | Self | 736a6a98bd5d456f8a253b5790f5a8e0 | 1 | celery.beat | beat:apply_entry | 271 | Scheduler: Sending due task task_weekday (tasks.my_regular_task)
[2021-03-21 18:04:00,041: INFO/MainProcess] Scheduler: Sending due task task_weekday (tasks.my_regular_task)
[2021-03-21 18:04:00,060: DEBUG/MainProcess] tasks.my_regular_task sent. id->bdffda9e-ab8a-41dd-a3b1-7ce62a1ab669
解决方法
task_name 是唯一索引。在表 periodtask 中,一个 task-name 只能出现一次。 所以你提交的两个作业意味着第二个会覆盖第一个。
您可以创建一个简单调用 abstract class Logger(vararg levels: LogLevel) {
private var nextLogger: Logger? = null
private val specifiedLevelsSet = EnumSet.copyOf(listOf(*levels))
fun appendNext(nextLogger: Logger): Logger {
this.nextLogger = nextLogger
return this
}
fun message(message: String,severity: LogLevel) {
if (specifiedLevelsSet.contains(severity)) {
log(message)
}
nextLogger?.message(message,severity)
}
abstract fun log(message: String)
}
class ConsoleLogger(vararg level: LogLevel) : Logger(*level) {
override fun log(message: String) {
System.err.println("Writing to console: $message")
}
}
class EmailLogger(vararg level: LogLevel) : Logger(*level) {
override fun log(message: String) {
System.err.println("Sending via email: $message")
}
}
class FileLogger(vararg level: LogLevel) : Logger(*level) {
override fun log(message: String) {
System.err.println("Writing to Log File: $message")
}
}
的任务函数 my_regualar_task_weekend()
,然后将周末计划应用于新任务。