|
@@ -22,14 +22,12 @@ public class ProcessUpdate {
|
|
|
private final HTTPResponseService service;
|
|
|
private final transient PoolManager pm;
|
|
|
private final AtomicInteger requestsInFlight = new AtomicInteger();
|
|
|
-
|
|
|
+
|
|
|
public ProcessUpdate(int pipelineBits, HTTPResponseService service, PoolManager pm) {
|
|
|
this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
|
this.service = service;
|
|
|
this.pm = pm;
|
|
|
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
@@ -59,7 +57,7 @@ public class ProcessUpdate {
|
|
|
long seqCode = request.getSequenceCode();
|
|
|
int temp = requestsInFlight.incrementAndGet();
|
|
|
|
|
|
- if (DBUpdateInFlight.hasRoomFor(queries) || service.hasRoomFor(temp)) {
|
|
|
+ if ((pause.get()==0) && DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {
|
|
|
|
|
|
List<Tuple> args = new ArrayList<Tuple>(queries);
|
|
|
List<ResultObject> objs = new ArrayList<ResultObject>(queries);
|
|
@@ -112,11 +110,12 @@ public class ProcessUpdate {
|
|
|
|
|
|
});
|
|
|
|
|
|
- execUpdate(objs,args);
|
|
|
+ execUpdate(objs, args, 1);
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
+
|
|
|
System.out.println("unable to query");
|
|
|
if (r.cause()!=null) {
|
|
|
r.cause().printStackTrace();
|
|
@@ -126,12 +125,7 @@ public class ProcessUpdate {
|
|
|
}
|
|
|
|
|
|
//on all N responses.....
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
});
|
|
|
|
|
|
DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
@@ -145,17 +139,22 @@ public class ProcessUpdate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private final AtomicInteger pause = new AtomicInteger(0);
|
|
|
+
|
|
|
|
|
|
- private void execUpdate(List<ResultObject> toUpdate, List<Tuple> args) {
|
|
|
+ private void execUpdate(List<ResultObject> toUpdate, List<Tuple> args, int i) {
|
|
|
|
|
|
pm.pool().preparedBatch("UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
args, ar -> {
|
|
|
|
|
|
+ pause.addAndGet(i);
|
|
|
int status;
|
|
|
if (ar.succeeded()) {
|
|
|
- status = 200;
|
|
|
+ status = 200;
|
|
|
+ pause.decrementAndGet();
|
|
|
} else {
|
|
|
- execUpdate(toUpdate, args);
|
|
|
+
|
|
|
+ execUpdate(toUpdate, args, 0);
|
|
|
return;
|
|
|
// System.out.println("unable to update");
|
|
|
// if (ar.cause()!=null) {
|