Browse Source

Announce that we have peers on the cluster when we first see them to improve startup times, and add a result crunching script to tests/http.

Adam Ierymenko 9 years ago
parent
commit
32ec378e3b
5 changed files with 84 additions and 1 deletions
  1. 11 0
      node/Cluster.cpp
  2. 9 0
      node/Cluster.hpp
  3. 5 0
      node/Peer.cpp
  4. 58 0
      tests/http/crunch-results.js
  5. 1 1
      tests/http/server.js

+ 11 - 0
node/Cluster.cpp

@@ -363,6 +363,17 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 	}
 }
 
+void Cluster::broadcastHavePeer(const Identity &id)
+{
+	Buffer<1024> buf;
+	id.serialize(buf);
+	Mutex::Lock _l(_memberIds_m);
+	for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+		Mutex::Lock _l2(_members[*mid].lock);
+		_send(*mid,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
+	}
+}
+
 void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
 {
 	if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check

+ 9 - 0
node/Cluster.hpp

@@ -244,6 +244,15 @@ public:
 	 */
 	void handleIncomingStateMessage(const void *msg,unsigned int len);
 
+	/**
+	 * Broadcast that we have a given peer
+	 *
+	 * This should be done when new peers are first contacted.
+	 *
+	 * @param id Identity of peer
+	 */
+	void broadcastHavePeer(const Identity &id);
+
 	/**
 	 * Send this packet via another node in this cluster if another node has this peer
 	 *

+ 5 - 0
node/Peer.cpp

@@ -187,6 +187,11 @@ void Peer::received(
 						_sortPaths(now);
 					}
 
+#ifdef ZT_ENABLE_CLUSTER
+					if ((RR->cluster)&&(!suboptimalPath))
+						RR->cluster->broadcastHavePeer(_id);
+#endif
+
 				} else {
 
 					/* If this path is not known, send a HELLO. We don't learn

+ 58 - 0
tests/http/crunch-results.js

@@ -0,0 +1,58 @@
+//
+// Pipe the output of server.js into this to convert raw test results into bracketed statistics
+// suitable for graphing.
+//
+
+// Average over this interval of time
+var GRAPH_INTERVAL = 60000;
+
+// Number of bytes expected from each test
+var EXPECTED_BYTES = 5000;
+
+var readline = require('readline');
+var rl = readline.createInterface({
+  input: process.stdin,
+  output: process.stdout,
+  terminal: false
+});
+
+var startTS = 0;
+
+var count = 0.0;
+var totalFailures = 0;
+var totalPartialFailures = 0;
+var totalMs = 0;
+var totalData = 0;
+
+rl.on('line',function(line) {
+  line = line.trim();
+  var ls = line.split(',');
+  if (ls.length == 7) {
+    var ts = parseInt(ls[0]);
+    var from = ls[1];
+    var to = ls[2];
+    var ms = parseFloat(ls[3]);
+    var bytes = parseInt(ls[4]);
+    var timedOut = (ls[5] == 'true') ? true : false;
+    var errMsg = ls[6];
+
+    count += 1.0;
+    if ((bytes <= 0)||(timedOut))
+      ++totalFailures;
+    if (bytes !== EXPECTED_BYTES)
+      ++totalPartialFailures;
+    totalMs += ms;
+    totalData += bytes;
+
+    if (startTS === 0) {
+      startTS = ts;
+    } else if (((ts - startTS) >= GRAPH_INTERVAL)&&(count > 0.0)) {
+      console.log(count.toString()+','+(totalMs / count)+','+totalFailures+','+totalPartialFailures+','+totalData);
+
+      count = 0.0;
+      totalFailures = 0;
+      totalPartialFailures = 0;
+      totalMs = 0;
+    }
+  } // else ignore junk
+});

+ 1 - 1
tests/http/server.js

@@ -30,7 +30,7 @@ app.post('/:agentId',function(req,res) {
 		var resultData = null;
 		try {
 			resultData = JSON.parse(req.rawBody);
-			console.log(resultData.source+','+resultData.target+','+resultData.time+','+resultData.bytes+','+resultData.timedOut+',"'+((resultData.error) ? resultData.error : '')+'"');
+			console.log(Date.now()+','+resultData.source+','+resultData.target+','+resultData.time+','+resultData.bytes+','+resultData.timedOut+',"'+((resultData.error) ? resultData.error : '')+'"');
 		} catch (e) {}
 	}