diff options
| author | Samuel Tardieu <sam@rfc1149.net> | 2014-02-28 23:17:00 +0100 |
|---|---|---|
| committer | Samuel Tardieu <sam@rfc1149.net> | 2014-02-28 23:17:00 +0100 |
| commit | ae8b55b72846248256639aebd1ededc6a79e81df (patch) | |
| tree | c36b932e12e05a6d6102578e91e8a80f80247a2d /main/src/cgeo/geocaching/SearchResult.java | |
| parent | 3b84568a8c300c6993928c5e8dfe5d038f94ce10 (diff) | |
| download | cgeo-ae8b55b72846248256639aebd1ededc6a79e81df.zip cgeo-ae8b55b72846248256639aebd1ededc6a79e81df.tar.gz cgeo-ae8b55b72846248256639aebd1ededc6a79e81df.tar.bz2 | |
Query various connectors in parallel
Diffstat (limited to 'main/src/cgeo/geocaching/SearchResult.java')
| -rw-r--r-- | main/src/cgeo/geocaching/SearchResult.java | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/main/src/cgeo/geocaching/SearchResult.java b/main/src/cgeo/geocaching/SearchResult.java index 46ac38e..12a2522 100644 --- a/main/src/cgeo/geocaching/SearchResult.java +++ b/main/src/cgeo/geocaching/SearchResult.java @@ -1,5 +1,6 @@ package cgeo.geocaching; +import cgeo.geocaching.connector.IConnector; import cgeo.geocaching.connector.gc.GCLogin; import cgeo.geocaching.enumerations.CacheType; import cgeo.geocaching.enumerations.LoadFlags; @@ -11,6 +12,10 @@ import cgeo.geocaching.gcvote.GCVote; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.eclipse.jdt.annotation.Nullable; +import rx.Observable; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.schedulers.Schedulers; import android.os.Parcel; import android.os.Parcelable; @@ -296,4 +301,25 @@ public class SearchResult implements Parcelable { } } + public static <C extends IConnector> SearchResult parallelCombineActive(final Collection<C> connectors, + final Func1<C, SearchResult> func) { + return Observable.from(connectors).parallel(new Func1<Observable<C>, Observable<SearchResult>>() { + @Override + public Observable<SearchResult> call(final Observable<C> cObservable) { + return cObservable.flatMap(new Func1<C, Observable<? extends SearchResult>>() { + @Override + public Observable<? extends SearchResult> call(final C c) { + return c.isActive() ? Observable.from(func.call(c)) : Observable.<SearchResult>empty(); + } + }); + } + }, Schedulers.io()).reduce(new SearchResult(), new Func2<SearchResult, SearchResult, SearchResult>() { + @Override + public SearchResult call(final SearchResult searchResult, final SearchResult searchResult2) { + searchResult.addSearchResult(searchResult2); + return searchResult; + } + }).toBlockingObservable().first(); + } + } |
