根据输入路径产生输出路径和清除HDFS目录

##一、实验内容
对HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这里其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.

同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。

复制一份MapReduce1工程,取名为MapReduce3,对LogJob.java做些修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。
并且只设置了一个NamedOutput,名为result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.shiyanlou.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LogJob {

public static void main(String[] args) throws Exception {
// 输入路径
String inputPath = args[0];
if (inputPath.endsWith("/")) {
inputPath = inputPath.substring(0, inputPath.length() - 1);
}
// 输出路径
String outputPath = inputPath + "/output";

Configuration conf = new Configuration();
Job job = new Job(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);

job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

Path path1 = new Path(inputPath);
Path path2 = new Path(outputPath);

recreateFolder(path2, conf);

MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class,
Text.class, IntWritable.class);

FileInputFormat.addInputPath(job, path1);
FileOutputFormat.setOutputPath(job, path2);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

/**
* 清除目录
*
* @param path
* @param conf
* @throws IOException
*/

private static void recreateFolder(Path path, Configuration conf)
throws IOException {

FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
fs.delete(path);
}
}
}

LogReducer代码也需要修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.shiyanlou.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private MultipleOutputs outputs;

@Override
public void setup(Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer:::setup method");
outputs = new MultipleOutputs(context);
}

@Override
public void cleanup(Context context) throws IOException,
InterruptedException {

System.out.println("enter LogReducer:::cleanup method");
outputs.close();
}

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

System.out.println("enter LogReducer::reduce method");
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
System.out.println("key: " + key.toString() + " sum: " + sum);
outputs.write("result", key, sum);
}
}

get下来input/output文件夹,进入本地output文件夹查看:

$ ls
_logs  part-r-00000  result-r-00000  _SUCCESS

查看result-r-00000内容:

536dbacc4700aab274729cca    91
536dbae74700aab274729ccb    91
536dbb284700aab274729ccd    91
536dbb864700aab274729ccf    91
536dbba04700aab274729cd3    91
536dbba04700aab274729cd4    91
536dbba04700aab274729cd5    91
536dbba04700aab274729cd7    91
536dbba04700aab274729cd8    91
536dbba04700aab274729cd9    1
536dbba04700aab274729cdc    1
536dbba04700aab274729cdd    91
536dbba04700aab274729cde    152
536dbba04700aab274729ce0    87
536dbba04700aab274729ce1    87
536dbba04700aab274729ce2    87
536dbba04700aab274729ce3    87
536dbba04700aab274729ce4    91
536dbba04700aab274729ce5    91
536dbba04700aab274729ce8    152
536dbba04700aab274729ce9    91
536dbba14700aab274729cec    87
536dbba14700aab274729cee    87
536dbba14700aab274729cef    138
536dbba14700aab274729cf1    91
536dbba14700aab274729cf5    91
536dbba14700aab274729cf6    87
536dbba14700aab274729cf7    87
536dbba14700aab274729d02    87
536dbba14700aab274729d0a    87
536dbba14700aab274729d0d    87
536dbba14700aab274729d0f    1
536dbba14700aab274729d10    87
536dbba14700aab274729d12    87
536dbba14700aab274729d1c    152
536dbba14700aab274729d27    152

代码比之前的例子简单很多,仅仅是往一个named output “result” 写出结果。

二、小结

我们可以根据输入路径来生成输出路径,使用Path, FileSystem和Configuration三个类可以删除HDFS中的文件,相应地也可以创建文件。

Comments