|
@@ -1,6 +1,7 @@
|
|
package com.ociweb.gl.benchmark;
|
|
package com.ociweb.gl.benchmark;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -26,8 +27,12 @@ public class ProcessUpdate {
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
this.service = service;
|
|
this.service = service;
|
|
this.pm = pm;
|
|
this.pm = pm;
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private int randomValue() {
|
|
|
|
+ return 1+localRandom.nextInt(10000);
|
|
|
|
+ }
|
|
|
|
|
|
public void tickEvent() {
|
|
public void tickEvent() {
|
|
|
|
|
|
@@ -40,9 +45,7 @@ public class ProcessUpdate {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- private int randomValue() {
|
|
|
|
- return 1+localRandom.nextInt(10000);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
public boolean updateRestRequest(HTTPRequestReader request) {
|
|
int queries;
|
|
int queries;
|
|
@@ -57,6 +60,9 @@ public class ProcessUpdate {
|
|
|
|
|
|
if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
|
|
|
|
|
|
+ final AtomicInteger outstanding = new AtomicInteger(queries);
|
|
|
|
+ final List<ResultObject> toUpdate = new ArrayList<ResultObject>();
|
|
|
|
+
|
|
int q = queries;
|
|
int q = queries;
|
|
while (--q >= 0) {
|
|
while (--q >= 0) {
|
|
|
|
|
|
@@ -85,20 +91,8 @@ public class ProcessUpdate {
|
|
///////////////////////////////////
|
|
///////////////////////////////////
|
|
//set the new random value in this object
|
|
//set the new random value in this object
|
|
worldObject.setResult(randomValue());
|
|
worldObject.setResult(randomValue());
|
|
|
|
+ toUpdate.add(worldObject);
|
|
|
|
|
|
- pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
- Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {
|
|
|
|
- if (ar.succeeded()) {
|
|
|
|
- worldObject.setStatus(200);
|
|
|
|
- } else {
|
|
|
|
- System.out.println("unable to update");
|
|
|
|
- if (ar.cause()!=null) {
|
|
|
|
- ar.cause().printStackTrace();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- worldObject.setStatus(500);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
|
|
} else {
|
|
} else {
|
|
System.out.println("unable to query");
|
|
System.out.println("unable to query");
|
|
@@ -109,13 +103,40 @@ public class ProcessUpdate {
|
|
worldObject.setStatus(500);
|
|
worldObject.setStatus(500);
|
|
}
|
|
}
|
|
|
|
|
|
- //on all N responses.....
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ if (0 == outstanding.decrementAndGet()) {
|
|
|
|
+ //call update for all the query updates...
|
|
|
|
+
|
|
|
|
+ List<Tuple> args = new ArrayList<Tuple>();
|
|
|
|
+ toUpdate.forEach(w-> {
|
|
|
|
+ args.add(Tuple.of(w.getResult(), w.getId()));
|
|
|
|
+ });
|
|
|
|
+ Collections.sort(args, (a,b) -> {
|
|
|
|
+ return Integer.compare( ((Tuple)a).getInteger(0),
|
|
|
|
+ ((Tuple)b).getInteger(0));
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ System.out.println("call for update to "+args.size());
|
|
|
|
+ pm.pool().preparedBatch("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
+ args, ar -> {
|
|
|
|
+
|
|
|
|
+ int status;
|
|
|
|
+ if (ar.succeeded()) {
|
|
|
|
+ status = 200;
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("unable to update");
|
|
|
|
+ if (ar.cause()!=null) {
|
|
|
|
+ ar.cause().printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ status = 500;
|
|
|
|
+ }
|
|
|
|
+ toUpdate.forEach(w->{
|
|
|
|
+ w.setStatus(status);
|
|
|
|
+ });
|
|
|
|
+ System.out.println("finished update for "+toUpdate.size()+" status "+status);
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
});
|
|
});
|
|
|
|
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|