Separate observable values by specific amount of time in RxJS

For your specific example, the idea is to map each value from the array to an observable that will yield its result after a delay, then concatenate the resulting stream of observables: var delayedStream = Rx.Observable .fromArray([1, 2, 3, 4, 5]) .map(function (value) { return Rx.Observable.return(value).delay(2000); }) .concatAll(); Other examples might indeed make use of … Read more

How to throttle event stream using RX?

Okay, you have 3 scenarios here: 1) I would like to get one value of the event stream every second. means: that if it produces more events per second, you will get a always bigger buffer. observableStream.Throttle(timeSpan) 2) I would like to get the latest event, that was produced before the second happens means: other … Read more

Implementing IObservable from scratch

The official documentation deprecates users implementing IObservable themselves. Instead, users are expected to use the factory method Observable.Create When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create It happens that Observable.Create is a trivial wrapper around Reactive’s internal class AnonymousObservable: public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) { if (subscribe … Read more

C# 5.0 async/await feature and Rx – Reactive Extensions

Check also: TPL Dataflow Overview about TDF and Rx: Astute readers may notice some similarities between TPL Dataflow and Reactive Extensions (Rx), currently available as a download from the DevLabs prototypes site. Rx is predominantly focused on coordination and composition of event streams with a LINQ-based API, providing a rich set of combinators for manipulating … Read more

Rx: How can I respond immediately, and throttle subsequent requests

Here’s my approach. It’s similar to others that have gone before, but it doesn’t suffer the over-zealous window production problem. The desired function works a lot like Observable.Throttle but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a … Read more

EventBus/PubSub vs (reactive extensions) RX with respect to code clarity in a single threaded application

The following is what I see as benefits of using reactive event streams in a single-threaded synchronous application. 1. More declarative, less side-effects and less mutable state. Event streams are capable of encapsulating logic and state, potentially leaving your code without side-effects and mutable variables. Consider an application that counts button clicks and displays the … Read more

Rx – can/should I replace .NET events with Observables?

For #2, the most straightforward way is via a Subject: Subject<int> _Progress; IObservable<int> Progress { get { return _Progress; } } private void setProgress(int new_value) { _Progress.OnNext(new_value); } private void wereDoneWithWorking() { _Progress.OnCompleted(); } private void somethingBadHappened(Exception ex) { _Progress.OnError(ex); } With this, now your “Progress” can not only notify when the progress has changed, … Read more

IConnectableObservables in Rx

Short answer: IConnectableObservable represents a pending hot observable that can be shared with multiple subscribers. Calling IConnectableObservable.Connect() causes the change to hot (subscribes to the cold source observable) Long answer: A cold observable (like Observable.Range) replays the sequence for each subscriber. It’s analagous to a stopwatch, where every subscriber is given their own stopwatch. The … Read more

RxJava and parallel execution of observer code

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing. A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)