How to monitor number of RXJS subscriptions?

You could achieve it using defer to track subscriptions and finalize to track completions, e.g. as an operator:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

// just a stub for `onCountUpdate`
function noop(){}

And then use it like:

const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

Heres a code snippet illustrating this:

const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>{
  source$.next('one');
}, 250);

setTimeout(()=>{
  source$.next('two');
}, 1000);

setTimeout(()=>{
  source$.next('three');
}, 1250);

setTimeout(()=>{
  source$.next('four');
}, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
  .subscribe(value => console.log('A', value));

setTimeout(()=>{
  result$.subscribe(value => console.log('B', value));
}, 500);


setTimeout(()=>{
  result$.subscribe(value => console.log('C', value));
}, 1000);

setTimeout(()=>{
  subscriptionA.unsubscribe();
}, 1500);


// complete source
setTimeout(()=>{
  source$.complete();
}, 2000);


function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

function noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

* NOTE: if your source$ is cold — you might need to share it.

Hope it helps

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)