Почему subscribeOn() не отправляет предыдущее значение из Behaviour subject?
Изучаю нюансы работы методов subscribeOn()/observeOn() для Subjects. Не могу понять почему в приведенном коде subscribeOn() - запрещает отправку события str 0 ?
val observer: Observer<String> = object : Observer<String> {
override fun onComplete() {
println("Consumption onComplete [thread] - ${Thread.currentThread().name}\"")
}
override fun onNext(item: String) {
println("Consumption onNext $item [thread] - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
println("Consumption onError ${e.message} [thread] - ${Thread.currentThread().name}\"")
}
override fun onSubscribe(d: Disposable) {
println("Consumption onSubscribe [thread] - ${Thread.currentThread().name}\"")
}
}
val subject = BehaviorSubject.create<String>()
subject.onNext("str 0 ")
subject
.doOnNext { println("doOnNext $it [thread] - ${Thread.currentThread().name}\"") }
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(observer)
subject.onNext("str 1")
Thread.sleep(4000)
Вывод будет такой:
Consumption onSubscribe [thread] - main"
doOnNext str 1 [thread] - RxComputationThreadPool-1"
Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1
Если закомментировать subscribeOn() то событие str 0 веже придет при подписке.
Consumption onSubscribe [thread] - main"
doOnNext str 0 [thread] - main"
doOnNext str 1 [thread] - main"
Consumption onNext str 0 [thread] - RxCachedThreadScheduler-1
Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1
Почему это происходит?
Ответы (1 шт):
Потому что из-за .subscribeOn(Schedulers.computation()) подписка на upstream планируется, собственно, на Schedulers.computation(). К тому времени как произойдёт подписка, строка subject.onNext("str 1") будет уже выполнена. Поэтому первым значением, которое увидит подписчик будет "str 1"
ObservableSubscribeOn::subscribeActual:
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Тут мы видим, как планируется задача подписки scheduler.scheduleDirect(new SubscribeTask(parent))
В SubscribeTask::run происходит подписка на source (upstream)
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}