rx java - rxJava. Help to understand how publish and unsubscribe work -


i observe strange behavior when using publish() in conjunction observeon , subscribeon. please take @ folowing examples.

code:

connectableobservable<string> observable = observable.create(     new observable.onsubscribe<string>() {         @override         public void call(subscriber<? super string> subscriber) {             int i=0;             log.d("testtag:", "start call");             while (!subscriber.isunsubscribed()) {                 subscriber.onnext("item "+i);                 i++;             }             log.d("testtag:", "completed");             subscriber.oncompleted();         }     } ).publish();  observable     .take(10)     .subscribe(         new action1<string>() {             @override             public void call(string s) {                 log.d("testtag:", "item received 1 : " + string.valueof(s));             }         });  observable.connect(); 

output:

start call item received 1 : item 0 item received 1 : item 1 item received 1 : item 2 item received 1 : item 3 item received 1 : item 4 item received 1 : item 5 item received 1 : item 6 item received 1 : item 7 item received 1 : item 8 item received 1 : item 9 

condition !subscriber.isunsubscribed() never happens. totally fine. strange thing happens when add observeon(schedulers.newthread()) , subscribeon(schedulers.newthread()). take look:

connectableobservable<string> observable = observable.create(     new observable.onsubscribe<string>() {         @override         public void call(subscriber<? super string> subscriber) {             int i=0;             log.d("testtag:", "start call");             while (!subscriber.isunsubscribed()) {                 subscriber.onnext("item "+i);                 i++;             }             log.d("testtag:", "completed");             subscriber.oncompleted();         }     } ) .observeon(schedulers.newthread()) .subscribeon(schedulers.newthread()) .publish();  observable     .take(10)     .subscribe(         new action1<string>() {             @override             public void call(string s) {                 log.d("testtag:", "item received 1 : " + string.valueof(s));             }         });  observable.connect(); 

output:

start call item received 1 : item 0 item received 1 : item 1 item received 1 : item 2 item received 1 : item 3 item received 1 : item 4 item received 1 : item 5 item received 1 : item 6 item received 1 : item 7 item received 1 : item 8 item received 1 : item 9 completed 

please me understand why condition !subscriber.isunsubscribed() become true.

ps. understand if want check !subscriber.isunsubscribed() should use .publish().refcount() instead of connect(). goal understand current behavior.

publish() reacts 0 subscribers consuming source stream until completion (which never happens here). once took 10 sole subscribe(), source keep running forever or until call unsubscribe on subscription returned connect() call.

in second case, !subscriber.isunsubscribed() true because overflow observeon's or publish's internal queue , whole chain dies missingbackpressureexception, triggers unsubscription reaches observable eventually.

what need share() instead of publish() source gets terminated if subscribers go away, note since observable doesn't respect backpressure, still prone missingbackpressureexception.

generally, suggest looking @ standard operators , observable factories first instead of rolling observable, or looking @ abstractonsubscribe , examples in javadoc see how implement backpressure-aware custom observable.


Comments

Popular posts from this blog

c++ - Difference between pre and post decrement in recursive function argument -

php - Nothing but 'run(); ' when browsing to my local project, how do I fix this? -

php - How can I echo out this array? -