Producer并不是一层不变
前面说过Subscriber可以通过setProducer设置Producer,而且这个方法也是支持并发调用的,意味着我们的Producer是可能不断变化的————换个说法————我们的数据源可能会改变。关于Producer的逻辑看这个RxJava中的 Producer。但是那篇文章没有提到的是,如果我们的Producer在中途改变了,会发生什么情况。举个例子(这个例子实际来自RxJava开发者的一篇进阶博文)。这个系列的文章真的非常不错,能够看到很多我们不会注意到的问题,也可以窥见早期RxJava实现上的一些影子,对于进一步理解Rx帮助很大。
给定两个Observable,希望观察第一个Observable,当一个Observable结束之后,观察第二个Observable,第二个Observable结束了,那么才会结束,自定义了一个TheObserve操作符:
public static final class ThenObserve<T> implements Observable.Operator<T, T> {
final Observable<? extends T> other;
public ThenObserve(Observable<? extends T> other) {
this.other = other;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
Subscriber<T> parent = new Subscriber<T>(child, false) {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
other.unsafeSubscribe(child);
}
};
child.add(parent);
return parent;
}
}
Observable<Integer> source = Observable
.range(1, 10)
.lift(new ThenObserve<>(Observable.range(11, 90)));
TestSubscriber<Integer> ts = new TestSubscriber<>();
ts.requestMore(20);
source.subscribe(ts);
ts.getOnNextEvents().forEach(System.out::println);
结果输出了1到30一共30个数字,并不像预想的先输出1-10 然后输出剩下的11-20 一共20个数字。
我们知道Subscriber可以向上游请求数据,如果没有设置Producer,内部有个requested计数器会将这个请求先保存起来,待到调用了setProducer的时候会把请求传递到上游。
而问题在于这个计数器只负责累计计数,并不会在请求已经到达的时候,减去已经完成的请求。那么在这个例子导致的问题就是第一个range(1,10)发出了10个数后,紧接着数据源变成了range(11,90),此时这个range(11,90)依然得到这个 数值为20的requested计数器 ,所以依然发射了 20个数字,所以最后导致一共产生了30个数据。那么应对这种数据源发生变化的场景我们需要用到ProducerArbiter。
先修改这个例子,让ProducerArbiter正确的发挥作用,首先,ProducerArbiter的作用是作为中间枢纽,充当上下游的Producer,Producer发生改变对于上下游是透明的。
修改后:
public static final class ThenObserve<T> implements Observable.Operator<T, T> {
final Observable<? extends T> other;
public ThenObserve(Observable<? extends T> other) {
this.other = other;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ProducerArbiter arbiter = new ProducerArbiter();
Subscriber<T> parent = new Subscriber<T>() {
@Override
public void onNext(T t) {
child.onNext(t);
arbiter.produced(1);
}
@Override
public void onError(Throwable e) {
arbiter.setProducer(null);
child.onError(e);
}
@Override
public void onCompleted() {
arbiter.setProducer(null);
//产生新的subscriber用来接收producer的改变
DelegatingSubscriber<T> subscriber = new DelegatingSubscriber<>(child, arbiter);
child.add(subscriber);
other.unsafeSubscribe(subscriber);
}
@Override
public void setProducer(Producer p) {
arbiter.setProducer(p);
}
};
child.add(parent);
child.setProducer(arbiter);
return parent;
}
}
static class DelegatingSubscriber<T> extends Subscriber<T>{
Subscriber<? super T> actual;
ProducerArbiter arbiter;
public DelegatingSubscriber(Subscriber<? super T> downstream,ProducerArbiter arbiter){
this.actual = downstream;
this.arbiter = arbiter;
}
@Override
public void onCompleted() {
arbiter.setProducer(null);
this.actual.onCompleted();
}
@Override
public void onError(Throwable e) {
arbiter.setProducer(null);
this.actual.onError(e);
}
@Override
public void onNext(T o) {
this.actual.onNext(o);
arbiter.produced(1);
}
@Override
public void setProducer(Producer p) {
arbiter.setProducer(p);
}
}
从最后的实现看,child Subscriber是消费者,需要从第一个range Observable和other Observable两处获取数据,由于数据源是不稳定的,所以child实际上只设置了ProducerArbiter一个Producer,实际Producer的切换交给了ProducerArbiter,这样数据源切换对于child 来说是透明的。
这里具体这做了三件事情:给child 设置ProducerArbiter;ProducerArbiter在数据源发生改变时切换到其他Producer;数据产生时调用producer(n)更新计数。
ProducerArbiter 实现原理
如果是我们来实现ProducerArbiter以此满足上面 例子中的需要,其实我们要做的就是每次请求到来的时候增加计数,当有事件被消耗的时候减少计数,同时支持串行访问,从这样的角度我们再去看ProducerArbiter的源码,可能就会比较容易理解。一共有四个方法:
java public void request(long n)
java public void setProducer(Producer newProducer)
java public void produced(long n)
java public void emitLoop()
request方法:
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
if (n == 0) {
return;
}
synchronized (this) {
if (emitting) {
missedRequested += n; //1
return;
}
emitting = true;
}
boolean skipFinal = false; //2
try {
long r = requested;
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
requested = u;
Producer p = currentProducer;
if (p != null) {
p.request(n);//3
}
emitLoop();//4
skipFinal = true;
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
1) 用missedRequested保存没有处理的请求,因为并发访问的原因,此时可能有其他线程在调用,所以先保存起来。
2) skipFinal 实际上的作用是决定是否跳过最后的finally代码块(当然实际并不能跳过,finally代码块的作用只是为了预防emitLoop方法发生错误而没有安全的将emitting 置为false,导致一直阻塞后续的请求)。
3) 这里必须详细解释为什么是请求n,而不是requested。因为下游发出数据请求的时候,这些请求积累是发生在每个Producer上的,此刻的n实际是下游在向currentProducer请求,而requested是之前积累的,可能是之前的missedProducer上积累的请求,更加详细的逻辑会在emitLoop中。
4) emitLoop的作用实际就是真正调用实际的Producer 请求数据的过程。代码能够走到这一步说明emitting = false,此时没有来自其他线程的数据请求,执行emitLoop漏循环处理已经积累的请求。
setProducer方法
public void setProducer(Producer newProducer) {
synchronized (this) {
if (emitting) {
missedProducer = newProducer == null ? NULL_PRODUCER : newProducer;//1
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
currentProducer = newProducer;
if (newProducer != null) {
newProducer.request(requested);//2
}
emitLoop();
skipFinal = true;
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
1) 如果传入的newProducer是null则会把missedProducer设置成NULL_PRODUCER,可以说是一个标记,在emitLoop中作为判断条件使用。
2) 请求了requested个,貌似和前面request方法的//3处有点不同,这是因为如果切换了Producer,那么之前的Producer没有生产的数据,再也不会有机会产生,所以这些
积累下的数据就应该全部交给新的Prodcuer来产生。
emitLoop方法
public void emitLoop() {
for (;;) {
long localRequested;
long localProduced;
Producer localProducer;
synchronized (this) {
localRequested = missedRequested;//1
localProduced = missedProduced;
localProducer = missedProducer;
if (localRequested == 0L
&& localProduced == 0L
&& localProducer == null) {//2
emitting = false;
return;
}
missedRequested = 0L;//3
missedProduced = 0L;
missedProducer = null;
}
long r = requested;
if (r != Long.MAX_VALUE) {//1
long u = r + localRequested;
if (u < 0 || u == Long.MAX_VALUE) {
r = Long.MAX_VALUE;
requested = r;
} else {
long v = u - localProduced;
if (v < 0) {
throw new IllegalStateException("more produced than requested");
}
r = v;
requested = v;
}
}
if (localProducer != null) {//2
if (localProducer == NULL_PRODUCER) {
currentProducer = null;
} else {
currentProducer = localProducer;
localProducer.request(r);
}
} else {//3
Producer p = currentProducer;
if (p != null && localRequested != 0L) {
p.request(localRequested);
}
}
}
}
emitLoop方法比较长,直接看真的容易看不懂,我也是最开始卡在这里很久,后来发现最好是用单一Producer的角度分析,从简化的角度理解,如果此时一直都只有一个Producer,不存在Producer切换,那么可以认为此时的ProducerArbiter实际是一个普通的Producer,这里的emitLoop方法实际上处理的只是missedRequested,实际上和RangeProducer(以RangeProducer为例)的背压处理情况下的slowPath方法极度相似————不停的处理积累的请求直到把积累的请求完全处理完。
此时再来看emitLoop,其实只是多了Producer切换的情况,其实如果我们把Producer也看作是一种数据————没错!我们把Producer也看成是request一样,好比某处下游正在请求数据一样,只不过这个请求是请求切换Producer。对应请求数据,我们是寻找合适的Producer把请求发出去,对应Producer我们则应该是寻找合适的数据去发射。 此时我们再来看emitLoop的逻辑,退出漏循环的条件是所有的请求都处理完毕(准确说是:积累的请求处理完、该切换的Producer切换完毕以及累计处理完的事件计数完毕),这里很容易理解。
1) 整个if语句块实际是进行已经消费的事件进行减法计数,减去已经消费的事件数
2) 如果localProducer(实际是missedProducer)不为空,表示存在Producer切换,不过还得继续判断(因为切换的时候可能是传的一个为null的Producer),如果localProducer == NULL_PRODUCER实际上就是把当前的Producer设置成null了也就是不设置数据源,不然的话 切换数据源,并且把累计的请求处理完。
至此整个ProducerArbiter分析完毕,但是ProducerArbiter存在的问题是,没有保证事件是串行发射的,举个例子,此时前面的request正在被Producer A执行,正在发射事件给subscriber,但是紧接着切换了Producer B,此时Producer B也开始给Subscriber发射事件,这样导致的结果就是事件发射是并行的,必然会发生问题,这也显然不是我们想要的。其实我们这时候就需要ProducerObserverArbiter,看名字就是知道它还是个观察者。