diff options
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
| -rw-r--r-- | main/src/cgeo/geocaching/utils/RxUtils.java | 69 |
1 files changed, 15 insertions, 54 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java index 280575b..08cc3e7 100644 --- a/main/src/cgeo/geocaching/utils/RxUtils.java +++ b/main/src/cgeo/geocaching/utils/RxUtils.java @@ -2,7 +2,6 @@ package cgeo.geocaching.utils; import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Observable.Operator; import rx.Scheduler; import rx.Scheduler.Worker; import rx.Subscriber; @@ -17,7 +16,6 @@ import rx.observers.Subscribers; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; -import rx.util.async.Async; import android.os.Handler; import android.os.HandlerThread; @@ -115,49 +113,6 @@ public class RxUtils { abstract protected void onStop(); } - public static <T> Operator<T, T> operatorTakeUntil(final Func1<? super T, Boolean> predicate) { - return new Operator<T, T>() { - @Override - public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { - return new Subscriber<T>(subscriber) { - private boolean done = false; - - @Override - public void onCompleted() { - if (!done) { - subscriber.onCompleted(); - } - } - - @Override - public void onError(final Throwable throwable) { - if (!done) { - subscriber.onError(throwable); - } - } - - @Override - public void onNext(final T value) { - subscriber.onNext(value); - boolean shouldEnd = false; - try { - shouldEnd = predicate.call(value); - } catch (final Throwable e) { - done = true; - subscriber.onError(e); - unsubscribe(); - } - if (shouldEnd) { - done = true; - subscriber.onCompleted(); - unsubscribe(); - } - } - }; - } - }; - } - public static<T> Observable<T> rememberLast(final Observable<T> observable, final T initialValue) { final AtomicReference<T> lastValue = new AtomicReference<>(initialValue); return observable.doOnNext(new Action1<T>() { @@ -175,20 +130,26 @@ public class RxUtils { } public static <T> void andThenOnUi(final Scheduler scheduler, final Func0<T> background, final Action1<T> foreground) { - Async.fromCallable(background, scheduler).observeOn(AndroidSchedulers.mainThread()).subscribe(foreground); + scheduler.createWorker().schedule(new Action0() { + @Override + public void call() { + final T value = background.call(); + AndroidSchedulers.mainThread().createWorker().schedule(new Action0() { + @Override + public void call() { + foreground.call(value); + } + }); + } + }); } public static void andThenOnUi(final Scheduler scheduler, final Action0 background, final Action0 foreground) { - andThenOnUi(scheduler, new Func0<Void>() { + scheduler.createWorker().schedule(new Action0() { @Override - public Void call() { + public void call() { background.call(); - return null; - } - }, new Action1<Void>() { - @Override - public void call(final Void ignored) { - foreground.call(); + AndroidSchedulers.mainThread().createWorker().schedule(foreground); } }); } |
