package
whut;
import
java.io.IOException;
import
java.util.ArrayList;
import
java.util.List;
import
org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputFormat;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.pig.LoadFunc;
import
org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.data.DataByteArray;
import
org.apache.pig.data.Tuple;
import
org.apache.pig.data.TupleFactory;
class
Range
{
private
int
start;
private
int
end;
public
static
List<Range> parse(String cutStr)
throws
Exception
{
List<Range> rangeList=
new
ArrayList<Range>();
boolean
state=cutStr.matches(
"\\d+~\\d+(,\\d+~\\d+)*"
);
if
(!state)
{
throw
new
Exception(
"InputForat Error:\n"
+
"Usage:number~number,number~number;Such 2~7,10~19"
);
}
String[] splits=cutStr.split(
","
);
for
(
int
i=
0
;i<splits.length;i++)
{
Range range=
new
Range();
String sub=splits[i];
String[] subSplits=sub.split(
"~"
);
int
subStart=Integer.parseInt(subSplits[
0
]);
int
subEnd=Integer.parseInt(subSplits[
1
]);
if
(subStart>subEnd)
throw
new
Exception(
"InputForat Error:\n"
+
"Detail:first number must less than second number"
);
range.setStart(subStart);
range.setEnd(subEnd);
rangeList.add(range);
}
return
rangeList;
}
public
int
getStart() {
return
start;
}
public
void
setStart(
int
start) {
this
.start = start;
}
public
int
getEnd() {
return
end;
}
public
void
setEnd(
int
end) {
this
.end = end;
}
public
String getSubString(String inStr)
{
String res=inStr.substring(start, end);
return
res;
}
}
public
class
LineLoadFunc
extends
LoadFunc{
private
static
final
Log LOG=LogFactory.getLog(LineLoadFunc.
class
);
private
final
TupleFactory tupleFactory=TupleFactory.getInstance();
private
RecordReader reader;
private
List<Range> ranges;
public
LineLoadFunc(String cutPattern)
throws
Exception
{
ranges=Range.parse(cutPattern);
}
@Override
public
void
setLocation(String location, Job job)
throws
IOException {
FileInputFormat.setInputPaths(job, location);
}
@Override
public
InputFormat getInputFormat()
throws
IOException {
return
new
TextInputFormat();
}
@Override
public
void
prepareToRead(RecordReader reader, PigSplit split)
throws
IOException {
this
.reader=reader;
}
@Override
public
Tuple getNext()
throws
IOException {
try
{
if
(!reader.nextKeyValue())
return
null
;
Text value=(Text)reader.getCurrentValue();
String line=value.toString();
Tuple tuple=tupleFactory.newTuple(ranges.size());
for
(
int
i=
0
;i<ranges.size();i++)
{
Range range=ranges.get(i);
if
(range.getEnd()>line.length())
{
throw
new
ExecException(
"InputFormat:Error\n"
+
"field length more than total length"
);
}
tuple.set(i,
new
DataByteArray(range.getSubString(line)));
}
return
tuple;
}
catch
(InterruptedException e)
{
throw
new
ExecException();
}
}
}