java - 如何处理RxJava中观察者的onNext引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

它输出1到5之间的数字,然后打印异常。
我要实现的是让观察者保持订阅状态,并在抛出异常后继续运行,即打印1到10之间的所有数字。
我尝试过使用retry()other various error handling operators,但是,正如文档中所述,它们的目的是处理由可观察到的本身所发出的错误。
最直接的解决方案是将onNext的整个主体封装到一个try-catch块中,但这听起来并不是一个好的解决方案。在similar Rx.NET question中,建议的解决方案是生成一个扩展方法,通过创建一个可观察的代理来进行包装。我试着重新制作它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

这不会改变任何东西,因为rxjava本身将订户包装成一个SafeSubscriber。使用unsafeSubscribe来绕过它似乎也不是一个好的解决方案。
我能做些什么来解决这个问题?


最佳答案:

这是学习Rx时出现的一个常见问题。
DR
建议将异常处理逻辑放到订阅服务器中,而不是创建通用的可观察包装器。
解释
记住,RX是向订户推送事件。
从Observable界面上,很明显,除了处理一个事件花费了多长时间,或者抛出异常中包含的信息之外,Observable对它的订户没有真正的了解。
处理订阅服务器异常并继续向该订阅服务器发送事件的通用包装器是一个坏主意。
为什么?好吧,观察员应该只知道订阅者现在处于未知的失败状态。在这种情况下继续发送事件是不明智的——例如,订阅服务器处于这样一种情况,从这一点向前发送的每个事件都将引发异常并需要一段时间来完成它。
一旦订阅者抛出了一个异常,对于可观测的数据,只有两个可行的操作过程:
重新引发异常
实现一般性处理以记录失败并停止发送IT事件(任何类型),并清除由于该订阅服务器而导致的任何资源,并继续进行任何剩余订阅。
用户异常的具体处理将是一个糟糕的设计选择;它将在用户和可观察的之间产生不适当的行为耦合。因此,如果你想对糟糕的用户有弹性,上面的两个选择实际上是可观察对象本身责任的合理限度。
如果您希望您的订阅服务器具有弹性并继续运行,那么您应该完全将其包装在异常处理逻辑中,该逻辑设计用于处理您知道如何从中恢复的特定异常(并且可能处理瞬时异常、日志记录、重试逻辑、电路断开等)。
只有订阅服务器本身才有上下文来了解它是否适合在出现故障时接收进一步的事件。
如果您的情况需要开发可重用的错误处理逻辑,那么请将自己置于包装观察者的事件处理程序而不是可观察事件的心态中,并注意不要在出现故障时盲目地继续传输事件。Release It!虽然不是关于RX的,但这是一个有趣的软件工程经典,在这最后一点上有很多话要说。如果你没有读过,我强烈建议你读。