问题描述
我编写了以下代码来查找最高温度,但是当我尝试检索输出时,文件已创建但为空。我不太明白为什么会这样...有人可以帮忙吗?
我的跑步者代码:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.textoutputFormat;
public class MaxTemp {
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(MaxTemp.class);
conf.setJobName("MaxTemp1");
conf.setInputFormat(TextInputFormat.class);
conf.setoutputFormat(textoutputFormat.class);
conf.setoutputKeyClass(Text.class);
conf.setoutputValueClass(IntWritable.class);
conf.setMapperClass(MaxTempMapper.class);
conf.setCombinerClass(MaxTempReducer.class);
conf.setReducerClass(MaxTempReducer.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setoutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
映射器代码:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTempMapper extends MapReduceBase implements Mapper<LongWritable,Text,IntWritable> {
public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException {
String record = value.toString();
String[] parts = record.split(",");
output.collect(new Text(parts[0]),new IntWritable(Integer.parseInt(parts[1])));
}
}
我的减速器代码:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTempReducer extends MapReduceBase implements Reducer<Text,IntWritable,IntWritable> {
public void reduce1(Text key,Iterator<IntWritable> values,Reporter reporter) throws IOException {
int maxValue = 0;
while (values.hasNext()) {
maxValue=Math.max(maxValue,values.next().get());
}
output.collect(key,new IntWritable(maxValue));
}
@Override
public void reduce(Text arg0,Iterator<IntWritable> arg1,IntWritable> arg2,Reporter arg3) throws IOException {
// Todo Auto-generated method stub
}
}
我将附加输出屏幕截图
解决方法
我不禁注意到,在MaxTempReducer
类内部,您拥有reduce1
函数,同时覆盖了要在外部使用的正确reduce
函数 减速器等级。这就是为什么您没有在 HDFS 中获得任何输出的原因,因为该程序看到了reducer类,但是没有看到描述要处理的reduce
函数它(也就是找到温度的最大值)。
还有一个问题,您正在使用deprecated classes from old versions of Hadoop,因为在令人满意的基础上对框架及其组件进行了更新测试(您可以自己检查here )。
因此,通过解决这两个问题,您的程序可能看起来像这样:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Counters;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class MaxTemp
{
/* input: <byte_offset,line_of_dataset>
* output: <City,Temperature>
*/
public static class Map extends Mapper<Object,Text,IntWritable>
{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException
{
String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[0]),new IntWritable(Integer.parseInt(parts[1])));
}
}
/* input: <City,Temperature>
* output: <City,Max Temperature>
*/
public static class Reduce extends Reducer<Text,IntWritable,IntWritable>
{
public void reduce(Text key,Iterable<IntWritable> values,InterruptedException
{
int max_value = 0;
for(IntWritable value : values)
{
if(value.get() > max_value)
max_value = value.get();
}
context.write(key,new IntWritable(max_value));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("temperatures");
Path output_dir = new Path("temp_out");
// in case the output directory already exists,delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir,true);
// configure the MapReduce job
Job maxtemp_job = Job.getInstance(conf,"Max Temperature");
maxtemp_job.setJarByClass(MaxTemp.class);
maxtemp_job.setMapperClass(Map.class);
maxtemp_job.setCombinerClass(Reduce.class);
maxtemp_job.setReducerClass(Reduce.class);
maxtemp_job.setMapOutputKeyClass(Text.class);
maxtemp_job.setMapOutputValueClass(IntWritable.class);
maxtemp_job.setOutputKeyClass(Text.class);
maxtemp_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(maxtemp_job,input_dir);
FileOutputFormat.setOutputPath(maxtemp_job,output_dir);
maxtemp_job.waitForCompletion(true);
}
}
HDFS中的temperatures
目录如下所示:
Boston,3
Athens,15
Tokyo,20
Tokyo,10
Athens,32
Boston,9