|
@@ -24,7 +24,7 @@ import io.vertx.sqlclient.Tuple;
|
|
|
|
|
|
public class ProcessUpdate {
|
|
|
|
|
|
- private transient ObjectPipe<ResultObject> DBUpdateInFlight;
|
|
|
+ private transient ObjectPipe<ResultObject> dbUpdateInFlight;
|
|
|
private final transient List<ResultObject> collectorDBUpdate = new ArrayList<ResultObject>();
|
|
|
private final transient ThreadLocalRandom localRandom = ThreadLocalRandom.current();
|
|
|
private final HTTPResponseService service;
|
|
@@ -36,7 +36,7 @@ public class ProcessUpdate {
|
|
|
private PreparedQuery updateQuery;
|
|
|
|
|
|
public ProcessUpdate(int pipelineBits, HTTPResponseService service, PoolManager pm) {
|
|
|
- this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
|
+ this.dbUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class, ResultObject::new);
|
|
|
this.service = service;
|
|
|
this.pm = pm;
|
|
|
selectableQuery();
|
|
@@ -46,10 +46,10 @@ public class ProcessUpdate {
|
|
|
|
|
|
public void tickEvent() {
|
|
|
|
|
|
- ResultObject temp = DBUpdateInFlight.tailObject();
|
|
|
+ ResultObject temp = dbUpdateInFlight.tailObject();
|
|
|
while (null!=temp && temp.getStatus()>=0) {
|
|
|
consumeResultObjectDBUpdate(temp);
|
|
|
- temp = DBUpdateInFlight.tailObject();
|
|
|
+ temp = dbUpdateInFlight.tailObject();
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -70,7 +70,7 @@ public class ProcessUpdate {
|
|
|
long seqCode = request.getSequenceCode();
|
|
|
int temp = requestsInFlight.incrementAndGet();
|
|
|
|
|
|
- if (DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {
|
|
|
+ if (dbUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {
|
|
|
|
|
|
processConnection(queries, conId, seqCode);
|
|
|
|
|
@@ -173,7 +173,7 @@ public class ProcessUpdate {
|
|
|
PreparedQuery query, PreparedQuery update) {
|
|
|
//testing one per query
|
|
|
|
|
|
- final ResultObject worldObject = DBUpdateInFlight.headObject();
|
|
|
+ final ResultObject worldObject = dbUpdateInFlight.headObject();
|
|
|
assert(null!=worldObject);
|
|
|
|
|
|
worldObject.setConnectionId(conId);
|
|
@@ -226,7 +226,7 @@ public class ProcessUpdate {
|
|
|
|
|
|
});
|
|
|
|
|
|
- DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
|
+ dbUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
|
|
|
} catch (Throwable t) {
|
|
|
t.printStackTrace();
|
|
|
this.selectQuery = null;
|
|
@@ -282,7 +282,7 @@ public class ProcessUpdate {
|
|
|
|
|
|
//collect all the objects
|
|
|
collectorDBUpdate.add(t);
|
|
|
- DBUpdateInFlight.moveTailForward();//only move forward when it is consumed.
|
|
|
+ dbUpdateInFlight.moveTailForward();//only move forward when it is consumed.
|
|
|
if (collectorDBUpdate.size() == t.getGroupSize()) {
|
|
|
//now ready to send, we have all the data
|
|
|
publishMultiResponseDBUpdate(t.getConnectionId(), t.getSequenceId());
|
|
@@ -301,7 +301,7 @@ public class ProcessUpdate {
|
|
|
collectorDBUpdate.get(c).setStatus(-1);
|
|
|
}
|
|
|
collectorDBUpdate.clear();
|
|
|
- DBUpdateInFlight.publishTailPosition();
|
|
|
+ dbUpdateInFlight.publishTailPosition();
|
|
|
});
|
|
|
assert(result) : "internal error, we should not pick up more work than we can send";
|
|
|
requestsInFlight.decrementAndGet();
|