Browse Source

GL, update version. Fix memory allocation. (#5071)

* [ci fw-only Java/greenlightning]

swap versions

* [ci fw-only Java/greenlightning]

go back to simple file

* [ci fw-only Java/greenlightning]

update version to add loging and track issue

* [ci fw-only Java/greenlightning]

use old update design

bump version number

* [ci fw-only Java/greenlightning]

reduce poll rate for greater volume

* [ci fw-only Java/greenlightning]

update comment

* [ci fw-only Java/greenlightning]

update for larger pipes

* [ci fw-only Java/greenlightning]

update memory limit

update to next version

better request read

* [ci fw-only Java/greenlightning]

next version and buffer size adjustments

cleaned up object creation for JSON request

* [ci fw-only Java/greenlightning]

update to next version

cache int values for epoll

* update launcher

* reduce to 28G

* reduce memory

* reduce memory usage 24G

* set new size

* back up to 28G

* reduce newSize

* update memory

* reduce size

* remove fixed pipe size, test server can not support this.

* bump up new sizes

* [ci fw-only Java/greenlightning]

update to next version with more socket readers

* [ci fw-only Java/greenlightning]

version upgrade

* remove comment

* [ci fw-only Java/greenlightning]

Update version

Clear input pipes
Nathan Tippy 6 years ago
parent
commit
2f47a77977

+ 1 - 1
frameworks/Java/greenlightning/pom.xml

@@ -4,7 +4,7 @@
 
 	<groupId>com.javanut.gl.benchmark</groupId>
 	<artifactId>benchmark-test</artifactId>
-	<version>1.1.11</version> 
+	<version>1.1.12</version> 
 
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 1 - 3
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/DBRest.java

@@ -20,9 +20,7 @@ public class DBRest implements RestMethodListener, PubSubMethodListener, TickLis
 			      int maxResponseCount, int maxResponseSize) {
 		
 		pm = new PoolManager(options);
-				
-		maxResponseCount = Math.max(maxResponseCount, ((1<<pipelineBits)/20));//match response count to expected db calls
-		
+			
 		HTTPResponseService service = runtime.newCommandChannel().newHTTPResponseService(
 				                maxResponseCount, 
 				                maxResponseSize);

+ 5 - 17
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/FrameworkTest.java

@@ -48,7 +48,7 @@ public class FrameworkTest implements GreenApp {
 	public static String connectionPassword = "postgres";
 	
 	//TODO: add utility to compute this based on need.
-	static final int c = 148;//592; // to reach 16K simultaneous calls
+	static final int c = 148;//293;//592; // to reach 16K simultaneous calls
 
 	private final long defaultRate = Long.parseLong(System.getProperty("xx.rate", "180000")); //2.5K cycles per second
 	                                                                                          // at 512 requests is 1.28M/sec
@@ -57,11 +57,6 @@ public class FrameworkTest implements GreenApp {
 	static {
 		System.setProperty("java.lang.Integer.IntegerCache.high", ""+Integer.MAX_VALUE);
 
-		//TODO: test with normal polll and a very fast reader, vs epoll and slower reader.
-		
-	//	System.setProperty("java.nio.channels.spi.SelectorProvider","sun.nio.ch.PollSelectorProvider");
-		//System.setProperty("java.nio.channels.spi.SelectorProvider","com.javanut.gl.CustomEPollSelectorProvider");//
-		
 		ServerSocketWriterStage.BASE_ADJUST = Float.parseFloat(System.getProperty("xx.ratio", "1"));
 		ServerSocketWriterStage.HARD_LIMIT_NS = Long.parseLong(System.getProperty("xx.limitns", "180000"));		
 	}
@@ -75,8 +70,7 @@ public class FrameworkTest implements GreenApp {
     	//this server works best with  -XX:+UseNUMA    	
     	this(System.getProperty("host","0.0.0.0"), 
     		 Integer.parseInt(System.getProperty("port","8080")),    	//default port for test 
-    		 c,         //pipes per track    			 
-    		 1<<14,     // default total size of network buffer used by blocks  
+    		 c,         //pipes per track 
     		 Integer.parseInt(System.getProperty("telemetry.port", "-1")),
     		 "tfb-database", // jdbc:postgresql://tfb-database:5432/hello_world
     		 "hello_world",
@@ -94,8 +88,6 @@ public class FrameworkTest implements GreenApp {
         
     public FrameworkTest(String host, int port, 
     		             int concurrentWritesPerChannel, 
-    		            
-    		             int minMemoryOfInputPipes,
     		             int telemetryPort,
     		             String dbHost,
     		             String dbName,
@@ -112,7 +104,6 @@ public class FrameworkTest implements GreenApp {
     	this.host = host;
     	this.concurrentWritesPerChannel = concurrentWritesPerChannel;
 
-    	this.minMemoryOfInputPipes = minMemoryOfInputPipes;
     	this.telemetryPort = telemetryPort;
     	this.pipelineBits = 15;//max concurrent in flight database requests 1<<pipelineBits
     	            
@@ -125,7 +116,8 @@ public class FrameworkTest implements GreenApp {
     	this.maxQueueOut = 8*20;   	
     	this.maxConnectionBits = 15;//16K connections, for test plus overhead MUST be 32K
     	
-    	this.maxRequestSize = 1<<13;
+    	//do not make much larger than what is required to hold 16 in flight requests
+    	this.maxRequestSize = 1<<11;//between ServerSocketBulkReader and ServerSocketBulkRouter
     	    	
     	if (!"127.0.0.1".equals(System.getProperty("host",null))) { 
     		    		
@@ -193,14 +185,10 @@ public class FrameworkTest implements GreenApp {
     			 //TODO: neeed to allow for multiple writes one pipe! big dif.
     			// .setConcurrentChannelsPerEncryptUnit(Math.max(1,concurrentWritesPerChannel/2))  //8K    
     			 .setConcurrentChannelsPerEncryptUnit(concurrentWritesPerChannel/25)  ///80) ///16) // /8)//4)
-    			 //TODO: we need smaller count of connections but MORE writers.
-    			 
-    			// .disableEPoll() //provides advantage in JSON test.... 
- 						 
+
     			 .setMaxRequestSize(maxRequestSize)
     			 .setMaxQueueIn(c*16)
     	
-    			 .setMinimumInputPipeMemory(minMemoryOfInputPipes)
     			 .setMaxQueueOut(maxQueueOut)
     			 .setMaxResponseSize(dbCallMaxResponseSize) //big enough for large mult db response
     	         .useInsecureServer(); //turn off TLS

+ 1 - 0
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/GreenLightning.java

@@ -10,6 +10,7 @@ public class GreenLightning {
 		
 		GraphManager.showThreadIdOnTelemetry = true;		
 		GraphManager.showScheduledRateOnTelemetry = true;
+		GraphManager.showMessageCountRangeOnTelemetry = true;
 		
 		GreenRuntime.run(new FrameworkTest(),args);
 	

+ 34 - 21
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessFortune.java

@@ -6,8 +6,8 @@ import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 import com.javanut.pronghorn.util.AppendableBuilder;
 
-import io.reactiverse.pgclient.PgIterator;
-import io.reactiverse.pgclient.Row;
+import io.reactiverse.pgclient.PgPool;
+import io.reactiverse.pgclient.impl.RowImpl;
 
 public class ProcessFortune {
 
@@ -24,6 +24,7 @@ public class ProcessFortune {
 		this.service = service;
 	}
 	
+	private PgPool myPool = null;
 	
 	
 	public void tickEvent() { 
@@ -37,12 +38,15 @@ public class ProcessFortune {
 				} else {
 					break;
 				}
-			}		
+			}	
+			if (null==temp && myPool!=null) { //new test
+				//myPool.close();
+				myPool = null;
+			}
 		}
 		
 	}
 	
-
 	
 	public boolean restFortuneRequest(HTTPRequestReader request) {
 	
@@ -54,30 +58,39 @@ public class ProcessFortune {
 			target.setStatus(-2);//out for work	
 			target.clear();
 
-			pm.pool().preparedQuery( "SELECT id, message FROM fortune", r -> {
-				    //NOTE: we want to do as little work here a s possible since
-				    //      we want this thread to get back to work on other calls.
-					if (r.succeeded()) {
-						PgIterator resultSet = r.result().iterator();						
-						while (	resultSet.hasNext() ) {
-					        Row next = resultSet.next();
-							target.addFortune(next.getInteger(0), next.getString(1));						
-						}
-						target.setStatus(200);
-					} else {
-						System.out.println("fail: "+r.cause().getLocalizedMessage());
-						target.setStatus(500);
-					}		
-					
-				});
+			if (null==myPool) {
+				myPool = pm.pool();
+			}
 			
-			fortuneInFlight.moveHeadForward(); //always move to ensure this can be read.  //TODO: remove and combined with above
+			gatherData(target);
+			
+			fortuneInFlight.moveHeadForward(); //always move to ensure this can be read.  
 			return true;
 		} else {
 			return false;//can not pick up new work now			
 		}		
 	}
 
+
+    //TODO: generate non DB version for tight local testing.
+	private void gatherData(final FortunesObject target) {
+		myPool.preparedQuery( "SELECT id, message FROM fortune", r -> {
+			    //NOTE: we want to do as little work here a s possible since
+			    //      we want this thread to get back to work on other calls.
+				if (r.succeeded()) {
+					r.result().forEach((row)-> {						
+						RowImpl next = (RowImpl)row;
+						target.addFortune((Integer)next.get(0), (String)next.get(1));						
+					});
+					target.setStatus(200);
+				} else {
+					System.out.println("fail: "+r.cause().getLocalizedMessage());
+					target.setStatus(500);
+				}		
+				
+			});
+	}
+
 	private boolean isReadyFortune(FortunesObject temp) {
 		return null!=temp && temp.getStatus()>=0;
 	}

+ 4 - 3
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessQuery.java

@@ -13,6 +13,7 @@ import io.reactiverse.pgclient.PgIterator;
 import io.reactiverse.pgclient.PgPool;
 import io.reactiverse.pgclient.PgRowSet;
 import io.reactiverse.pgclient.Tuple;
+import io.reactiverse.pgclient.impl.RowImpl;
 
 public class ProcessQuery {
 	
@@ -128,10 +129,10 @@ public class ProcessQuery {
 					if (r.succeeded()) {
 						
 						PgIterator resultSet = r.result().iterator();
-				        Tuple row = resultSet.next();			        
+				        RowImpl row = (RowImpl)resultSet.next();			        
 				        
-				        target.setId(row.getInteger(0));
-				        target.setResult(row.getInteger(1));					
+				        target.setId((Integer)row.get(0));
+				        target.setResult((Integer)row.get(1));					
 						target.setStatus(200);
 						
 					} else {

+ 5 - 3
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessUpdate.java

@@ -12,6 +12,7 @@ import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 
 import io.reactiverse.pgclient.PgIterator;
+import io.reactiverse.pgclient.PgPool;
 import io.reactiverse.pgclient.Tuple;
 
 public class ProcessUpdate {
@@ -59,6 +60,7 @@ public class ProcessUpdate {
 		
 		if ((pause.get()<20) && DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {		
 			    
+			    PgPool outerPool = pm.pool();
 				//NEW List<Tuple> args = new ArrayList<Tuple>(queries);
 				List<ResultObject> objs = new ArrayList<ResultObject>(queries);
 				int q = queries;
@@ -75,7 +77,7 @@ public class ProcessUpdate {
 						worldObject.setId(randomValue());
 						objs.add(worldObject);					
 						
-						pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
+						outerPool.preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
 								if (r.succeeded()) {
 																		
 									PgIterator resultSet = r.result().iterator();
@@ -92,7 +94,7 @@ public class ProcessUpdate {
 							        worldObject.setResult(randomValue());							        
 							        
 							        
-							        pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2", 							        		
+							        outerPool.preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2", 							        		
 						        			Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {							        	
 													if (ar.succeeded()) {														
 											        	worldObject.setStatus(200);			
@@ -148,7 +150,7 @@ public class ProcessUpdate {
 						DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
 				
 				}
-				
+				//outerPool.close();
 			return true;
 		} else {
 			requestsInFlight.decrementAndGet();