Explorar o código

GL, update logic modifed to match driver documentation practices (#5101)

* [ci fw-only Java/greenlightning]

swap versions

* [ci fw-only Java/greenlightning]

go back to simple file

* [ci fw-only Java/greenlightning]

update version to add loging and track issue

* [ci fw-only Java/greenlightning]

use old update design

bump version number

* [ci fw-only Java/greenlightning]

reduce poll rate for greater volume

* [ci fw-only Java/greenlightning]

update comment

* [ci fw-only Java/greenlightning]

update for larger pipes

* [ci fw-only Java/greenlightning]

update memory limit

update to next version

better request read

* [ci fw-only Java/greenlightning]

next version and buffer size adjustments

cleaned up object creation for JSON request

* [ci fw-only Java/greenlightning]

update to next version

cache int values for epoll

* update launcher

* reduce to 28G

* reduce memory

* reduce memory usage 24G

* set new size

* back up to 28G

* reduce newSize

* update memory

* reduce size

* remove fixed pipe size, test server can not support this.

* bump up new sizes

* [ci fw-only Java/greenlightning]

update to next version with more socket readers

* [ci fw-only Java/greenlightning]

version upgrade

* remove comment

* [ci fw-only Java/greenlightning]

Update version

Clear input pipes

* [ci fw-only Java/greenlightning]

larger connection count for update test

* [ci fw-only Java/greenlightning]

DB driver changed management so we switched to new one with support

* [ci fw-only Java/greenlightning]

fix upate to match the document streaming appraoch from driver
Nathan Tippy %!s(int64=5) %!d(string=hai) anos
pai
achega
c4ce36f350

+ 134 - 59
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessUpdate.java

@@ -14,6 +14,7 @@ import com.javanut.pronghorn.pipe.ObjectPipe;
 
 import io.vertx.core.AsyncResult;
 import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.PreparedQuery;
 import io.vertx.sqlclient.Row;
 import io.vertx.sqlclient.RowIterator;
 import io.vertx.sqlclient.RowSet;
@@ -29,12 +30,17 @@ public class ProcessUpdate {
 	private final HTTPResponseService service;
 	private final transient PoolManager pm;
 	private final AtomicInteger requestsInFlight = new AtomicInteger();
+
+	private PreparedQuery selectQuery;
+	private boolean building = false;
+	private PreparedQuery updateQuery;
 		
 	public ProcessUpdate(int pipelineBits, HTTPResponseService service, PoolManager pm) {
 		this.DBUpdateInFlight = new ObjectPipe<ResultObject>(pipelineBits, ResultObject.class,	ResultObject::new);
 		this.service = service;
 		this.pm = pm;
-		
+		selectableQuery();
+		updateQuery();
 	}
 	
 	
@@ -65,47 +71,106 @@ public class ProcessUpdate {
 		int temp = requestsInFlight.incrementAndGet();
 		
 		if (DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {		
+						
+			processConnection(queries, conId, seqCode);
 	
-		   // final AtomicBoolean ok = new AtomicBoolean();
-		    
-			//pm.pool().getConnection(result -> {
-				
-				//if (result.succeeded()) {
-				
-					//SqlConnection connection = result.result();					
-					processConnection(queries, conId, seqCode);
-					//connection.close();
-					
-				//	ok.set(true);
-					
-				//} else {
-				//	requestsInFlight.decrementAndGet();			
-				//	ok.set(false);
-				//}
-				
-			//});
-				
-			//return ok.get();
 			return true;
 		} else {
 			requestsInFlight.decrementAndGet();
 			return false;
 		}
 	}
+	
+	
+	private PreparedQuery selectableQuery() {
+		
+		if (null!=selectQuery || building) {
+			return selectQuery;		
+		} else {
+			building = true;
+			pm.pool().getConnection(h -> {
+				
+				if (h.succeeded()) {
+					SqlConnection connection = h.result();
+					
+					connection.prepare("SELECT * FROM world WHERE id=$1", ph -> {
+						if (ph.succeeded()) {							
+							selectQuery = ph.result();														
+							building = false;
+							if (updateQuery==null) {
+								updateQuery();
+							}
+							
+						} else {							
+							ph.cause().printStackTrace();
+						}
+					});
+					
+					connection.close();
+				} else {
+					h.cause().printStackTrace();
+				}
+			});
+			return null;
+		}
+	}
 
+	private PreparedQuery updateQuery() {
+		
+		if (null!=updateQuery || building) {
+			return updateQuery;		
+		} else {
+			building = true;
+			pm.pool().getConnection(h -> {
+				
+				if (h.succeeded()) {
+					SqlConnection connection = h.result();
+					
+					connection.prepare("UPDATE world SET randomnumber=$1 WHERE id=$2", ph -> {
+						if (ph.succeeded()) {							
+							updateQuery = ph.result();	
+							building = false;
+							if (selectQuery == null) {
+								selectableQuery();
+							}
+						} 
+					});
+					
+					connection.close();
+				} 
+				
+				
+			});
+			return null;
+		}
+	}	
+	
 
 	private void processConnection(int queries, long conId, long seqCode) {
 		
+		//only process after we have the prepared statements built.
+		PreparedQuery query = selectableQuery();
+		if (query==null) {
+			return;
+		}
+		
+		PreparedQuery update= updateQuery();
+		if (update==null) {
+			return;
+		}
+		
 		List<ResultObject> objs = new ArrayList<ResultObject>(queries);
 		int q = queries;
 		while (--q >= 0) {
-				processSingleUpdate(queries, conId, seqCode, objs);
+				processSingleUpdate(queries, conId, seqCode, objs, query, update);
 		
 		}
 	}
 
 
-	private void processSingleUpdate(int queries, long conId, long seqCode, List<ResultObject> objs) {
+	private void processSingleUpdate(int queries, long conId, long seqCode,
+			                         List<ResultObject> objs, 
+			                         PreparedQuery query, PreparedQuery update) {
 		//testing one per query 
 
 		final ResultObject worldObject = DBUpdateInFlight.headObject();
@@ -119,44 +184,54 @@ public class ProcessUpdate {
 		worldObject.setId(randomValue());
 		objs.add(worldObject);					
 		
-		pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
-				if (r.succeeded()) {
-														
-					RowIterator<Row> resultSet = r.result().iterator();
-			        Tuple row = resultSet.next();			        
-			        
-			        assert(worldObject.getId()==row.getInteger(0));
-			        
-			        //read the existing random value and store it in the world object
-			        worldObject.setResult(row.getInteger(1));
-			        ///////////////////////////////////
-			        //the object can be used here with the old value
-			        ///////////////////////////////////
-			        //set the new random value in this object
-			        worldObject.setResult(randomValue());							        
-			        
-			        
-			        pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2", 							        		
-		        			Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {							        	
-									setStatus(worldObject, ar);																												
-		        			}
-		        			);					        
-			        
-				} else {	
-				
-					System.out.println("unable to query");
-					if (r.cause()!=null) {
-						r.cause().printStackTrace();
-					}
+		try {
+			query.execute(Tuple.of(worldObject.getId()), r -> {
+					if (r.succeeded()) {
+															
+						RowIterator<Row> resultSet = r.result().iterator();
+				        Tuple row = resultSet.next();			        
+				        
+				        assert(worldObject.getId()==row.getInteger(0));
+				        
+				        //read the existing random value and store it in the world object
+				        worldObject.setResult(row.getInteger(1));
+				        ///////////////////////////////////
+				        //the object can be used here with the old value
+				        ///////////////////////////////////
+				        //set the new random value in this object
+				        worldObject.setResult(randomValue());							        
+				        
+				        try {
+					        update.execute( 							        		
+					        			Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {							        	
+												setStatus(worldObject, ar);																												
+					        			}
+				        			);					        
+				        } catch (Throwable t) {
+				        	t.printStackTrace();
+				        	this.updateQuery = null; //TODO: need to try again.
+				        }
+				        
+					} else {	
 					
-					worldObject.setStatus(500);
-				}		
-				
-				//on all N responses.....
-												
-			});	
+						System.out.println("unable to query");
+						if (r.cause()!=null) {
+							r.cause().printStackTrace();
+						}
+						
+						worldObject.setStatus(500);
+					}		
 					
-		DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
+					//on all N responses.....
+													
+				});	
+						
+			DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
+		} catch (Throwable t) {
+			t.printStackTrace();
+			this.selectQuery = null;
+			//TODO: rollabck??
+		}
 	}