package
whut.onetomany;
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Mapper.Context;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public
class
JoinMain
extends
Configured
implements
Tool {
public
static
class
JoinMapper
extends
Mapper<LongWritable, Text, TextIntPair, Text>
{
private
TextIntPair tp=
new
TextIntPair();
private
Text val=
new
Text();
@Override
protected
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
FileSplit file=(FileSplit)context.getInputSplit();
String fileName=file.getPath().toString();
String line=value.toString();
String[] lineKeyValue=line.split(
"\""
);
String lineKey=lineKeyValue[
0
];
String lineValue=lineKeyValue[
1
];
tp.setFirstKey(lineKey);
if
(fileName.indexOf(
"trade"
)>=
0
)
{
tp.setSecondKey(
0
);
val.set(lineValue);
}
else
if
(fileName.indexOf(
"pay"
)>=
0
)
{
tp.setSecondKey(
1
);
val.set(lineValue);
}
context.write(tp, val);
}
}
public
static
class
JoinReducer
extends
Reducer<TextIntPair, Text, Text, Text>
{
@Override
protected
void
reduce(TextIntPair key, Iterable<Text> values,
Context context)
throws
IOException, InterruptedException {
Iterator<Text> valList=values.iterator();
String tradeName=valList.next().toString();
while
(valList.hasNext())
{
Text pay=valList.next();
context.write(
new
Text(tradeName), pay);
}
}
}
@Override
public
int
run(String[] args)
throws
Exception
{
Configuration conf=getConf();
Job job=
new
Job(conf,
"JoinJob"
);
job.setJarByClass(JoinMain.
class
);
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
FileInputFormat.addInputPath(job,
new
Path(args[
1
]));
FileOutputFormat.setOutputPath(job,
new
Path(args[
2
]));
job.setMapperClass(JoinMapper.
class
);
job.setReducerClass(JoinReducer.
class
);
job.setPartitionerClass(PartitionByText.
class
);
job.setGroupingComparatorClass(TextComparator.
class
);
job.setSortComparatorClass(TextIntComparator.
class
);
job.setMapOutputKeyClass(TextIntPair.
class
);
job.setMapOutputValueClass(Text.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(Text.
class
);
job.waitForCompletion(
true
);
int
exitCode=job.isSuccessful()?
0
:
1
;
return
exitCode;
}
public
static
void
main(String[] args)
throws
Exception
{
int
code=ToolRunner.run(
new
JoinMain(), args);
System.exit(code);
}
}