aboutsummaryrefslogtreecommitdiffstats
path: root/main/src
diff options
context:
space:
mode:
authorSamuel Tardieu <sam@rfc1149.net>2014-07-06 17:37:01 +0200
committerSamuel Tardieu <sam@rfc1149.net>2014-07-06 17:37:01 +0200
commit0bf25c427d933172a683c080ff6325413db36142 (patch)
treef6343faea80650d97098ff51def6a2013a5bd901 /main/src
parent548e5bf9d56a95304e9140ba22c969d77b52f169 (diff)
downloadcgeo-0bf25c427d933172a683c080ff6325413db36142.zip
cgeo-0bf25c427d933172a683c080ff6325413db36142.tar.gz
cgeo-0bf25c427d933172a683c080ff6325413db36142.tar.bz2
Use a shared download scheduler for all network access
Diffstat (limited to 'main/src')
-rw-r--r--main/src/cgeo/geocaching/AbstractDialogFragment.java4
-rw-r--r--main/src/cgeo/geocaching/CacheDetailActivity.java13
-rw-r--r--main/src/cgeo/geocaching/CachePopupFragment.java3
-rw-r--r--main/src/cgeo/geocaching/MainActivity.java4
-rw-r--r--main/src/cgeo/geocaching/PocketQueryList.java5
-rw-r--r--main/src/cgeo/geocaching/SearchResult.java4
-rw-r--r--main/src/cgeo/geocaching/StaticMapsProvider.java4
-rw-r--r--main/src/cgeo/geocaching/connector/gc/GCParser.java6
-rw-r--r--main/src/cgeo/geocaching/connector/gc/RecaptchaHandler.java4
-rw-r--r--main/src/cgeo/geocaching/network/HtmlImage.java13
-rw-r--r--main/src/cgeo/geocaching/network/StatusUpdater.java5
-rw-r--r--main/src/cgeo/geocaching/settings/AbstractCheckCredentialsPreference.java5
-rw-r--r--main/src/cgeo/geocaching/settings/RegisterSend2CgeoPreference.java6
-rw-r--r--main/src/cgeo/geocaching/utils/RxUtils.java7
14 files changed, 43 insertions, 40 deletions
diff --git a/main/src/cgeo/geocaching/AbstractDialogFragment.java b/main/src/cgeo/geocaching/AbstractDialogFragment.java
index 6de8fec..f242fc9 100644
--- a/main/src/cgeo/geocaching/AbstractDialogFragment.java
+++ b/main/src/cgeo/geocaching/AbstractDialogFragment.java
@@ -14,13 +14,13 @@ import cgeo.geocaching.settings.Settings;
import cgeo.geocaching.ui.CacheDetailsCreator;
import cgeo.geocaching.ui.LoggingUI;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import rx.Observable;
import rx.Subscription;
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
import rx.functions.Func0;
-import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import android.annotation.TargetApi;
@@ -194,7 +194,7 @@ public abstract class AbstractDialogFragment extends DialogFragment implements C
final GCVoteRating rating = GCVote.getRating(cache.getGuid(), geocode);
return rating != null ? Observable.just(rating) : Observable.<GCVoteRating>empty();
}
- })).subscribeOn(Schedulers.io()).subscribe(new Action1<GCVoteRating>() {
+ })).subscribeOn(RxUtils.networkScheduler).subscribe(new Action1<GCVoteRating>() {
@Override
public void call(final GCVoteRating rating) {
cache.setRating(rating.getRating());
diff --git a/main/src/cgeo/geocaching/CacheDetailActivity.java b/main/src/cgeo/geocaching/CacheDetailActivity.java
index 8b852cf..7933c79 100644
--- a/main/src/cgeo/geocaching/CacheDetailActivity.java
+++ b/main/src/cgeo/geocaching/CacheDetailActivity.java
@@ -65,7 +65,6 @@ import rx.Subscriber;
import rx.android.observables.AndroidObservable;
import rx.functions.Action0;
import rx.functions.Action1;
-import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import android.R.color;
@@ -293,7 +292,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
final String realGeocode = geocode;
final String realGuid = guid;
- Schedulers.io().createWorker().schedule(new Action0() {
+ RxUtils.networkScheduler.createWorker().schedule(new Action0() {
@Override
public void call() {
search = Geocache.searchByGeocode(realGeocode, StringUtils.isBlank(realGeocode) ? realGuid : null, 0, false, loadCacheHandler);
@@ -859,7 +858,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
progress.show(CacheDetailActivity.this, res.getString(R.string.cache_dialog_refresh_title), res.getString(R.string.cache_dialog_refresh_message), true, refreshCacheHandler.cancelMessage());
- cache.refresh(refreshCacheHandler, Schedulers.io());
+ cache.refresh(refreshCacheHandler, RxUtils.networkScheduler);
}
private void dropCache() {
@@ -869,7 +868,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
}
progress.show(CacheDetailActivity.this, res.getString(R.string.cache_dialog_offline_drop_title), res.getString(R.string.cache_dialog_offline_drop_message), true, null);
- cache.drop(new ChangeNotificationHandler(CacheDetailActivity.this, progress), Schedulers.io());
+ cache.drop(new ChangeNotificationHandler(CacheDetailActivity.this, progress), RxUtils.networkScheduler);
}
private void storeCache() {
@@ -913,7 +912,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
view = (ScrollView) getLayoutInflater().inflate(R.layout.cachedetail_details_page, parentView, false);
// Start loading preview map
- AndroidObservable.bindActivity(CacheDetailActivity.this, previewMap).subscribeOn(Schedulers.io())
+ AndroidObservable.bindActivity(CacheDetailActivity.this, previewMap).subscribeOn(RxUtils.networkScheduler)
.subscribe(new Action1<BitmapDrawable>() {
@Override
public void call(final BitmapDrawable image) {
@@ -1578,7 +1577,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
}
});
- AndroidObservable.bindActivity(this, producer).subscribeOn(Schedulers.io()).subscribe(new Observer<Spanned>() {
+ AndroidObservable.bindActivity(this, producer).subscribeOn(RxUtils.networkScheduler).subscribe(new Observer<Spanned>() {
@Override
public void onCompleted() {
if (null != loadingIndicatorView) {
@@ -2297,7 +2296,7 @@ public class CacheDetailActivity extends AbstractViewPagerActivity<CacheDetailAc
protected void storeCache(final int listId, final StoreCacheHandler storeCacheHandler) {
progress.show(this, res.getString(R.string.cache_dialog_offline_save_title), res.getString(R.string.cache_dialog_offline_save_message), true, storeCacheHandler.cancelMessage());
- Schedulers.io().createWorker().schedule(new Action0() {
+ RxUtils.networkScheduler.createWorker().schedule(new Action0() {
@Override
public void call() {
cache.store(listId, storeCacheHandler);
diff --git a/main/src/cgeo/geocaching/CachePopupFragment.java b/main/src/cgeo/geocaching/CachePopupFragment.java
index 010d701..48f3bf1 100644
--- a/main/src/cgeo/geocaching/CachePopupFragment.java
+++ b/main/src/cgeo/geocaching/CachePopupFragment.java
@@ -9,6 +9,7 @@ import cgeo.geocaching.settings.Settings;
import cgeo.geocaching.ui.CacheDetailsCreator;
import cgeo.geocaching.utils.CancellableHandler;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import org.apache.commons.lang3.StringUtils;
@@ -168,7 +169,7 @@ public class CachePopupFragment extends AbstractDialogFragment {
final StoreCacheHandler refreshCacheHandler = new StoreCacheHandler(R.string.cache_dialog_offline_save_message);
progress.show(getActivity(), res.getString(R.string.cache_dialog_refresh_title), res.getString(R.string.cache_dialog_refresh_message), true, refreshCacheHandler.cancelMessage());
- cache.refresh(refreshCacheHandler, Schedulers.io());
+ cache.refresh(refreshCacheHandler, RxUtils.networkScheduler);
}
}
diff --git a/main/src/cgeo/geocaching/MainActivity.java b/main/src/cgeo/geocaching/MainActivity.java
index 84bc960..b2fc96a 100644
--- a/main/src/cgeo/geocaching/MainActivity.java
+++ b/main/src/cgeo/geocaching/MainActivity.java
@@ -23,6 +23,7 @@ import cgeo.geocaching.ui.Formatter;
import cgeo.geocaching.ui.dialog.Dialogs;
import cgeo.geocaching.utils.DatabaseBackupUtils;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import cgeo.geocaching.utils.TextUtils;
import cgeo.geocaching.utils.Version;
@@ -36,7 +37,6 @@ import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
-import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import android.app.AlertDialog;
@@ -579,7 +579,7 @@ public class MainActivity extends AbstractActionBarActivity {
}
});
AndroidObservable.bindActivity(MainActivity.this, address.onErrorResumeNext(Observable.from(geo.getCoords().toString())))
- .subscribeOn(Schedulers.io())
+ .subscribeOn(RxUtils.networkScheduler)
.subscribe(new Action1<String>() {
@Override
public void call(final String address) {
diff --git a/main/src/cgeo/geocaching/PocketQueryList.java b/main/src/cgeo/geocaching/PocketQueryList.java
index 4e84881..21f306e 100644
--- a/main/src/cgeo/geocaching/PocketQueryList.java
+++ b/main/src/cgeo/geocaching/PocketQueryList.java
@@ -2,14 +2,15 @@ package cgeo.geocaching;
import cgeo.geocaching.activity.ActivityMixin;
import cgeo.geocaching.connector.gc.GCParser;
+import cgeo.geocaching.utils.RxUtils;
import org.apache.commons.collections4.CollectionUtils;
+
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
-import rx.schedulers.Schedulers;
import android.app.Activity;
import android.app.AlertDialog;
@@ -52,7 +53,7 @@ public final class PocketQueryList {
subscriber.onNext(GCParser.searchPocketQueryList());
subscriber.onCompleted();
}
- })).subscribeOn(Schedulers.io()).subscribe(new Action1<List<PocketQueryList>>() {
+ })).subscribeOn(RxUtils.networkScheduler).subscribe(new Action1<List<PocketQueryList>>() {
@Override
public void call(final List<PocketQueryList> pocketQueryLists) {
waitDialog.dismiss();
diff --git a/main/src/cgeo/geocaching/SearchResult.java b/main/src/cgeo/geocaching/SearchResult.java
index c56e29e..088b682 100644
--- a/main/src/cgeo/geocaching/SearchResult.java
+++ b/main/src/cgeo/geocaching/SearchResult.java
@@ -8,6 +8,7 @@ import cgeo.geocaching.enumerations.LoadFlags.LoadFlag;
import cgeo.geocaching.enumerations.LoadFlags.SaveFlag;
import cgeo.geocaching.enumerations.StatusCode;
import cgeo.geocaching.gcvote.GCVote;
+import cgeo.geocaching.utils.RxUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -17,7 +18,6 @@ import org.eclipse.jdt.annotation.Nullable;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
-import rx.schedulers.Schedulers;
import android.os.Parcel;
import android.os.Parcelable;
@@ -324,7 +324,7 @@ public class SearchResult implements Parcelable {
}
});
}
- }, Schedulers.io()).reduce(new SearchResult(), new Func2<SearchResult, SearchResult, SearchResult>() {
+ }, RxUtils.networkScheduler).reduce(new SearchResult(), new Func2<SearchResult, SearchResult, SearchResult>() {
@Override
public SearchResult call(final SearchResult searchResult, final SearchResult searchResult2) {
searchResult.addSearchResult(searchResult2);
diff --git a/main/src/cgeo/geocaching/StaticMapsProvider.java b/main/src/cgeo/geocaching/StaticMapsProvider.java
index 0551fc0..0ea2d8a 100644
--- a/main/src/cgeo/geocaching/StaticMapsProvider.java
+++ b/main/src/cgeo/geocaching/StaticMapsProvider.java
@@ -8,6 +8,7 @@ import cgeo.geocaching.network.Parameters;
import cgeo.geocaching.settings.Settings;
import cgeo.geocaching.utils.FileUtils;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import ch.boye.httpclientandroidlib.HttpResponse;
@@ -16,7 +17,6 @@ import org.eclipse.jdt.annotation.NonNull;
import rx.Observable;
import rx.functions.Action0;
-import rx.schedulers.Schedulers;
import rx.util.async.Async;
import android.graphics.Bitmap;
@@ -94,7 +94,7 @@ public final class StaticMapsProvider {
}
}
}
- }, prefix, Schedulers.io());
+ }, prefix, RxUtils.networkScheduler);
}
private static int limitSize(final int imageSize) {
diff --git a/main/src/cgeo/geocaching/connector/gc/GCParser.java b/main/src/cgeo/geocaching/connector/gc/GCParser.java
index 93cb3a2..26c8175 100644
--- a/main/src/cgeo/geocaching/connector/gc/GCParser.java
+++ b/main/src/cgeo/geocaching/connector/gc/GCParser.java
@@ -32,6 +32,7 @@ import cgeo.geocaching.ui.DirectionImage;
import cgeo.geocaching.utils.CancellableHandler;
import cgeo.geocaching.utils.Log;
import cgeo.geocaching.utils.MatcherWrapper;
+import cgeo.geocaching.utils.RxUtils;
import cgeo.geocaching.utils.SynchronizedDateFormat;
import cgeo.geocaching.utils.TextUtils;
@@ -53,7 +54,6 @@ import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func2;
-import rx.schedulers.Schedulers;
import android.net.Uri;
import android.text.Html;
@@ -1703,7 +1703,7 @@ public abstract class GCParser {
}
return parseLogs(true, rawResponse);
}
- }).subscribeOn(Schedulers.io());
+ }).subscribeOn(RxUtils.networkScheduler);
}
private static Observable<LogEntry> parseLogs(final boolean markAsFriendsLog, final String rawResponse) {
@@ -1870,7 +1870,7 @@ public abstract class GCParser {
return;
}
- final Observable<LogEntry> logs = getLogsFromDetails(page).subscribeOn(Schedulers.computation());
+ final Observable<LogEntry> logs = getLogsFromDetails(page).subscribeOn(RxUtils.computationScheduler);
Observable<LogEntry> specialLogs;
if (Settings.isFriendLogsWanted()) {
CancellableHandler.sendLoadProgressDetail(handler, R.string.cache_dialog_loading_details_status_logs);
diff --git a/main/src/cgeo/geocaching/connector/gc/RecaptchaHandler.java b/main/src/cgeo/geocaching/connector/gc/RecaptchaHandler.java
index c6d8dfe..1c6f5e0 100644
--- a/main/src/cgeo/geocaching/connector/gc/RecaptchaHandler.java
+++ b/main/src/cgeo/geocaching/connector/gc/RecaptchaHandler.java
@@ -4,6 +4,7 @@ import cgeo.geocaching.R;
import cgeo.geocaching.loaders.RecaptchaReceiver;
import cgeo.geocaching.network.Network;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import org.apache.commons.io.IOUtils;
@@ -11,7 +12,6 @@ import rx.Observable;
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
import rx.functions.Func0;
-import rx.schedulers.Schedulers;
import android.annotation.SuppressLint;
import android.app.Activity;
@@ -59,7 +59,7 @@ public class RecaptchaHandler extends Handler {
return Observable.empty();
}
});
- AndroidObservable.bindActivity(activity, captcha).subscribeOn(Schedulers.io()).subscribe(new Action1<Bitmap>() {
+ AndroidObservable.bindActivity(activity, captcha).subscribeOn(RxUtils.networkScheduler).subscribe(new Action1<Bitmap>() {
@Override
public void call(final Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
diff --git a/main/src/cgeo/geocaching/network/HtmlImage.java b/main/src/cgeo/geocaching/network/HtmlImage.java
index ce832fb..7a5851a 100644
--- a/main/src/cgeo/geocaching/network/HtmlImage.java
+++ b/main/src/cgeo/geocaching/network/HtmlImage.java
@@ -43,10 +43,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Date;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public class HtmlImage implements Html.ImageGetter {
@@ -94,7 +90,6 @@ public class HtmlImage implements Html.ImageGetter {
final private PublishSubject<Observable<String>> loading = PublishSubject.create();
final private Observable<String> waitForEnd = Observable.merge(loading).publish().refCount();
final CompositeSubscription subscription = new CompositeSubscription(waitForEnd.subscribe());
- final private Executor downloadExecutor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
public HtmlImage(final String geocode, final boolean returnErrorImage, final int listId, final boolean onlySave) {
this.geocode = geocode;
@@ -164,8 +159,8 @@ public class HtmlImage implements Html.ImageGetter {
if (bitmap != null && !onlySave) {
subscriber.onNext(bitmap);
}
- downloadExecutor.execute(new Runnable() {
- @Override public void run() {
+ RxUtils.networkScheduler.createWorker().schedule(new Action0() {
+ @Override public void call() {
downloadAndSave(subscriber);
}
});
@@ -193,12 +188,10 @@ public class HtmlImage implements Html.ImageGetter {
subscriber.onCompleted();
return;
}
- } else {
- if (subscriber.isUnsubscribed() || downloadOrRefreshCopy(url, file)) {
+ } else if (subscriber.isUnsubscribed() || downloadOrRefreshCopy(url, file)) {
// The existing copy was fresh enough or we were unsubscribed earlier.
subscriber.onCompleted();
return;
- }
}
if (onlySave) {
subscriber.onCompleted();
diff --git a/main/src/cgeo/geocaching/network/StatusUpdater.java b/main/src/cgeo/geocaching/network/StatusUpdater.java
index bf9ebdf..82650d1 100644
--- a/main/src/cgeo/geocaching/network/StatusUpdater.java
+++ b/main/src/cgeo/geocaching/network/StatusUpdater.java
@@ -1,12 +1,13 @@
package cgeo.geocaching.network;
import cgeo.geocaching.CgeoApplication;
+import cgeo.geocaching.utils.RxUtils;
import cgeo.geocaching.utils.Version;
import org.json.JSONException;
import org.json.JSONObject;
+
import rx.functions.Action0;
-import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import android.os.Build.VERSION;
@@ -51,7 +52,7 @@ public class StatusUpdater {
final static public BehaviorSubject<Status> latestStatus = BehaviorSubject.create(Status.defaultStatus(null));
static {
- Schedulers.io().createWorker().schedulePeriodically(new Action0() {
+ RxUtils.networkScheduler.createWorker().schedulePeriodically(new Action0() {
@Override
public void call() {
final JSONObject response =
diff --git a/main/src/cgeo/geocaching/settings/AbstractCheckCredentialsPreference.java b/main/src/cgeo/geocaching/settings/AbstractCheckCredentialsPreference.java
index 1a230c1..8a9f279 100644
--- a/main/src/cgeo/geocaching/settings/AbstractCheckCredentialsPreference.java
+++ b/main/src/cgeo/geocaching/settings/AbstractCheckCredentialsPreference.java
@@ -5,13 +5,14 @@ import cgeo.geocaching.activity.ActivityMixin;
import cgeo.geocaching.enumerations.StatusCode;
import cgeo.geocaching.network.Cookies;
import cgeo.geocaching.ui.dialog.Dialogs;
+import cgeo.geocaching.utils.RxUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
+
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
import rx.functions.Func0;
-import rx.schedulers.Schedulers;
import rx.util.async.Async;
import android.app.ProgressDialog;
@@ -70,7 +71,7 @@ public abstract class AbstractCheckCredentialsPreference extends AbstractClickab
public ImmutablePair<StatusCode, ? extends Drawable> call() {
return login();
}
- })).subscribeOn(Schedulers.io()).subscribe(new Action1<ImmutablePair<StatusCode, ? extends Drawable>>() {
+ })).subscribeOn(RxUtils.networkScheduler).subscribe(new Action1<ImmutablePair<StatusCode, ? extends Drawable>>() {
@Override
public void call(final ImmutablePair<StatusCode, ? extends Drawable> loginInfo) {
loginDialog.dismiss();
diff --git a/main/src/cgeo/geocaching/settings/RegisterSend2CgeoPreference.java b/main/src/cgeo/geocaching/settings/RegisterSend2CgeoPreference.java
index e899be9..84c343a 100644
--- a/main/src/cgeo/geocaching/settings/RegisterSend2CgeoPreference.java
+++ b/main/src/cgeo/geocaching/settings/RegisterSend2CgeoPreference.java
@@ -6,14 +6,16 @@ import cgeo.geocaching.network.Network;
import cgeo.geocaching.network.Parameters;
import cgeo.geocaching.ui.dialog.Dialogs;
import cgeo.geocaching.utils.Log;
+import cgeo.geocaching.utils.RxUtils;
import ch.boye.httpclientandroidlib.HttpResponse;
+
import org.apache.commons.lang3.StringUtils;
+
import rx.Observable;
import rx.android.observables.AndroidObservable;
import rx.functions.Action1;
import rx.functions.Func0;
-import rx.schedulers.Schedulers;
import android.app.ProgressDialog;
import android.content.Context;
@@ -75,7 +77,7 @@ public class RegisterSend2CgeoPreference extends AbstractClickablePreference {
return Observable.empty();
}
- }).firstOrDefault(0)).subscribeOn(Schedulers.io()).subscribe(new Action1<Integer>() {
+ }).firstOrDefault(0)).subscribeOn(RxUtils.networkScheduler).subscribe(new Action1<Integer>() {
@Override
public void call(final Integer pin) {
progressDialog.dismiss();
diff --git a/main/src/cgeo/geocaching/utils/RxUtils.java b/main/src/cgeo/geocaching/utils/RxUtils.java
index b865f0b..241ba78 100644
--- a/main/src/cgeo/geocaching/utils/RxUtils.java
+++ b/main/src/cgeo/geocaching/utils/RxUtils.java
@@ -5,6 +5,10 @@ import rx.Scheduler;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
public class RxUtils {
// Utility class, not to be instanciated
@@ -12,9 +16,10 @@ public class RxUtils {
public final static Scheduler computationScheduler = Schedulers.computation();
+ public static final Scheduler networkScheduler = Schedulers.from(new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
+
public static <T> void waitForCompletion(final BlockingObservable<T> observable) {
observable.lastOrDefault(null);
- return;
}
public static void waitForCompletion(final Observable<?>... observables) {