|
@@ -1,10 +1,7 @@
|
|
package com.ociweb.gl.benchmark;
|
|
package com.ociweb.gl.benchmark;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
@@ -13,9 +10,7 @@ import com.ociweb.gl.api.HTTPResponseService;
|
|
import com.ociweb.pronghorn.network.config.HTTPContentTypeDefaults;
|
|
import com.ociweb.pronghorn.network.config.HTTPContentTypeDefaults;
|
|
import com.ociweb.pronghorn.pipe.ObjectPipe;
|
|
import com.ociweb.pronghorn.pipe.ObjectPipe;
|
|
|
|
|
|
-import io.reactiverse.pgclient.PgConnection;
|
|
|
|
import io.reactiverse.pgclient.PgIterator;
|
|
import io.reactiverse.pgclient.PgIterator;
|
|
-import io.reactiverse.pgclient.PgPreparedQuery;
|
|
|
|
import io.reactiverse.pgclient.Tuple;
|
|
import io.reactiverse.pgclient.Tuple;
|
|
|
|
|
|
public class ProcessUpdate {
|
|
public class ProcessUpdate {
|
|
@@ -31,12 +26,11 @@ public class ProcessUpdate {
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
this.service = service;
|
|
this.service = service;
|
|
this.pm = pm;
|
|
this.pm = pm;
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
- private int randomValue() {
|
|
|
|
- return 1+localRandom.nextInt(10000);
|
|
|
|
- }
|
|
|
|
|
|
|
|
public void tickEvent() {
|
|
public void tickEvent() {
|
|
|
|
|
|
@@ -48,7 +42,10 @@ public class ProcessUpdate {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- //AtomicBoolean run = new AtomicBoolean(true);
|
|
|
|
|
|
+
|
|
|
|
+ private int randomValue() {
|
|
|
|
+ return 1+localRandom.nextInt(10000);
|
|
|
|
+ }
|
|
|
|
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
int queries;
|
|
int queries;
|
|
@@ -57,39 +54,12 @@ public class ProcessUpdate {
|
|
} else {
|
|
} else {
|
|
queries = 1;
|
|
queries = 1;
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
long conId = request.getConnectionId();
|
|
long conId = request.getConnectionId();
|
|
- long seqCode = request.getSequenceCode();
|
|
|
|
-
|
|
|
|
- int temp = requestsInFlight.get();
|
|
|
|
|
|
+ long seqCode = request.getSequenceCode();
|
|
|
|
+ int temp = requestsInFlight.incrementAndGet();
|
|
|
|
|
|
- if ( DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(1+temp) ) {
|
|
|
|
-
|
|
|
|
- PgConnection connection = null;
|
|
|
|
- PgPreparedQuery pQuery = null;
|
|
|
|
- try {
|
|
|
|
- connection = findConnection();
|
|
|
|
- pQuery = prepQuery(connection, "SELECT * FROM world WHERE id=$1");
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- if (null!=connection) {
|
|
|
|
- connection.close();
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- } catch (ExecutionException e) {
|
|
|
|
- if (null!=connection) {
|
|
|
|
- connection.close();
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
|
|
|
|
- final PgConnection con = connection;
|
|
|
|
- final PgPreparedQuery pu = pQuery;
|
|
|
|
-
|
|
|
|
- final AtomicInteger outstanding = new AtomicInteger(queries);
|
|
|
|
- final List<ResultObject> toUpdate = new ArrayList<ResultObject>();
|
|
|
|
- requestsInFlight.incrementAndGet();
|
|
|
|
-
|
|
|
|
int q = queries;
|
|
int q = queries;
|
|
while (--q >= 0) {
|
|
while (--q >= 0) {
|
|
|
|
|
|
@@ -102,8 +72,56 @@ public class ProcessUpdate {
|
|
worldObject.setGroupSize(queries);
|
|
worldObject.setGroupSize(queries);
|
|
|
|
|
|
worldObject.setId(randomValue());
|
|
worldObject.setId(randomValue());
|
|
-
|
|
|
|
- exeQuery(pQuery, con, pu, outstanding, toUpdate, worldObject);
|
|
|
|
|
|
+
|
|
|
|
+ pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
|
|
|
|
+ if (r.succeeded()) {
|
|
|
|
+
|
|
|
|
+ PgIterator resultSet = r.result().iterator();
|
|
|
|
+ Tuple row = resultSet.next();
|
|
|
|
+
|
|
|
|
+ assert(worldObject.getId()==row.getInteger(0));
|
|
|
|
+
|
|
|
|
+ //read the existing random value and store it in the world object
|
|
|
|
+ worldObject.setResult(row.getInteger(1));
|
|
|
|
+ ///////////////////////////////////
|
|
|
|
+ //the object can be used here with the old value
|
|
|
|
+ ///////////////////////////////////
|
|
|
|
+ //set the new random value in this object
|
|
|
|
+ worldObject.setResult(randomValue());
|
|
|
|
+
|
|
|
|
+ //TODO: can we prep this only once and hold it?
|
|
|
|
+
|
|
|
|
+ pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
+ Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {
|
|
|
|
+ if (ar.succeeded()) {
|
|
|
|
+ worldObject.setStatus(200);
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("unable to update");
|
|
|
|
+ if (ar.cause()!=null) {
|
|
|
|
+ ar.cause().printStackTrace();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ worldObject.setStatus(500);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("unable to query");
|
|
|
|
+ if (r.cause()!=null) {
|
|
|
|
+ r.cause().printStackTrace();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ worldObject.setStatus(500);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //on all N responses.....
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
|
|
|
|
@@ -111,117 +129,11 @@ public class ProcessUpdate {
|
|
|
|
|
|
return true;
|
|
return true;
|
|
} else {
|
|
} else {
|
|
|
|
+ requestsInFlight.decrementAndGet();
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void exeQuery(PgPreparedQuery pQuery, final PgConnection con, final PgPreparedQuery pu,
|
|
|
|
- final AtomicInteger outstanding, final List<ResultObject> toUpdate, final ResultObject worldObject) {
|
|
|
|
-
|
|
|
|
- pQuery.execute(
|
|
|
|
- Tuple.of(worldObject.getId()), r -> {
|
|
|
|
- if (r.succeeded()) {
|
|
|
|
-
|
|
|
|
- PgIterator resultSet = r.result().iterator();
|
|
|
|
- Tuple row = resultSet.next();
|
|
|
|
-
|
|
|
|
- assert(worldObject.getId()==row.getInteger(0));
|
|
|
|
-
|
|
|
|
- //read the existing random value and store it in the world object
|
|
|
|
- worldObject.setResult(row.getInteger(1));
|
|
|
|
- ///////////////////////////////////
|
|
|
|
- //the object can be used here with the old value
|
|
|
|
- ///////////////////////////////////
|
|
|
|
- //set the new random value in this object
|
|
|
|
- worldObject.setResult(randomValue());
|
|
|
|
- toUpdate.add(worldObject);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- } else {
|
|
|
|
- //TODO: urgent, unable to call so we must back off and try again!!!!
|
|
|
|
- exeQuery(pQuery, con, pu, outstanding, toUpdate, worldObject);
|
|
|
|
- return;
|
|
|
|
-
|
|
|
|
-// System.out.println("unable to query");
|
|
|
|
-// if (r.cause()!=null) {
|
|
|
|
-// r.cause().printStackTrace();
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// worldObject.setStatus(500);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (0 == outstanding.decrementAndGet()) {
|
|
|
|
- //call update for all the query updates...
|
|
|
|
-
|
|
|
|
- List<Tuple> args = new ArrayList<Tuple>();
|
|
|
|
- toUpdate.forEach(w-> {
|
|
|
|
- args.add(Tuple.of(w.getResult(), w.getId()));
|
|
|
|
- });
|
|
|
|
- Collections.sort(args, (a,b) -> {
|
|
|
|
- return Integer.compare( ((Tuple)a).getInteger(0),
|
|
|
|
- ((Tuple)b).getInteger(0));
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- execUpdate(con, pu, toUpdate, args);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void execUpdate(final PgConnection con, final PgPreparedQuery pu, final List<ResultObject> toUpdate,
|
|
|
|
- List<Tuple> args) {
|
|
|
|
- con.preparedBatch("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
- args, ar -> {
|
|
|
|
-
|
|
|
|
- int status;
|
|
|
|
- if (ar.succeeded()) {
|
|
|
|
- status = 200;
|
|
|
|
- } else {
|
|
|
|
- execUpdate(con, pu, toUpdate, args);
|
|
|
|
- return;
|
|
|
|
-// System.out.println("unable to update");
|
|
|
|
-// if (ar.cause()!=null) {
|
|
|
|
-// ar.cause().printStackTrace();
|
|
|
|
-// }
|
|
|
|
-// status = 500;
|
|
|
|
- }
|
|
|
|
- toUpdate.forEach(w->{
|
|
|
|
- w.setStatus(status);
|
|
|
|
- });
|
|
|
|
- pu.close();
|
|
|
|
- con.close();
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private PgConnection findConnection() throws InterruptedException, ExecutionException {
|
|
|
|
- PgConnection connection;
|
|
|
|
- CompletableFuture<PgConnection> conFu = new CompletableFuture<PgConnection>();
|
|
|
|
- pm.pool().getConnection(h-> {
|
|
|
|
- if (h.succeeded()) {
|
|
|
|
- conFu.complete(h.result());
|
|
|
|
- } else {
|
|
|
|
- conFu.completeExceptionally(h.cause());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- return conFu.get();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- private PgPreparedQuery prepQuery(PgConnection con, String sql) throws InterruptedException, ExecutionException {
|
|
|
|
-
|
|
|
|
- CompletableFuture<PgPreparedQuery> prepFu = new CompletableFuture<PgPreparedQuery>();
|
|
|
|
- con.prepare(sql, h->{
|
|
|
|
- if (h.succeeded()) {
|
|
|
|
- prepFu.complete(h.result());
|
|
|
|
- } else {
|
|
|
|
- prepFu.completeExceptionally(h.cause());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- return prepFu.get();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private void consumeResultObjectDBUpdate(final ResultObject t) {
|
|
private void consumeResultObjectDBUpdate(final ResultObject t) {
|
|
|
|
|
|
//collect all the objects
|
|
//collect all the objects
|