java - Infinite observable from another observable -


i have observable represents sequence selected db table, finite.

observable<item> selectresults() { ... } 

i implement pulling specified interval, @ end end observable wrap original 1 , pull indefinitely.

i don't know how :(


ok, idea do, modeled around interval observable, needs error handling , unsubscribe logic.

public class onsubscribeperiodicobservable implements onsubscribe<item> { ...    @override   public void call(final subscriber<? super item> subscriber) {       final worker worker = scheduler.createworker();       subscriber.add( worker );        worker.scheduleperiodically(new action0() {           @override           public void call() {     selectresults().subscribe( new observer<item>() {             @override             public void oncompleted() {               //continue             }              @override             public void onerror(throwable e) {               subscriber.onerror( e );             }              @override             public void onnext(item t) {               subscriber.onnext( t );             }          });            }        }, initialdelay, period, unit); } 

you can accomplish standard operators give error propagation, unsubscription , backpressure cheap:

observable<integer> databasequery = observable     .just(1, 2, 3, 4)     .delay(500, timeunit.milliseconds);  observable<integer> result = observable         .timer(1, 2, timeunit.seconds)         .onbackpressuredrop()         .concatmap(t -> databasequery);  result.subscribe(system.out::println);  thread.sleep(10000); 

Comments

Popular posts from this blog

c++ - Difference between pre and post decrement in recursive function argument -

php - Nothing but 'run(); ' when browsing to my local project, how do I fix this? -

php - How can I echo out this array? -