Browse Source

[spring] Improve database tests (#9313)

* [spring-webflux] Improve database tests

- Improve the test reliability with a better random number
  generation mechanism
- Improve the scalability of fortune test

* [spring] Fix postgres deadlock

The now fixed deadlock issue was only visible
under high concurrency and was generating errors
and leading to lower than expected performance.

Sorting the worlds before updating them allows
to avoid this issue.
Sébastien Deleuze 10 months ago
parent
commit
808b21e688

+ 19 - 0
frameworks/Java/spring-webflux/src/main/java/benchmark/Utils.java

@@ -0,0 +1,19 @@
+package benchmark;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+abstract public class Utils {
+
+	private static final int MIN_WORLD_NUMBER = 1;
+	private static final int MAX_WORLD_NUMBER_PLUS_ONE = 10_001;
+
+	public static int randomWorldNumber() {
+		return ThreadLocalRandom.current().nextInt(MIN_WORLD_NUMBER, MAX_WORLD_NUMBER_PLUS_ONE);
+	}
+
+	public static IntStream randomWorldNumbers() {
+		return ThreadLocalRandom.current().ints(MIN_WORLD_NUMBER, MAX_WORLD_NUMBER_PLUS_ONE).distinct();
+	}
+
+}

+ 23 - 8
frameworks/Java/spring-webflux/src/main/java/benchmark/repository/R2dbcDbRepository.java

@@ -1,20 +1,27 @@
 package benchmark.repository;
 
-import benchmark.model.Fortune;
-import benchmark.model.World;
 import org.springframework.context.annotation.Profile;
 import org.springframework.r2dbc.core.DatabaseClient;
 import org.springframework.stereotype.Component;
+
+import benchmark.model.Fortune;
+import benchmark.model.World;
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @Component
 @Profile("r2dbc")
 public class R2dbcDbRepository implements DbRepository {
+
     private final DatabaseClient databaseClient;
+    private final ConnectionFactory connectionFactory;
+    private final ThreadLocal<Mono<? extends Connection>> conn = new ThreadLocal<>();
 
     public R2dbcDbRepository(DatabaseClient databaseClient) {
         this.databaseClient = databaseClient;
+        this.connectionFactory = databaseClient.getConnectionFactory();
     }
 
     @Override
@@ -24,10 +31,9 @@ public class R2dbcDbRepository implements DbRepository {
                 .bind("$1", id)
                 .mapProperties(World.class)
                 .first();
-
     }
 
-    public Mono<World> updateWorld(World world) {
+    private Mono<World> updateWorld(World world) {
         return databaseClient
                 .sql("UPDATE world SET randomnumber=$2 WHERE id = $1")
                 .bind("$1", world.id)
@@ -37,6 +43,8 @@ public class R2dbcDbRepository implements DbRepository {
                 .map(count -> world);
     }
 
+
+    @Override
     public Mono<World> findAndUpdateWorld(int id, int randomNumber) {
         return getWorld(id).flatMap(world -> {
             world.randomnumber = randomNumber;
@@ -46,9 +54,16 @@ public class R2dbcDbRepository implements DbRepository {
 
     @Override
     public Flux<Fortune> fortunes() {
-        return databaseClient
-                .sql("SELECT id, message FROM fortune")
-                .mapProperties(Fortune.class)
-                .all();
+        return getConnection()
+                .flatMapMany(conn -> conn.createStatement("SELECT id, message FROM " + "fortune").execute())
+                .flatMap(result -> result.map(r -> new Fortune(r.get(0, Integer.class), r.get(1, String.class))));
     }
+
+    private Mono<? extends Connection> getConnection() {
+        if (this.conn.get() == null) {
+            this.conn.set(Mono.from(connectionFactory.create()).cache());
+        }
+        return this.conn.get();
+    }
+
 }

+ 6 - 9
frameworks/Java/spring-webflux/src/main/java/benchmark/web/DbHandler.java

@@ -1,8 +1,8 @@
 package benchmark.web;
 
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
+import benchmark.Utils;
 import benchmark.model.Fortune;
 import benchmark.model.World;
 import benchmark.repository.DbRepository;
@@ -29,7 +29,7 @@ public class DbHandler {
     }
 
     public Mono<ServerResponse> db(ServerRequest request) {
-        int id = randomWorldNumber();
+        int id = Utils.randomWorldNumber();
         Mono<World> world = dbRepository.getWorld(id)
                 .switchIfEmpty(Mono.error(new Exception("No World found with Id: " + id)));
 
@@ -41,8 +41,8 @@ public class DbHandler {
     public Mono<ServerResponse> queries(ServerRequest request) {
         int queries = parseQueryCount(request.queryParams().getFirst("queries"));
 
-        Mono<List<World>> worlds = Flux.range(0, queries)
-                .flatMap(i -> dbRepository.getWorld(randomWorldNumber()))
+        Mono<List<World>> worlds = Flux.fromStream(Utils.randomWorldNumbers().limit(queries).boxed())
+                .flatMap(dbRepository::getWorld)
                 .collectList();
 
         return ServerResponse.ok()
@@ -67,8 +67,8 @@ public class DbHandler {
     public Mono<ServerResponse> updates(ServerRequest request) {
         int queries = parseQueryCount(request.queryParams().getFirst("queries"));
 
-        Mono<List<World>> worlds = Flux.range(0, queries)
-                .flatMap(i -> dbRepository.findAndUpdateWorld(randomWorldNumber(), randomWorldNumber()))
+        Mono<List<World>> worlds = Flux.fromStream(Utils.randomWorldNumbers().limit(queries).boxed())
+                .flatMap(i -> dbRepository.findAndUpdateWorld(i, Utils.randomWorldNumber()))
                 .collectList();
 
         return ServerResponse.ok()
@@ -87,7 +87,4 @@ public class DbHandler {
                                 .bodyValue(JStachio.render(new Fortunes(fortunes))));
     }
 
-    private static int randomWorldNumber() {
-        return 1 + ThreadLocalRandom.current().nextInt(10000);
-    }
 }

+ 2 - 0
frameworks/Java/spring/src/main/java/hello/web/DbHandler.java

@@ -1,6 +1,7 @@
 package hello.web;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import hello.Utils;
@@ -52,6 +53,7 @@ public class DbHandler {
 					world.randomNumber = randomNumber;
 					return world;
 				}).limit(queries)
+				.sorted(Comparator.comparingInt(w -> w.id))
 				.toList();
 		dbRepository.updateWorlds(worlds);
 		return ServerResponse.ok()