RxJS表现不正常

我正在使用RxJS 6开发Angular 8应用程序。我以为我在RxJS方面做得很好,但是今天遇到了一些使我觉得对它的工作原理有基本误解的东西。

我有以下订阅:

this.nudgePauseConnectivity$.pipe(
    tap(() => console.log(1)),
    withLatestFrom(this.hotspotPausedState$()),
    tap(() => console.log(2)),
    filter(([nudgePause, connectionPaused]) => connectionPaused === enums.HotspotPausedStateEnum.FALSE),
    tap(() => console.log(3)),
    switchMapTo(this._store.pipe(select(this._hotspotSelectors.selectUsername))),
    tap(() => console.log(4)),
    switchMap(username =>
        this._cabinApiV2Service.subscriberService.getRemainingConnectivityAllowance(username)
    ),
    tap(() => console.log(5)),
    map(remainingConnectivityAllowance => SubscriberConnectivityAllowanceTransformer.transform(
        remainingConnectivityAllowance
    ))
).subscribe(() => {/* Do something */});

nudgePauseConnectivity$ is a subject I use to cause the body of the observable to execute, like this:

pauseConnectivity() {
    this.nudgePauseConnectivity$.next({});
}

this._hotspotSelectors.selectUsername is an NgRx selector, which if you haven't used NgRx in an observable that returns values from a store.

My problem is this: I thought that the expression would execute once for each value emitted from this.nudgePauseConnectivity$. I would expect to see in the console:

1个      2      3      4      5

However, it seems that even if this.nudgePauseConnectivity$ does not emit a value, the execution of the expression begins if this._store.pipe(select(this._hotspotSelectors.selectUsername)) emits a value, so I am seeing this sort of output in the console:

1个      2      3      4      5      4      5

I am thoroughly confused by this! I do not want the expression to ever begin executing in the middle: I would like the expression to execute only if this.nudgePauseConnectivity$ emits a value, irrespective of the values of any expressions in the switchMaps or other high order observable operators later in the chain.

我该如何实现?

评论
  • 夜伴歌笙
    夜伴歌笙 回复

    在第一次发射后,switchMap不会取消订阅。你应该那样做。我们通常会忘记这一点,因为大多数时候我们都在处理1个可观察到的发射,例如http客户端。当您的应用程序处于运行状态时,流将一直存在,而不是:

    switchMapTo(this._store.pipe(select(this._hotspotSelectors.selectUsername))),
    

    您可以这样做(不无限期中断所有链接):

    switchMap(() => this._store.pipe(
     select(this._hotspotSelectors.selectUsername), 
     take(1)
    )
    

    Or just put the take(1) in the chain after switchMapTo if you want to interrupt all the emissions after the first one:

    ...
    switchMapTo(this._store.pipe(select(this._hotspotSelectors.selectUsername))),
    take(1),
    ...
    

    值得一看的是代码的其他部分,以了解您是否正确地取消了对正在使用的所有可观察对象(包括角度形式的对象)的订阅。