aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/cgeo/geocaching/utils/RxUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
-rw-r--r--main/src/cgeo/geocaching/utils/RxUtils.java19
1 files changed, 19 insertions, 0 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java
index 670c38c..ec8f7b8 100644
--- a/main/src/cgeo/geocaching/utils/RxUtils.java
+++ b/main/src/cgeo/geocaching/utils/RxUtils.java
@@ -8,6 +8,8 @@ import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.BlockingObservable;
import rx.observers.Subscribers;
@@ -24,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class RxUtils {
@@ -150,4 +153,20 @@ public class RxUtils {
};
}
+ public static<T> Observable<T> rememberLast(final Observable<T> observable) {
+ final AtomicReference<T> lastValue = new AtomicReference<>(null);
+ return observable.doOnNext(new Action1<T>() {
+ @Override
+ public void call(final T value) {
+ lastValue.set(value);
+ }
+ }).startWith(Observable.defer(new Func0<Observable<T>>() {
+ @Override
+ public Observable<T> call() {
+ final T last = lastValue.get();
+ return last != null ? Observable.just(last) : Observable.<T>empty();
+ }
+ })).replay(1).refCount();
+ }
+
}