记一次mapreduce读取不到输入文件的问题
hdfs上输入文件所在包含两个目录,分别是: /20170503/shoplast/
/20170503/shop/
但是我想过滤掉shop,只把shoplast作为输入
故我实现了过滤器如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
static
class
FileNameFilter
implements
PathFilter {
@Override
public
boolean
accept(Path path) {
if
(path.getName().endsWith(
"last"
)) {
return
true
;
}
else
{
return
false
;
}
}
}
|
然后mapreduce的输入设为 /20170503/*, 开始执行。。。
结果 Total input paths to process : 0 输入文件数为0! 这什么鬼!
-----------------------------
看了源码之后感觉自己是个弱智啊。源码如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
protected
List<FileStatus> listStatus(JobContext job
)
throws
IOException {
List<FileStatus> result =
new
ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if
(dirs.length ==
0
) {
throw
new
IOException(
"No input paths specified in job"
);
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean
recursive = getInputDirRecursive(job);
List<IOException> errors =
new
ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters =
new
ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if
(jobFilter !=
null
) {
filters.add(jobFilter);
}
PathFilter inputFilter =
new
MultiPathFilter(filters);
for
(
int
i=
0
; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if
(matches ==
null
) {
errors.add(
new
IOException(
"Input path does not exist: "
+ p));
}
else
if
(matches.length ==
0
) {
errors.add(
new
IOException(
"Input Pattern "
+ p +
" matches 0 files"
));
}
else
{
for
(FileStatus globStat: matches) {
if
(globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while
(iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if
(inputFilter.accept(stat.getPath())) {
if
(recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
}
else
{
result.add(stat);
}
}
}
}
else
{
result.add(globStat);
}
}
}
}
if
(!errors.isEmpty()) {
throw
new
InvalidInputException(errors);
}
LOG.info(
"Total input paths to process : "
+ result.size());
return
result;
}
|
仔细看这段
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
for
(FileStatus globStat: matches) {
if
(globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while
(iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if
(inputFilter.accept(stat.getPath())) {
if
(recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
}
else
{
result.add(stat);
}
}
}
}
else
{
result.add(globStat);
}
}
|
以为过滤器是针对最终的输入文件名。如果输入的路径为目录,它会跟进里面的文件的
好吧,接下来修改下过滤器吧。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
static
class
FileNameFilter
implements
PathFilter {
@Override
public
boolean
accept(Path path) {
if
(path.getParent().getName().endsWith(
"last"
)) {
return
true
;
}
else
{
return
false
;
}
}
}
|
再次运行,当当当
|
1
2
|
cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs:
//20170503/
* matches 0 files
Exception
in
thread
"main"
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs:
//20170503/
* matches 0 files
|
还是读取不到输入文件,这是什么原因呢?看源码吧,还是上面贴出的那个方法。
|
1
2
3
4
5
6
7
8
|
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if
(matches ==
null
) {
errors.add(
new
IOException(
"Input path does not exist: "
+ p));
}
else
if
(matches.length ==
0
) {
errors.add(
new
IOException(
"Input Pattern "
+ p +
" matches 0 files"
));
}
|
不继续跟进globStatus方法了,想了解的自己去看源码吧。总之,这里是针对父目录的也应用了过滤器
结论:过滤器不光针对最终的文件,输入路径的父目录也要应用过滤器。