Rx for .NET – What happened to Scheduler.Dispatcher?
The DispatcherScheduler has been moved to the System.Reactive.Windows.Threading assembly. If you are using NuGet, it’s in Rx-WPF
The DispatcherScheduler has been moved to the System.Reactive.Windows.Threading assembly. If you are using NuGet, it’s in Rx-WPF
In the world of functional reactive programming, a behavior is a value that changes over time. This is exactly what a BehaviorSubject represents: when you subscribe you get the current value, and then you can continue to observe the changes. See http://en.wikipedia.org/wiki/Functional_reactive_programming.
I’ve written a framework that represents my explorations in this question called ReactiveUI It implements both an Observable ICommand, as well as ViewModel objects who signal changes via an IObservable, as well as the ability to “assign” an IObservable to a property, who will then fire INotifyPropertyChange whenever its IObservable changes. It also encapsulates a … Read more
Ana Betts’ answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe(); Or: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();
I think a constructive discussion on this topic has been had on the Rx Forums in 2009. Instead of adding adhoc Do operators into your queries, you would add a custom Log/Trace operator. This operator would capture the Subscription, Disposal, OnNext, OnError and OnCompleted events. Depending on your implementation it could just write to the … Read more
You have probably not added the necessary assembly references for Rx to your project. (Referencing an assembly is not the same thing as importing a namespace! You already know what a namespace is; an assembly is something similar to a JAR; the smallest unit of code deployment/distribution. Your project must reference it before the namespaces … Read more
Please in a normal and human way explain to me the difference OK, let’s make some toast. That’s the most normal human thing I do. Pull based: // I want some toast. I will pull it out of the toaster when it is done. // I will do nothing until the toast is available. Toast … Read more
The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends. If the observable completes – with either OnCompleted or OnError – then the subscription is already disposed for you. Try this code: var xs = Observable.Create<int>(o => { var d … Read more
Here is a method that is similar to Dave’s but uses Sample instead (which is more appropriate than buffer). I’ve included a similar extension method to the one I added to Dave’s answer. The extension: public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) { var sampler = new Subject<Unit>(); var sub = source. Sample(sampler). ObserveOn(Scheduler.ThreadPool). … Read more
The namespace System.Reactive.Linq contains the static class Observable which defines all the extension methods for common reactive combinators. It resides in System.Reactive.dll The extension methods for IObservable<T>.Subscribe such as Subscribe(onNext), Subscribe(onNext, onError) are however defined in mscorlib in the static class System.ObservableExtensions. tl;dr: For Rx/Observable extension methods you need to import System.Reactive.Linq = using System.Reactive.Linq; … Read more