UpdatesMongoAsyncHandler.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package hello;
  2. import static hello.Helper.getQueries;
  3. import static hello.Helper.randomWorldNumber;
  4. import static hello.Helper.sendException;
  5. import static hello.Helper.sendJson;
  6. import com.mongodb.async.client.MongoCollection;
  7. import com.mongodb.async.client.MongoDatabase;
  8. import com.mongodb.client.model.Filters;
  9. import com.mongodb.client.model.UpdateOneModel;
  10. import com.mongodb.client.model.Updates;
  11. import com.mongodb.client.model.WriteModel;
  12. import io.undertow.server.HttpHandler;
  13. import io.undertow.server.HttpServerExchange;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. import java.util.concurrent.CompletableFuture;
  17. import org.bson.Document;
  18. import org.bson.conversions.Bson;
  19. /**
  20. * Handles the updates test using MongoDB with an asynchronous API.
  21. */
  22. final class UpdatesMongoAsyncHandler implements HttpHandler {
  23. private final MongoCollection<Document> worldCollection;
  24. UpdatesMongoAsyncHandler(MongoDatabase db) {
  25. worldCollection = db.getCollection("world");
  26. }
  27. @Override
  28. public void handleRequest(HttpServerExchange exchange) {
  29. int queries = getQueries(exchange);
  30. nUpdatedWorlds(queries).whenComplete(
  31. (worlds, exception) -> {
  32. if (exception != null) {
  33. sendException(exchange, exception);
  34. } else {
  35. sendJson(exchange, worlds);
  36. }
  37. });
  38. }
  39. private CompletableFuture<World[]> nUpdatedWorlds(int n) {
  40. return nWorlds(n).thenCompose(
  41. worlds -> {
  42. List<WriteModel<Document>> writes = new ArrayList<>(worlds.length);
  43. for (World world : worlds) {
  44. world.randomNumber = randomWorldNumber();
  45. Bson filter = Filters.eq(world.id);
  46. Bson update = Updates.set("randomNumber", world.randomNumber);
  47. writes.add(new UpdateOneModel<>(filter, update));
  48. }
  49. CompletableFuture<World[]> next = new CompletableFuture<>();
  50. worldCollection.bulkWrite(
  51. writes,
  52. (result, exception) -> {
  53. if (exception != null) {
  54. next.completeExceptionally(exception);
  55. } else {
  56. next.complete(worlds);
  57. }
  58. });
  59. return next;
  60. });
  61. }
  62. private CompletableFuture<World[]> nWorlds(int n) {
  63. @SuppressWarnings({ "unchecked", "rawtypes" })
  64. CompletableFuture<World>[] futures = new CompletableFuture[n];
  65. for (int i = 0; i < futures.length; i++) {
  66. futures[i] = oneWorld();
  67. }
  68. return CompletableFuture.allOf(futures).thenApply(
  69. nil -> {
  70. World[] worlds = new World[futures.length];
  71. for (int i = 0; i < futures.length; i++) {
  72. worlds[i] = futures[i].join();
  73. }
  74. return worlds;
  75. });
  76. }
  77. private CompletableFuture<World> oneWorld() {
  78. CompletableFuture<World> future = new CompletableFuture<>();
  79. worldCollection
  80. .find(Filters.eq(randomWorldNumber()))
  81. .map(Helper::mongoDocumentToWorld)
  82. .first(
  83. (world, exception) -> {
  84. if (exception != null) {
  85. future.completeExceptionally(exception);
  86. } else {
  87. future.complete(world);
  88. }
  89. });
  90. return future;
  91. }
  92. }