在hadoop中执行另一项工作

问题描述

| 我不明白如何使工作使用相同的输出目录 目录以在其中写入其他文件。我尝试过通勤 并注释这条线,但仍然无法正常工作。我得到以下 当我评论它时例外。无论如何,我在代码中尝试运行两个 使用相同的reducer但使用不同的mapper的单独作业。 编辑:不,一个工作的输出不是另一个工作的输入,原因 我希望它们在同一文件夹中是因为它们是另一张地图的输入 减少我想做的工作。
FileOutputFormat.setOutputPath(job,new Path(args[1]));

11/04/14 13:33:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread \"main\" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:770)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
    at org.myorg.WordCount.main(WordCount.java:123)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        // if (otherArgs.length != 2) {
        //   System.err.println(\"Usage: wordcount <in> <out>\");
        //   System.exit(2);
        // }
        Job job = new Job(conf,\"Job1\");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Mapper1.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.waitForCompletion(true);

        FileSystem hdfs = FileSystem.get(conf);
        Path fromPath = new Path(\"/user/hadoop/output/part-r-00000\");
        Path toPath = new Path(\"/user/hadoop/output/output1\");

        // renaming to output1
        boolean isRenamed = hdfs.rename(fromPath,toPath);
        if (isRenamed)
        {
            System.out.println(\"Renamed to /user/hadoop/output/output1!\");
        }
        else
        {
            System.out.println(\"Not Renamed!\");
        }

        job = new Job(conf,\"Job2\");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Mapper2.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // FileInputFormat.addInputPath(job,new Path(args[0]));
        // FileOutputFormat.setOutputPath(job,new Path(args[1]));


        System.exit( job.waitForCompletion(true) ? 0 : 1);
    }
将以下内容添加到我的代码中会导致其他错误:
    job.setInputFormatClass(FileInputFormat.class);
    job.setOutputFormatClass(FileOutputFormat.class);

    FileInputFormat.addInputPath(job,new Path(args[0]));
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
Exception in thread \"main\" java.lang.RuntimeException: java.lang.InstantiationException
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:768)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
    at org.myorg.WordCount.main(WordCount.java:135)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.InstantiationException
    at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
    ... 9 more
    

解决方法

所有文件都不必位于同一目录中。您的第三项作业可以具有多个输入路径(目录或文件)。 看到
FileInputFormat.addInputPaths(JobConf conf,String commaSeparatedPaths)
和朋友...     ,您必须为第二个作业提供一个新的Configuration对象。顺便说一句,为什么不对输出格式使用这些方法?   job.setInputFormatClass(FileInputFormat.class);      job.setOutputFormatClass(FileOutputFormat.class); 这是一篇关于递归工作的博客文章,这与您正在做的事情完全相同。 http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html 编辑: 顺便说一句,您打算将一个文件夹写入到哪个文件夹中,该文件夹既是前一个作业的输出又是新作业的输入?这只会导致另一个异常,例如:\“输出路径已存在\”。     

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...