|
@@ -3,6 +3,8 @@ package com.ociweb.gl.benchmark;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
@@ -11,7 +13,9 @@ import com.ociweb.gl.api.HTTPResponseService;
|
|
import com.ociweb.pronghorn.network.config.HTTPContentTypeDefaults;
|
|
import com.ociweb.pronghorn.network.config.HTTPContentTypeDefaults;
|
|
import com.ociweb.pronghorn.pipe.ObjectPipe;
|
|
import com.ociweb.pronghorn.pipe.ObjectPipe;
|
|
|
|
|
|
|
|
+import io.reactiverse.pgclient.PgConnection;
|
|
import io.reactiverse.pgclient.PgIterator;
|
|
import io.reactiverse.pgclient.PgIterator;
|
|
|
|
+import io.reactiverse.pgclient.PgPreparedQuery;
|
|
import io.reactiverse.pgclient.Tuple;
|
|
import io.reactiverse.pgclient.Tuple;
|
|
|
|
|
|
public class ProcessUpdate {
|
|
public class ProcessUpdate {
|
|
@@ -44,8 +48,7 @@ public class ProcessUpdate {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ //AtomicBoolean run = new AtomicBoolean(true);
|
|
|
|
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
int queries;
|
|
int queries;
|
|
@@ -54,14 +57,38 @@ public class ProcessUpdate {
|
|
} else {
|
|
} else {
|
|
queries = 1;
|
|
queries = 1;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
long conId = request.getConnectionId();
|
|
long conId = request.getConnectionId();
|
|
- long seqCode = request.getSequenceCode();
|
|
|
|
- int temp = requestsInFlight.incrementAndGet();
|
|
|
|
|
|
+ long seqCode = request.getSequenceCode();
|
|
|
|
|
|
- if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
|
|
|
|
+ int temp = requestsInFlight.get();
|
|
|
|
+
|
|
|
|
+ if ( DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(1+temp) ) {
|
|
|
|
+
|
|
|
|
+ PgConnection connection = null;
|
|
|
|
+ PgPreparedQuery pQuery = null;
|
|
|
|
+ try {
|
|
|
|
+ connection = findConnection();
|
|
|
|
+ pQuery = prepQuery(connection, "SELECT * FROM world WHERE id=$1");
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ if (null!=connection) {
|
|
|
|
+ connection.close();
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ } catch (ExecutionException e) {
|
|
|
|
+ if (null!=connection) {
|
|
|
|
+ connection.close();
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ final PgConnection con = connection;
|
|
|
|
+ final PgPreparedQuery pu = pQuery;
|
|
|
|
+
|
|
final AtomicInteger outstanding = new AtomicInteger(queries);
|
|
final AtomicInteger outstanding = new AtomicInteger(queries);
|
|
final List<ResultObject> toUpdate = new ArrayList<ResultObject>();
|
|
final List<ResultObject> toUpdate = new ArrayList<ResultObject>();
|
|
|
|
+ requestsInFlight.incrementAndGet();
|
|
|
|
|
|
int q = queries;
|
|
int q = queries;
|
|
while (--q >= 0) {
|
|
while (--q >= 0) {
|
|
@@ -75,69 +102,8 @@ public class ProcessUpdate {
|
|
worldObject.setGroupSize(queries);
|
|
worldObject.setGroupSize(queries);
|
|
|
|
|
|
worldObject.setId(randomValue());
|
|
worldObject.setId(randomValue());
|
|
-
|
|
|
|
- pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
|
|
|
|
- if (r.succeeded()) {
|
|
|
|
-
|
|
|
|
- PgIterator resultSet = r.result().iterator();
|
|
|
|
- Tuple row = resultSet.next();
|
|
|
|
-
|
|
|
|
- assert(worldObject.getId()==row.getInteger(0));
|
|
|
|
-
|
|
|
|
- //read the existing random value and store it in the world object
|
|
|
|
- worldObject.setResult(row.getInteger(1));
|
|
|
|
- ///////////////////////////////////
|
|
|
|
- //the object can be used here with the old value
|
|
|
|
- ///////////////////////////////////
|
|
|
|
- //set the new random value in this object
|
|
|
|
- worldObject.setResult(randomValue());
|
|
|
|
- toUpdate.add(worldObject);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- } else {
|
|
|
|
- System.out.println("unable to query");
|
|
|
|
- if (r.cause()!=null) {
|
|
|
|
- r.cause().printStackTrace();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- worldObject.setStatus(500);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (0 == outstanding.decrementAndGet()) {
|
|
|
|
- //call update for all the query updates...
|
|
|
|
-
|
|
|
|
- List<Tuple> args = new ArrayList<Tuple>();
|
|
|
|
- toUpdate.forEach(w-> {
|
|
|
|
- args.add(Tuple.of(w.getResult(), w.getId()));
|
|
|
|
- });
|
|
|
|
- Collections.sort(args, (a,b) -> {
|
|
|
|
- return Integer.compare( ((Tuple)a).getInteger(0),
|
|
|
|
- ((Tuple)b).getInteger(0));
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- System.out.println("call for update to "+args.size());
|
|
|
|
- pm.pool().preparedBatch("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
- args, ar -> {
|
|
|
|
-
|
|
|
|
- int status;
|
|
|
|
- if (ar.succeeded()) {
|
|
|
|
- status = 200;
|
|
|
|
- } else {
|
|
|
|
- System.out.println("unable to update");
|
|
|
|
- if (ar.cause()!=null) {
|
|
|
|
- ar.cause().printStackTrace();
|
|
|
|
- }
|
|
|
|
- status = 500;
|
|
|
|
- }
|
|
|
|
- toUpdate.forEach(w->{
|
|
|
|
- w.setStatus(status);
|
|
|
|
- });
|
|
|
|
- System.out.println("finished update for "+toUpdate.size()+" status "+status);
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+
|
|
|
|
+ exeQuery(pQuery, con, pu, outstanding, toUpdate, worldObject);
|
|
|
|
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
|
|
|
|
@@ -145,11 +111,117 @@ public class ProcessUpdate {
|
|
|
|
|
|
return true;
|
|
return true;
|
|
} else {
|
|
} else {
|
|
- requestsInFlight.decrementAndGet();
|
|
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void exeQuery(PgPreparedQuery pQuery, final PgConnection con, final PgPreparedQuery pu,
|
|
|
|
+ final AtomicInteger outstanding, final List<ResultObject> toUpdate, final ResultObject worldObject) {
|
|
|
|
+
|
|
|
|
+ pQuery.execute(
|
|
|
|
+ Tuple.of(worldObject.getId()), r -> {
|
|
|
|
+ if (r.succeeded()) {
|
|
|
|
+
|
|
|
|
+ PgIterator resultSet = r.result().iterator();
|
|
|
|
+ Tuple row = resultSet.next();
|
|
|
|
+
|
|
|
|
+ assert(worldObject.getId()==row.getInteger(0));
|
|
|
|
+
|
|
|
|
+ //read the existing random value and store it in the world object
|
|
|
|
+ worldObject.setResult(row.getInteger(1));
|
|
|
|
+ ///////////////////////////////////
|
|
|
|
+ //the object can be used here with the old value
|
|
|
|
+ ///////////////////////////////////
|
|
|
|
+ //set the new random value in this object
|
|
|
|
+ worldObject.setResult(randomValue());
|
|
|
|
+ toUpdate.add(worldObject);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ //TODO: urgent, unable to call so we must back off and try again!!!!
|
|
|
|
+ exeQuery(pQuery, con, pu, outstanding, toUpdate, worldObject);
|
|
|
|
+ return;
|
|
|
|
+
|
|
|
|
+// System.out.println("unable to query");
|
|
|
|
+// if (r.cause()!=null) {
|
|
|
|
+// r.cause().printStackTrace();
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// worldObject.setStatus(500);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (0 == outstanding.decrementAndGet()) {
|
|
|
|
+ //call update for all the query updates...
|
|
|
|
+
|
|
|
|
+ List<Tuple> args = new ArrayList<Tuple>();
|
|
|
|
+ toUpdate.forEach(w-> {
|
|
|
|
+ args.add(Tuple.of(w.getResult(), w.getId()));
|
|
|
|
+ });
|
|
|
|
+ Collections.sort(args, (a,b) -> {
|
|
|
|
+ return Integer.compare( ((Tuple)a).getInteger(0),
|
|
|
|
+ ((Tuple)b).getInteger(0));
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ execUpdate(con, pu, toUpdate, args);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void execUpdate(final PgConnection con, final PgPreparedQuery pu, final List<ResultObject> toUpdate,
|
|
|
|
+ List<Tuple> args) {
|
|
|
|
+ con.preparedBatch("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
+ args, ar -> {
|
|
|
|
+
|
|
|
|
+ int status;
|
|
|
|
+ if (ar.succeeded()) {
|
|
|
|
+ status = 200;
|
|
|
|
+ } else {
|
|
|
|
+ execUpdate(con, pu, toUpdate, args);
|
|
|
|
+ return;
|
|
|
|
+// System.out.println("unable to update");
|
|
|
|
+// if (ar.cause()!=null) {
|
|
|
|
+// ar.cause().printStackTrace();
|
|
|
|
+// }
|
|
|
|
+// status = 500;
|
|
|
|
+ }
|
|
|
|
+ toUpdate.forEach(w->{
|
|
|
|
+ w.setStatus(status);
|
|
|
|
+ });
|
|
|
|
+ pu.close();
|
|
|
|
+ con.close();
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private PgConnection findConnection() throws InterruptedException, ExecutionException {
|
|
|
|
+ PgConnection connection;
|
|
|
|
+ CompletableFuture<PgConnection> conFu = new CompletableFuture<PgConnection>();
|
|
|
|
+ pm.pool().getConnection(h-> {
|
|
|
|
+ if (h.succeeded()) {
|
|
|
|
+ conFu.complete(h.result());
|
|
|
|
+ } else {
|
|
|
|
+ conFu.completeExceptionally(h.cause());
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ return conFu.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private PgPreparedQuery prepQuery(PgConnection con, String sql) throws InterruptedException, ExecutionException {
|
|
|
|
+
|
|
|
|
+ CompletableFuture<PgPreparedQuery> prepFu = new CompletableFuture<PgPreparedQuery>();
|
|
|
|
+ con.prepare(sql, h->{
|
|
|
|
+ if (h.succeeded()) {
|
|
|
|
+ prepFu.complete(h.result());
|
|
|
|
+ } else {
|
|
|
|
+ prepFu.completeExceptionally(h.cause());
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ return prepFu.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
private void consumeResultObjectDBUpdate(final ResultObject t) {
|
|
private void consumeResultObjectDBUpdate(final ResultObject t) {
|
|
|
|
|
|
//collect all the objects
|
|
//collect all the objects
|