package
IT01;
import
java.io.IOException;
import
java.text.SimpleDateFormat;
import
java.util.Date;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import
org.apache.hadoop.hbase.mapreduce.TableReducer;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public
class
HbaseApp
{
public
static
String path1 =
"hdfs://hadoop80:9000/FlowData.txt"
;
public
static
void
main(String[] args)
throws
Exception
{
Configuration conf =
new
Configuration();
conf.set(
"hbaser.rootdir"
,
"hdfs://hadoop80:9000/hbase"
);
conf.set(
"hbase.zookeeper.quorum"
,
"hadoop80"
);
conf.set(TableOutputFormat.OUTPUT_TABLE,
"wlan_log"
);
conf.set(
"dfs.socket.timeout"
,
"2000"
);
Job job =
new
Job(conf,
"HbaseApp"
);
FileInputFormat.setInputPaths(job,
new
Path(path1));
job.setInputFormatClass(TextInputFormat.
class
);
job.setMapperClass(MyMapper.
class
);
job.setMapOutputKeyClass(Text.
class
);
job.setMapOutputValueClass(Text.
class
);
job.setNumReduceTasks(
1
);
job.setPartitionerClass(HashPartitioner.
class
);
job.setReducerClass(MyReducer.
class
);
job.setOutputFormatClass(TableOutputFormat.
class
);
job.waitForCompletion(
true
);
}
public
static
class
MyMapper
extends
Mapper{
protected
void
map(LongWritable k1, Text v1,Context context)
throws
IOException, InterruptedException
{
String[] splited = v1.toString().split(
"\t"
);
String reportTime = splited[
0
];
String msisdn = splited[
1
];
Date date =
new
Date(Long.parseLong(reportTime));
String time = DateConvert.dateParse(date);
String rowkey = msisdn+
":"
+time;
context.write(
new
Text(rowkey),
new
Text(v1.toString()));
}
}
public
static
class
MyReducer
extends
TableReducer{
protected
void
reduce(Text k2, Iterablev2s,Context context)
throws
IOException, InterruptedException
{
for
(Text v2 : v2s)
{
String[] splited = v2.toString().split(
"\t"
);
/**添加记录的时候需要指定行健、列族、列名、数值***/
Put put =
new
Put(k2.toString().getBytes());
put.add(
"cf"
.getBytes(),
"reportTime"
.getBytes(), splited[
0
].getBytes());
put.add(
"cf"
.getBytes(),
"msisdn"
.getBytes(), splited[
1
].getBytes());
put.add(
"cf"
.getBytes(),
"apmac1"
.getBytes(), splited[
2
].getBytes());
put.add(
"cf"
.getBytes(),
"apmac2"
.getBytes(), splited[
3
].getBytes());
put.add(
"cf"
.getBytes(),
"host"
.getBytes(), splited[
4
].getBytes());
put.add(
"cf"
.getBytes(),
"sitetype"
.getBytes(), splited[
5
].getBytes());
put.add(
"cf"
.getBytes(),
"upPackNum"
.getBytes(), splited[
6
].getBytes());
put.add(
"cf"
.getBytes(),
"downPackNum"
.getBytes(), splited[
7
].getBytes());
put.add(
"cf"
.getBytes(),
"upPayLoad"
.getBytes(), splited[
8
].getBytes());
put.add(
"cf"
.getBytes(),
"downPayLoad"
.getBytes(), splited[
9
].getBytes());
put.add(
"cf"
.getBytes(),
"httpstatus"
.getBytes(), splited[
10
].getBytes());
context.write(NullWritable.get(),put);
}
}
}
}
class
DateConvert
{
public
static
String dateParse(Date date)
{
SimpleDateFormat df =
new
SimpleDateFormat(
"yyyyMMddhhmmss"
);
return
df.format(date);
}
}