Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure anymore...

I'm aware that we have Observable that does not have backpressure support and Flowable that has it.

So based on example, lets say I have flowable with interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.

But then we have the same with Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This will not crash at all, even when I put some delay on consuming it still works. To make Flowable work lets say I put onBackpressureDrop operator, crash is gone but not all values are emitted either.

So the base question I can not find answer currently in my head is why should I care about backpressure when I can use plain Observable still receive all values without managing the buffer? Or maybe from the other side, what advantages do backpressure give me in favour of managing and handling the consuming?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
263 views
Welcome To Ask or Share your Answers For Others

1 Answer

What backpressure manifests in practice is bounded buffers, Flowable.observeOn has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn has an unbounded buffer that keeps collecting the elements and your app may run out of memory.

You may use Observable for example:

  • handling GUI events
  • working with short sequences (less than 1000 elements total)

You may use Flowable for example:

  • cold and non-timed sources
  • generator like sources
  • network and database accessors

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...