aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/cgeo/geocaching/utils/RxUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
-rw-r--r--main/src/cgeo/geocaching/utils/RxUtils.java69
1 files changed, 15 insertions, 54 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java
index 280575b..08cc3e7 100644
--- a/main/src/cgeo/geocaching/utils/RxUtils.java
+++ b/main/src/cgeo/geocaching/utils/RxUtils.java
@@ -2,7 +2,6 @@ package cgeo.geocaching.utils;
import rx.Observable;
import rx.Observable.OnSubscribe;
-import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
@@ -17,7 +16,6 @@ import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
-import rx.util.async.Async;
import android.os.Handler;
import android.os.HandlerThread;
@@ -115,49 +113,6 @@ public class RxUtils {
abstract protected void onStop();
}
- public static <T> Operator<T, T> operatorTakeUntil(final Func1<? super T, Boolean> predicate) {
- return new Operator<T, T>() {
- @Override
- public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
- return new Subscriber<T>(subscriber) {
- private boolean done = false;
-
- @Override
- public void onCompleted() {
- if (!done) {
- subscriber.onCompleted();
- }
- }
-
- @Override
- public void onError(final Throwable throwable) {
- if (!done) {
- subscriber.onError(throwable);
- }
- }
-
- @Override
- public void onNext(final T value) {
- subscriber.onNext(value);
- boolean shouldEnd = false;
- try {
- shouldEnd = predicate.call(value);
- } catch (final Throwable e) {
- done = true;
- subscriber.onError(e);
- unsubscribe();
- }
- if (shouldEnd) {
- done = true;
- subscriber.onCompleted();
- unsubscribe();
- }
- }
- };
- }
- };
- }
-
public static<T> Observable<T> rememberLast(final Observable<T> observable, final T initialValue) {
final AtomicReference<T> lastValue = new AtomicReference<>(initialValue);
return observable.doOnNext(new Action1<T>() {
@@ -175,20 +130,26 @@ public class RxUtils {
}
public static <T> void andThenOnUi(final Scheduler scheduler, final Func0<T> background, final Action1<T> foreground) {
- Async.fromCallable(background, scheduler).observeOn(AndroidSchedulers.mainThread()).subscribe(foreground);
+ scheduler.createWorker().schedule(new Action0() {
+ @Override
+ public void call() {
+ final T value = background.call();
+ AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
+ @Override
+ public void call() {
+ foreground.call(value);
+ }
+ });
+ }
+ });
}
public static void andThenOnUi(final Scheduler scheduler, final Action0 background, final Action0 foreground) {
- andThenOnUi(scheduler, new Func0<Void>() {
+ scheduler.createWorker().schedule(new Action0() {
@Override
- public Void call() {
+ public void call() {
background.call();
- return null;
- }
- }, new Action1<Void>() {
- @Override
- public void call(final Void ignored) {
- foreground.call();
+ AndroidSchedulers.mainThread().createWorker().schedule(foreground);
}
});
}