Browse Source

GL, Reactive DB driver was moved to Vert.x for ongoing patches and future support (#5090)

* [ci fw-only Java/greenlightning]

swap versions

* [ci fw-only Java/greenlightning]

go back to simple file

* [ci fw-only Java/greenlightning]

update version to add loging and track issue

* [ci fw-only Java/greenlightning]

use old update design

bump version number

* [ci fw-only Java/greenlightning]

reduce poll rate for greater volume

* [ci fw-only Java/greenlightning]

update comment

* [ci fw-only Java/greenlightning]

update for larger pipes

* [ci fw-only Java/greenlightning]

update memory limit

update to next version

better request read

* [ci fw-only Java/greenlightning]

next version and buffer size adjustments

cleaned up object creation for JSON request

* [ci fw-only Java/greenlightning]

update to next version

cache int values for epoll

* update launcher

* reduce to 28G

* reduce memory

* reduce memory usage 24G

* set new size

* back up to 28G

* reduce newSize

* update memory

* reduce size

* remove fixed pipe size, test server can not support this.

* bump up new sizes

* [ci fw-only Java/greenlightning]

update to next version with more socket readers

* [ci fw-only Java/greenlightning]

version upgrade

* remove comment

* [ci fw-only Java/greenlightning]

Update version

Clear input pipes

* [ci fw-only Java/greenlightning]

larger connection count for update test

* [ci fw-only Java/greenlightning]

DB driver changed management so we switched to new one with support
Nathan Tippy 5 years ago
parent
commit
a05701bb16

+ 9 - 3
frameworks/Java/greenlightning/pom.xml

@@ -35,9 +35,15 @@
 			<scope>test</scope>
 			<scope>test</scope>
 		</dependency>
 		</dependency>
 		<dependency>
 		<dependency>
-		    <groupId>io.reactiverse</groupId>
-		    <artifactId>reactive-pg-client</artifactId>
-		    <version>0.11.2</version>
+			 <groupId>io.netty</groupId>
+			 <artifactId>netty-transport-native-epoll</artifactId>
+			 <version>4.1.15.Final</version>
+			 <classifier>linux-x86_64</classifier>
+		</dependency>
+		<dependency>
+			 <groupId>io.vertx</groupId>
+			 <artifactId>vertx-pg-client</artifactId>
+			 <version>4.0.0-milestone3</version>
 		</dependency>
 		</dependency>
 
 
 	</dependencies>
 	</dependencies>

+ 4 - 3
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/DBRest.java

@@ -7,7 +7,8 @@ import com.javanut.gl.api.PubSubMethodListener;
 import com.javanut.gl.api.RestMethodListener;
 import com.javanut.gl.api.RestMethodListener;
 import com.javanut.gl.api.TickListener;
 import com.javanut.gl.api.TickListener;
 
 
-import io.reactiverse.pgclient.PgPoolOptions;
+import io.vertx.pgclient.PgConnectOptions;
+import io.vertx.sqlclient.PoolOptions;
 
 
 public class DBRest implements RestMethodListener, PubSubMethodListener, TickListener {
 public class DBRest implements RestMethodListener, PubSubMethodListener, TickListener {
 
 
@@ -16,10 +17,10 @@ public class DBRest implements RestMethodListener, PubSubMethodListener, TickLis
 	private final ProcessQuery processQuery;
 	private final ProcessQuery processQuery;
 	private static transient PoolManager pm;
 	private static transient PoolManager pm;
 	
 	
-	public DBRest(GreenRuntime runtime, PgPoolOptions options, int pipelineBits, 
+	public DBRest(GreenRuntime runtime, PgConnectOptions options, PoolOptions poolOptions, int pipelineBits, 
 			      int maxResponseCount, int maxResponseSize) {
 			      int maxResponseCount, int maxResponseSize) {
 		
 		
-		pm = new PoolManager(options);
+		pm = new PoolManager(options, poolOptions);
 			
 			
 		HTTPResponseService service = runtime.newCommandChannel().newHTTPResponseService(
 		HTTPResponseService service = runtime.newCommandChannel().newHTTPResponseService(
 				                maxResponseCount, 
 				                maxResponseCount, 

+ 0 - 1
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/FortunesObject.java

@@ -4,7 +4,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 
 
-import io.reactiverse.pgclient.Tuple;
 
 
 public class FortunesObject {
 public class FortunesObject {
 
 

+ 15 - 12
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/FrameworkTest.java

@@ -8,9 +8,10 @@ import com.javanut.gl.api.GreenFramework;
 import com.javanut.gl.api.GreenRuntime;
 import com.javanut.gl.api.GreenRuntime;
 import com.javanut.pronghorn.network.ServerSocketWriterStage;
 import com.javanut.pronghorn.network.ServerSocketWriterStage;
 
 
-import io.reactiverse.pgclient.PgClient;
-import io.reactiverse.pgclient.PgPool;
-import io.reactiverse.pgclient.PgPoolOptions;
+import io.vertx.pgclient.PgConnectOptions;
+import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.PoolOptions;
+
 
 
 public class FrameworkTest implements GreenApp {
 public class FrameworkTest implements GreenApp {
 
 
@@ -30,8 +31,8 @@ public class FrameworkTest implements GreenApp {
     private final int jsonMaxResponseCount;
     private final int jsonMaxResponseCount;
     private final int jsonMaxResponseSize;
     private final int jsonMaxResponseSize;
     
     
-    private PgPoolOptions options;
-    
+    private PgConnectOptions options;
+    private PoolOptions poolOptions;
     
     
     private int maxQueueOut;
     private int maxQueueOut;
     private int maxConnectionBits;
     private int maxConnectionBits;
@@ -98,7 +99,7 @@ public class FrameworkTest implements GreenApp {
     	this.payloadText = payloadResponse;
     	this.payloadText = payloadResponse;
     	this.payload = payloadText.getBytes();
     	this.payload = payloadText.getBytes();
     	
     	
-    	this.connectionsPerTrack = 20;
+    	this.connectionsPerTrack = 2;
     	this.connectionPort = 5432;
     	this.connectionPort = 5432;
     	this.bindPort = port;
     	this.bindPort = port;
     	this.host = host;
     	this.host = host;
@@ -137,10 +138,9 @@ public class FrameworkTest implements GreenApp {
     	
     	
 	    		
 	    		
     	try {
     	try {
-    		options = new PgPoolOptions()
+    		options = new PgConnectOptions()
     				.setPort(connectionPort)
     				.setPort(connectionPort)
     				.setPipeliningLimit(1<<pipelineBits)
     				.setPipeliningLimit(1<<pipelineBits)
-    				.setMaxWaitQueueSize(1<<pipelineBits)
     				.setHost(connectionHost)
     				.setHost(connectionHost)
     				.setDatabase(connectionDB)
     				.setDatabase(connectionDB)
     				.setUser(connectionUser)
     				.setUser(connectionUser)
@@ -148,12 +148,15 @@ public class FrameworkTest implements GreenApp {
     				.setCachePreparedStatements(true)
     				.setCachePreparedStatements(true)
     				.setTcpNoDelay(true)
     				.setTcpNoDelay(true)
     				.setTcpKeepAlive(true)
     				.setTcpKeepAlive(true)
-    				.setUsePooledBuffers(false)
-    				.setMaxSize(connectionsPerTrack);	    	
+    				
+    				;	    	
 
 
+    		poolOptions = new PoolOptions()
+    				  .setMaxSize(connectionsPerTrack);
+    		
     		///early check to know if we have a database or not,
     		///early check to know if we have a database or not,
 	    	///this helps testing to know which tests should be run on different boxes.
 	    	///this helps testing to know which tests should be run on different boxes.
-	    	PgPool pool = PgClient.pool(options);
+	    	PgPool pool = PgPool.pool(options, poolOptions);
 			pool.getConnection(a->{
 			pool.getConnection(a->{
 	    		foundDB.set(a.succeeded());
 	    		foundDB.set(a.succeeded());
 	    		if (null!=a.result()) {
 	    		if (null!=a.result()) {
@@ -246,7 +249,7 @@ public class FrameworkTest implements GreenApp {
 		       .includeRoutes(Struct.PLAINTEXT_ROUTE, restTest::plainRestRequest)
 		       .includeRoutes(Struct.PLAINTEXT_ROUTE, restTest::plainRestRequest)
 		       .includeRoutes(Struct.JSON_ROUTE, restTest::jsonRestRequest);		 
 		       .includeRoutes(Struct.JSON_ROUTE, restTest::jsonRestRequest);		 
 
 
-		DBRest dbRestInstance = new DBRest(runtime, options, pipelineBits, 
+		DBRest dbRestInstance = new DBRest(runtime, options, poolOptions, pipelineBits, 
 				                           dbCallMaxResponseCount, dbCallMaxResponseSize);		
 				                           dbCallMaxResponseCount, dbCallMaxResponseSize);		
 		
 		
 		runtime.registerListener("DBReadWrite", dbRestInstance)
 		runtime.registerListener("DBReadWrite", dbRestInstance)

+ 19 - 7
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/PoolManager.java

@@ -1,22 +1,34 @@
 package com.javanut.gl.benchmark;
 package com.javanut.gl.benchmark;
 
 
-import io.reactiverse.pgclient.PgClient;
-import io.reactiverse.pgclient.PgPool;
-import io.reactiverse.pgclient.PgPoolOptions;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.pgclient.PgConnectOptions;
+import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.PoolOptions;
 
 
 public class PoolManager {
 public class PoolManager {
 
 
-	private final transient PgPoolOptions options;
-	private transient PgPool pool;
+	private final transient PgConnectOptions options;
+	private transient PoolOptions poolOptions;
+	private PgPool pool;
+	private Vertx vertx;
 	
 	
-	public PoolManager(PgPoolOptions options) {
+	public PoolManager(PgConnectOptions options, PoolOptions poolOptions) {
 		this.options = options;
 		this.options = options;
+		this.poolOptions = poolOptions;
+		
+		this.vertx = Vertx.vertx(new VertxOptions().
+				  setPreferNativeTransport(true)
+				);
 		
 		
+		boolean usingNative = vertx.isNativeTransportEnabled();
+		System.out.println("Running with native: " + usingNative);
 	}
 	}
 		
 		
 	public PgPool pool() {
 	public PgPool pool() {
 		if (null==pool) {			
 		if (null==pool) {			
-			pool = PgClient.pool(options);			
+			pool = PgPool.pool(vertx, options, poolOptions);			
 		}
 		}
 		return pool;
 		return pool;
 	}
 	}

+ 4 - 5
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessFortune.java

@@ -6,8 +6,8 @@ import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 import com.javanut.pronghorn.util.AppendableBuilder;
 import com.javanut.pronghorn.util.AppendableBuilder;
 
 
-import io.reactiverse.pgclient.PgPool;
-import io.reactiverse.pgclient.impl.RowImpl;
+import io.vertx.pgclient.PgPool;
+
 
 
 public class ProcessFortune {
 public class ProcessFortune {
 
 
@@ -78,9 +78,8 @@ public class ProcessFortune {
 			    //NOTE: we want to do as little work here a s possible since
 			    //NOTE: we want to do as little work here a s possible since
 			    //      we want this thread to get back to work on other calls.
 			    //      we want this thread to get back to work on other calls.
 				if (r.succeeded()) {
 				if (r.succeeded()) {
-					r.result().forEach((row)-> {						
-						RowImpl next = (RowImpl)row;
-						target.addFortune((Integer)next.get(0), (String)next.get(1));						
+					r.result().forEach((row)-> {
+						target.addFortune((Integer)row.getInteger(0), (String)row.getString(1));						
 					});
 					});
 					target.setStatus(200);
 					target.setStatus(200);
 				} else {
 				} else {

+ 10 - 10
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessQuery.java

@@ -9,11 +9,11 @@ import com.javanut.gl.api.HTTPResponseService;
 import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 
 
-import io.reactiverse.pgclient.PgIterator;
-import io.reactiverse.pgclient.PgPool;
-import io.reactiverse.pgclient.PgRowSet;
-import io.reactiverse.pgclient.Tuple;
-import io.reactiverse.pgclient.impl.RowImpl;
+import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowIterator;
+import io.vertx.sqlclient.Tuple;
+
 
 
 public class ProcessQuery {
 public class ProcessQuery {
 	
 	
@@ -95,7 +95,7 @@ public class ProcessQuery {
 				p.preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
 				p.preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
 						if (r.succeeded()) {
 						if (r.succeeded()) {
 
 
-							PgIterator resultSet = r.result().iterator();
+							RowIterator<Row> resultSet = r.result().iterator();
 							Tuple row = resultSet.next();			        
 							Tuple row = resultSet.next();			        
 							
 							
 							target.setId(row.getInteger(0));
 							target.setId(row.getInteger(0));
@@ -128,11 +128,11 @@ public class ProcessQuery {
 			pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
 			pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(randomValue()), r -> {
 					if (r.succeeded()) {
 					if (r.succeeded()) {
 						
 						
-						PgIterator resultSet = r.result().iterator();
-				        RowImpl row = (RowImpl)resultSet.next();			        
+						RowIterator<Row> resultSet = r.result().iterator();
+				        Row row = resultSet.next();			        
 				        
 				        
-				        target.setId((Integer)row.get(0));
-				        target.setResult((Integer)row.get(1));					
+				        target.setId((Integer)row.getInteger(0));
+				        target.setResult((Integer)row.getInteger(1));					
 						target.setStatus(200);
 						target.setStatus(200);
 						
 						
 					} else {
 					} else {

+ 109 - 94
frameworks/Java/greenlightning/src/main/java/com/javanut/gl/benchmark/ProcessUpdate.java

@@ -4,6 +4,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 import com.javanut.gl.api.HTTPRequestReader;
 import com.javanut.gl.api.HTTPRequestReader;
@@ -11,9 +12,14 @@ import com.javanut.gl.api.HTTPResponseService;
 import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.network.config.HTTPContentTypeDefaults;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 import com.javanut.pronghorn.pipe.ObjectPipe;
 
 
-import io.reactiverse.pgclient.PgIterator;
-import io.reactiverse.pgclient.PgPool;
-import io.reactiverse.pgclient.Tuple;
+import io.vertx.core.AsyncResult;
+import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowIterator;
+import io.vertx.sqlclient.RowSet;
+import io.vertx.sqlclient.SqlConnection;
+import io.vertx.sqlclient.Tuple;
+
 
 
 public class ProcessUpdate {
 public class ProcessUpdate {
 	
 	
@@ -59,99 +65,27 @@ public class ProcessUpdate {
 		int temp = requestsInFlight.incrementAndGet();
 		int temp = requestsInFlight.incrementAndGet();
 		
 		
 		if (DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {		
 		if (DBUpdateInFlight.hasRoomFor(queries) && service.hasRoomFor(temp) ) {		
-			    
-				//NEW List<Tuple> args = new ArrayList<Tuple>(queries);
-				List<ResultObject> objs = new ArrayList<ResultObject>(queries);
-				int q = queries;
-				while (--q >= 0) {
-						//testing one per query 
-					    PgPool outerPool = pm.pool();
+	
+		   // final AtomicBoolean ok = new AtomicBoolean();
+		    
+			//pm.pool().getConnection(result -> {
 				
 				
-						final ResultObject worldObject = DBUpdateInFlight.headObject();
-						assert(null!=worldObject);
-											
-						worldObject.setConnectionId(conId);
-						worldObject.setSequenceId(seqCode);
-						worldObject.setStatus(-2);//out for work	
-						worldObject.setGroupSize(queries);
-						
-						worldObject.setId(randomValue());
-						objs.add(worldObject);					
-						
-						outerPool.preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
-								if (r.succeeded()) {
-																		
-									PgIterator resultSet = r.result().iterator();
-							        Tuple row = resultSet.next();			        
-							        
-							        assert(worldObject.getId()==row.getInteger(0));
-							        
-							        //read the existing random value and store it in the world object
-							        worldObject.setResult(row.getInteger(1));
-							        ///////////////////////////////////
-							        //the object can be used here with the old value
-							        ///////////////////////////////////
-							        //set the new random value in this object
-							        worldObject.setResult(randomValue());							        
-							        
-							        
-							        outerPool.preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2", 							        		
-						        			Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {							        	
-													if (ar.succeeded()) {														
-											        	worldObject.setStatus(200);			
-											        	
-													} else {	
-														System.out.println("unable to update");
-														if (ar.cause()!=null) {
-															ar.cause().printStackTrace();
-														}
-														
-														worldObject.setStatus(500);
-													}																												
-						        			});
-							        
-							        
-//							      //  
-//							      //  Use of batch updates is acceptable but not required. 
-//							      //   To be clear: batches are not permissible for selecting/reading the rows,
-//							      //   but batches are acceptable for writing the updates.
-//							      //  
-//							        
-//							        //TODO: can we prep this only once and hold it?
-//							        
-//							        Tuple of = Tuple.of(worldObject.getResult(), worldObject.getId());
-//							        args.add(of);
-//							        
-//							        //only call for update when we have each of the args
-//							        if (args.size()==queries) {
-//							        	Collections.sort(args, (a,b) -> {
-//											return Integer.compare( ((Tuple)a).getInteger(0),
-//															        ((Tuple)b).getInteger(0));
-//										
-//										});
-//							        	
-//							        	execUpdate(objs, args, 1);							        	
-//							        	
-//							        }							        
-							        
-								} else {	
-								
-									System.out.println("unable to query");
-									if (r.cause()!=null) {
-										r.cause().printStackTrace();
-									}
-									
-									worldObject.setStatus(500);
-								}		
-								
-								//on all N responses.....
-																
-							});	
-									
-						DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
+				//if (result.succeeded()) {
 				
 				
-				}
-			
+					//SqlConnection connection = result.result();					
+					processConnection(queries, conId, seqCode);
+					//connection.close();
+					
+				//	ok.set(true);
+					
+				//} else {
+				//	requestsInFlight.decrementAndGet();			
+				//	ok.set(false);
+				//}
+				
+			//});
+				
+			//return ok.get();
 			return true;
 			return true;
 		} else {
 		} else {
 			requestsInFlight.decrementAndGet();
 			requestsInFlight.decrementAndGet();
@@ -159,6 +93,87 @@ public class ProcessUpdate {
 		}
 		}
 	}
 	}
 
 
+
+	private void processConnection(int queries, long conId, long seqCode) {
+		
+		List<ResultObject> objs = new ArrayList<ResultObject>(queries);
+		int q = queries;
+		while (--q >= 0) {
+				processSingleUpdate(queries, conId, seqCode, objs);
+		
+		}
+	}
+
+
+	private void processSingleUpdate(int queries, long conId, long seqCode, List<ResultObject> objs) {
+		//testing one per query 
+
+		final ResultObject worldObject = DBUpdateInFlight.headObject();
+		assert(null!=worldObject);
+							
+		worldObject.setConnectionId(conId);
+		worldObject.setSequenceId(seqCode);
+		worldObject.setStatus(-2);//out for work	
+		worldObject.setGroupSize(queries);
+		
+		worldObject.setId(randomValue());
+		objs.add(worldObject);					
+		
+		pm.pool().preparedQuery("SELECT * FROM world WHERE id=$1", Tuple.of(worldObject.getId()), r -> {
+				if (r.succeeded()) {
+														
+					RowIterator<Row> resultSet = r.result().iterator();
+			        Tuple row = resultSet.next();			        
+			        
+			        assert(worldObject.getId()==row.getInteger(0));
+			        
+			        //read the existing random value and store it in the world object
+			        worldObject.setResult(row.getInteger(1));
+			        ///////////////////////////////////
+			        //the object can be used here with the old value
+			        ///////////////////////////////////
+			        //set the new random value in this object
+			        worldObject.setResult(randomValue());							        
+			        
+			        
+			        pm.pool().preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2", 							        		
+		        			Tuple.of(worldObject.getResult(), worldObject.getId()), ar -> {							        	
+									setStatus(worldObject, ar);																												
+		        			}
+		        			);					        
+			        
+				} else {	
+				
+					System.out.println("unable to query");
+					if (r.cause()!=null) {
+						r.cause().printStackTrace();
+					}
+					
+					worldObject.setStatus(500);
+				}		
+				
+				//on all N responses.....
+												
+			});	
+					
+		DBUpdateInFlight.moveHeadForward(); //always move to ensure this can be read.
+	}
+
+
+	private static void setStatus(final ResultObject worldObject, AsyncResult<RowSet<Row>> ar) {
+		if (ar.succeeded()) {														
+			worldObject.setStatus(200);			
+			
+		} else {	
+			System.out.println("unable to update");
+			if (ar.cause()!=null) {
+				ar.cause().printStackTrace();
+			}
+			
+			worldObject.setStatus(500);
+		}
+	}
+
 	
 	
 //	private void execUpdate(List<ResultObject> toUpdate, List<Tuple> args, int i) {
 //	private void execUpdate(List<ResultObject> toUpdate, List<Tuple> args, int i) {
 //				
 //