diff options
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
| -rw-r--r-- | main/src/cgeo/geocaching/utils/RxUtils.java | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java index beb5e18..af58214 100644 --- a/main/src/cgeo/geocaching/utils/RxUtils.java +++ b/main/src/cgeo/geocaching/utils/RxUtils.java @@ -11,6 +11,7 @@ import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func0; import rx.functions.Func1; +import rx.internal.util.RxThreadFactory; import rx.observables.BlockingObservable; import rx.observers.Subscribers; import rx.schedulers.Schedulers; @@ -25,8 +26,7 @@ import android.os.Process; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -39,12 +39,12 @@ public class RxUtils { public final static Scheduler computationScheduler = Schedulers.computation(); - public static final Scheduler networkScheduler = Schedulers.from(new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())); + public static final Scheduler networkScheduler = Schedulers.from(Executors.newFixedThreadPool(10, new RxThreadFactory("network-"))); - public static final Scheduler refreshScheduler = Schedulers.from(new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())); + public static final Scheduler refreshScheduler = Schedulers.from(Executors.newFixedThreadPool(3, new RxThreadFactory("refresh-"))); private static final HandlerThread looperCallbacksThread = - new HandlerThread("Looper callbacks thread", Process.THREAD_PRIORITY_DEFAULT); + new HandlerThread("looper callbacks", Process.THREAD_PRIORITY_DEFAULT); static { looperCallbacksThread.start(); @@ -158,8 +158,8 @@ public class RxUtils { }; } - public static<T> Observable<T> rememberLast(final Observable<T> observable) { - final AtomicReference<T> lastValue = new AtomicReference<>(null); + public static<T> Observable<T> rememberLast(final Observable<T> observable, final T initialValue) { + final AtomicReference<T> lastValue = new AtomicReference<>(initialValue); return observable.doOnNext(new Action1<T>() { @Override public void call(final T value) { @@ -224,7 +224,7 @@ public class RxUtils { if (cached.containsKey(key)) { return cached.get(key); } - final Observable<V> value = func.call(key).replay().refCount(); + final Observable<V> value = func.call(key).share(); cached.put(key, value); return value; } |
