How to stop and resume Observable.interval emiting ticks

Here’s one possible solution: class TickHandler { private AtomicLong lastTick = new AtomicLong(0L); private Subscription subscription; void resume() { System.out.println(“resumed”); subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io()) .map(tick -> lastTick.getAndIncrement()) .subscribe(tick -> System.out.println(“tick = ” + tick)); } void stop() { if (subscription != null && !subscription.isUnsubscribed()) { System.out.println(“stopped”); subscription.unsubscribe(); } } }

Deliver the first item immediately, ‘debounce’ following items

Update: From @lopar’s comments a better way would be: Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS))) Would something like this work: String[] items = {“one”, “two”, “three”, “four”, “five”}; Observable<String> myObservable = Observable.from(items); Observable.concat( myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS) ).subscribe(s -> System.out.println(s));

rxjava merge observables of different type

It’s hard to say without knowing exactly what you need, but possibly zip() or combineLatest(). zip will take both Observable<Milk> and Observable<Cereals> and let you combine them into CerealsWithMilk via a provided function. This emits a new CerealsWithMilk each time you get get both a Milk and a Cereals. combineLatest is similar to zip except … Read more

How to ignore error and continue infinite stream?

You may want to use one of the error handling operators. onErrorResumeNext( ) — instructs an Observable to emit a sequence of items if it encounters an error onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an … Read more

How to use CompositeDisposable of RxJava 2?

private final CompositeDisposable disposables = new CompositeDisposable(); // adding an Observable to the disposable disposables.add(sampleObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() { @Override public void onComplete() { } @Override public void onError(Throwable e) { } @Override public void onNext(String value) { } })); static Observable<String> sampleObservable() { return Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends … Read more

Retrofit with Rxjava Schedulers.newThread() vs Schedulers.io()

You are correct that the benefit of using Schedulers.io() lies in the fact that it uses a thread pool, whereas Schedulers.newThread() does not. The primary reason that you should consider using thread pools is that they maintain a number of pre-created threads that are idle and waiting for work. This means that when you have … Read more

Rxandroid What’s the difference between SubscribeOn and ObserveOn

SubscribeOn specify the Scheduler on which an Observable will operate. ObserveOn specify the Scheduler on which an observer will observe this Observable. So basically SubscribeOn is mostly subscribed (executed) on a background thread ( you do not want to block the UI thread while waiting for the observable) and also in ObserveOn you want to … Read more

Combine a list of Observables and wait until all completed

You can use flatMap in case you have dynamic tasks composition. Something like this: public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)