Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a file with millions of lines in it that I need to process. Each line of the file will result in an HTTP call. I'm trying to figure out the best way to attack the problem.

I obviously could just read the file and make the calls sequentially, but it would be incredibly slow. I'd like to parallelize the calls, but I'm not sure if I should read the entire file into memory (something I'm not a huge fan of) or try to parallelize the reading of the file as well (which I'm not sure would make sense).

Just looking for some thoughts here on the best way to attack the problem. If there is an existing framework or library that does something similar I'm happy to use that as well.

Thanks.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
457 views
Welcome To Ask or Share your Answers For Others

1 Answer

I'd like to parallelize the calls, but I'm not sure if I should read the entire file into memory

You should used an ExecutorService with a bounded BlockingQueue. As you read in your million lines you submit jobs to the thread-pool until the BlockingQueue is full. This way you will be able to run 100 (or whatever number is optimal) of HTTP requests simultaneously without having to read all of the lines of the file beforehand.

You'll need to set up a RejectedExecutionHandler that blocks if the queue is full. This is better than a caller runs handler.

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
// NOTE: you want the min and max thread numbers here to be the same value
ThreadPoolExecutor threadPool =
    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue);
// we need our RejectedExecutionHandler to block if the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
       @Override
       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
           try {
                // this will block the producer until there's room in the queue
                executor.getQueue().put(r);
           } catch (InterruptedException e) {
                throw new RejectedExecutionException(
                   "Unexpected InterruptedException", e);
           }
    }
});

// now read in the urls
while ((String url = urlReader.readLine()) != null) {
    // submit them to the thread-pool.  this may block.
    threadPool.submit(new DownloadUrlRunnable(url));
}
// after we submit we have to shutdown the pool
threadPool.shutdown();
// wait for them to complete
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

...
private class DownloadUrlRunnable implements Runnable {
    private final String url;
    public DownloadUrlRunnable(String url) {
       this.url = url;
    }
    public void run() {
       // download the URL
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...