问题描述
我想修改 bus scheduling problem from ortools 以便每个司机的班次在插槽方面是连续的,如果需要,司机可以同时共享一个班次。
例如,假设我们有以下半小时班次(格式类似于来自ortools的原始bus_scheduling_problem):
shifts = [
[0,'07:00','07:30',420,450,30],[1,'08:00',480,[2,'08:30',510,[3,'09:00',540,[4,'09:30',570,[5,'10:00',600,[6,'10:30',630,[7,'11:00',660,[8,'11:30',690,[9,'12:00',720,[10,'12:30',750,[11,'13:00',780,[12,'13:30',810,[13,'14:00',840,[14,'14:30',870,[15,'15:00',900,[16,'15:30',930,[17,'16:00',960,[18,'16:30',990,[19,'17:00',1020,[20,'17:30',1050,[21,'18:00',1080,[22,'18:30',1110,[23,'19:00',1140,[24,'19:30',1170,[25,'20:00',1200,[26,'20:30',1230,[27,'21:00',1260,[28,'21:30',1290,[29,'22:00',1320,[30,'22:30',1350,[31,'23:00',1380,[32,'23:30',1410,[33,'24:00',1440,30]
]
我成功执行了 this version of the bus_scheduling code 并且我发现我需要 2 个驱动程序来满足上述时间表的需求。工作时间范围从07:00 am to 24:00 (midnight)
开始。
因此,如果我们有 2 名巴士司机用于此时间表,我更愿意根据 12 小时司机班次分配涵盖每日值班的分配如下:
Driver 1: 07:00 - 19:00 with a break at 13:00
Driver 2: 12:00 - 24:00 with a break at 14:00 (basically no overlap with Driver 1's break)
我所说的连续小时的意思是,满足07:00-11:00 + 14:00-15:00 + 17:00-24:00
形式的12 小时司机 轮班解决方案的解决方案应该不被接受。具有更多驱动程序的解决方案还应确保休息时间不会重叠,以便并非所有驱动程序都在休息。此外,由于工作量大,休息槽可能会被堵塞。
我在 or-tools 讨论 中得到一个 答案 说我需要在每个节点维护自轮班开始以来的总时间,但我编码有困难,假设它解决了我的问题。
解决方法
对我来说,bus scheduling problem from ortools 对您的任务来说太过分了,因为您提到轮班持续时间总是 30
分钟,并且不需要设置/清理时间。此外,司机必须准确地工作 11
小时并有连续的休息时间。相反,我们可以编写一个类似于 nurse scheduling problem 的脚本,它可能更容易理解(对我来说,这是第一次用 or-tools 写东西,很清楚)。 >
准备
首先,总班次可以计算如下:
num_shifts = len(shifts)
所需的驱动程序数量:
num_drivers = ceil(float(num_shifts) / working_time)
在您的情况下,司机必须准确驾驶 11
小时,因此是 22
个班次(每个班次固定为 30
分钟):
working_time = 22
休息时间为 1
小时,所以:
break_time = 2
正如您在评论中提到的,每位司机在驾驶 4
小时后必须休息,但不得迟于 8
小时后:
break_interval = [8,16]
司机可以开始工作的最新班次:
latest_start_shift = num_shifts - working_time - break_time
真的,如果他/她晚点开始工作,那么司机就不会在整个工作时间内工作。
构建模型
让我们为司机定义一个班次数组:
driver_shifts = {}
for driver_id in range(num_drivers):
for shift_id in range(num_shifts):
driver_shifts[(driver_id,shift_id)] = model.NewBoolVar('driver%ishift%i' % (driver_id,shift_id))
driver_shifts[(d,s)]
等于 1
,如果班次 s
被分配给司机 d
,否则 0
。
另外,为司机创建一个开始班次的数组:
start_time = {}
for driver_id in range(num_drivers):
for shift_id in range(latest_start_shift + 1):
start_time[(driver_id,shift_id)] = model.NewBoolVar('driver%istart%i' % (driver_id,shift_id))
start_time[(d,s)]
等于 1
,如果司机 d
在班次 s
开始工作日,否则 0
。
司机每天开车正好 11 小时
每位司机必须在一天内准确驾驶所需的驾驶时间:
for driver_id in range(num_drivers):
model.Add(sum(driver_shifts[(driver_id,shift_id)] for shift_id in range(num_shifts)) == working_time)
然而,这还不够,因为驱动程序必须连续进行,中间有一个休息时间。我们稍后会看到如何做到这一点。
所有班次均由司机负责
每个班次必须由至少一名司机负责:
for shift_id in range(num_shifts):
model.Add(sum(driver_shifts[(driver_id,shift_id)] for driver_id in range(num_drivers)) >= 1)
驱动程序连续驱动
在这里 start_time
发挥作用。基本思想是,对于驱动程序的每个可能的开始时间,我们强制驱动程序在非工作时间工作(实际上,驱动程序每天只能开始工作一次!)。
因此,驱动程序每天只能开始工作一次:
for driver_id in range(num_drivers):
model.Add(sum(start_time[(driver_id,start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)
对于驱动程序的每个开始时间,连续working_time + break_time
内的工作时间为working_time
for driver_id in range(num_drivers):
for start_shift_id in range(latest_start_shift + 1):
model.Add(sum(driver_shifts[(driver_id,shift_id)] for shift_id in
range(start_shift_id,start_shift_id + working_time + break_time)) == working_time) \
.OnlyEnforceIf(start_time[(driver_id,start_shift_id)])
中断是连续的
为此,我们需要一个额外的数组 break_ind[(d,s,b)]
来表示具有给定工作班次开始 d
的给定司机 s
是否在班次 b
处休息。因此,在这种情况下,休息时间的 driver_shifts
值应为 0
:
l = start_shift_id + break_interval[0]
r = start_shift_id + break_interval[1]
for s in range(l,r):
break_ind[(driver_id,start_shift_id,s)] = model.NewBoolVar("d%is%is%i"%(driver_id,s))
model.Add(sum(driver_shifts[(driver_id,s1)] for s1 in range(s,s + break_time)) == 0)\
.OnlyEnforceIf(start_time[(driver_id,start_shift_id)])\
.OnlyEnforceIf(break_ind[(driver_id,s)])
此外,司机每天只能休息一次:
model.Add(sum(break_ind[(driver_id,s)] for s in range(l,r)) == 1)
完整代码
您可以查看下面的完整代码或here(我添加了它以供将来参考)。您还可以在那里找到司机不休息的情况的简化版本。
from ortools.sat.python import cp_model
from math import ceil
shifts = [
[0,'07:00','07:30',420,450,30],[1,'08:00',480,[2,'08:30',510,[3,'09:00',540,[4,'09:30',570,[5,'10:00',600,[6,'10:30',630,[7,'11:00',660,[8,'11:30',690,[9,'12:00',720,[10,'12:30',750,[11,'13:00',780,[12,'13:30',810,[13,'14:00',840,[14,'14:30',870,[15,'15:00',900,[16,'15:30',930,[17,'16:00',960,[18,'16:30',990,[19,'17:00',1020,[20,'17:30',1050,[21,'18:00',1080,[22,'18:30',1110,[23,'19:00',1140,[24,'19:30',1170,[25,'20:00',1200,[26,'20:30',1230,[27,'21:00',1260,[28,'21:30',1290,[29,'22:00',1320,[30,'22:30',1350,[31,'23:00',1380,[32,'23:30',1410,[33,'24:00',1440,30]
]
class VarArraySolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self,driver_shifts,num_drivers,num_shifts,solutions):
cp_model.CpSolverSolutionCallback.__init__(self)
self.driver_shifts = driver_shifts
self.num_drivers = num_drivers
self.num_shifts = num_shifts
self.solutions = solutions
self.solution_id = 0
def on_solution_callback(self):
if self.solution_id in self.solutions:
self.solution_id += 1
print ("Solution found!")
for driver_id in range(self.num_drivers):
print ("*************Driver#%s*************" % driver_id)
for shift_id in range(self.num_shifts):
if (self.Value(self.driver_shifts[(driver_id,shift_id)])):
print('Shift from %s to %s' %
(shifts[shift_id][1],shifts[shift_id][2]))
print()
def solution_count(self):
return self.solution_id
solver = cp_model.CpSolver()
model = cp_model.CpModel()
num_shifts = len(shifts)
working_time = 22
break_time = 2
# when take a break within the working time
break_interval = [8,16]
latest_start_shift = num_shifts - working_time - break_time
num_drivers = ceil(float(num_shifts) / working_time)
# create an array of assignments of drivers
driver_shifts = {}
for driver_id in range(num_drivers):
for shift_id in range(num_shifts):
driver_shifts[(driver_id,shift_id))
# driver must work exactly {working_time} shifts
for driver_id in range(num_drivers):
model.Add(sum(driver_shifts[(driver_id,shift_id)] for shift_id in range(num_shifts)) == working_time)
# each shift must be covered by at least one driver
for shift_id in range(num_shifts):
model.Add(sum(driver_shifts[(driver_id,shift_id)] for driver_id in range(num_drivers)) >= 1)
# create an array of start times for drivers
start_time = {}
for driver_id in range(num_drivers):
for shift_id in range(latest_start_shift + 1):
start_time[(driver_id,shift_id))
break_ind = {}
for driver_id in range(num_drivers):
for start_shift_id in range(latest_start_shift + 1):
model.Add(sum(driver_shifts[(driver_id,start_shift_id)])
l = start_shift_id + break_interval[0]
r = start_shift_id + break_interval[1]
for s in range(l,r):
break_ind[(driver_id,s))
model.Add(sum(driver_shifts[(driver_id,s + break_time)) == 0)\
.OnlyEnforceIf(start_time[(driver_id,start_shift_id)])\
.OnlyEnforceIf(break_ind[(driver_id,s)])
model.Add(sum(break_ind[(driver_id,r)) == 1)
for driver_id in range(num_drivers):
model.Add(sum(start_time[(driver_id,start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)
solution_printer = VarArraySolutionPrinter(driver_shifts,range(2))
status = solver.SearchForAllSolutions(model,solution_printer)