是否可以将参数传递给工作先生

问题描述

给出来自 mrJob 站点的字数统计程序的基本示例:

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self,_,line):
        yield "chars",len(line)
        yield "words",len(line.split())
        yield "lines",1

    def reducer(self,key,values):
        yield key,sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

从命令行,这个例子可以作为 python mrJobFilename.py mrJobFilename.py 运行。这应该自己运行程序并计算文件中的字数。

在这个例子中,如果我想传入一个参数怎么办,比如 minCount = 3。有了这个参数,reducer 只会返回计数超过 minCount 的单词。

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self,values):
        X = sum(values)
        if X > minCount:
            yield key,sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

我尝试将 minWord 作为参数传递:python mrJobFilename.py mrJobFilename.py 3,但出现错误 OSError: Input path 3 does not exist!

我也试过用 sysArg 设置一个变量:

if __name__ == '__main__':
    minWord = sys.argv[1]
    MRWordFrequencyCount.run()

使用 python mrJobFilename.py mrJobFilename.py < 3 运行时出现错误 bash: 3: No such file or directory。如果我不使用 <,我会收到之前的输入文件未找到错误

最后,我尝试输入第二个 csv 文件。 csv 文件有 2 行,如下所示:

minWord
3

这意味着将参数传递给 mrJobs,因为它不断给我错误,即第二个参数不是输入文件。我使用 mapper_raw 尝试加载它,但出现奇怪的错误UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8f in position 22: invalid start byte

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper_raw(self,input_arg1,input_arg2):
        import csv
        f = open(input_path2)
        reader = csv.reader(f)
        next(reader) # skip header
        yield(next(reader))

    def steps(self):
          return [
              MRStep(mapper_raw=self.mapper_raw)
          ]


if __name__ == '__main__':
    MRWordFrequencyCount.run()

如何将参数传递给 mrJob?最终,我需要这样做来传递我想要并行求解的微分方程系统的参数。

解决方法

您可以按照 mrjob document 添加命令行参数,如 argparse

所以你的代码应该是这样的:

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def configure_args(self):
        super(MRWordFrequencyCount,self).configure_args()
        self.add_passthru_arg("-m","--minCount",help="your argument description")

    def mapper(self,_,line):
        yield "chars",len(line)
        yield "words",len(line.split())
        yield "lines",1

    def reducer(self,key,values):
        X = sum(values)
        if X > self.options.minCount:
            yield key,sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

将您的论点与 self.options.minCount 一起使用。

运行命令:

python code.py input.txt --minCount 4