|
@@ -3,6 +3,7 @@ package com.ociweb.gl.benchmark;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import com.ociweb.gl.api.HTTPRequestReader;
|
|
|
import com.ociweb.gl.api.HTTPResponseService;
|
|
@@ -15,11 +16,11 @@ import io.reactiverse.pgclient.Tuple;
|
|
|
public class ProcessUpdate {
|
|
|
|
|
|
private transient ObjectPipe<ResultObject> DBUpdateInFlight;
|
|
|
- private boolean collectionPendingDBUpdate = false;
|
|
|
private final transient List<ResultObject> collectorDBUpdate = new ArrayList<ResultObject>();
|
|
|
private final transient ThreadLocalRandom localRandom = ThreadLocalRandom.current();
|
|
|
private final HTTPResponseService service;
|
|
|
private final transient PoolManager pm;
|
|
|
+ private final AtomicInteger requestsInFlight = new AtomicInteger();
|
|
|
|
|
|
public ProcessUpdate(int pipelineBits, HTTPResponseService service, PoolManager pm) {
|
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
@@ -30,17 +31,12 @@ public class ProcessUpdate {
|
|
|
|
|
|
public void tickEvent() {
|
|
|
|
|
|
- {
|
|
|
ResultObject temp = DBUpdateInFlight.tailObject();
|
|
|
- while (isReadyDBUpdate(temp)) {
|
|
|
- if (consumeResultObjectDBUpdate(temp)) {
|
|
|
- temp = DBUpdateInFlight.tailObject();
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
+ while (null!=temp && temp.getStatus()>=0) {
|
|
|
+ consumeResultObjectDBUpdate(temp);
|
|
|
+ temp = DBUpdateInFlight.tailObject();
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|
|
@@ -57,9 +53,10 @@ public class ProcessUpdate {
|
|
|
}
|
|
|
long conId = request.getConnectionId();
|
|
|
long seqCode = request.getSequenceCode();
|
|
|
-
|
|
|
- if (DBUpdateInFlight.hasRoomFor(queries)) {
|
|
|
-
|
|
|
+ int temp = requestsInFlight.incrementAndGet();
|
|
|
+
|
|
|
+ if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
|
+
|
|
|
int q = queries;
|
|
|
while (--q >= 0) {
|
|
|
|
|
@@ -83,17 +80,16 @@ public class ProcessUpdate {
|
|
|
|
|
|
//read the existing random value and store it in the world object
|
|
|
worldObject.setResult(row.getInteger(1));
|
|
|
-
|
|
|
+ ///////////////////////////////////
|
|
|
+ //the object can be used here with the old value
|
|
|
///////////////////////////////////
|
|
|
//set the new random value in this object
|
|
|
- worldObject.setResult(randomValue());
|
|
|
+ worldObject.setResult(randomValue());
|
|
|
|
|
|
pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {
|
|
|
if (ar.succeeded()) {
|
|
|
-
|
|
|
- worldObject.setStatus(200);
|
|
|
-
|
|
|
+ worldObject.setStatus(200);
|
|
|
} else {
|
|
|
System.out.println("unable to update");
|
|
|
if (ar.cause()!=null) {
|
|
@@ -101,9 +97,9 @@ public class ProcessUpdate {
|
|
|
}
|
|
|
|
|
|
worldObject.setStatus(500);
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
});
|
|
|
+
|
|
|
} else {
|
|
|
System.out.println("unable to query");
|
|
|
if (r.cause()!=null) {
|
|
@@ -113,6 +109,12 @@ public class ProcessUpdate {
|
|
|
worldObject.setStatus(500);
|
|
|
}
|
|
|
|
|
|
+ //on all N responses.....
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
});
|
|
|
|
|
@@ -122,38 +124,23 @@ public class ProcessUpdate {
|
|
|
|
|
|
return true;
|
|
|
} else {
|
|
|
+ requestsInFlight.decrementAndGet();
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean isReadyDBUpdate(ResultObject temp) {
|
|
|
-
|
|
|
- if (collectionPendingDBUpdate) {
|
|
|
- //now ready to send, we have all the data
|
|
|
- if (!publishMultiResponseDBUpdate(collectorDBUpdate.get(0).getConnectionId(), collectorDBUpdate.get(0).getSequenceId() )) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return null!=temp && temp.getStatus()>=0;
|
|
|
- }
|
|
|
+ private void consumeResultObjectDBUpdate(final ResultObject t) {
|
|
|
|
|
|
- private boolean consumeResultObjectDBUpdate(final ResultObject t) {
|
|
|
- boolean ok;
|
|
|
//collect all the objects
|
|
|
collectorDBUpdate.add(t);
|
|
|
DBUpdateInFlight.moveTailForward();//only move forward when it is consumed.
|
|
|
if (collectorDBUpdate.size() == t.getGroupSize()) {
|
|
|
//now ready to send, we have all the data
|
|
|
- ok =publishMultiResponseDBUpdate(t.getConnectionId(), t.getSequenceId());
|
|
|
- } else {
|
|
|
- ok = true;//added to list
|
|
|
- }
|
|
|
-
|
|
|
- return ok;
|
|
|
+ publishMultiResponseDBUpdate(t.getConnectionId(), t.getSequenceId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private boolean publishMultiResponseDBUpdate(long conId, long seqCode) {
|
|
|
+ private void publishMultiResponseDBUpdate(long conId, long seqCode) {
|
|
|
boolean result = service.publishHTTPResponse(conId, seqCode, 200,
|
|
|
HTTPContentTypeDefaults.JSON,
|
|
|
w-> {
|
|
@@ -167,8 +154,8 @@ public class ProcessUpdate {
|
|
|
collectorDBUpdate.clear();
|
|
|
DBUpdateInFlight.publishTailPosition();
|
|
|
});
|
|
|
- collectionPendingDBUpdate = !result;
|
|
|
- return result;
|
|
|
+ assert(result) : "internal error, we should not pick up more work than we can send";
|
|
|
+ requestsInFlight.decrementAndGet();
|
|
|
}
|
|
|
|
|
|
|