|
@@ -4,7 +4,6 @@ import static hello.Helper.getQueries;
|
|
|
import static hello.Helper.randomWorld;
|
|
|
import static hello.Helper.sendException;
|
|
|
import static hello.Helper.sendJson;
|
|
|
-import static hello.Helper.toCompletableFuture;
|
|
|
|
|
|
import com.mongodb.async.client.MongoCollection;
|
|
|
import com.mongodb.async.client.MongoDatabase;
|
|
@@ -17,7 +16,6 @@ import io.undertow.server.HttpServerExchange;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.stream.IntStream;
|
|
|
import org.bson.Document;
|
|
|
import org.bson.conversions.Bson;
|
|
|
|
|
@@ -33,53 +31,70 @@ final class UpdatesMongoAsyncHandler implements HttpHandler {
|
|
|
|
|
|
@Override
|
|
|
public void handleRequest(HttpServerExchange exchange) {
|
|
|
- IntStream
|
|
|
- .range(0, getQueries(exchange))
|
|
|
- .mapToObj(
|
|
|
- i -> {
|
|
|
- CompletableFuture<World> future = new CompletableFuture<>();
|
|
|
- worldCollection
|
|
|
- .find(Filters.eq(randomWorld()))
|
|
|
- .map(Helper::mongoDocumentToWorld)
|
|
|
- .first(
|
|
|
- (world, exception) -> {
|
|
|
- if (exception != null) {
|
|
|
- future.completeExceptionally(exception);
|
|
|
- } else {
|
|
|
- future.complete(world);
|
|
|
- }
|
|
|
- });
|
|
|
- return future;
|
|
|
- })
|
|
|
- .collect(toCompletableFuture())
|
|
|
- .thenCompose(
|
|
|
- worlds -> {
|
|
|
- List<WriteModel<Document>> writes = new ArrayList<>(worlds.size());
|
|
|
- for (World world : worlds) {
|
|
|
- world.randomNumber = randomWorld();
|
|
|
- Bson filter = Filters.eq(world.id);
|
|
|
- Bson update = Updates.set("randomNumber", world.randomNumber);
|
|
|
- writes.add(new UpdateOneModel<>(filter, update));
|
|
|
- }
|
|
|
- CompletableFuture<List<World>> next = new CompletableFuture<>();
|
|
|
- worldCollection.bulkWrite(
|
|
|
- writes,
|
|
|
- (result, exception) -> {
|
|
|
- if (exception != null) {
|
|
|
- next.completeExceptionally(exception);
|
|
|
- } else {
|
|
|
- next.complete(worlds);
|
|
|
- }
|
|
|
- });
|
|
|
- return next;
|
|
|
- })
|
|
|
- .whenComplete(
|
|
|
- (worlds, exception) -> {
|
|
|
+ int queries = getQueries(exchange);
|
|
|
+ nUpdatedWorlds(queries).whenComplete(
|
|
|
+ (worlds, exception) -> {
|
|
|
+ if (exception != null) {
|
|
|
+ sendException(exchange, exception);
|
|
|
+ } else {
|
|
|
+ sendJson(exchange, worlds);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<World[]> nUpdatedWorlds(int n) {
|
|
|
+ return nWorlds(n).thenCompose(
|
|
|
+ worlds -> {
|
|
|
+ List<WriteModel<Document>> writes = new ArrayList<>(worlds.length);
|
|
|
+ for (World world : worlds) {
|
|
|
+ world.randomNumber = randomWorld();
|
|
|
+ Bson filter = Filters.eq(world.id);
|
|
|
+ Bson update = Updates.set("randomNumber", world.randomNumber);
|
|
|
+ writes.add(new UpdateOneModel<>(filter, update));
|
|
|
+ }
|
|
|
+ CompletableFuture<World[]> next = new CompletableFuture<>();
|
|
|
+ worldCollection.bulkWrite(
|
|
|
+ writes,
|
|
|
+ (result, exception) -> {
|
|
|
+ if (exception != null) {
|
|
|
+ next.completeExceptionally(exception);
|
|
|
+ } else {
|
|
|
+ next.complete(worlds);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return next;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<World[]> nWorlds(int n) {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ CompletableFuture<World>[] futures = new CompletableFuture[n];
|
|
|
+ for (int i = 0; i < futures.length; i++) {
|
|
|
+ futures[i] = oneWorld();
|
|
|
+ }
|
|
|
+ return CompletableFuture.allOf(futures).thenApply(
|
|
|
+ nil -> {
|
|
|
+ World[] worlds = new World[futures.length];
|
|
|
+ for (int i = 0; i < futures.length; i++) {
|
|
|
+ worlds[i] = futures[i].join();
|
|
|
+ }
|
|
|
+ return worlds;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private CompletableFuture<World> oneWorld() {
|
|
|
+ CompletableFuture<World> future = new CompletableFuture<>();
|
|
|
+ worldCollection
|
|
|
+ .find(Filters.eq(randomWorld()))
|
|
|
+ .map(Helper::mongoDocumentToWorld)
|
|
|
+ .first(
|
|
|
+ (world, exception) -> {
|
|
|
if (exception != null) {
|
|
|
- sendException(exchange, exception);
|
|
|
+ future.completeExceptionally(exception);
|
|
|
} else {
|
|
|
- sendJson(exchange, worlds);
|
|
|
+ future.complete(world);
|
|
|
}
|
|
|
});
|
|
|
+ return future;
|
|
|
}
|
|
|
}
|