rx java - rxJava. Help to understand how publish and unsubscribe work -
i observe strange behavior when using publish()
in conjunction observeon
, subscribeon
. please take @ folowing examples.
code:
connectableobservable<string> observable = observable.create( new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { int i=0; log.d("testtag:", "start call"); while (!subscriber.isunsubscribed()) { subscriber.onnext("item "+i); i++; } log.d("testtag:", "completed"); subscriber.oncompleted(); } } ).publish(); observable .take(10) .subscribe( new action1<string>() { @override public void call(string s) { log.d("testtag:", "item received 1 : " + string.valueof(s)); } }); observable.connect();
output:
start call item received 1 : item 0 item received 1 : item 1 item received 1 : item 2 item received 1 : item 3 item received 1 : item 4 item received 1 : item 5 item received 1 : item 6 item received 1 : item 7 item received 1 : item 8 item received 1 : item 9
condition !subscriber.isunsubscribed()
never happens. totally fine. strange thing happens when add observeon(schedulers.newthread())
, subscribeon(schedulers.newthread())
. take look:
connectableobservable<string> observable = observable.create( new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { int i=0; log.d("testtag:", "start call"); while (!subscriber.isunsubscribed()) { subscriber.onnext("item "+i); i++; } log.d("testtag:", "completed"); subscriber.oncompleted(); } } ) .observeon(schedulers.newthread()) .subscribeon(schedulers.newthread()) .publish(); observable .take(10) .subscribe( new action1<string>() { @override public void call(string s) { log.d("testtag:", "item received 1 : " + string.valueof(s)); } }); observable.connect();
output:
start call item received 1 : item 0 item received 1 : item 1 item received 1 : item 2 item received 1 : item 3 item received 1 : item 4 item received 1 : item 5 item received 1 : item 6 item received 1 : item 7 item received 1 : item 8 item received 1 : item 9 completed
please me understand why condition !subscriber.isunsubscribed()
become true
.
ps. understand if want check !subscriber.isunsubscribed()
should use .publish().refcount()
instead of connect()
. goal understand current behavior.
publish()
reacts 0 subscribers consuming source stream until completion (which never happens here). once took 10 sole subscribe(), source keep running forever or until call unsubscribe
on subscription
returned connect() call.
in second case, !subscriber.isunsubscribed() true because overflow observeon's or publish's internal queue , whole chain dies missingbackpressureexception
, triggers unsubscription reaches observable eventually.
what need share() instead of publish() source gets terminated if subscribers go away, note since observable doesn't respect backpressure, still prone missingbackpressureexception
.
generally, suggest looking @ standard operators , observable factories first instead of rolling observable, or looking @ abstractonsubscribe
, examples in javadoc see how implement backpressure-aware custom observable.
Comments
Post a Comment