How to reset a BehaviorSubject

I assume you want to clear the BehaviorSubject (because otherwise don’t call onComplete on it). That is not supported but you can achieve a similar effect by having a current value that is ignored by consumers: public static final Object EMPTY = new Object(); BehaviorSubject<Object> subject = BehaviorSubject.createDefault(EMPTY); Observable<YourType> obs = subject.filter(v -> v != … Read more

Spring boot Webclient’s retrieve vs exchange

Adding to @JArgente’s answer. According to the official documentation of the retrieve() method: Perform the HTTP request and retrieve the response body. … This method is a shortcut to using exchange() and decoding the response body through ClientResponse. and the exchange() method Perform the HTTP request and return a ClientResponse with the response status and … Read more

Split Rx Observable into multiple streams and process individually

Easy as pie, just use filter An example in scala import rx.lang.scala.Observable val o: Observable[String] = Observable.just(“a”, “b”, “c”, “a”, “b”, “b”, “b”, “a”) val hotO: Observable[String] = o.share val aSource: Observable[String] = hotO.filter(x ⇒ x == “a”) val bSource: Observable[String] = hotO.filter(x ⇒ x == “b”) val cSource: Observable[String] = hotO.filter(x ⇒ x == … Read more

RxJS takeWhile but include the last value

Since RxJS 6.4.0 this is now possible with takeWhile(predicate, true). There’s already an opened PR that adds an optional inclusive parameter to takeWhile: https://github.com/ReactiveX/rxjs/pull/4115 There’re at least two possible workarounds: using concatMap(): of(‘red’, ‘blue’, ‘green’, ‘orange’).pipe( concatMap(color => { if (color === ‘green’) { return of(color, null); } return of(color); }), takeWhile(color => color), ) … Read more

EventBus/PubSub vs (reactive extensions) RX with respect to code clarity in a single threaded application

The following is what I see as benefits of using reactive event streams in a single-threaded synchronous application. 1. More declarative, less side-effects and less mutable state. Event streams are capable of encapsulating logic and state, potentially leaving your code without side-effects and mutable variables. Consider an application that counts button clicks and displays the … Read more

How to create an Observable from OnClick Event Android?

You would do something like this: Observable<View> clickEventObservable = Observable.create(new Observable.OnSubscribe<View>() { @Override public void call(final Subscriber<? super View> subscriber) { viewIWantToMonitorForClickEvents.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { if (subscriber.isUnsubscribed()) return; subscriber.onNext(v); } }); } }); // You can then apply all sorts of operation here Subscription subscription = clickEventObservable.flatMap(/* */); // Unsubscribe … Read more

How to properly handle onError inside RxJava (Android)?

.doOnError() is an operator, and is not as such a part of the Subscriber. Therefore, having a .doOnError() does not count as an implemented onError(). About the question in one of the comments, of course it is possible to use lambdas. In this case simply replace .doOnError(throwable -> L.e(TAG, “Throwable ” + throwable.getMessage())) .subscribe(s -> … Read more

IConnectableObservables in Rx

Short answer: IConnectableObservable represents a pending hot observable that can be shared with multiple subscribers. Calling IConnectableObservable.Connect() causes the change to hot (subscribes to the cold source observable) Long answer: A cold observable (like Observable.Range) replays the sequence for each subscriber. It’s analagous to a stopwatch, where every subscriber is given their own stopwatch. The … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)