|
@@ -1,16 +1,16 @@
|
|
|
package models;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
import io.reactiverse.reactivex.pgclient.PgClient;
|
|
|
import io.reactiverse.reactivex.pgclient.PgIterator;
|
|
|
import io.reactiverse.reactivex.pgclient.Row;
|
|
|
import io.reactiverse.reactivex.pgclient.Tuple;
|
|
|
+import io.reactivex.Observable;
|
|
|
import module.PgClients;
|
|
|
import ratpack.exec.Promise;
|
|
|
import ratpack.rx2.RxRatpack;
|
|
|
-import io.reactivex.Observable;
|
|
|
-
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
|
|
|
public class PgClientRepository implements DbRepository {
|
|
|
private final PgClients pgClients;
|
|
@@ -21,48 +21,45 @@ public class PgClientRepository implements DbRepository {
|
|
|
|
|
|
@Override
|
|
|
public Promise<World> getWorld(int id) {
|
|
|
- return getWorlds(new int[]{id}).map(worlds -> worlds.get(0));
|
|
|
+ return getWorlds(new int[] { id }).map(worlds -> worlds.get(0));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Promise<List<World>> getWorlds(int[] ids) {
|
|
|
|
|
|
- Observable<World>[] observables = new Observable[ids.length];
|
|
|
-
|
|
|
PgClient pgClient = pgClients.getOne();
|
|
|
|
|
|
- for (int i = 0; i < ids.length; i++) {
|
|
|
- observables[i] = pgClient.rxPreparedQuery("SELECT * FROM world WHERE id = $1", Tuple.of(ids[i])).map(rowset -> {
|
|
|
- final Row row = rowset.iterator().next();
|
|
|
+ Observable<World> observable = Observable.range(0, ids.length)
|
|
|
+ .flatMap(i -> pgClient.rxPreparedQuery("SELECT * FROM world WHERE id = $1", Tuple.of(ids[i]))
|
|
|
+ .map(rowset -> {
|
|
|
+ final Row row = rowset.iterator().next();
|
|
|
|
|
|
- return new World(row.getInteger(0), row.getInteger(1));
|
|
|
- }).toObservable();
|
|
|
- }
|
|
|
+ return new World(row.getInteger(0), row.getInteger(1));
|
|
|
+ })
|
|
|
+ .toObservable());
|
|
|
|
|
|
- return getPromise(observables);
|
|
|
+ return RxRatpack.promiseAll(observable);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Promise<List<World>> findAndUpdateWorlds(int[] ids, int[] randomNumbers) {
|
|
|
return getWorlds(ids).flatMap(worlds -> {
|
|
|
- Observable<World>[] observables = new Observable[worlds.size()];
|
|
|
-
|
|
|
PgClient pgClient = pgClients.getOne();
|
|
|
|
|
|
- for (int i = 0; i < worlds.size(); i++) {
|
|
|
- World world = worlds.get(i);
|
|
|
- world.randomNumber = randomNumbers[i];
|
|
|
- observables[i] = pgClient.rxPreparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2", Tuple.of(world.randomNumber, world.id)).map(rowset -> world).toObservable();
|
|
|
- }
|
|
|
-
|
|
|
- return getPromise(observables);
|
|
|
+ Observable<World> observable = Observable.range(0, worlds.size())
|
|
|
+ .flatMap(i -> {
|
|
|
+ World world = worlds.get(i);
|
|
|
+ world.randomNumber = randomNumbers[i];
|
|
|
+ return pgClient
|
|
|
+ .rxPreparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2", Tuple.of(world.randomNumber, world.id))
|
|
|
+ .map(rowset -> world)
|
|
|
+ .toObservable();
|
|
|
+ });
|
|
|
+
|
|
|
+ return RxRatpack.promiseAll(observable);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Promise<List<World>> getPromise(Observable<World>[] observables) {
|
|
|
- return RxRatpack.promiseAll(Observable.mergeArray(observables));
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public Promise<List<Fortune>> fortunes() {
|
|
|
return RxRatpack.promiseAll(pgClients.getOne().rxPreparedQuery("SELECT * FROM fortune").flatMapObservable(pgRowSet -> {
|