[Java, Разработка под Android] Инициализация Rx цепочки
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем привет, меня зовут Иван, я Android-разработчик. Сегодня хочу поделится своим опытом работы с RxJava2 и рассказать, как происходит инициализация цепочки. Почему я вообще решил поднять эту тему? Пообщавшись со знакомыми разработчиками, я понял, что не каждый кто использует этот инструмент понимает, как он работает. И тогда я решил разобраться как же устроены подписки в RxJava2 и в какой последовательности вся работа инициализируется. Я не нашел ни одной статьи, поясняющей это. В свете этого я полез в исходники, чтобы посмотреть, как же все работает и набросал для себя небольшую шпаргалку, которая выросла в данную статью.В этой статье я не буду описывать что такое Observable, Observer и все остальные сущности, которые используются в RxJava2. Если вы решили прочитать данную статью, то я предполагаю, что вы уже знакомы с этой информацией. А если вы всё же не знакомы с этими понятия, то я рекомендую перед началом чтения ознакомиться с ними.Вот с чего можно начать:Грокаем* RxJavaИсследуем RxJava 2 для AndroidДавайте посмотрим, как работает простейшая цепочка:
Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
По верхамСначала опишу вкратце каждый шаг, через который пройдем в этой цепочке (шаги начинаются сверху вниз):
- Создается объект в операторе just ObservableFromArray.
- Создается объект в операторе map ObservableMap, который принимает в конструктор ссылку на ранее созданный объект в операторе just.
- Создается объект в операторе filter ObservableFilter, который принимает в конструктор ссылку на ранее созданный объект в map, в котором уже хранится ссылка на just.
- После создания всех Observable’ов у последнего Observable в цепочки вызывается метод subscribe() (в нашем случае это ObservableFilter созданный в операторе filter) в котором создается новый Observer, который и будет обрабатывать все полученные события.
- В методе ObservableFilter.subscribe() вызывается следующий метод ObservableFilter.subscribeActual(), в котором создается внутренний Observer, в случае с оператором filter, это FilterObserver. В этот внутренний Observer передается ссылка на первый созданный Observer в ObservableFilter.subscribe().
- Вызывается ObservableMap.subscribe() в котором так же вызывается ObservableMap.subscribeActual()и создается внутренний Observer, в случае с оператором map, это MapObserver, в который передается ссылка на FilterObserver.
- Вызывается ObservableFromArray.subscribe() и после ObservableFromArray.subscribeActual(), и уже там вызывается метод onSubscribe()у переданного в ObservableFromArray.subscribeActual() Observer’а.
- onSubscribe() вызывается у каждого нижележащего Observer’а в цепочке.
- ObservableFromArray начинает излучать все события в метод onNext() нижележащего Observer’а.
Визуальное представление описанной выше схемы.Создание источников данныхТеперь давайте рассмотрим описанные выше шаги подробнее, сначала попадаем в метод just() где происходит проверка каждого значения на null, далее идет вызов метода fromArray(), который возвращает Observable.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
ObjectHelper.requireNonNull(item1, "item1 is null");
ObjectHelper.requireNonNull(item2, "item2 is null");
ObjectHelper.requireNonNull(item3, "item3 is null");
ObjectHelper.requireNonNull(item4, "item4 is null");
ObjectHelper.requireNonNull(item5, "item5 is null");
return fromArray(item1, item2, item3, item4, item5);
}
В fromArray() проверяется, что метод принимает в себя не пустой массив и имеет больше одного элемента.
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
}
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
После прохода через все эти шаги создается новый экземпляр класса ObservableFromArray, который на вход принимает массив с данными.После этого новый объект передается в метод onAssembly(), так как нет необходимости как-то еще модифицировать Observable, то возвращается тот же источник, который поступал на вход.
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onAssembly() проверяет хотим ли перехватить текущий Observable и как-то модифицировать его, например таким образом:
RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) {
return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
}
return o;
});
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
Только что созданный ObservableFromArrayВызывается следующий оператор в цепочке map(). Оператор проходит почти через те же шаги, что были описаны выше. Сначала делается проверка на null, потом создается новый экземпляр класса ObservableMap.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Вот тут появляется небольшое отличие, в конструктор ObservableMap передается не только mapper, который будет преобразовывать одно значение в другое, но также принимает в конструктор this (source). В данном случае this это ссылка на экземпляр класса ObservableFromArray созданный на предыдущем шаге. ObservableMap расширяет абстрактный класс AbstractObservableWithUpstream, в котором и храниться source.AbstractObservableWithUpstream абстрактный класс, который реализуют Observable операторы и в котором хранится ссылка на вышележащий источник данных.Далее происходит вызов метода onAssembly() и возвращение созданного Observable.
Обновленная схема с созданным ObservableMapПереходим к следующему оператору в цепочки filter(). В нем не происходит ничего нового, за исключением того, что создается объект ObservableFilter и в его конструктор в this передается ссылка на экземпляр ObservableMap (у которого уже есть ссылка на ObservableFromArray, как показано на схеме выше) созданный на предыдущем шаге.
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
Обновленная схема с созданным ObservableFilterНачало подпискиПоследний оператор в цепочке subscribe(), который вызывает перегруженную версию метода. В нашем случае обрабатывается только onNext(). Метод subscribe() вызывается у ObservableFilter, который был последним созданным Observable в цепочке.
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
В перегруженном методе сначала проверяются все переданные параметры на null, далее создается объект класса LambdaObserver и происходит подписка.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
А вот и сам метод в котором и происходит подписка.
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
В методе subscribeActual() производится подписка на источник данных и в него же передается созданный ранее LambdaObserver. subscribeActual() вызывается в классе ObservableFilter. И вот что там происходит.
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
Создается новый объект класса FilterObserver, который принимает в конструктор LambdaObserver созданный ранее и предикат для фильтрации, которые хранится в ObservableFilter в виде поля класса.Класс FilterObserver расширяет класс BasicFuseableObserver, в котором уже реализован метод onSubscribe(). BasicFuseableObserver это абстрактный класс, который реализуют промежуточные Observer’ы. Если посмотреть исходники, то его реализуют только 6 классов, два из которых это FilterObserver и MapObserver. В методе BasicFuseableObserver.onSubscribe() также вызывается метод onSubscribe() у нижележащего Observer’а, который передавался в конструктор этого класса. А выглядит это вот так:
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
После того как подписались на ObservableFilter и создали объект FilterObserver, он передается в source.subscribe(). Хочу напомнить, что source это объект класса ObservableMap, переданный ранее в цепочке. У объекта ObservableMap вызывается метод subscribe().
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
Далее происходит те же шаги, в методе subscribe() вызывается subscribeActual(), оба этих метода вызываются у ObservableMap. В subscribeActual() создается новый MapObserver с переданным в качестве параметра экземпляром FilterObserver и функцией mapper’а.
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
Все рассмотренные Observer’ы расширяли абстрактный класс BasicFuseableObserver, в котором уже реализован метод onSubscribe() и так же есть ссылка на нижележащий Observer, у которого так же вызывается метод onSubscribe().
В конце метода subscribeActual() вызывается метод run(), в котором и начинается излучение всех данных в нижележащие Observer’ы.
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
Соответственно вызываются onNext() для передачи значений в нижележащие Observer’ы и потом onComplete() при завершении излучения данных, либо может произойти ошибка и вызовется onError(), который завершит всю цепочку.
Визуальное представление процесса создания и подписокВыводObservable’ы вкладываются друг в друга и вызывают callback’и для создания Observer’ов, которые и будут обрабатывать получаемые данные и передавать их дальше по цепочки.Метод onSubscribe() вызывается до начала отправки данных и это надо иметь ввиду если вы пользуетесь такими оператора как doOnSubscribe().На каждый оператор создается как минимум 3 объекта:
- Анонимный класс передаваемый в оператор
- Observable создаваемый внутри оператора
- Observer обрабатывающий получаемые данные
Потому при использовании операторов стоит иметь ввиду, что каждый оператор аллоцирует память для несколько объектов и не стоит добавлять операторы в цепочку, только потому что “можно”.RxJava мощный инструмент, но нужно понимать как он работает и для каких задач его использовать. Если вам нужно просто выполнение сетевого запроса в фоновом потоке и последующее выполнение результата на главном потоке, то это как “стрелять из пушки по воробьям”, попасть можно, а вот последствия могут быть серьезными.
===========
Источник:
habr.com
===========
Похожие новости:
- [Веб-дизайн, JavaScript, jQuery, HTML] EasyUI: действительно easy?
- [JavaScript, Расширения для браузеров, Медийная реклама] AdBlock: особенности работы и продвинутые методы блокировки
- Microsoft опубликовал собственный дистрибутив OpenJDK
- [Информационная безопасность, Разработка под Android, Геоинформационные сервисы, Смартфоны] Google собирает данные геолокации со смартфонов, даже если запретить отслеживание
- [JavaScript, Программирование, Алгоритмы, Функциональное программирование] Решаем вопрос сортировки в JavaScript раз и навсегда
- [Разработка под Android, Dart, Flutter] Основы Flutter для начинающих (Часть I)
- [JavaScript, API, Тестирование веб-сервисов] Настраиваем нагрузочное тестирование с Artillery.io
- [Разработка веб-сайтов, Python, JavaScript, Программирование] Как создавать предметы генеративного искусства с помощью L-систем на языке Python (перевод)
- [Высокая производительность, Java, Анализ и проектирование систем, Алгоритмы] Развеиваем мифы об управлении памятью в JVM (перевод)
- [Java] Новые функции языка, начиная с Java 8 до 16 (перевод)
Теги для поиска: #_java, #_razrabotka_pod_android (Разработка под Android), #_rxjava, #_rx, #_rxjava2, #_rxjava_2, #_java, #_razrabotka_pod_android (
Разработка под Android
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 07:57
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем привет, меня зовут Иван, я Android-разработчик. Сегодня хочу поделится своим опытом работы с RxJava2 и рассказать, как происходит инициализация цепочки. Почему я вообще решил поднять эту тему? Пообщавшись со знакомыми разработчиками, я понял, что не каждый кто использует этот инструмент понимает, как он работает. И тогда я решил разобраться как же устроены подписки в RxJava2 и в какой последовательности вся работа инициализируется. Я не нашел ни одной статьи, поясняющей это. В свете этого я полез в исходники, чтобы посмотреть, как же все работает и набросал для себя небольшую шпаргалку, которая выросла в данную статью.В этой статье я не буду описывать что такое Observable, Observer и все остальные сущности, которые используются в RxJava2. Если вы решили прочитать данную статью, то я предполагаю, что вы уже знакомы с этой информацией. А если вы всё же не знакомы с этими понятия, то я рекомендую перед началом чтения ознакомиться с ними.Вот с чего можно начать:Грокаем* RxJavaИсследуем RxJava 2 для AndroidДавайте посмотрим, как работает простейшая цепочка: Observable.just (1, 2, 3, 4, 5)
.map {…} .filter {…} .subscribe();
Визуальное представление описанной выше схемы.Создание источников данныхТеперь давайте рассмотрим описанные выше шаги подробнее, сначала попадаем в метод just() где происходит проверка каждого значения на null, далее идет вызов метода fromArray(), который возвращает Observable. public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
ObjectHelper.requireNonNull(item1, "item1 is null"); ObjectHelper.requireNonNull(item2, "item2 is null"); ObjectHelper.requireNonNull(item3, "item3 is null"); ObjectHelper.requireNonNull(item4, "item4 is null"); ObjectHelper.requireNonNull(item5, "item5 is null"); return fromArray(item1, item2, item3, item4, item5); } public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) { return new ObservableFromArray<>(new Integer[] { 4, 5, 6 }); } return o; }); Observable.just(1, 2, 3) .filter(v -> v > 3) .test() .assertResult(4, 5, 6); Только что созданный ObservableFromArrayВызывается следующий оператор в цепочке map(). Оператор проходит почти через те же шаги, что были описаны выше. Сначала делается проверка на null, потом создается новый экземпляр класса ObservableMap. public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } Обновленная схема с созданным ObservableMapПереходим к следующему оператору в цепочки filter(). В нем не происходит ничего нового, за исключением того, что создается объект ObservableFilter и в его конструктор в this передается ссылка на экземпляр ObservableMap (у которого уже есть ссылка на ObservableFromArray, как показано на схеме выше) созданный на предыдущем шаге. public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } Обновленная схема с созданным ObservableFilterНачало подпискиПоследний оператор в цепочке subscribe(), который вызывает перегруженную версию метода. В нашем случае обрабатывается только onNext(). Метод subscribe() вызывается у ObservableFilter, который был последним созданным Observable в цепочке. public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { ...... } } public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate)); } public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; if (d instanceof QueueDisposable) { this.qd = (QueueDisposable<T>)d; } if (beforeDownstream()) { downstream.onSubscribe(this); afterDownstream(); } } } После того как подписались на ObservableFilter и создали объект FilterObserver, он передается в source.subscribe(). Хочу напомнить, что source это объект класса ObservableMap, переданный ранее в цепочке. У объекта ObservableMap вызывается метод subscribe(). public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { ...... } } public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); } public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } В конце метода subscribeActual() вызывается метод run(), в котором и начинается излучение всех данных в нижележащие Observer’ы. void run() {
T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } Визуальное представление процесса создания и подписокВыводObservable’ы вкладываются друг в друга и вызывают callback’и для создания Observer’ов, которые и будут обрабатывать получаемые данные и передавать их дальше по цепочки.Метод onSubscribe() вызывается до начала отправки данных и это надо иметь ввиду если вы пользуетесь такими оператора как doOnSubscribe().На каждый оператор создается как минимум 3 объекта:
=========== Источник: habr.com =========== Похожие новости:
Разработка под Android ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 07:57
Часовой пояс: UTC + 5