|
@@ -41,7 +41,8 @@ JSONDB::JSONDB(const std::string &basePath) :
|
|
_basePath(basePath),
|
|
_basePath(basePath),
|
|
_rawInput(-1),
|
|
_rawInput(-1),
|
|
_rawOutput(-1),
|
|
_rawOutput(-1),
|
|
- _summaryThreadRun(true)
|
|
|
|
|
|
+ _summaryThreadRun(true),
|
|
|
|
+ _dataReady(false)
|
|
{
|
|
{
|
|
if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) {
|
|
if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) {
|
|
// If base path is http:// we run in HTTP mode
|
|
// If base path is http:// we run in HTTP mode
|
|
@@ -75,6 +76,8 @@ JSONDB::JSONDB(const std::string &basePath) :
|
|
OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions
|
|
OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ _networks_m.lock(); // locked until data is loaded, etc.
|
|
|
|
+
|
|
if (_rawInput < 0) {
|
|
if (_rawInput < 0) {
|
|
unsigned int cnt = 0;
|
|
unsigned int cnt = 0;
|
|
while (!_load(_basePath)) {
|
|
while (!_load(_basePath)) {
|
|
@@ -82,27 +85,25 @@ JSONDB::JSONDB(const std::string &basePath) :
|
|
fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str());
|
|
fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str());
|
|
Thread::sleep(250);
|
|
Thread::sleep(250);
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n)
|
|
|
|
- _recomputeSummaryInfo(n->first);
|
|
|
|
- for(;;) {
|
|
|
|
- _summaryThread_m.lock();
|
|
|
|
- if (_summaryThreadToDo.empty()) {
|
|
|
|
- _summaryThread_m.unlock();
|
|
|
|
- break;
|
|
|
|
|
|
+ for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n)
|
|
|
|
+ _summaryThreadToDo.push_back(n->first);
|
|
|
|
+
|
|
|
|
+ if (_summaryThreadToDo.size() > 0) {
|
|
|
|
+ _summaryThread = Thread::start(this);
|
|
|
|
+ } else {
|
|
|
|
+ _dataReady = true;
|
|
|
|
+ _networks_m.unlock();
|
|
}
|
|
}
|
|
- _summaryThread_m.unlock();
|
|
|
|
- Thread::sleep(50);
|
|
|
|
|
|
+ } else {
|
|
|
|
+ // In IPC mode we wait for the first message to start, and we start
|
|
|
|
+ // this thread since this thread is responsible for reading from stdin.
|
|
|
|
+ _summaryThread = Thread::start(this);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
JSONDB::~JSONDB()
|
|
JSONDB::~JSONDB()
|
|
{
|
|
{
|
|
- {
|
|
|
|
- Mutex::Lock _l(_networks_m);
|
|
|
|
- _networks.clear();
|
|
|
|
- }
|
|
|
|
Thread t;
|
|
Thread t;
|
|
{
|
|
{
|
|
Mutex::Lock _l(_summaryThread_m);
|
|
Mutex::Lock _l(_summaryThread_m);
|
|
@@ -119,11 +120,11 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
|
|
#ifndef __WINDOWS__
|
|
#ifndef __WINDOWS__
|
|
if (obj.length() > 0) {
|
|
if (obj.length() > 0) {
|
|
Mutex::Lock _l(_rawLock);
|
|
Mutex::Lock _l(_rawLock);
|
|
- if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0)
|
|
|
|
- return true;
|
|
|
|
- } else {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
|
|
+ if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) {
|
|
|
|
+ if (write(_rawOutput,"\n",1) == 1)
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ } else return true;
|
|
#endif
|
|
#endif
|
|
return false;
|
|
return false;
|
|
} else if (_httpAddr) {
|
|
} else if (_httpAddr) {
|
|
@@ -202,7 +203,7 @@ void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkC
|
|
{
|
|
{
|
|
char n[64];
|
|
char n[64];
|
|
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId);
|
|
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId);
|
|
- writeRaw(n,OSUtils::jsonDump(networkConfig));
|
|
|
|
|
|
+ writeRaw(n,OSUtils::jsonDump(networkConfig,-1));
|
|
{
|
|
{
|
|
Mutex::Lock _l(_networks_m);
|
|
Mutex::Lock _l(_networks_m);
|
|
_networks[networkId].config = nlohmann::json::to_msgpack(networkConfig);
|
|
_networks[networkId].config = nlohmann::json::to_msgpack(networkConfig);
|
|
@@ -214,7 +215,7 @@ void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,co
|
|
{
|
|
{
|
|
char n[256];
|
|
char n[256];
|
|
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId);
|
|
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId);
|
|
- writeRaw(n,OSUtils::jsonDump(memberConfig));
|
|
|
|
|
|
+ writeRaw(n,OSUtils::jsonDump(memberConfig,-1));
|
|
{
|
|
{
|
|
Mutex::Lock _l(_networks_m);
|
|
Mutex::Lock _l(_networks_m);
|
|
_networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig);
|
|
_networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig);
|
|
@@ -310,6 +311,7 @@ void JSONDB::threadMain()
|
|
std::string rawInputBuf;
|
|
std::string rawInputBuf;
|
|
FD_ZERO(&readfds);
|
|
FD_ZERO(&readfds);
|
|
FD_ZERO(&nullfds);
|
|
FD_ZERO(&nullfds);
|
|
|
|
+ struct timeval tv;
|
|
#endif
|
|
#endif
|
|
|
|
|
|
std::vector<uint64_t> todo;
|
|
std::vector<uint64_t> todo;
|
|
@@ -317,24 +319,42 @@ void JSONDB::threadMain()
|
|
while (_summaryThreadRun) {
|
|
while (_summaryThreadRun) {
|
|
#ifndef __WINDOWS__
|
|
#ifndef __WINDOWS__
|
|
if (_rawInput < 0) {
|
|
if (_rawInput < 0) {
|
|
|
|
+ // In HTTP and filesystem mode we just wait for summary to-do items
|
|
Thread::sleep(25);
|
|
Thread::sleep(25);
|
|
} else {
|
|
} else {
|
|
|
|
+ // In IPC mode we wait but also select() on STDIN to read database updates
|
|
FD_SET(_rawInput,&readfds);
|
|
FD_SET(_rawInput,&readfds);
|
|
- struct timeval tv;
|
|
|
|
tv.tv_sec = 0;
|
|
tv.tv_sec = 0;
|
|
tv.tv_usec = 25000;
|
|
tv.tv_usec = 25000;
|
|
select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv);
|
|
select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv);
|
|
if (FD_ISSET(_rawInput,&readfds)) {
|
|
if (FD_ISSET(_rawInput,&readfds)) {
|
|
const long rn = (long)read(_rawInput,readbuf,1048576);
|
|
const long rn = (long)read(_rawInput,readbuf,1048576);
|
|
|
|
+ bool gotMessage = false;
|
|
for(long i=0;i<rn;++i) {
|
|
for(long i=0;i<rn;++i) {
|
|
- if (readbuf[i]) {
|
|
|
|
|
|
+ if ((readbuf[i] != '\n')&&(readbuf[i] != '\r')&&(readbuf[i] != 0)) { // compatible with nodeJS IPC
|
|
rawInputBuf.push_back(readbuf[i]);
|
|
rawInputBuf.push_back(readbuf[i]);
|
|
} else if (rawInputBuf.length() > 0) {
|
|
} else if (rawInputBuf.length() > 0) {
|
|
- _add(OSUtils::jsonParse(rawInputBuf));
|
|
|
|
|
|
+ try {
|
|
|
|
+ const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf));
|
|
|
|
+
|
|
|
|
+ gotMessage = true;
|
|
|
|
+ if (!_dataReady) {
|
|
|
|
+ _dataReady = true;
|
|
|
|
+ _networks_m.unlock();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (obj.is_array()) {
|
|
|
|
+ for(unsigned long i=0;i<obj.size();++i)
|
|
|
|
+ _add(obj[i]);
|
|
|
|
+ } else if (obj.is_object()) {
|
|
|
|
+ _add(obj);
|
|
|
|
+ }
|
|
|
|
+ } catch ( ... ) {} // ignore malformed JSON
|
|
rawInputBuf.clear();
|
|
rawInputBuf.clear();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- continue; // we only want to do the stuff below this every few dozen ms or so, so pause again
|
|
|
|
|
|
+ if (!gotMessage) // select() again immediately until we get at least one full message
|
|
|
|
+ continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
#else
|
|
#else
|
|
@@ -347,73 +367,83 @@ void JSONDB::threadMain()
|
|
continue;
|
|
continue;
|
|
else _summaryThreadToDo.swap(todo);
|
|
else _summaryThreadToDo.swap(todo);
|
|
}
|
|
}
|
|
- const uint64_t now = OSUtils::now();
|
|
|
|
- for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) {
|
|
|
|
- const uint64_t networkId = *ii;
|
|
|
|
|
|
|
|
|
|
+ if (!_dataReady) {
|
|
|
|
+ _dataReady = true;
|
|
|
|
+ _networks_m.unlock();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ const uint64_t now = OSUtils::now();
|
|
|
|
+ try {
|
|
Mutex::Lock _l(_networks_m);
|
|
Mutex::Lock _l(_networks_m);
|
|
- std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId));
|
|
|
|
- if (n != _networks.end()) {
|
|
|
|
- NetworkSummaryInfo &ns = n->second.summaryInfo;
|
|
|
|
- ns.activeBridges.clear();
|
|
|
|
- ns.allocatedIps.clear();
|
|
|
|
- ns.authorizedMemberCount = 0;
|
|
|
|
- ns.activeMemberCount = 0;
|
|
|
|
- ns.totalMemberCount = 0;
|
|
|
|
- ns.mostRecentDeauthTime = 0;
|
|
|
|
-
|
|
|
|
- for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) {
|
|
|
|
- try {
|
|
|
|
- nlohmann::json member(nlohmann::json::from_msgpack(m->second));
|
|
|
|
-
|
|
|
|
- if (OSUtils::jsonBool(member["authorized"],false)) {
|
|
|
|
- ++ns.authorizedMemberCount;
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- const nlohmann::json &mlog = member["recentLog"];
|
|
|
|
- if ((mlog.is_array())&&(mlog.size() > 0)) {
|
|
|
|
- const nlohmann::json &mlog1 = mlog[0];
|
|
|
|
- if (mlog1.is_object()) {
|
|
|
|
- if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2))
|
|
|
|
- ++ns.activeMemberCount;
|
|
|
|
|
|
+ for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) {
|
|
|
|
+ const uint64_t networkId = *ii;
|
|
|
|
+ std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId));
|
|
|
|
+ if (n != _networks.end()) {
|
|
|
|
+ NetworkSummaryInfo &ns = n->second.summaryInfo;
|
|
|
|
+ ns.activeBridges.clear();
|
|
|
|
+ ns.allocatedIps.clear();
|
|
|
|
+ ns.authorizedMemberCount = 0;
|
|
|
|
+ ns.activeMemberCount = 0;
|
|
|
|
+ ns.totalMemberCount = 0;
|
|
|
|
+ ns.mostRecentDeauthTime = 0;
|
|
|
|
+
|
|
|
|
+ for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) {
|
|
|
|
+ try {
|
|
|
|
+ nlohmann::json member(nlohmann::json::from_msgpack(m->second));
|
|
|
|
+
|
|
|
|
+ if (OSUtils::jsonBool(member["authorized"],false)) {
|
|
|
|
+ ++ns.authorizedMemberCount;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ const nlohmann::json &mlog = member["recentLog"];
|
|
|
|
+ if ((mlog.is_array())&&(mlog.size() > 0)) {
|
|
|
|
+ const nlohmann::json &mlog1 = mlog[0];
|
|
|
|
+ if (mlog1.is_object()) {
|
|
|
|
+ if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2))
|
|
|
|
+ ++ns.activeMemberCount;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- } catch ( ... ) {}
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- if (OSUtils::jsonBool(member["activeBridge"],false))
|
|
|
|
- ns.activeBridges.push_back(Address(m->first));
|
|
|
|
- } catch ( ... ) {}
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- const nlohmann::json &mips = member["ipAssignments"];
|
|
|
|
- if (mips.is_array()) {
|
|
|
|
- for(unsigned long i=0;i<mips.size();++i) {
|
|
|
|
- InetAddress mip(OSUtils::jsonString(mips[i],"").c_str());
|
|
|
|
- if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
|
|
|
|
- ns.allocatedIps.push_back(mip);
|
|
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ if (OSUtils::jsonBool(member["activeBridge"],false))
|
|
|
|
+ ns.activeBridges.push_back(Address(m->first));
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ const nlohmann::json &mips = member["ipAssignments"];
|
|
|
|
+ if (mips.is_array()) {
|
|
|
|
+ for(unsigned long i=0;i<mips.size();++i) {
|
|
|
|
+ InetAddress mip(OSUtils::jsonString(mips[i],"").c_str());
|
|
|
|
+ if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
|
|
|
|
+ ns.allocatedIps.push_back(mip);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- } catch ( ... ) {}
|
|
|
|
- } else {
|
|
|
|
- try {
|
|
|
|
- ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
|
|
|
|
- } catch ( ... ) {}
|
|
|
|
- }
|
|
|
|
- ++ns.totalMemberCount;
|
|
|
|
- } catch ( ... ) {}
|
|
|
|
- }
|
|
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
+ } else {
|
|
|
|
+ try {
|
|
|
|
+ ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
+ }
|
|
|
|
+ ++ns.totalMemberCount;
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
+ }
|
|
|
|
|
|
- std::sort(ns.activeBridges.begin(),ns.activeBridges.end());
|
|
|
|
- std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end());
|
|
|
|
|
|
+ std::sort(ns.activeBridges.begin(),ns.activeBridges.end());
|
|
|
|
+ std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end());
|
|
|
|
|
|
- n->second.summaryInfoLastComputed = now;
|
|
|
|
|
|
+ n->second.summaryInfoLastComputed = now;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ } catch ( ... ) {}
|
|
|
|
|
|
todo.clear();
|
|
todo.clear();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (!_dataReady) // sanity check
|
|
|
|
+ _networks_m.unlock();
|
|
|
|
+
|
|
#ifndef __WINDOWS__
|
|
#ifndef __WINDOWS__
|
|
delete [] readbuf;
|
|
delete [] readbuf;
|
|
#endif
|
|
#endif
|