Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

This is a design question regarding the implementation of a Pipeline. The following is my naive implementation.

Interface for individual steps/stages in the pipeline:

public interface Step<T, U> {
    public U execute(T input);
}

Concrete implementations of steps/stages in pipeline:

public class StepOne implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 100;
    }
}

public class StepTwo implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 500;
    }
}

public class StepThree implements Step<Integer, String> {
    @Override
    public String execute(Integer input) {
        return "The final amount is " + input;
    }
}

The pipeline class will hold/register the steps in the pipeline and execute them one after the other:

public class Pipeline {
    private List<Step> pipelineSteps = new ArrayList<>();
    private Object firstStepInput = 100;

    public void addStep(Step step) {
        pipelineSteps.add(step);
    }

    public void execute() {
        for (Step step : pipelineSteps) {
            Object out = step.execute(firstStepInput);
            firstStepInput = out;
        }
   }
}

Diver program to execute the pipeline:

public class Main {
    public static void main(String[] args) {
        Pipeline pipeline = new Pipeline();
        pipeline.addStep(new StepOne());
        pipeline.addStep(new StepTwo());
        pipeline.addStep(new StepThree());

        pipeline.execute();
    } 
}

However, as you can see the naive implementation has many limitations.

One of the major ones is that since the requirement is that the output of each step could be of any type, the naive implementation is not type-safe (the execute method in the Pipeline class). If I happen to wire the steps in the pipeline incorrectly, the app will fail.

Can anyone help me design the solution by adding to what I have coded, or point me towards an already existing pattern to solve this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
135 views
Welcome To Ask or Share your Answers For Others

1 Answer

why do you need an additional Pipeline class ? I think you can remove the middle man. this will make your api simpler, for example:

Step<Integer, String> source = Step.of(Object::toString);
Step<Integer, Integer> toHex = source.pipe(it -> Integer.parseInt(it, 16));

toHex.execute(11/*0x11*/);// return 17;

you can implement your pipeline pattern simply in as below :

interface Step<I, O> {

    O execute(I value);

    default <R> Step<I, R> pipe(Step<O, R> source) {
        return value -> source.execute(execute(value));
    }

    static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}

in prior java version you can use an abstract class instead:

abstract static class Step<I, O> {

    public abstract O execute(I value);

    public <R> Step<I, R> pipe(Step<O, R> source) {
        return new Step<I, R>() {
            @Override
            public R execute(I value) {
                return source.execute(Step.this.execute(value));
            }
        };
    }

    public static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...