根据输入路径产生输出路径和清除HDFS目录
根据输入路径产生输出路径和清除HDFS目录
##一、实验内容
对HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这里其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.
同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。
复制一份MapReduce1工程,取名为MapReduce3,对LogJob.java做些修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。
并且只设置了一个NamedOutput,名为result.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package com.shiyanlou.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LogJob {
public static void main(String[] args) throws Exception {
// 输入路径
String inputPath = args[0];
if (inputPath.endsWith("/")) {
inputPath = inputPath.substring(0, inputPath.length() - 1);
}
// 输出路径
String outputPath = inputPath + "/output";
Configuration conf = new Configuration();
Job job = new Job(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path1 = new Path(inputPath);
Path path2 = new Path(outputPath);
recreateFolder(path2, conf);
MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class,
Text.class, IntWritable.class);
FileInputFormat.addInputPath(job, path1);
FileOutputFormat.setOutputPath(job, path2);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* 清除目录
*
* @param path
* @param conf
* @throws IOException
*/
private static void recreateFolder(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
fs.delete(path);
}
}
}
LogReducer代码也需要修改:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.shiyanlou.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs outputs;
public void setup(Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer:::setup method");
outputs = new MultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,
InterruptedException {
System.out.println("enter LogReducer:::cleanup method");
outputs.close();
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("enter LogReducer::reduce method");
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
System.out.println("key: " + key.toString() + " sum: " + sum);
outputs.write("result", key, sum);
}
}
get下来input/output文件夹,进入本地output文件夹查看:
$ ls
_logs part-r-00000 result-r-00000 _SUCCESS
查看result-r-00000内容:
536dbacc4700aab274729cca 91
536dbae74700aab274729ccb 91
536dbb284700aab274729ccd 91
536dbb864700aab274729ccf 91
536dbba04700aab274729cd3 91
536dbba04700aab274729cd4 91
536dbba04700aab274729cd5 91
536dbba04700aab274729cd7 91
536dbba04700aab274729cd8 91
536dbba04700aab274729cd9 1
536dbba04700aab274729cdc 1
536dbba04700aab274729cdd 91
536dbba04700aab274729cde 152
536dbba04700aab274729ce0 87
536dbba04700aab274729ce1 87
536dbba04700aab274729ce2 87
536dbba04700aab274729ce3 87
536dbba04700aab274729ce4 91
536dbba04700aab274729ce5 91
536dbba04700aab274729ce8 152
536dbba04700aab274729ce9 91
536dbba14700aab274729cec 87
536dbba14700aab274729cee 87
536dbba14700aab274729cef 138
536dbba14700aab274729cf1 91
536dbba14700aab274729cf5 91
536dbba14700aab274729cf6 87
536dbba14700aab274729cf7 87
536dbba14700aab274729d02 87
536dbba14700aab274729d0a 87
536dbba14700aab274729d0d 87
536dbba14700aab274729d0f 1
536dbba14700aab274729d10 87
536dbba14700aab274729d12 87
536dbba14700aab274729d1c 152
536dbba14700aab274729d27 152
代码比之前的例子简单很多,仅仅是往一个named output “result” 写出结果。
二、小结
我们可以根据输入路径来生成输出路径,使用Path, FileSystem和Configuration三个类可以删除HDFS中的文件,相应地也可以创建文件。