aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/cgeo/geocaching/utils/RxUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
-rw-r--r--main/src/cgeo/geocaching/utils/RxUtils.java16
1 files changed, 8 insertions, 8 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java
index beb5e18..af58214 100644
--- a/main/src/cgeo/geocaching/utils/RxUtils.java
+++ b/main/src/cgeo/geocaching/utils/RxUtils.java
@@ -11,6 +11,7 @@ import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
+import rx.internal.util.RxThreadFactory;
import rx.observables.BlockingObservable;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
@@ -25,8 +26,7 @@ import android.os.Process;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,12 +39,12 @@ public class RxUtils {
public final static Scheduler computationScheduler = Schedulers.computation();
- public static final Scheduler networkScheduler = Schedulers.from(new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
+ public static final Scheduler networkScheduler = Schedulers.from(Executors.newFixedThreadPool(10, new RxThreadFactory("network-")));
- public static final Scheduler refreshScheduler = Schedulers.from(new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
+ public static final Scheduler refreshScheduler = Schedulers.from(Executors.newFixedThreadPool(3, new RxThreadFactory("refresh-")));
private static final HandlerThread looperCallbacksThread =
- new HandlerThread("Looper callbacks thread", Process.THREAD_PRIORITY_DEFAULT);
+ new HandlerThread("looper callbacks", Process.THREAD_PRIORITY_DEFAULT);
static {
looperCallbacksThread.start();
@@ -158,8 +158,8 @@ public class RxUtils {
};
}
- public static<T> Observable<T> rememberLast(final Observable<T> observable) {
- final AtomicReference<T> lastValue = new AtomicReference<>(null);
+ public static<T> Observable<T> rememberLast(final Observable<T> observable, final T initialValue) {
+ final AtomicReference<T> lastValue = new AtomicReference<>(initialValue);
return observable.doOnNext(new Action1<T>() {
@Override
public void call(final T value) {
@@ -224,7 +224,7 @@ public class RxUtils {
if (cached.containsKey(key)) {
return cached.get(key);
}
- final Observable<V> value = func.call(key).replay().refCount();
+ final Observable<V> value = func.call(key).share();
cached.put(key, value);
return value;
}