自定义hadoop map/reduce输入文件切割InputFormat



 那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。 hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。 但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。 又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相对较为复杂了,但并不是不能实现。 1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符。 1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下: 

public class FileInputFormatB extends FileInputFormat {


public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) {

 return new SearchRecordReader("\b"); } @Override protected boolean isSplitable(FileSystem fs, Path filename) { // 输入文件不分片 return false; }



public class IsearchRecordReader extends RecordReader {
private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
private byte[] separator = {'b'};
private int sepLength = 1;

‍ public IsearchRecordReader(){
public IsearchRecordReader(String seps){
this.separator = seps.getBytes();
sepLength = separator.length;

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

this.start = split.getStart();
this.end = (this.start + split.getLength());
Path file = split.getPath();
this.compressionCodecs = new CompressionCodecFactory(job);
CompressionCodec codec = this.compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
this.in = new LineReader(codec.createInputStream(fileIn), job);
this.end = Long.MAX_VALUE;
} else {
if (this.start != 0L) {

skipFirstLine = true; this.start -= sepLength; fileIn.seek(this.start);

this.in = new LineReader(fileIn, job);
if (skipFirstLine) { // skip first line and re-establish "start".
int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));

if(newSize > 0){

start += newSize;


this.pos = this.start;

public boolean nextKeyValue() throws IOException {
if (this.key == null) {
this.key = new LongWritable();
if (this.value == null) {
this.value = new Text();
int newSize = 0;
while (this.pos < this.end) {
newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

if (newSize == 0) {


this.pos += newSize;
if (newSize < this.maxLineLength) {



LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));

if (newSize == 0) {
this.key = null;
this.value = null;
return false;
return true;

public LongWritable getCurrentKey() {
return this.key;

public Text getCurrentValue() {
return this.value;

public float getProgress() {
if (this.start == this.end) {
return 0.0F;
return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));

public synchronized void close() throws IOException {
if (this.in != null)



public class LineReader {
//private static final byte CR = 13;
//private static final byte LF = 10;

private static final int DEFAULT_BUFFER_SIZE = 32 1024 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;

LineReader(InputStream in, int bufferSize) {
this.bufferLength = 0;

this.bufferPosn = 0; 

this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];

public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));

public void close() throws IOException {

public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);

public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);


public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;

do {

//已经读到buffer的末尾了,读下一个buffer if (this.bufferPosn >= this.bufferLength) { bufferPosn = 0; bufferLength = in.read(buffer); //读到文件末尾了,则跳出,进行下一个文件的读取 if (bufferLength <= 0) { break; } } int startPosn = this.bufferPosn; for (; bufferPosn < bufferLength; bufferPosn ++) { //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题) if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){ sepPosn = 0; } //遇到行分隔符的第一个字符 if (buffer[bufferPosn] == separator[sepPosn]) { bufferPosn ++; int i = 0; //判断接下来的字符是否也是行分隔符中的字符 for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){ //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半 if(bufferPosn + i >= bufferLength){ bufferPosn += i - 1; break; } //一旦其中有一个字符不相同,就判定为不是分隔符 if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){ sepPosn = 0; break; } } //的确遇到了行分隔符 if(sepPosn == sepLength){ bufferPosn += i; newline = true; sepPosn = 0; break; } } } int readLength = this.bufferPosn - startPosn; bytesConsumed += readLength; //行分隔符不放入块中 //int appendLength = readLength - newlineLength; if (readLength > maxLineLength - txtLength) { readLength = maxLineLength - txtLength; } if (readLength > 0) { record.append(this.buffer, startPosn, readLength); txtLength += readLength; //去掉记录的分隔符 if(newline){ str.set(record.getBytes(), 0, record.getLength() - sepLength); } } 

} while (!newline && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) {

throw new IOException("Too many bytes before newline: " + bytesConsumed);


return (int) bytesConsumed;



public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{

str.clear(); int txtLength = 0; int newlineLength = 0; boolean prevCharCR = false; long bytesConsumed = 0L; do { int startPosn = this.bufferPosn; if (this.bufferPosn >= this.bufferLength) { startPosn = this.bufferPosn = 0; if (prevCharCR) bytesConsumed ++; this.bufferLength = this.in.read(this.buffer); if (this.bufferLength <= 0) break; } for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) { if (this.buffer[this.bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; this.bufferPosn ++; break; } if (prevCharCR) { newlineLength = 1; break; } prevCharCR = this.buffer[this.bufferPosn] == CR; } int readLength = this.bufferPosn - startPosn; if ((prevCharCR) && (newlineLength == 0)) --readLength; bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; } } while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume)); if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed); return (int)bytesConsumed;














