Explorar o código

Clean up socket writing for greater volume (#4235)

Nathan Tippy %!s(int64=6) %!d(string=hai) anos
pai
achega
fbd50d25dc

+ 1 - 1
frameworks/Java/greenlightning/greenlightning.dockerfile

@@ -8,4 +8,4 @@ RUN mvn clean install -q
 FROM nimmis/java-centos:openjdk-8-jre-headless
 FROM nimmis/java-centos:openjdk-8-jre-headless
 WORKDIR /greenlightning
 WORKDIR /greenlightning
 COPY --from=maven /greenlightning/target/greenlightning-test.jar app.jar
 COPY --from=maven /greenlightning/target/greenlightning-test.jar app.jar
-CMD ["java", "-server", "-Xmx13g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-XX:+AggressiveOpts", "-jar", "app.jar"]
+CMD ["java", "-server", "-Xmx16g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-XX:+AggressiveOpts", "-jar", "app.jar"]

+ 2 - 2
frameworks/Java/greenlightning/pom.xml

@@ -4,7 +4,7 @@
 
 
 	<groupId>com.ociweb.gl.benchmark</groupId>
 	<groupId>com.ociweb.gl.benchmark</groupId>
 	<artifactId>benchmark-test</artifactId>
 	<artifactId>benchmark-test</artifactId>
-	<version>1.0.16</version>
+	<version>1.0.17</version>
 
 
 	<properties>
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -14,7 +14,7 @@
 		<dependency>
 		<dependency>
 			<groupId>com.ociweb</groupId>
 			<groupId>com.ociweb</groupId>
 			<artifactId>greenlightning</artifactId>
 			<artifactId>greenlightning</artifactId>
-			<version>1.0.16</version>
+			<version>1.0.17</version>
 		</dependency>
 		</dependency>
 		<dependency>
 		<dependency>
 			<groupId>org.slf4j</groupId>
 			<groupId>org.slf4j</groupId>

+ 10 - 13
frameworks/Java/greenlightning/src/main/java/com/ociweb/gl/benchmark/FrameworkTest.java

@@ -47,14 +47,14 @@ public class FrameworkTest implements GreenApp {
 			    
 			    
     public FrameworkTest() {
     public FrameworkTest() {
     	// use this in commit messages to narrow travis testing to just this project
     	// use this in commit messages to narrow travis testing to just this project
-    	// [ci fw-only Java/greenlightning]
+    	// rebase before using this:  [ci fw-only Java/greenlightning]
     	
     	
     	//this server works best with  -XX:+UseNUMA    	
     	//this server works best with  -XX:+UseNUMA    	
     	this(System.getProperty("host","0.0.0.0"), 
     	this(System.getProperty("host","0.0.0.0"), 
     		 8080,    //default port for test 
     		 8080,    //default port for test 
-    		 10,      //default concurrency, 5 to support 140 channels on 14 core boxes
-    		 8*1024,  //default max rest requests allowed to queue in wait
-    		 1<<19,   //default network buffer per input socket connection
+    		 7,      //default concurrency per track
+    		 2*1024,  //default max rest requests allowed to queue in wait
+    		 1<<20,   //default network buffer per input socket connection
     		 Integer.parseInt(System.getProperty("telemetry.port", "-1")),
     		 Integer.parseInt(System.getProperty("telemetry.port", "-1")),
     		 "tfb-database", // jdbc:postgresql://tfb-database:5432/hello_world
     		 "tfb-database", // jdbc:postgresql://tfb-database:5432/hello_world
     		 "hello_world",
     		 "hello_world",
@@ -79,10 +79,10 @@ public class FrameworkTest implements GreenApp {
     	this.queueLengthOfPendingRequests = queueLengthOfPendingRequests;
     	this.queueLengthOfPendingRequests = queueLengthOfPendingRequests;
     	this.minMemoryOfInputPipes = minMemoryOfInputPipes;
     	this.minMemoryOfInputPipes = minMemoryOfInputPipes;
     	this.telemetryPort = telemetryPort;
     	this.telemetryPort = telemetryPort;
-    	this.pipelineBits = 17;//max concurrent in flight database requests 1<<pipelineBits
-
+    	this.pipelineBits = 14;//max concurrent in flight database requests 1<<pipelineBits
+    	
     	this.dbCallMaxResponseCount = 1<<4;
     	this.dbCallMaxResponseCount = 1<<4;
-    	this.jsonMaxResponseCount = 1<<16;
+    	this.jsonMaxResponseCount = 1<<13;
     	
     	
     	this.dbCallMaxResponseSize = 20_000; //for 500 mult db call in JSON format
     	this.dbCallMaxResponseSize = 20_000; //for 500 mult db call in JSON format
     	this.jsonMaxResponseSize = 1<<9;
     	this.jsonMaxResponseSize = 1<<9;
@@ -114,7 +114,7 @@ public class FrameworkTest implements GreenApp {
     				.setUser(connectionUser)
     				.setUser(connectionUser)
     				.setIdleTimeout(20)
     				.setIdleTimeout(20)
     				.setPassword(connectionPassword)
     				.setPassword(connectionPassword)
-    				.setCachePreparedStatements(true)
+    				.setCachePreparedStatements(true)    	
     				.setMaxSize(connectionsPerTrack);	    	
     				.setMaxSize(connectionsPerTrack);	    	
 
 
     		///early check to know if we have a database or not,
     		///early check to know if we have a database or not,
@@ -138,7 +138,7 @@ public class FrameworkTest implements GreenApp {
 		
 		
 		framework.setDefaultRate(20_000);
 		framework.setDefaultRate(20_000);
 		
 		
-		//for 14 cores this is expected to use less than 16G
+		//for 14 cores this is expected to use less than 16G, must use next largest prime to ensure smaller groups are not multiples.
 		framework.useHTTP1xServer(bindPort, this::parallelBehavior) //standard auto-scale
 		framework.useHTTP1xServer(bindPort, this::parallelBehavior) //standard auto-scale
     			 .setHost(host)
     			 .setHost(host)
     			 .setMaxConnectionBits(13) //8K max client connections.
     			 .setMaxConnectionBits(13) //8K max client connections.
@@ -147,7 +147,7 @@ public class FrameworkTest implements GreenApp {
     			 .setMaxQueueIn(queueLengthOfPendingRequests)
     			 .setMaxQueueIn(queueLengthOfPendingRequests)
     			 
     			 
     			 .setMinimumInputPipeMemory(minMemoryOfInputPipes)
     			 .setMinimumInputPipeMemory(minMemoryOfInputPipes)
-    			 .setMaxQueueOut(64)
+    			 .setMaxQueueOut(256)
     			 .setMaxResponseSize(dbCallMaxResponseSize) //big enough for large mult db response
     			 .setMaxResponseSize(dbCallMaxResponseSize) //big enough for large mult db response
     	         .useInsecureServer(); //turn off TLS
     	         .useInsecureServer(); //turn off TLS
 
 
@@ -194,9 +194,6 @@ public class FrameworkTest implements GreenApp {
 		}
 		}
 				
 				
 		framework.setTimerPulseRate(30 * 1000);//2x per minute
 		framework.setTimerPulseRate(30 * 1000);//2x per minute
-		
-		ServerSocketWriterStage.lowLatency = false; //turn on high volume mode, less concerned about low latency. 
-	
     }
     }
 
 
 
 

+ 1 - 1
frameworks/Java/greenlightning/src/main/java/com/ociweb/gl/benchmark/GreenLightning.java

@@ -5,7 +5,7 @@ import com.ociweb.gl.api.GreenRuntime;
 public class GreenLightning {
 public class GreenLightning {
 
 
 	public static void main(String[] args) {
 	public static void main(String[] args) {
-
+		                           
 		GreenRuntime.run(new FrameworkTest(),args);
 		GreenRuntime.run(new FrameworkTest(),args);
 	
 	
 	}
 	}

+ 3 - 2
frameworks/Java/greenlightning/src/main/java/com/ociweb/gl/benchmark/PoolManager.java

@@ -11,12 +11,13 @@ public class PoolManager {
 	private transient long lastUsed;
 	private transient long lastUsed;
 	
 	
 	public PoolManager(PgPoolOptions options) {
 	public PoolManager(PgPoolOptions options) {
-		this.options = options;		
+		this.options = options;
+		
 	}
 	}
 		
 		
 	public PgPool pool() {
 	public PgPool pool() {
 		if (null==pool) {
 		if (null==pool) {
-			pool = PgClient.pool(options);	//TODO: how to clear this when not in use?		
+			pool = PgClient.pool(options);
 		}
 		}
 		lastUsed = System.nanoTime();
 		lastUsed = System.nanoTime();
 		return pool;
 		return pool;

+ 39 - 36
frameworks/Java/greenlightning/src/main/java/com/ociweb/gl/benchmark/ProcessQuery.java

@@ -10,6 +10,7 @@ import com.ociweb.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.ociweb.pronghorn.pipe.ObjectPipe;
 import com.ociweb.pronghorn.pipe.ObjectPipe;
 
 
 import io.reactiverse.pgclient.PgIterator;
 import io.reactiverse.pgclient.PgIterator;
+import io.reactiverse.pgclient.PgPool;
 import io.reactiverse.pgclient.Tuple;
 import io.reactiverse.pgclient.Tuple;
 
 
 public class ProcessQuery {
 public class ProcessQuery {
@@ -51,6 +52,7 @@ public class ProcessQuery {
 		return 1+localRandom.nextInt(10000);
 		return 1+localRandom.nextInt(10000);
 	}		
 	}		
 
 
+	
 	public boolean multiRestRequest(HTTPRequestReader request) { 
 	public boolean multiRestRequest(HTTPRequestReader request) { 
 
 
 		final int queries;
 		final int queries;
@@ -63,50 +65,51 @@ public class ProcessQuery {
 	
 	
 		if (DBRestInFlight.hasRoomFor(queries)) {
 		if (DBRestInFlight.hasRoomFor(queries)) {
 			
 			
-			
-			int q = queries;
-			while (--q >= 0) {
-				
-					final ResultObject target = DBRestInFlight.headObject();
-					
-					//already released but not published yet: TODO: we have a problem here!!!
-					assert(null!=target && -1==target.getStatus()) : "found status "+target.getStatus()+" on query "+q+" of "+queries ; //must block that this has been consumed?? should head/tail rsolve.
+			sendQueries(pm.pool(),queries,request.getConnectionId(),request.getSequenceCode());
 									
 									
-					target.setConnectionId(request.getConnectionId());
-					target.setSequenceId(request.getSequenceCode());
-					assert(target.getStatus()==-1);//waiting for work
-					target.setStatus(-2);//out for work	
-					target.setGroupSize(queries);
-				
-					pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
-							if (r.succeeded()) {
-								
-								PgIterator resultSet = r.result().iterator();
-						        Tuple row = resultSet.next();			        
-						        
-						        target.setId(row.getInteger(0));
-						        target.setResult(row.getInteger(1));					
-								target.setStatus(200);
-								
-							} else {
-								System.out.println("fail: "+r.cause().getLocalizedMessage());
-								target.setStatus(500); 
-							}				
-						});	
-								
-					DBRestInFlight.moveHeadForward(); //always move to ensure this can be read.
-			
-			}
-				
 			return true;
 			return true;
 		} else {
 		} else {
 			return false;
 			return false;
 		}	
 		}	
 	}
 	}
 
 
-	
+		
+	private void sendQueries(PgPool p, int queries, long con, long seq) {
+		int q = queries;
+		while (--q >= 0) {
+			
+				final ResultObject target = DBRestInFlight.headObject();
+			
+				assert(null!=target && -1==target.getStatus()) : "found status "+target.getStatus()+" on query "+q+" of "+queries ; //must block that this has been consumed?? should head/tail rsolve.
+								
+				target.setConnectionId(con);
+				target.setSequenceId(seq);
+				assert(target.getStatus()==-1);//waiting for work
+				target.setStatus(-2);//out for work	
+				target.setGroupSize(queries);
+			
+				p.preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
+						if (r.succeeded()) {
+							
+							PgIterator resultSet = r.result().iterator();
+					        Tuple row = resultSet.next();			        
+					        
+					        target.setId(row.getInteger(0));
+					        target.setResult(row.getInteger(1));					
+							target.setStatus(200);
+							
+						} else {
+							System.out.println("fail: "+r.cause().getLocalizedMessage());
+							target.setStatus(500); 
+						}		
+						
+					});	
+							
+				DBRestInFlight.moveHeadForward(); //always move to ensure this can be read.
+		
+		}
+	}
 
 
-	
 	public boolean singleRestRequest(HTTPRequestReader request) { 
 	public boolean singleRestRequest(HTTPRequestReader request) { 
 
 
 		final ResultObject target = DBRestInFlight.headObject();
 		final ResultObject target = DBRestInFlight.headObject();