Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have an async chain in my java code that i want to stop after a certain timeout so i created a threadPool with some threads and called the CompletableFuture like this

ExecutorService pool = Executors.newFixedThreadPool(10);

than i have a cyclic method that loads data from the db and executes some task on it, once all the CompletableFutures are completed its doing it again

CompletableFuture<MyObject> futureTask =
                CompletableFuture.supplyAsync(() -> candidate, pool)
                .thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
                .thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
                .thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
                .exceptionally(ExceptionHandlerService::handle);

My problem is in task6, that has a very intensive task (its a network connection task that sometimes hangs forever) i noticed that my orTimeout is being fired correctly after 30 seconds, but the thread running Task6 is still being running

after few cycles like this, all my threads are drained and my app dies

How can i cancel the running threads on the pool after the timeout has reached? (without calling pool.shutdown())

UPDATE* inside the main thread i did a simple check as shown here

for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
                unfinishedTasks = handleFutureTasks(unfinishedTasks, totalBatchSize);
                if(unfinishedTasks.isEmpty()) {
                    break;
                }
                if(i==0) {
                    //handle cancelation of the tasks
                    for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
                        **task.cancel(true);**
                        log.error("Reached timeout on task, is canceled: {}", task.isCancelled());
                    }
                    break;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ex) {
                }
            }

What i see is that after few cycles, all the tasks complain about timeout... in the first 1-2 cycles, i still get epected responses (while there are threads to process it)

i still feel that the thread pool is exhausted

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
2.3k views
Welcome To Ask or Share your Answers For Others

1 Answer

I know you said without calling pool.shutDown, but there is simply no other way. When you look at your stages though, they will run in either the thread that "appends" them (adding those thenApply) or a thread from that pool that you define. May be an example should make more sense.

public class SO64743332 {

    static ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);

        //simulateWork(4);

        CompletableFuture<String> f2 = f1.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationOne(x);
        });

        CompletableFuture<String> f3 = f2.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationTwo(x);
        });

        f3.join();
    }

    private static String dbCall() {
        simulateWork(2);
        return "a";
    }

    private static String transformationOne(String input) {
        return input + "b";
    }

    private static String transformationTwo(String input) {
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

They key point of the above code is this : simulateWork(4);. Run the code with it commented out and then uncomment it. See what thread is actually going to execute all those thenApply. It is either main or the same thread from the pool, meaning although you have a pool defined - it's only a single thread from that pool that will execute all those stages.

In this context, you could define a single thread executor (inside a method let's say) that will run all those stages. This way you could control when to call shutDownNow and potentially interrupt (if your code responds to interrupts) the running task. Here is a made-up example that simulates that:

public class SO64743332 {

    public static void main(String[] args) {
        execute();
    }


    public static void execute() {

        ExecutorService pool = Executors.newSingleThreadExecutor();

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);
        CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));

        // give enough time for transformationOne to start, but not finish
        simulateWork(2);

        try {
            CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
                                               .orTimeout(4, TimeUnit.SECONDS);
            cf3.get(10, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            pool.shutdownNow();
        }

    }

    private static String dbCall() {
        System.out.println("Started DB call");
        simulateWork(1);
        System.out.println("Done with DB call");
        return "a";
    }

    private static String transformationOne(String input) {
        System.out.println("Started work");
        simulateWork(10);
        System.out.println("Done work");
        return input + "b";
    }

    private static String transformationTwo(String input) {
        System.out.println("Started transformation two");
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

Running this you should notice that transformationOne starts, but it is interrupted because of the shutDownNow.

The drawback of this should be obvious, every invocation of execute will create a new thread pool...


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...