Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

Can someone please point out where I could find an implementation for CombineFileInputFormat (org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.

It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.

Edit: I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
163 views
Welcome To Ask or Share your Answers For Others

1 Answer

Here is an implementation I have for you:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

        return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
    }

    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
        private final LineRecordReader linerecord;

        public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
            FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
            linerecord = new LineRecordReader(conf, filesplit);
        }

        @Override
        public void close() throws IOException {
            linerecord.close();

        }

        @Override
        public LongWritable createKey() {
            // TODO Auto-generated method stub
            return linerecord.createKey();
        }

        @Override
        public Text createValue() {
            // TODO Auto-generated method stub
            return linerecord.createValue();
        }

        @Override
        public long getPos() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getPos();
        }

        @Override
        public float getProgress() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getProgress();
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {

            // TODO Auto-generated method stub
            return linerecord.next(key, value);
        }

    }
}

In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
            if (argument != null) {
                conf.set("mapred.max.split.size", argument);
            } else {
                conf.set("mapred.max.split.size", "134217728"); // 128 MB
            }
...

            conf.setInputFormat(CombinedInputFormat.class);
...

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...