package cgeo.geocaching.utils; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.schedulers.Schedulers; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RxUtils { // Utility class, not to be instanciated private RxUtils() {} final static private int cores = Runtime.getRuntime().availableProcessors(); public final static Scheduler computationScheduler = Schedulers.executor(new ThreadPoolExecutor(1, cores, 5, TimeUnit.SECONDS, new LinkedBlockingQueue())); public static Subscription subscribeThenUI(final Observable observable, final Observer observer) { return observable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer); } public static Subscription subscribeThenUI(final Observable observable, final Action1 action) { return observable.observeOn(AndroidSchedulers.mainThread()).subscribe(action); } public static Subscription subscribeThenUI(final Observable observable, final Action1 action, final Action1 onError) { return observable.observeOn(AndroidSchedulers.mainThread()).subscribe(action, onError); } public static Observable onIO(final Observable observable) { return observable.subscribeOn(Schedulers.io()); } public static Subscription subscribeOnIOThenUI(final Observable observable, final Observer observer) { return subscribeThenUI(onIO(observable), observer); } public static Subscription subscribeOnIOThenUI(final Observable observable, final Action1 action) { return subscribeThenUI(onIO(observable), action); } public static Subscription subscribeOnIOThenUI(final Observable observable, final Action1 action, final Action1 onError) { return subscribeThenUI(onIO(observable), action, onError); } }