Виды Observable

Какого вида бывают Observable? Когда их использовать?

Какие бывают Observable — Observer?

Observable бывает разных видов, в зависимости от того сколько и как эмитятся данные. Выбирать Observable надо внимательно. Итак, в RxJava у нас есть следующие виды Observable: 

  • Observable
  • Single
  • Maybe
  • Flowable
  • Completable

В противовес к каждому Observable мы соответсвующий ему Observer:

  • Observer
  • SingleObservable
  • MaybeObservable
  • CompletableObserver

В таблице ниже представлены Observables в соответствии с тем как и сколько они эмитят данные

ObservableObserverКол-во эмиссий
ObservableObserverМного или ничего
SingleSingleObserverОдна эмиссия
MaybeMaybeObserverОдна эмиссия или ничего
FlowableObserverМного или ничего
CompletableCompletableObserverНичего

Observable & Observer

Observable пожалуй самый часто используемый observable(простите за тавтологию). Observable эмитит один и более элементов. В примере ниже, мы можем видеть как observable эмитит элементы один за другим. Мы также можем заэмитить список элементов сразу. Но если вы хотите использовать оператор к каждому элементу(другими словами, трансформировать эмитированные данные), то лучше эмитить по одному.

// эмитим один Note
Observable<Note>
 
// эмиссия всех элементов сразу, но лучше получать данные по одному
Observable<List<Note>>
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
 
import java.util.ArrayList;
import java.util.List;
 
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
 
public class ObserverActivity extends AppCompatActivity {
 
    private static final String TAG = ObserverActivity.class.getSimpleName();
    private Disposable disposable;
 
    /**
     * Simple Observable emitting multiple Notes
     * -
     * Observable : Observer
     */
 
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_observer);
 
        Observable<Note> notesObservable = getNotesObservable();
 
        Observer<Note> notesObserver = getNotesObserver();
 
        notesObservable.observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribeWith(notesObserver);
    }
 
    private Observer<Note> getNotesObserver() {
        return new Observer<Note>() {
 
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }
 
            @Override
            public void onNext(Note note) {
                Log.d(TAG, "onNext: " + note.getNote());
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
 
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
    }
 
    private Observable<Note> getNotesObservable() {
        final List<Note> notes = prepareNotes();
 
        return Observable.create(new ObservableOnSubscribe<Note>() {
            @Override
            public void subscribe(ObservableEmitter<Note> emitter) throws Exception {
                for (Note note : notes) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(note);
                    }
                }
 
                // all notes are emitted
                if (!emitter.isDisposed()) {
                    emitter.onComplete();
                }
            }
        });
    }
 
    private List<Note> prepareNotes() {
        List<Note> notes = new ArrayList<>();
        notes.add(new Note(1, "Buy tooth paste!"));
        notes.add(new Note(2, "Call brother!"));
        notes.add(new Note(3, "Watch Narcos tonight!"));
        notes.add(new Note(4, "Pay power bill!"));
        return notes;
    }
 
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
public class Note {
    int id;
    String note;
 
    // getters an setters
}

Результат получаемый при исполнении кода

onSubscribe
onNext: Buy tooth paste!
onNext: Call brother!
onNext: Watch Narcos tonight!
onNext: Pay power bill!
onComplete

 

Single & SingleObserver

Single эмитит один объект или выдает ошибку. То же самое можно сделать используя Observable с единой эмиссией, но Single что всегда будет одна эмиссия. Single можно, например, использовать в случае выполнения сетевого запроса, который будет выполнен успешно или получим ошибку.

Пример ниже всегда эмитит один объект Note. Еще одним примером использования Single может быть получение объекта Note по его id. 
Также нам нужно убедиться, что Note присутствует в базе данных, так как Single всегда должен эмитить значение.

Обратите внимание, что SingleObserver не имеет метода onNext(), чтобы эмитить  данные, вместо этого данные будут получены методом onSuccess (Note note).

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
 
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
 
public class SingleObserverActivity extends AppCompatActivity {
 
    private static final String TAG = SingleObserverActivity.class.getSimpleName();
    private Disposable disposable;
 
    /**
     * Single Observable emitting single Note
     * Single Observable is more useful in making network calls
     * where you expect a single response object to be emitted
     * -
     * Single : SingleObserver
     */
 
    // TODO - link to Retrofit  tutorial
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_single_observer);
 
        Single<Note> noteObservable = getNoteObservable();
 
        SingleObserver<Note> singleObserver = getSingleObserver();
 
        noteObservable
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(singleObserver);
 
    }
 
    private SingleObserver<Note> getSingleObserver() {
        return new SingleObserver<Note>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }
 
            @Override
            public void onSuccess(Note note) {
                Log.e(TAG, "onSuccess: " + note.getNote());
            }
 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }
        };
    }
 
    private Single<Note> getNoteObservable() {
        return Single.create(new SingleOnSubscribe<Note>() {
            @Override
            public void subscribe(SingleEmitter<Note> emitter) throws Exception {
                Note note = new Note(1, "Buy milk!");
                emitter.onSuccess(note);
            }
        });
    }
 
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

Результат

onSubscribe
onSuccess: Buy milk!

Maybe & MaybeObserver

Maybe может эмитить данные, а может и не эмитить. Он может либо содержать элемент, либо выдать ошибку, либо не содержать данных — этакий реактивный Optional.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
 
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
 
public class MaybeObserverActivity extends AppCompatActivity {
 
    private static final String TAG = MaybeObserverActivity.class.getSimpleName();
    private Disposable disposable;
 
    /**
     * Consider an example getting a note from db using ID
     * There is possibility of not finding the note by ID in the db
     * In this situation, MayBe can be used
     * -
     * Maybe : MaybeObserver
     */
 
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_maybe_observer);
 
        Maybe<Note> noteObservable = getNoteObservable();
 
        MaybeObserver<Note> noteObserver = getNoteObserver();
 
        noteObservable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(noteObserver);
    }
 
    private MaybeObserver<Note> getNoteObserver() {
        return new MaybeObserver<Note>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
 
            @Override
            public void onSuccess(Note note) {
                Log.d(TAG, "onSuccess: " + note.getNote());
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
 
            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
    }
 
    /**
     * Emits optional data (0 or 1 emission)
     * But for now it emits 1 Note always
     */
    private Maybe<Note> getNoteObservable() {
        return Maybe.create(new MaybeOnSubscribe<Note>() {
            @Override
            public void subscribe(MaybeEmitter<Note> emitter) throws Exception {
                Note note = new Note(1, "Call brother!");
                if (!emitter.isDisposed()) {
                    emitter.onSuccess(note);
                }
            }
        });
    }
 
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

Completable & CompletableObserver

Completable  похож на void-метод. Он либо успешно завершает свою работу без каких-либо данных, либо бросает исключение. То есть это некий кусок кода, который можно запустить, и он либо успешно выполнится, либо завершится сбоем.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
 
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
 
public class CompletableObserverActivity extends AppCompatActivity {
 
    private static final String TAG = CompletableObserverActivity.class.getSimpleName();
    private Disposable disposable;
 
    /**
     * Completable won't emit any item, instead it returns
     * Success or failure state
     * Consider an example of making a PUT request to server to update
     * something where you are not expecting any response but the
     * success status
     * -
     * Completable : CompletableObserver
     */
    // TODO - link to Retrofit  tutorial
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_completable_observer);
 
        Note note = new Note(1, "Home work!");
 
        Completable completableObservable = updateNote(note);
 
        CompletableObserver completableObserver = completableObserver();
 
        completableObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(completableObserver);
    }
 
 
    /**
     * Assume this making PUT request to server to update the Note
     */
    private Completable updateNote(Note note) {
        return Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                if (!emitter.isDisposed()) {
                    Thread.sleep(1000);
                    emitter.onComplete();
                }
            }
        });
    }
 
    private CompletableObserver completableObserver() {
        return new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }
 
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: Note updated successfully!");
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
        };
    }
 
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

Flowable & Observer

Flowable должен использоваться, когда observable генерирует огромное количество событий / данных, которые может обрабатывать observer.  В соответствии с документацией, Flowable может использоваться, когда генерируются более 10000 событий, и observer не может принять все это.

В приведенном ниже примере Flowable эмитит числа от 1 до 100, и оператор Reduce() используется для сложения всех чисел и эмитинга конечной суммы.

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
 
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Flowable;
import io.reactivex.SingleObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.schedulers.Schedulers;
 
public class FlowableObserverActivity extends AppCompatActivity {
 
    private static final String TAG = FlowableObserverActivity.class.getSimpleName();
    private Disposable disposable;
 
    /**
     * Simple example of Flowable just to show the syntax
     * the use of Flowable is best explained when used with BackPressure
     * Read the below link to know the best use cases to use Flowable operator
     * https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#when-to-use-flowable
     * -
     * Flowable : SingleObserver
     */
 
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_flowable_observer);
 
        Flowable<Integer> flowableObservable = getFlowableObservable();
 
        SingleObserver<Integer> observer = getFlowableObserver();
 
        flowableObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .reduce(0, new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer result, Integer number) {
                        //Log.e(TAG, "Result: " + result + ", new number: " + number);
                        return result + number;
                    }
                })
                .subscribe(observer);
    }
 
    private SingleObserver<Integer> getFlowableObserver() {
        return new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }
 
            @Override
            public void onSuccess(Integer integer) {
                Log.d(TAG, "onSuccess: " + integer);
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
        };
    }
 
    private Flowable<Integer> getFlowableObservable() {
        return Flowable.range(1, 100);
    }
 
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

Результат

onSubscribe
onSuccess: 5050

Observable бывает разных видов, в зависимости от того сколько и как эмитятся данные. Выбирать Observable надо внимательно. Итак, в RxJava у нас есть следующие виды Observable: 

  • Observable
  • Single
  • Maybe
  • Flowable
  • Completable

В противовес к каждому Observable мы соответсвующий ему Observer:

  • Observer
  • SingleObservable
  • MaybeObservable
  • CompletableObserver

Опубликовано

в

от

Метки: