|
@@ -1,16 +1,25 @@
|
|
package net.officefloor.benchmark;
|
|
package net.officefloor.benchmark;
|
|
|
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
|
|
import io.netty.channel.unix.Socket;
|
|
import io.netty.channel.unix.Socket;
|
|
import io.r2dbc.pool.PoolingConnectionFactoryProvider;
|
|
import io.r2dbc.pool.PoolingConnectionFactoryProvider;
|
|
|
|
+import io.r2dbc.postgresql.api.PostgresqlException;
|
|
import io.r2dbc.spi.Batch;
|
|
import io.r2dbc.spi.Batch;
|
|
import io.r2dbc.spi.Connection;
|
|
import io.r2dbc.spi.Connection;
|
|
import io.r2dbc.spi.ConnectionFactories;
|
|
import io.r2dbc.spi.ConnectionFactories;
|
|
import io.r2dbc.spi.ConnectionFactory;
|
|
import io.r2dbc.spi.ConnectionFactory;
|
|
import io.r2dbc.spi.ConnectionFactoryOptions;
|
|
import io.r2dbc.spi.ConnectionFactoryOptions;
|
|
|
|
+import net.officefloor.cache.Cache;
|
|
|
|
+import net.officefloor.cache.constant.ConstantCacheManagedObjectSource;
|
|
|
|
+import net.officefloor.frame.api.managedobject.ManagedObject;
|
|
|
|
+import net.officefloor.frame.util.ManagedObjectSourceStandAlone;
|
|
|
|
+import net.officefloor.frame.util.ManagedObjectUserStandAlone;
|
|
|
|
+import net.officefloor.plugin.managedobject.poll.StatePollContext;
|
|
import net.officefloor.server.RequestHandler;
|
|
import net.officefloor.server.RequestHandler;
|
|
import net.officefloor.server.http.parse.HttpRequestParser;
|
|
import net.officefloor.server.http.parse.HttpRequestParser;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
@@ -38,7 +47,7 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
/**
|
|
/**
|
|
* Run application.
|
|
* Run application.
|
|
*/
|
|
*/
|
|
- public static void main(String[] args) throws Exception {
|
|
|
|
|
|
+ public static void main(String[] args) throws Throwable {
|
|
|
|
|
|
// Increase the buffer size (note: too high and cause OOM issues)
|
|
// Increase the buffer size (note: too high and cause OOM issues)
|
|
System.setProperty("reactor.bufferSize.small", String.valueOf(QUERY_BUFFER_SIZE));
|
|
System.setProperty("reactor.bufferSize.small", String.valueOf(QUERY_BUFFER_SIZE));
|
|
@@ -58,6 +67,11 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
*/
|
|
*/
|
|
private final ThreadLocal<Connection[]> threadLocalConnections;
|
|
private final ThreadLocal<Connection[]> threadLocalConnections;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * {@link Cache} of {@link CachedWorld}.
|
|
|
|
+ */
|
|
|
|
+ private final Cache<Integer, CachedWorld> cache;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Instantiate.
|
|
* Instantiate.
|
|
*
|
|
*
|
|
@@ -68,8 +82,9 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
* @param username Username.
|
|
* @param username Username.
|
|
* @param password Password.
|
|
* @param password Password.
|
|
*/
|
|
*/
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
public R2dbcOfficeFloorMain(int socketCount, String server, int port, String database, String username,
|
|
public R2dbcOfficeFloorMain(int socketCount, String server, int port, String database, String username,
|
|
- String password) {
|
|
|
|
|
|
+ String password) throws Throwable {
|
|
|
|
|
|
// Must have enough connection capacity for initial load (+1 for rounding)
|
|
// Must have enough connection capacity for initial load (+1 for rounding)
|
|
int requiredConnectionsPerSocket = (QUERY_LOAD_CAPACITY / (socketCount * QUERY_BUFFER_SIZE)) + 1;
|
|
int requiredConnectionsPerSocket = (QUERY_LOAD_CAPACITY / (socketCount * QUERY_BUFFER_SIZE)) + 1;
|
|
@@ -99,6 +114,48 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
return connections;
|
|
return connections;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
+
|
|
|
|
+ // Provide the cache
|
|
|
|
+ ManagedObjectSourceStandAlone source = new ManagedObjectSourceStandAlone();
|
|
|
|
+ source.registerInvokeProcessServicer(0, (processIndex, parameter, managedObject) -> {
|
|
|
|
+
|
|
|
|
+ // Poll database for cached data
|
|
|
|
+ StatePollContext<Map<Integer, CachedWorld>> pollContext = (StatePollContext<Map<Integer, CachedWorld>>) parameter;
|
|
|
|
+ Map<Integer, CachedWorld> data = new HashMap<>();
|
|
|
|
+ try {
|
|
|
|
+ Flux.from(connectionFactory.create()).flatMap((connection) -> {
|
|
|
|
+ return Flux.from(connection.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD").execute())
|
|
|
|
+ .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
|
+ Integer randomNumber = row.get(1, Integer.class);
|
|
|
|
+ CachedWorld cachedWorld = new CachedWorld(id, randomNumber);
|
|
|
|
+ data.put(id, cachedWorld);
|
|
|
|
+ return cachedWorld;
|
|
|
|
+ }))).last().flatMap(ignore -> Mono.from(connection.close()));
|
|
|
|
+ }).blockLast();
|
|
|
|
+ pollContext.setNextState(data, -1, null);
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ ManagedObject cacheMo = new ManagedObjectUserStandAlone()
|
|
|
|
+ .sourceManagedObject(source.loadManagedObjectSource(ConstantCacheManagedObjectSource.class));
|
|
|
|
+ this.cache = (Cache<Integer, CachedWorld>) cacheMo.getObject();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void sendDatabaseError(Throwable failure, AbstractSendResponse response) {
|
|
|
|
+
|
|
|
|
+ // Handle issue of prepared statement not found
|
|
|
|
+ // (seems unsafe memory issue in R2DBC that occurs during start then stops)
|
|
|
|
+ if (failure instanceof PostgresqlException) {
|
|
|
|
+ PostgresqlException postgresqlException = (PostgresqlException) failure;
|
|
|
|
+ if ("26000".equals(postgresqlException.getErrorDetails().getCode())) {
|
|
|
|
+ // Prepared statement not existing
|
|
|
|
+ response.sendError(503); // consider overloaded in connection setup during warm up
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Just send the failure
|
|
|
|
+ response.sendError(failure);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -137,7 +194,7 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
}))).publishOn(conn.writeScheduler).subscribe(world -> {
|
|
}))).publishOn(conn.writeScheduler).subscribe(world -> {
|
|
sender.sendDb(world);
|
|
sender.sendDb(world);
|
|
}, error -> {
|
|
}, error -> {
|
|
- sender.sendError(error);
|
|
|
|
|
|
+ this.sendDatabaseError(error, sender);
|
|
}, () -> {
|
|
}, () -> {
|
|
conn.processed(1);
|
|
conn.processed(1);
|
|
});
|
|
});
|
|
@@ -164,7 +221,7 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
}))).collectList().publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
}))).collectList().publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
sender.sendQueries(worlds.toArray(World[]::new));
|
|
sender.sendQueries(worlds.toArray(World[]::new));
|
|
}, error -> {
|
|
}, error -> {
|
|
- sender.sendError(error);
|
|
|
|
|
|
+ this.sendDatabaseError(error, sender);
|
|
}, () -> {
|
|
}, () -> {
|
|
conn.processed(queryCount);
|
|
conn.processed(queryCount);
|
|
});
|
|
});
|
|
@@ -189,7 +246,7 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
}))).collectList().publishOn(conn.writeScheduler).subscribe(fortunes -> {
|
|
}))).collectList().publishOn(conn.writeScheduler).subscribe(fortunes -> {
|
|
sender.sendFortunes(fortunes);
|
|
sender.sendFortunes(fortunes);
|
|
}, error -> {
|
|
}, error -> {
|
|
- sender.sendError(error);
|
|
|
|
|
|
+ this.sendDatabaseError(error, sender);
|
|
}, () -> {
|
|
}, () -> {
|
|
conn.processed(1);
|
|
conn.processed(1);
|
|
});
|
|
});
|
|
@@ -233,12 +290,33 @@ public class R2dbcOfficeFloorMain implements DatabaseOperations {
|
|
}).publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
}).publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
sender.sendUpdate(worlds.toArray(World[]::new));
|
|
sender.sendUpdate(worlds.toArray(World[]::new));
|
|
}, error -> {
|
|
}, error -> {
|
|
- sender.sendError(error);
|
|
|
|
|
|
+ this.sendDatabaseError(error, sender);
|
|
}, () -> {
|
|
}, () -> {
|
|
conn.processed(executeQueryCount);
|
|
conn.processed(executeQueryCount);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void cached(int queryCount, CachedSendResponse sender) {
|
|
|
|
+
|
|
|
|
+ // Set up for unique numbers
|
|
|
|
+ ThreadLocalRandom random = ThreadLocalRandom.current();
|
|
|
|
+
|
|
|
|
+ // Obtain the list of cached worlds
|
|
|
|
+ CachedWorld[] cachedWorlds = new CachedWorld[queryCount];
|
|
|
|
+ for (int i = 0; i < cachedWorlds.length; i++) {
|
|
|
|
+
|
|
|
|
+ // Obtain unique identifier
|
|
|
|
+ int randomNumber = random.nextInt(1, 10001);
|
|
|
|
+
|
|
|
|
+ // Obtain the cached world
|
|
|
|
+ cachedWorlds[i] = cache.get(randomNumber);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Send cached worlds
|
|
|
|
+ sender.sendCached(cachedWorlds);
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class RateLimit {
|
|
private static class RateLimit {
|
|
|
|
|
|
private final RateLimitedConnection[] rateLimitedConnections;
|
|
private final RateLimitedConnection[] rateLimitedConnections;
|