aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/cgeo/geocaching/SearchResult.java
diff options
context:
space:
mode:
authorSamuel Tardieu <sam@rfc1149.net>2014-02-28 23:17:00 +0100
committerSamuel Tardieu <sam@rfc1149.net>2014-02-28 23:17:00 +0100
commitae8b55b72846248256639aebd1ededc6a79e81df (patch)
treec36b932e12e05a6d6102578e91e8a80f80247a2d /main/src/cgeo/geocaching/SearchResult.java
parent3b84568a8c300c6993928c5e8dfe5d038f94ce10 (diff)
downloadcgeo-ae8b55b72846248256639aebd1ededc6a79e81df.zip
cgeo-ae8b55b72846248256639aebd1ededc6a79e81df.tar.gz
cgeo-ae8b55b72846248256639aebd1ededc6a79e81df.tar.bz2
Query various connectors in parallel
Diffstat (limited to 'main/src/cgeo/geocaching/SearchResult.java')
-rw-r--r--main/src/cgeo/geocaching/SearchResult.java26
1 files changed, 26 insertions, 0 deletions
diff --git a/main/src/cgeo/geocaching/SearchResult.java b/main/src/cgeo/geocaching/SearchResult.java
index 46ac38e..12a2522 100644
--- a/main/src/cgeo/geocaching/SearchResult.java
+++ b/main/src/cgeo/geocaching/SearchResult.java
@@ -1,5 +1,6 @@
package cgeo.geocaching;
+import cgeo.geocaching.connector.IConnector;
import cgeo.geocaching.connector.gc.GCLogin;
import cgeo.geocaching.enumerations.CacheType;
import cgeo.geocaching.enumerations.LoadFlags;
@@ -11,6 +12,10 @@ import cgeo.geocaching.gcvote.GCVote;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jdt.annotation.Nullable;
+import rx.Observable;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.schedulers.Schedulers;
import android.os.Parcel;
import android.os.Parcelable;
@@ -296,4 +301,25 @@ public class SearchResult implements Parcelable {
}
}
+ public static <C extends IConnector> SearchResult parallelCombineActive(final Collection<C> connectors,
+ final Func1<C, SearchResult> func) {
+ return Observable.from(connectors).parallel(new Func1<Observable<C>, Observable<SearchResult>>() {
+ @Override
+ public Observable<SearchResult> call(final Observable<C> cObservable) {
+ return cObservable.flatMap(new Func1<C, Observable<? extends SearchResult>>() {
+ @Override
+ public Observable<? extends SearchResult> call(final C c) {
+ return c.isActive() ? Observable.from(func.call(c)) : Observable.<SearchResult>empty();
+ }
+ });
+ }
+ }, Schedulers.io()).reduce(new SearchResult(), new Func2<SearchResult, SearchResult, SearchResult>() {
+ @Override
+ public SearchResult call(final SearchResult searchResult, final SearchResult searchResult2) {
+ searchResult.addSearchResult(searchResult2);
+ return searchResult;
+ }
+ }).toBlockingObservable().first();
+ }
+
}