aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/cgeo/geocaching/utils/RxUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/cgeo/geocaching/utils/RxUtils.java')
-rw-r--r--main/src/cgeo/geocaching/utils/RxUtils.java179
1 files changed, 174 insertions, 5 deletions
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java
index 241ba78..08cc3e7 100644
--- a/main/src/cgeo/geocaching/utils/RxUtils.java
+++ b/main/src/cgeo/geocaching/utils/RxUtils.java
@@ -1,22 +1,56 @@
package cgeo.geocaching.utils;
import rx.Observable;
+import rx.Observable.OnSubscribe;
import rx.Scheduler;
+import rx.Scheduler.Worker;
+import rx.Subscriber;
+import rx.android.schedulers.AndroidSchedulers;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.Func0;
+import rx.functions.Func1;
+import rx.internal.util.RxThreadFactory;
import rx.observables.BlockingObservable;
+import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
+import rx.subjects.PublishSubject;
+import rx.subscriptions.Subscriptions;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import android.os.Handler;
+import android.os.HandlerThread;
+import android.os.Looper;
+import android.os.Process;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class RxUtils {
- // Utility class, not to be instanciated
- private RxUtils() {}
+ private RxUtils() {
+ // Utility class, not to be instantiated
+ }
public final static Scheduler computationScheduler = Schedulers.computation();
- public static final Scheduler networkScheduler = Schedulers.from(new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
+ public static final Scheduler networkScheduler = Schedulers.from(Executors.newFixedThreadPool(10, new RxThreadFactory("network-")));
+
+ public static final Scheduler refreshScheduler = Schedulers.from(Executors.newFixedThreadPool(3, new RxThreadFactory("refresh-")));
+
+ private static final HandlerThread looperCallbacksThread =
+ new HandlerThread("looper callbacks", Process.THREAD_PRIORITY_DEFAULT);
+
+ static {
+ looperCallbacksThread.start();
+ }
+
+ public static final Looper looperCallbacksLooper = looperCallbacksThread.getLooper();
+ public static final Scheduler looperCallbacksScheduler = AndroidSchedulers.handlerThread(new Handler(looperCallbacksLooper));
+ public static final Worker looperCallbacksWorker = looperCallbacksScheduler.createWorker();
public static <T> void waitForCompletion(final BlockingObservable<T> observable) {
observable.lastOrDefault(null);
@@ -25,4 +59,139 @@ public class RxUtils {
public static void waitForCompletion(final Observable<?>... observables) {
waitForCompletion(Observable.merge(observables).toBlocking());
}
+
+ /**
+ * Subscribe function whose subscription and unsubscription take place on a looper thread.
+ *
+ * @param <T>
+ * the type of the observable
+ */
+ public static abstract class LooperCallbacks<T> implements OnSubscribe<T> {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final long stopDelay;
+ final TimeUnit stopDelayUnit;
+ final protected PublishSubject<T> subject = PublishSubject.create();
+
+ public LooperCallbacks(final long stopDelay, final TimeUnit stopDelayUnit) {
+ this.stopDelay = stopDelay;
+ this.stopDelayUnit = stopDelayUnit;
+ }
+
+ public LooperCallbacks() {
+ this(0, TimeUnit.SECONDS);
+ }
+
+ @Override
+ final public void call(final Subscriber<? super T> subscriber) {
+ subscriber.add(subject.subscribe(Subscribers.from(subscriber)));
+ looperCallbacksWorker.schedule(new Action0() {
+ @Override
+ public void call() {
+ if (counter.getAndIncrement() == 0) {
+ onStart();
+ }
+ subscriber.add(Subscriptions.create(new Action0() {
+ @Override
+ public void call() {
+ looperCallbacksWorker.schedule(new Action0() {
+ @Override
+ public void call() {
+ if (counter.decrementAndGet() == 0) {
+ onStop();
+ }
+ }
+ }, stopDelay, stopDelayUnit);
+ }
+ }));
+ }
+ });
+ }
+
+ abstract protected void onStart();
+
+ abstract protected void onStop();
+ }
+
+ public static<T> Observable<T> rememberLast(final Observable<T> observable, final T initialValue) {
+ final AtomicReference<T> lastValue = new AtomicReference<>(initialValue);
+ return observable.doOnNext(new Action1<T>() {
+ @Override
+ public void call(final T value) {
+ lastValue.set(value);
+ }
+ }).startWith(Observable.defer(new Func0<Observable<T>>() {
+ @Override
+ public Observable<T> call() {
+ final T last = lastValue.get();
+ return last != null ? Observable.just(last) : Observable.<T>empty();
+ }
+ })).replay(1).refCount();
+ }
+
+ public static <T> void andThenOnUi(final Scheduler scheduler, final Func0<T> background, final Action1<T> foreground) {
+ scheduler.createWorker().schedule(new Action0() {
+ @Override
+ public void call() {
+ final T value = background.call();
+ AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
+ @Override
+ public void call() {
+ foreground.call(value);
+ }
+ });
+ }
+ });
+ }
+
+ public static void andThenOnUi(final Scheduler scheduler, final Action0 background, final Action0 foreground) {
+ scheduler.createWorker().schedule(new Action0() {
+ @Override
+ public void call() {
+ background.call();
+ AndroidSchedulers.mainThread().createWorker().schedule(foreground);
+ }
+ });
+ }
+
+ /**
+ * Cache the last value of observables so that every key is associated to only one of them.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+ public static class ObservableCache<K, V> {
+
+ final private Func1<K, Observable<V>> func;
+ final private Map<K, Observable<V>> cached = new HashMap<>();
+
+ /**
+ * Create a new observables cache.
+ *
+ * @param func the function transforming a key into an observable
+ */
+ public ObservableCache(final Func1<K, Observable<V>> func) {
+ this.func = func;
+ }
+
+ /**
+ * Get the observable corresponding to a key. If the key has not already been
+ * seen, the function passed to the constructor will be called to build the observable
+ * <p/>
+ * If the observable has already emitted values, only the last one will be remembered.
+ *
+ * @param key the key
+ * @return the observable corresponding to the key
+ */
+ public synchronized Observable<V> get(final K key) {
+ if (cached.containsKey(key)) {
+ return cached.get(key);
+ }
+ final Observable<V> value = func.call(key).replay(1).refCount();
+ cached.put(key, value);
+ return value;
+ }
+
+ }
+
}