Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

My MapReduce job processes data by dates and needs to write output to a certain folder structure. Current expectation is to generate out put in following structure:

2013
    01
    02
    ..

2012
    01
    02
    ..

etc.

At any time, I get only upto 12 months of data, So, I am using MultipleOutputs class to create 12 outputs using the following function in the driver:

public void createOutputs(){
    Calendar c = Calendar.getInstance();
    String monthStr, pathStr;

    // Create multiple outputs for last 12 months
    // TODO make 12 configurable
    for(int i = 0; i < 12; ++i ){
        //Get month and add 1 as month is 0 based index
        int month = c.get(Calendar.MONTH)+1; 
        //Add leading 0
        monthStr = month > 10 ? "" + month : "0" + month ;  
        // Generate path string in the format 2013/03/etl
        pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
        // Add the named output
        MultipleOutputs.addNamedOutput(config, pathStr );  
        // Move to previous month
        c.add(Calendar.MONTH, -1); 
    }
}

In the reducer, I added a cleanup function to move the generated output to appropriate directories.

protected void cleanup(Context context) throws IOException, InterruptedException {
        // Custom function to recursively process data
        moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}

Problem: cleanup function of the reducer is getting executed before the output is moved from _temporary directory to the output directory. And due to this, the above function doesn't see any output at the time of execution since all the data is still in _temporary directory.

What is the best way for me to achieve the desired functionality? Appreciate any insights.

Thinking of the following:

  • Is there a way to use custom outputcommitter?
  • Is it better to chain another job or is it an overkill for this?
  • Is there a simpler alternative that I am just not aware of..

Here is the sample log of file structure from cleanup function:

MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
209 views
Welcome To Ask or Share your Answers For Others

1 Answer

You should not need a second job. I am currently using MultipleOutputs to create a ton of output Directories in one of my programs. Despite there being upwards of 30 directories I am able to use only a couple of MultipleOutputs objects. This is because you can set output directory when you write, so it can be determined only when needed. You only actually need more than one namedOutput if you want to output in different formats (ex. one with key: Text.class, value: Text.class and one with key: Text.class and Value: IntWritable.class)

setup:

MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);

setup of reducer:

mout = new MultipleOutputs<Text, Text>(context);

calling mout in reducer:

String key; //set to whatever output key will be
String value; //set to whatever output value will be
String outputFileName; //set to absolute path to file where this should write

mout.write("Output",new Text(key),new Text(value),outputFileName);

you can have a piece of code determine the directory while coding. For example say you want to specify directory by month and year:

int year;//extract year from data
int month;//extract month from data
String baseFileName; //parent directory to all outputs from this job
String outputFileName = baseFileName + "/" + year + "/" + month;

mout.write("Output",new Text(key),new Text(value),outputFileName);

Hope this helps.

EDIT: output file structure for above example:

Base
    2013
        01
        02
        03
        ...
    2012
        01
        ...
    ...

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...