java - Infinite observable from another observable -
i have observable represents sequence selected db table, finite.
observable<item> selectresults() { ... }
i implement pulling specified interval, @ end end observable wrap original 1 , pull indefinitely.
i don't know how :(
ok, idea do, modeled around interval observable, needs error handling , unsubscribe logic.
public class onsubscribeperiodicobservable implements onsubscribe<item> { ... @override public void call(final subscriber<? super item> subscriber) { final worker worker = scheduler.createworker(); subscriber.add( worker ); worker.scheduleperiodically(new action0() { @override public void call() { selectresults().subscribe( new observer<item>() { @override public void oncompleted() { //continue } @override public void onerror(throwable e) { subscriber.onerror( e ); } @override public void onnext(item t) { subscriber.onnext( t ); } }); } }, initialdelay, period, unit); }
you can accomplish standard operators give error propagation, unsubscription , backpressure cheap:
observable<integer> databasequery = observable .just(1, 2, 3, 4) .delay(500, timeunit.milliseconds); observable<integer> result = observable .timer(1, 2, timeunit.seconds) .onbackpressuredrop() .concatmap(t -> databasequery); result.subscribe(system.out::println); thread.sleep(10000);
Comments
Post a Comment