MRJob - 在 hadoop 作业中使用外部库

问题描述

我在本地机器上设置了一个 2 节点的 hadoop 集群。我使用 MRJob 创建了一个作业类。我已经按照文档中的说明使用 MRJob 类中的 Dirs 属性包含了 numpy 包的路径。但是当我尝试在 hadoop 上运行作业时,它给了我一个错误。我观察到数据节点的临时目录中存在一个 numpy.tar.gz 文件,但它没有解压。

我的工作班级:

from mrjob.job import MRJob

class AttributeSplitter(MRJob):

    Dirs = ['~/.local/lib/python3.6/site-packages/numpy#my_numpy']

    def mapper(self,_,line):
        from my_numpy import log
        yield int(line),(0,log(0.5))

    def reducer(self,key,values):
        values = list(values)
        values.sort(key=lambda x: x[1])
        yield key,(values[0][0],values[0][1])


if __name__ == '__main__':
    AttributeSplitter.run()

产生的错误是:

Probable cause of failure:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess Failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)


Traceback (most recent call last):
  File "train_model.py",line 40,in <module>
    train_model(args.input,args.output)    
  File "train_model.py",line 27,in train_model
    model.fit(X_train,y_train)
  File "/home/hadoopuser/hadoop-movie-rating-prediction/random_forest_classifier.py",line 39,in fit
    model.fit(X_[sample],y_[sample])
  File "/home/hadoopuser/hadoop-movie-rating-prediction/decision_tree.py",line 171,in fit
    self.__generate_tree(self.tree_,X_,y_,weights,feature_types)
  File "/home/hadoopuser/hadoop-movie-rating-prediction/decision_tree.py",line 186,in __generate_tree
    best_feature_split = self.__split_attribute(tree,X,y,feature_types)        
  File "/home/hadoopuser/hadoop-movie-rating-prediction/decision_tree.py",line 241,in __split_attribute
    runner.run()
  File "/home/hadoopuser/.local/lib/python3.6/site-packages/mrjob/runner.py",line 503,in run
    self._run()
  File "/home/hadoopuser/.local/lib/python3.6/site-packages/mrjob/hadoop.py",line 329,in _run
    self._run_job_in_hadoop()
  File "/home/hadoopuser/.local/lib/python3.6/site-packages/mrjob/hadoop.py",line 407,in _run_job_in_hadoop
    num_steps=self._num_steps())
mrjob.step.StepFailedException: Step 1 of 1 Failed: Command '['/usr/local/hadoop/bin/hadoop','jar','/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar','-files','hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/files/wd/attribute_splitter.py#attribute_splitter.py,hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/files/wd/mrjob.zip#mrjob.zip,hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/files/wd/setup-wrapper.sh#setup-wrapper.sh','-archives','hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/files/wd/numpy.tar.gz#my_numpy','-input','hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/files/split_data_bbdac699-2663-4fcc-ae51-7b5783d8fe2c.txt','-output','hdfs:///user/hadoopuser/tmp/mrjob/attribute_splitter.hadoopuser.20210531.204013.120457/output','-mapper','/bin/sh -ex setup-wrapper.sh python3 attribute_splitter.py --step-num=0 --mapper','-reducer','/bin/sh -ex setup-wrapper.sh python3 attribute_splitter.py --step-num=0 --reducer']' returned non-zero exit status 256.

此外,我尝试通过配置 mrjob.conf 文件来显式安装 numpy(这也是 MRJob 文档中推荐的):

runners:
  hadoop:
    setup:
    - VENV=/tmp/$mapreduce_job_id
    - if [ ! -e $VENV ]; then virtualenv $VENV; fi
    - . $VENV/bin/activate
    - pip3 install numpy

不幸的是,它们都不起作用,我的想法也用完了。知道可能是什么问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)