问题描述
考虑以下蛇形工作流程(在此gist中完成):
我有一组预定义的参数来定义我的工作流通道:
PAR={
"id_list": range(1,10),}
我需要暂存数据,这里通过创建具有随机数的文件来模拟:
rule stage:
output: "in/{id}.old"
shell: "echo $RANDOM > {output}"
我有一个收集所有暂存文件名的功能和一个汇总暂存步骤的附带规则:
def get_all_dat(wildcards):
out=[]
for i in PAR["id_list"]:
dat=rules.stage.output[0].format(id=i)
out.append(dat)
return out
rule stage_all:
input: get_all_dat
output: "in/staged.list"
shell: "for i in {input}; do echo $i; done > {output}"
我绝对不需要get_all_dat
函数来完成本例中的简单操作(在expand
的输入上输入stage_all
就可以了),但是我决定将其包含在此处因为它与我的实际工作流程相匹配,在该工作流程中有一些通配符,它们都需要对齐,此功能可以确保这些通配符。
然后进入处理步骤:
rule process:
input:
list="in/staged.list",util="process.sh"
output: "out/{id}.new",shell: "./{input.util} $(cat {input.list})"
它获取来自stage_all
规则的文件列表,并将内容传递到process.sh
脚本。该脚本实质上对in/{id}.old
进行了一些虚拟更改,并写入了out/{id}.new
,有关确切代码,请参考gist。
关键,此过程将读取所有 in/{id}.old
个文件并创建所有 out/{id}.new
个文件。在这里,工作流通道变得混乱。与get_all_dat
函数一样,此“处理”是一个示例;我的实际工作流程中的实际处理无法分为单独的{id}
通道。
下一步是“绘图”:
rule plot:
input: "out/{id}.new"
output: "out/{id}.plot"
shell: "echo \"plot of $(cat {input})\" > {output}"
...拥有自己的聚合器(就像分段步骤一样):
def get_all_plot(wildcards):
out=[]
for i in PAR["id_list"]:
dat=rules.plot.output[0].format(id=i)
out.append(dat)
return out
rule plot_all:
input: get_all_plot
output: "out/plotted.list"
shell: "for i in {input}; do echo $i; done > {output}"
规则process
的主要问题在于,每个out/{id}.new
文件将发起一个对process.sh
的新调用,并发读取所有in/{id}.old
文件,并发同时写入所有{{1 }},这不好。我向out/{id}.new
添加了一些代码,以计算该脚本被调用的次数,请参见gist。
我尝试过的事情:
- 使用bash和lock文件以及flock强制额外的调用,以等待幸运的第一个
process.sh
线程完成,然后继续进行而没有错误; - 在规则
process.sh
的{{1}}中使用directory("out")
; - 添加将
output:
连接到process
的附加规则:
out/{id}.new
后果:
- 种族条件充裕,实际上没有很好的方法来确保仅执行一个
directory("out")
并snakemake删除rule connector: input: "out",output: "out/{id}.new",
文件(应如此),因为当相应的{{1 }}process.sh
规则首次被调用; - 工作流程中断,因为没有
out/{id}.new
与{id}
之间的连接; -
process
我的意图是使用out/{id}.new
作为目标,使用任意数量的内核(要等一个directory("out")
线程完成)来运行完整的工作流。原因是ChildIOException: File/directory is a child to another output:
步骤很便宜,而out/plotted.list
步骤很昂贵,并且process.sh
可以有很多值。
感谢您与我保持联系。
解决方法
那呢:
rule process:
input:
list="in/staged.list",util="process.sh",output:
touch('out/staged.list'),shell:
"./{input.util} $(cat {input.list})"
rule plot:
input:
list= 'out/staged.list ',output:
"out/{id}.plot"
params:
id= lambda wc: "out/%s.new" % wc.id,shell:
r"""
echo "plot of $(cat {params.id})" > {output}
"""
规则process
会输出一个伪文件,因此它只能执行一次,但会产生所有必需的文件。
规则plot
从上方接受虚拟文件的输入,因此它的实际输入也必须存在。实际的输入文件将作为参数传递。
接下来的规则应该能够正常使用"out/{id}.plot"
。