Browse Source

Add pgclient to benchmark the asynchronous aspect of ratpack when accessing a database (#4106)

jringuette 6 years ago
parent
commit
ff68751c68
22 changed files with 494 additions and 94 deletions
  1. 4 1
      frameworks/Java/ratpack/README.md
  2. 40 2
      frameworks/Java/ratpack/benchmark_config.json
  3. 4 1
      frameworks/Java/ratpack/build.gradle
  4. 11 0
      frameworks/Java/ratpack/ratpack-jdbc.dockerfile
  5. 23 0
      frameworks/Java/ratpack/ratpack-pgclient.dockerfile
  6. 47 0
      frameworks/Java/ratpack/src/main/java/DatabaseConfig.java
  7. 41 9
      frameworks/Java/ratpack/src/main/java/Main.java
  8. 11 0
      frameworks/Java/ratpack/src/main/java/ProfileConfig.java
  9. 6 12
      frameworks/Java/ratpack/src/main/java/handlers/BaseWorldHandler.java
  10. 4 6
      frameworks/Java/ratpack/src/main/java/handlers/DbHandler.java
  11. 3 22
      frameworks/Java/ratpack/src/main/java/handlers/FortuneHandler.java
  12. 1 1
      frameworks/Java/ratpack/src/main/java/handlers/JsonHandler.java
  13. 4 8
      frameworks/Java/ratpack/src/main/java/handlers/QueryHandler.java
  14. 4 32
      frameworks/Java/ratpack/src/main/java/handlers/UpdateHandler.java
  15. 15 0
      frameworks/Java/ratpack/src/main/java/models/DbRepository.java
  16. 87 0
      frameworks/Java/ratpack/src/main/java/models/JdbcRepository.java
  17. 81 0
      frameworks/Java/ratpack/src/main/java/models/PgClientRepository.java
  18. 23 0
      frameworks/Java/ratpack/src/main/java/module/JdbcRepositoryModule.java
  19. 38 0
      frameworks/Java/ratpack/src/main/java/module/PgClientModule.java
  20. 22 0
      frameworks/Java/ratpack/src/main/java/module/PgClientRepositoryModule.java
  21. 19 0
      frameworks/Java/ratpack/src/main/java/module/PgClients.java
  22. 6 0
      frameworks/Java/ratpack/src/main/resources/application.yml

+ 4 - 1
frameworks/Java/ratpack/README.md

@@ -6,7 +6,9 @@ Ratpack's [hikari module](https://github.com/ratpack/ratpack/tree/master/ratpack
 
 Ratpack's [handlebars module](https://github.com/ratpack/ratpack/tree/master/ratpack-handlebars) is used to render the fortune template.
 
-All accesses to the database are done through plain JDBC using an unbounded thread pool to prevent blocking the main event loop.
+There are two repository implementations.
+* [JdbcRepository](src/main/java/models/JdbcRepository.java) is using JDBC and an unbounded thread pool to prevent blocking the main event loop.
+* [PgClientRepository](src/main/java/models/PgClientRepository.java) is using an asynchronous driver to query the database.
 
 ### Plaintext Test
 
@@ -36,6 +38,7 @@ All accesses to the database are done through plain JDBC using an unbounded thre
 
 * [Java OpenJDK 1.8](http://openjdk.java.net/)
 * [Ratpack 1.5.4](http://ratpack.io/)
+* [reactive-pg-client 0.10.5](https://github.com/reactiverse/reactive-pg-client)
 
 ## Test URLs
 

+ 40 - 2
frameworks/Java/ratpack/benchmark_config.json

@@ -5,6 +5,23 @@
       "default": {
         "json_url": "/json",
         "plaintext_url": "/plaintext",
+        "port": 5050,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "None",
+        "framework": "Ratpack",
+        "language": "Java",
+        "flavor": "None",
+        "orm": "Raw",
+        "platform": "Netty",
+        "webserver": "None",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Ratpack",
+        "notes": "",
+        "versus": "Netty"
+      },
+      "jdbc": {
         "db_url": "/db",
         "query_url": "/queries?queries=",
         "fortune_url": "/fortunes",
@@ -12,7 +29,7 @@
         "port": 5050,
         "approach": "Realistic",
         "classification": "Micro",
-        "database": "MySQL",
+        "database": "Postgres",
         "framework": "Ratpack",
         "language": "Java",
         "flavor": "None",
@@ -21,7 +38,28 @@
         "webserver": "None",
         "os": "Linux",
         "database_os": "Linux",
-        "display_name": "Ratpack",
+        "display_name": "Ratpack-jdbc",
+        "notes": "",
+        "versus": "Netty"
+      },
+      "pgclient": {
+        "db_url": "/db",
+        "query_url": "/queries?queries=",
+        "fortune_url": "/fortunes",
+        "update_url": "/updates?queries=",
+        "port": 5050,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "Ratpack",
+        "language": "Java",
+        "flavor": "None",
+        "orm": "Raw",
+        "platform": "Netty",
+        "webserver": "None",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "Ratpack-pgclient",
         "notes": "",
         "versus": "Netty"
       }

+ 4 - 1
frameworks/Java/ratpack/build.gradle

@@ -21,11 +21,14 @@ dependencies {
     compile 'io.ratpack:ratpack-guice:1.5.4'
     compile 'io.ratpack:ratpack-hikari:1.5.4'
     compile 'io.ratpack:ratpack-handlebars:1.5.4'
+    compile 'io.ratpack:ratpack-rx:1.5.4'
+    compile 'io.reactiverse:reactive-pg-client:0.10.5'
+    compile 'io.vertx:vertx-rx-java:3.5.3'
 
     // Default SLF4J binding.  Note that this is a blocking implementation.
     // See here for a non blocking appender http://logging.apache.org/log4j/2.x/manual/async.html
     runtime 'org.slf4j:slf4j-simple:1.7.25'
-    runtime 'mysql:mysql-connector-java:5.1.47'
+    runtime 'org.postgresql:postgresql:42.2.5'
 
 }
 

+ 11 - 0
frameworks/Java/ratpack/ratpack-jdbc.dockerfile

@@ -0,0 +1,11 @@
+FROM gradle:4.7.0-jdk8 as gradle
+USER root
+WORKDIR /ratpack
+COPY build.gradle build.gradle
+COPY src src
+RUN gradle shadowJar
+
+FROM openjdk:8-jre-slim
+WORKDIR /ratpack
+COPY --from=gradle /ratpack/build/libs/ratpack-all.jar app.jar
+CMD ["java", "-server", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-jar", "app.jar", "profile.name=jdbc"]

+ 23 - 0
frameworks/Java/ratpack/ratpack-pgclient.dockerfile

@@ -0,0 +1,23 @@
+FROM gradle:4.7.0-jdk8 as gradle
+USER root
+WORKDIR /ratpack
+COPY build.gradle build.gradle
+COPY src src
+RUN gradle shadowJar
+
+FROM openjdk:8-jre-slim
+WORKDIR /ratpack
+COPY --from=gradle /ratpack/build/libs/ratpack-all.jar app.jar
+CMD export DBIP=`getent hosts tfb-database | awk '{ print $1 }'` && \
+    java \
+        -server \
+        -XX:+UseNUMA \
+        -XX:+UseParallelGC \
+        -Dvertx.disableMetrics=true \
+        -Dvertx.disableH2c=true \
+        -Dvertx.disableWebsockets=true \
+        -Dvertx.flashPolicyHandler=false \
+        -Dvertx.threadChecks=false \
+        -Dvertx.disableContextTimings=true \
+        -Dvertx.disableTCCL=true \
+        -jar app.jar profile.name=pgclient database.host=$DBIP

+ 47 - 0
frameworks/Java/ratpack/src/main/java/DatabaseConfig.java

@@ -0,0 +1,47 @@
+public class DatabaseConfig {
+    private String schema;
+    private String host;
+    private int port;
+    private String username;
+    private String password;
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+}

+ 41 - 9
frameworks/Java/ratpack/src/main/java/Main.java

@@ -5,23 +5,55 @@ import handlers.JsonHandler;
 import handlers.PlainTextHandler;
 import handlers.QueryHandler;
 import handlers.UpdateHandler;
+import module.JdbcRepositoryModule;
+import module.PgClientModule;
+import module.PgClientRepositoryModule;
 import ratpack.guice.Guice;
 import ratpack.handlebars.HandlebarsModule;
 import ratpack.hikari.HikariModule;
+import ratpack.rx.RxRatpack;
 import ratpack.server.RatpackServer;
 
 public class Main {
     public static void main(String... args) throws Exception {
+        RxRatpack.initialize();
+
         RatpackServer.start(server -> server
-                .serverConfig(c -> c.findBaseDir("handlebars"))
-                .registry(Guice.registry(b -> b
-                        .module(HikariModule.class, hikariConfig -> {
-                            hikariConfig.setJdbcUrl("jdbc:mysql://tfb-database:3306/hello_world?elideSetAutoCommits=true&useLocalSessionState=true&cachePrepStmts=true&cacheCallableStmts=true&alwaysSendSetIsolation=false&cacheServerConfiguration=true&zeroDateTimeBehavior=convertToNull&traceProtocol=false&maintainTimeStats=false");
-                            hikariConfig.setUsername("benchmarkdbuser");
-                            hikariConfig.setPassword("benchmarkdbpass");
-                            hikariConfig.setMaximumPoolSize(Runtime.getRuntime().availableProcessors()*2);
-                        })
-                        .module(HandlebarsModule.class)
+                .serverConfig(c -> {
+                    c
+                            .findBaseDir("application.yml")
+                            .yaml("application.yml")
+                            .args(args)
+                            .require("/database", DatabaseConfig.class);
+                })
+                .registry(Guice.registry(b -> {
+                            DatabaseConfig databaseConfig = b.getServerConfig().get("/database", DatabaseConfig.class);
+                            ProfileConfig profileConfig = b.getServerConfig().get("/profile", ProfileConfig.class);
+
+                            b.module(HandlebarsModule.class);
+
+                            if (profileConfig != null) {
+                                if ("pgclient".equals(profileConfig.getName())) {
+                                    b.module(PgClientModule.class, options -> {
+                                        options.setDatabase(databaseConfig.getSchema());
+                                        options.setHost(databaseConfig.getHost());
+                                        options.setPort(databaseConfig.getPort());
+                                        options.setUser(databaseConfig.getUsername());
+                                        options.setPassword(databaseConfig.getPassword());
+                                        options.setCachePreparedStatements(true);
+                                        options.setMaxSize(1);
+                                    }).module(PgClientRepositoryModule.class);
+                                } else if ("jdbc".equals(profileConfig.getName())) {
+                                    b.module(HikariModule.class, hikariConfig -> {
+                                        hikariConfig.setJdbcUrl(String.format("jdbc:postgresql://%s:%s/%s", databaseConfig.getHost(), databaseConfig.getPort(),
+                                                databaseConfig.getSchema()));
+                                        hikariConfig.setUsername(databaseConfig.getUsername());
+                                        hikariConfig.setPassword(databaseConfig.getPassword());
+                                        hikariConfig.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() * 2);
+                                    }).module(JdbcRepositoryModule.class);
+                                }
+                            }
+                        }
                 ))
                 .handlers(chain -> chain
                         .all(new HeaderHandler())

+ 11 - 0
frameworks/Java/ratpack/src/main/java/ProfileConfig.java

@@ -0,0 +1,11 @@
+public class ProfileConfig {
+    private String name;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

+ 6 - 12
frameworks/Java/ratpack/src/main/java/handlers/BaseWorldHandler.java

@@ -8,6 +8,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 public abstract class BaseWorldHandler extends InjectionHandler {
@@ -30,17 +31,10 @@ public abstract class BaseWorldHandler extends InjectionHandler {
         return Math.min(500, Math.max(1, parsedValue));
     }
 
-    protected World getWorld(Connection connection) {
-        try {
-            PreparedStatement statement = connection.prepareStatement("SELECT id, randomnumber FROM world WHERE id = ?");
-            statement.setInt(1, randomWorldNumber());
-            ResultSet rs = statement.executeQuery();
-            rs.next();
-            World world = new World(rs.getInt(1), rs.getInt(2));
-            statement.close();
-            return world;
-        } catch (SQLException e) {
-            throw new IllegalStateException(e);
-        }
+    protected int[] getNumbers(int count) {
+        int[] ids = new int[count];
+        Arrays.setAll(ids, value -> randomWorldNumber());
+
+        return ids;
     }
 }

+ 4 - 6
frameworks/Java/ratpack/src/main/java/handlers/DbHandler.java

@@ -1,5 +1,7 @@
 package handlers;
 
+import models.DbRepository;
+import models.JdbcRepository;
 import ratpack.exec.Blocking;
 import ratpack.handling.Context;
 
@@ -9,11 +11,7 @@ import java.sql.Connection;
 import static ratpack.jackson.Jackson.json;
 
 public class DbHandler extends BaseWorldHandler {
-    public void handle(Context ctx, DataSource datasource) {
-        Blocking.get(() -> {
-            try (Connection connection = datasource.getConnection()) {
-                return getWorld(connection);
-            }
-        }).then(result -> ctx.render(json(result)));
+    public void handle(Context ctx, DbRepository repository) {
+        repository.getWorld(randomWorldNumber()).then(result -> ctx.render(json(result)));
     }
 }

+ 3 - 22
frameworks/Java/ratpack/src/main/java/handlers/FortuneHandler.java

@@ -1,5 +1,6 @@
 package handlers;
 
+import models.DbRepository;
 import models.Fortune;
 import ratpack.exec.Blocking;
 import ratpack.handlebars.Template;
@@ -17,31 +18,11 @@ import java.util.Comparator;
 import java.util.List;
 
 public class FortuneHandler extends InjectionHandler {
-    public void handle(Context ctx, DataSource datasource) {
-        Blocking.get(() -> {
-            try (Connection connection = datasource.getConnection()) {
-                return getFortunes(connection);
-            }
-        }).then(fortunes -> {
+    public void handle(Context ctx, DbRepository repository) {
+        repository.fortunes().then(fortunes -> {
             fortunes.add(new Fortune(0, "Additional fortune added at request time."));
             fortunes.sort(Comparator.comparing(fortune -> fortune.message));
             ctx.render(Template.handlebarsTemplate("fortunes", Collections.singletonMap("fortunes", fortunes), "text/html; charset=UTF-8"));
         });
     }
-
-    private List<Fortune> getFortunes(Connection connection) {
-        try {
-            PreparedStatement statement = connection.prepareStatement("SELECT id, message FROM fortune");
-            ResultSet rs = statement.executeQuery();
-
-            List<Fortune> fortunes = new ArrayList<>();
-            while (rs.next()) {
-                fortunes.add(new Fortune(rs.getInt(1), rs.getString(2)));
-            }
-            statement.close();
-            return fortunes;
-        } catch (SQLException e) {
-            throw new IllegalStateException(e);
-        }
-    }
 }

+ 1 - 1
frameworks/Java/ratpack/src/main/java/handlers/JsonHandler.java

@@ -12,7 +12,7 @@ public class JsonHandler implements Handler {
     private static final String HELLO = "Hello, World!";
 
     @Override
-    public void handle(Context ctx) throws Exception {
+    public void handle(Context ctx) {
         ctx.render(json(Collections.singletonMap(MESSAGE, HELLO)));
     }
 }

+ 4 - 8
frameworks/Java/ratpack/src/main/java/handlers/QueryHandler.java

@@ -1,5 +1,7 @@
 package handlers;
 
+import models.DbRepository;
+import models.JdbcRepository;
 import models.World;
 import ratpack.exec.Blocking;
 import ratpack.handling.Context;
@@ -11,15 +13,9 @@ import java.util.Arrays;
 import static ratpack.jackson.Jackson.json;
 
 public class QueryHandler extends BaseWorldHandler {
-    public void handle(Context ctx, DataSource datasource) {
+    public void handle(Context ctx, DbRepository repository) {
         int queries = parseQueryCount(ctx);
 
-        Blocking.get(() -> {
-            try (Connection connection = datasource.getConnection()) {
-                World[] worlds = new World[queries];
-                Arrays.setAll(worlds, i -> getWorld(connection));
-                return worlds;
-            }
-        }).then(result -> ctx.render(json(result)));
+        repository.getWorlds(getNumbers(queries)).then(result -> ctx.render(json(result)));
     }
 }

+ 4 - 32
frameworks/Java/ratpack/src/main/java/handlers/UpdateHandler.java

@@ -1,43 +1,15 @@
 package handlers;
 
-import models.World;
-import ratpack.exec.Blocking;
+import models.DbRepository;
+import models.JdbcRepository;
 import ratpack.handling.Context;
 
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-
 import static ratpack.jackson.Jackson.json;
 
 public class UpdateHandler extends BaseWorldHandler {
-    public void handle(Context ctx, DataSource datasource) throws Exception {
+    public void handle(Context ctx, DbRepository repository) {
         int queries = parseQueryCount(ctx);
 
-        Blocking.get(() -> {
-            try (Connection connection = datasource.getConnection()) {
-                World[] worlds = new World[queries];
-                Arrays.setAll(worlds, i -> getWorld(connection));
-                update(worlds, connection);
-                return worlds;
-            }
-        }).then(result -> ctx.render(json(result)));
-    }
-
-    private void update(World[] worlds, Connection connection) {
-        try {
-            for (World world : worlds) {
-                PreparedStatement statement = connection.prepareStatement("UPDATE world SET randomnumber = ? WHERE id = ?");
-                world.randomNumber = randomWorldNumber();
-                statement.setInt(1, world.randomNumber);
-                statement.setInt(2, world.id);
-                statement.executeUpdate();
-                statement.close();
-            }
-        } catch (SQLException e) {
-            throw new IllegalStateException(e);
-        }
+        repository.findAndUpdateWorlds(getNumbers(queries), getNumbers(queries)).then(result -> ctx.render(json(result)));
     }
 }

+ 15 - 0
frameworks/Java/ratpack/src/main/java/models/DbRepository.java

@@ -0,0 +1,15 @@
+package models;
+
+import ratpack.exec.Promise;
+
+import java.util.List;
+
+public interface DbRepository {
+    Promise<World> getWorld(int id);
+
+    Promise<List<World>> getWorlds(int[] ids);
+
+    Promise<List<World>> findAndUpdateWorlds(int[] ids, int[] randomNumbers);
+
+    Promise<List<Fortune>> fortunes();
+}

+ 87 - 0
frameworks/Java/ratpack/src/main/java/models/JdbcRepository.java

@@ -0,0 +1,87 @@
+package models;
+
+import ratpack.exec.Blocking;
+import ratpack.exec.Promise;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class JdbcRepository implements DbRepository {
+    private final DataSource dataSource;
+
+    public JdbcRepository(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public Promise<World> getWorld(int id) {
+        return getWorlds(new int[]{id}).map(worlds -> worlds.get(0));
+    }
+
+    @Override
+    public Promise<List<World>> getWorlds(int[] ids) {
+        return Blocking.get(() -> {
+            World[] worlds = new World[ids.length];
+            try (Connection connection = dataSource.getConnection()) {
+                Arrays.setAll(worlds, value -> {
+                    try {
+                        PreparedStatement statement = connection.prepareStatement("SELECT id, randomnumber FROM world WHERE id = ?");
+                        statement.setInt(1, ids[value]);
+                        ResultSet rs = statement.executeQuery();
+                        rs.next();
+                        World world = new World(rs.getInt(1), rs.getInt(2));
+                        statement.close();
+                        return world;
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+
+            return Arrays.asList(worlds);
+        });
+    }
+
+    @Override
+    public Promise<List<World>> findAndUpdateWorlds(int[] ids, int[] randomNumbers) {
+        return getWorlds(ids).flatMap(worlds -> {
+            return Blocking.get(() -> {
+                try (Connection connection = dataSource.getConnection()) {
+                    int i = 0;
+                    for(World world: worlds) {
+                        PreparedStatement statement = connection.prepareStatement("UPDATE world SET randomnumber = ? WHERE id = ?");
+                        world.randomNumber = randomNumbers[i++];
+                        statement.setInt(1, world.randomNumber);
+                        statement.setInt(2, world.id);
+                        statement.executeUpdate();
+                        statement.close();
+                    }
+                    return worlds;
+                }
+            });
+        });
+    }
+
+    @Override
+    public Promise<List<Fortune>> fortunes() {
+        return Blocking.get(() -> {
+            try (Connection connection = dataSource.getConnection()) {
+                PreparedStatement statement = connection.prepareStatement("SELECT id, message FROM fortune");
+                ResultSet rs = statement.executeQuery();
+
+                List<Fortune> fortunes = new ArrayList<>();
+                while (rs.next()) {
+                    fortunes.add(new Fortune(rs.getInt(1), rs.getString(2)));
+                }
+                statement.close();
+                return fortunes;
+            }
+        });
+    }
+}

+ 81 - 0
frameworks/Java/ratpack/src/main/java/models/PgClientRepository.java

@@ -0,0 +1,81 @@
+package models;
+
+import io.reactiverse.rxjava.pgclient.PgClient;
+import io.reactiverse.rxjava.pgclient.PgIterator;
+import io.reactiverse.rxjava.pgclient.Row;
+import io.reactiverse.rxjava.pgclient.Tuple;
+import module.PgClients;
+import ratpack.exec.Promise;
+import ratpack.rx.RxRatpack;
+import rx.Observable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PgClientRepository implements DbRepository {
+    private final PgClients pgClients;
+
+    public PgClientRepository(PgClients pgClients) {
+        this.pgClients = pgClients;
+    }
+
+    @Override
+    public Promise<World> getWorld(int id) {
+        return getWorlds(new int[]{id}).map(worlds -> worlds.get(0));
+    }
+
+    @Override
+    public Promise<List<World>> getWorlds(int[] ids) {
+
+        Observable<World>[] observables = new Observable[ids.length];
+
+        PgClient pgClient = pgClients.getOne();
+
+        for (int i = 0; i < ids.length; i++) {
+            observables[i] = pgClient.rxPreparedQuery("SELECT * FROM world WHERE id = $1", Tuple.of(ids[i])).map(rowset -> {
+                final Row row = rowset.iterator().next();
+
+                return new World(row.getInteger(0), row.getInteger(1));
+            }).toObservable();
+        }
+
+        return getPromise(observables);
+    }
+
+    @Override
+    public Promise<List<World>> findAndUpdateWorlds(int[] ids, int[] randomNumbers) {
+        return getWorlds(ids).flatMap(worlds -> {
+            Observable<World>[] observables = new Observable[worlds.size()];
+
+            PgClient pgClient = pgClients.getOne();
+
+            for (int i = 0; i < worlds.size(); i++) {
+                World world = worlds.get(i);
+                world.randomNumber = randomNumbers[i];
+                observables[i] = pgClient.rxPreparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2", Tuple.of(world.randomNumber, world.id)).map(rowset -> world).toObservable();
+            }
+
+            return getPromise(observables);
+        });
+    }
+
+    private Promise<List<World>> getPromise(Observable<World>[] observables) {
+        return RxRatpack.promiseSingle(
+                Observable.merge(observables)
+                        .collect(() -> new ArrayList<World>(), (worlds, world) -> worlds.add(world)));
+    }
+
+    @Override
+    public Promise<List<Fortune>> fortunes() {
+        return RxRatpack.promise(pgClients.getOne().rxPreparedQuery("SELECT * FROM fortune").flatMapObservable(pgRowSet -> {
+            PgIterator resultSet = pgRowSet.iterator();
+            List<Fortune> fortunes = new ArrayList<>(pgRowSet.size());
+            while (resultSet.hasNext()) {
+                Tuple row = resultSet.next();
+                fortunes.add(new Fortune(row.getInteger(0), row.getString(1)));
+            }
+
+            return Observable.from(fortunes);
+        }));
+    }
+}

+ 23 - 0
frameworks/Java/ratpack/src/main/java/module/JdbcRepositoryModule.java

@@ -0,0 +1,23 @@
+package module;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.reactiverse.pgclient.PgPoolOptions;
+import io.reactiverse.rxjava.pgclient.PgClient;
+import models.JdbcRepository;
+import ratpack.guice.ConfigurableModule;
+
+import javax.sql.DataSource;
+
+public class JdbcRepositoryModule extends AbstractModule {
+    @Provides
+    @Singleton
+    public JdbcRepository jdbcRepository(DataSource dataSource) {
+        return new JdbcRepository(dataSource);
+    }
+
+    @Override
+    protected void configure() {
+    }
+}

+ 38 - 0
frameworks/Java/ratpack/src/main/java/module/PgClientModule.java

@@ -0,0 +1,38 @@
+package module;
+
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.reactiverse.pgclient.PgPoolOptions;
+import io.reactiverse.rxjava.pgclient.PgClient;
+import io.vertx.rxjava.core.Vertx;
+import ratpack.guice.ConfigurableModule;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PgClientModule extends ConfigurableModule<PgPoolOptions> {
+
+    @Provides
+    @Singleton
+    public Vertx vertx() {
+        return Vertx.vertx();
+    }
+
+    @Provides
+    @Singleton
+    public PgClients pgClients(PgPoolOptions options, Vertx vertx) {
+        List<PgClient> clients = new ArrayList<>();
+
+        for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
+            clients.add(PgClient.pool(vertx, options));
+        }
+
+        return new PgClients(clients);
+    }
+
+    @Override
+    protected void configure() {
+
+    }
+
+}

+ 22 - 0
frameworks/Java/ratpack/src/main/java/module/PgClientRepositoryModule.java

@@ -0,0 +1,22 @@
+package module;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.reactiverse.rxjava.pgclient.PgClient;
+import models.JdbcRepository;
+import models.PgClientRepository;
+
+import javax.sql.DataSource;
+
+public class PgClientRepositoryModule extends AbstractModule {
+    @Provides
+    @Singleton
+    public PgClientRepository pgClientRepository(PgClients pgClients) {
+        return new PgClientRepository(pgClients);
+    }
+
+    @Override
+    protected void configure() {
+    }
+}

+ 19 - 0
frameworks/Java/ratpack/src/main/java/module/PgClients.java

@@ -0,0 +1,19 @@
+package module;
+
+import io.reactiverse.rxjava.pgclient.PgClient;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+public class PgClients {
+    private final Iterator<PgClient> iterator;
+
+    public PgClients(Collection<PgClient> clients) {
+        this.iterator = Stream.generate(() -> clients).flatMap(Collection::stream).iterator();
+    }
+
+    public synchronized PgClient getOne() {
+        return iterator.next();
+    }
+}

+ 6 - 0
frameworks/Java/ratpack/src/main/resources/application.yml

@@ -0,0 +1,6 @@
+database:
+  schema: hello_world
+  host: tfb-database
+  port: 5432
+  username: benchmarkdbuser
+  password: benchmarkdbpass