protected
List<FileStatus> listStatus(JobContext job
)
throws
IOException {
List<FileStatus> result =
new
ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if
(dirs.length ==
0
) {
throw
new
IOException(
"No input paths specified in job"
);
}
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
boolean
recursive = getInputDirRecursive(job);
List<IOException> errors =
new
ArrayList<IOException>();
List<PathFilter> filters =
new
ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if
(jobFilter !=
null
) {
filters.add(jobFilter);
}
PathFilter inputFilter =
new
MultiPathFilter(filters);
for
(
int
i=
0
; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if
(matches ==
null
) {
errors.add(
new
IOException(
"Input path does not exist: "
+ p));
}
else
if
(matches.length ==
0
) {
errors.add(
new
IOException(
"Input Pattern "
+ p +
" matches 0 files"
));
}
else
{
for
(FileStatus globStat: matches) {
if
(globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while
(iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if
(inputFilter.accept(stat.getPath())) {
if
(recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
}
else
{
result.add(stat);
}
}
}
}
else
{
result.add(globStat);
}
}
}
}
if
(!errors.isEmpty()) {
throw
new
InvalidInputException(errors);
}
LOG.info(
"Total input paths to process : "
+ result.size());
return
result;
}