Browse Source

Process management improvements

Josh Engebretson 10 years ago
parent
commit
ca3e17cbc1

+ 37 - 0
Source/Atomic/IPC/IPC.cpp

@@ -4,6 +4,7 @@
 #endif
 #endif
 
 
 #include "../Core/CoreEvents.h"
 #include "../Core/CoreEvents.h"
+#include "../Engine/Engine.h"
 #include "../IO/Log.h"
 #include "../IO/Log.h"
 
 
 #include "IPCBroker.h"
 #include "IPCBroker.h"
@@ -21,7 +22,15 @@ IPC::IPC(Context* context) : Object(context)
 
 
 IPC::~IPC()
 IPC::~IPC()
 {
 {
+    for (unsigned i = 0; i < brokers_.Size(); i++)
+        brokers_[i]->Stop();
 
 
+    brokers_.Clear();
+
+    if (worker_.NotNull())
+        worker_->Stop();
+
+    worker_ = 0;
 }
 }
 
 
 bool IPC::InitWorker(int fd1, int fd2)
 bool IPC::InitWorker(int fd1, int fd2)
@@ -65,6 +74,34 @@ void IPC::SendEventToBroker(StringHash eventType, VariantMap& eventData)
 
 
 void IPC::HandleUpdate(StringHash eventType, VariantMap& eventData)
 void IPC::HandleUpdate(StringHash eventType, VariantMap& eventData)
 {
 {
+    // If we're a worker, if update fails, time to exit
+    if (worker_.NotNull())
+    {
+        if (!worker_->Update())
+        {
+            worker_ = 0;
+            GetSubsystem<Engine>()->Exit();
+            return;
+        }
+    }
+
+    // Update brokers
+    Vector<IPCBroker*> remove;
+
+    for (unsigned i = 0; i < brokers_.Size(); i++)
+    {
+        SharedPtr<IPCBroker>& broker = brokers_[i];
+        if (!broker->Update())
+        {
+            remove.Push(broker);
+        }
+    }
+
+    for (unsigned i = 0; i < remove.Size(); i++)
+    {
+        brokers_.Remove(SharedPtr<IPCBroker>(remove[i]));
+    }
+
     eventMutex_.Acquire();
     eventMutex_.Acquire();
 
 
     for (List<QueuedEvent>::Iterator itr = queuedEvents_.Begin(); itr != queuedEvents_.End(); itr++)
     for (List<QueuedEvent>::Iterator itr = queuedEvents_.Begin(); itr != queuedEvents_.End(); itr++)

+ 4 - 0
Source/Atomic/IPC/IPC.h

@@ -28,10 +28,13 @@ public:
     /// Destruct.
     /// Destruct.
     virtual ~IPC();
     virtual ~IPC();
 
 
+    // queues an event from a worker or broker receiving thread
     void QueueEvent(StringHash eventType, VariantMap& eventData);
     void QueueEvent(StringHash eventType, VariantMap& eventData);
 
 
+    // for a child worker process
     bool InitWorker(int fd1, int fd2);
     bool InitWorker(int fd1, int fd2);
 
 
+    // spawn a worker process
     IPCBroker* SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory = "");
     IPCBroker* SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory = "");
 
 
     // worker -> broker
     // worker -> broker
@@ -40,6 +43,7 @@ public:
 
 
 private:
 private:
 
 
+    // processes queued events
     void HandleUpdate(StringHash eventType, VariantMap& eventData);
     void HandleUpdate(StringHash eventType, VariantMap& eventData);
 
 
     mutable Mutex eventMutex_;
     mutable Mutex eventMutex_;

+ 28 - 8
Source/Atomic/IPC/IPCBroker.cpp

@@ -18,15 +18,20 @@ IPCBroker::IPCBroker(Context* context) : IPCChannel(context)
 
 
 IPCBroker::~IPCBroker()
 IPCBroker::~IPCBroker()
 {
 {
+
 }
 }
 
 
 void IPCBroker::ThreadFunction()
 void IPCBroker::ThreadFunction()
 {
 {
     while (shouldRun_)
     while (shouldRun_)
     {
     {
-        if (!Receive())
+        if (!otherProcess_->IsRunning())
         {
         {
-            Stop();
+            break;
+        }
+
+        if (!Receive())
+        {            
             break;
             break;
         }
         }
     }
     }
@@ -35,13 +40,28 @@ void IPCBroker::ThreadFunction()
 
 
 }
 }
 
 
+bool IPCBroker::Update()
+{
+
+    if (otherProcess_.Null())
+        return false;
+
+    if (!shouldRun_)
+    {
+        Stop();
+        return false;
+    }
+
+    return true;
+}
+
 bool IPCBroker::SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory)
 bool IPCBroker::SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory)
 {
 {
     Vector<String> pargs;
     Vector<String> pargs;
 
 
-    workerProcess_ = new IPCProcess(context_);
+    otherProcess_ = new IPCProcess(context_, pp_.fd1(), pp_.fd2());
 
 
-    transport_.OpenServer(workerProcess_->fd1());
+    transport_.OpenServer(otherProcess_->fd1());
 
 
     // copy args
     // copy args
     for (unsigned i = 0; i < args.Size(); i++)
     for (unsigned i = 0; i < args.Size(); i++)
@@ -55,14 +75,14 @@ bool IPCBroker::SpawnWorker(const String& command, const Vector<String>& args, c
         writable_cmdline += kCmdLinePipeEq + std::wstring(pipe_num);
         writable_cmdline += kCmdLinePipeEq + std::wstring(pipe_num);
     */
     */
 #else
 #else
-    pargs.Push(ToString("--ipc-server=%i", workerProcess_->fd1()));
-    pargs.Push(ToString("--ipc-client=%i", workerProcess_->fd2()));
+    pargs.Push(ToString("--ipc-server=%i", pp_.fd1()));
+    pargs.Push(ToString("--ipc-client=%i", pp_.fd2()));
 #endif
 #endif
 
 
-    if (!workerProcess_->Launch(command, pargs, initialDirectory))
+    if (!otherProcess_->Launch(command, pargs, initialDirectory))
         return false;
         return false;
 
 
-    close(workerProcess_->fd2());
+    close(pp_.fd2());
 
 
     return Run();
     return Run();
 
 

+ 4 - 1
Source/Atomic/IPC/IPCBroker.h

@@ -22,11 +22,14 @@ public:
 
 
     void ThreadFunction();
     void ThreadFunction();
 
 
+    bool Update();
+
 private:
 private:
 
 
     bool SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory = "");
     bool SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory = "");
 
 
-    SharedPtr<IPCProcess> workerProcess_;
+    // broker instantiates the pipe pair
+    PipePair pp_;
 
 
 };
 };
 
 

+ 5 - 1
Source/Atomic/IPC/IPCChannel.h

@@ -32,8 +32,12 @@ protected:
 
 
     // for access from thread
     // for access from thread
     WeakPtr<IPC> ipc_;
     WeakPtr<IPC> ipc_;
-    PipeTransport transport_;
 
 
+    // for brokers this is the worker process
+    // for workers, the broker process
+    SharedPtr<IPCProcess> otherProcess_;
+
+    PipeTransport transport_;
     IPCMessageHeader currentHeader_;
     IPCMessageHeader currentHeader_;
     VectorBuffer dataBuffer_;
     VectorBuffer dataBuffer_;
 
 

+ 7 - 0
Source/Atomic/IPC/IPCEvents.h

@@ -11,6 +11,13 @@ EVENT(E_IPCWORKERSTART, WorkerStart)
 
 
 }
 }
 
 
+/// Worker exited
+EVENT(E_IPCWORKEREXIT, WorkerExit)
+{
+    PARAM(P_BROKER, Broker);   // Broker*
+    PARAM(P_EXITCODE, ExitCode);   // int
+}
+
 /// Worker start
 /// Worker start
 EVENT(E_IPCHELLOFROMBROKER, HelloFromBroker)
 EVENT(E_IPCHELLOFROMBROKER, HelloFromBroker)
 {
 {

+ 112 - 64
Source/Atomic/IPC/IPCUnix.cpp

@@ -7,112 +7,135 @@ namespace Atomic
 {
 {
 
 
 #include <unistd.h>
 #include <unistd.h>
+#include <signal.h>
 #include <sys/socket.h>
 #include <sys/socket.h>
+#include <libproc.h>
 #include <errno.h>
 #include <errno.h>
 
 
 #define HANDLE_EINTR(x) ({ \
 #define HANDLE_EINTR(x) ({ \
-typeof(x) __eintr_result__; \
-do { \
-__eintr_result__ = x; \
+    typeof(x) __eintr_result__; \
+    do { \
+    __eintr_result__ = x; \
 } while (__eintr_result__ == -1 && errno == EINTR); \
 } while (__eintr_result__ == -1 && errno == EINTR); \
-__eintr_result__;\
+    __eintr_result__;\
 })
 })
 
 
 bool SilenceSocket(int fd) {
 bool SilenceSocket(int fd) {
-  int nosigpipe = 1;
-  // On OSX an attempt to read or write to a closed socket may generate a
-  // SIGPIPE rather than returning -1.  setsockopt will shut this off.
-  if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE,
-                      &nosigpipe, sizeof nosigpipe)) {
-    return false;
-  }
-  return true;
+    int nosigpipe = 1;
+    // On OSX an attempt to read or write to a closed socket may generate a
+    // SIGPIPE rather than returning -1.  setsockopt will shut this off.
+    if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE,
+                        &nosigpipe, sizeof nosigpipe)) {
+        return false;
+    }
+    return true;
 }
 }
 
 
 size_t ReadFromFD(int fd, char* buffer, size_t bytes) {
 size_t ReadFromFD(int fd, char* buffer, size_t bytes) {
-  ssize_t bytes_read =
-  HANDLE_EINTR(read(fd, buffer, bytes));
-  if (bytes_read < 0) {
-    return -1;
-  }
-  return bytes_read;
+
+    fd_set set;
+    struct timeval timeout;
+    FD_ZERO(&set);
+    FD_SET(fd, &set);
+
+    timeout.tv_sec = 0;
+    // 100ms
+    timeout.tv_usec = 100000;
+
+    // check if there is anything to read
+    int rv = select(fd + 1, &set, NULL, NULL, &timeout);
+
+    if (rv < 0)
+        return -1;
+
+    if (!rv)
+        return 0;
+
+    ssize_t bytes_read =
+            HANDLE_EINTR(read(fd, buffer, bytes));
+    if (bytes_read < 0) {
+        return -1;
+    }
+    return bytes_read;
 }
 }
 
 
 size_t WriteToFD(int fd, const char* data, size_t size) {
 size_t WriteToFD(int fd, const char* data, size_t size) {
-  // Allow for partial writes.
-  ssize_t written_total = 0;
-  for (ssize_t written_partial = 0; written_total < size; written_total += written_partial) {
-    written_partial =
-    HANDLE_EINTR(write(fd, data + written_total, size - written_total));
-    if (written_partial < 0) {
-      return -1;
+    // Allow for partial writes.
+    ssize_t written_total = 0;
+    for (ssize_t written_partial = 0; written_total < size; written_total += written_partial) {
+        written_partial =
+                HANDLE_EINTR(write(fd, data + written_total, size - written_total));
+        if (written_partial < 0) {
+            return -1;
+        }
     }
     }
-  }
-  return written_total;
+    return written_total;
 }
 }
 
 
 PipePair::PipePair() {
 PipePair::PipePair() {
-  fd_[0] = -1;
-  fd_[1] = -1;
+    fd_[0] = -1;
+    fd_[1] = -1;
 
 
-  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd_) !=0) {
-    return;
-  }
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd_) !=0) {
+        return;
+    }
 };
 };
 
 
 PipeUnix::PipeUnix() : fd_(-1) {
 PipeUnix::PipeUnix() : fd_(-1) {
 }
 }
 
 
 bool PipeUnix::OpenClient(int fd) {
 bool PipeUnix::OpenClient(int fd) {
-  if (!SilenceSocket(fd)) {
-    return false;
-  }
-  fd_ = fd;
-  return true;
+    if (!SilenceSocket(fd)) {
+        return false;
+    }
+    fd_ = fd;
+    return true;
 }
 }
 
 
 bool PipeUnix::OpenServer(int fd) {
 bool PipeUnix::OpenServer(int fd) {
-  if (!SilenceSocket(fd)) {
-    return false;
-  }
-  fd_ = fd;
-  return true;
+    if (!SilenceSocket(fd)) {
+        return false;
+    }
+    fd_ = fd;
+    return true;
 }
 }
 
 
 
 
 bool PipeUnix::Write(const void* buf, size_t sz) {
 bool PipeUnix::Write(const void* buf, size_t sz) {
-  if (sz == -1) {
-    return false;
-  }
-  size_t written = WriteToFD(fd_, static_cast<const char*> (buf), sz);
-  return (sz == written);
+    if (sz == -1) {
+        return false;
+    }
+    size_t written = WriteToFD(fd_, static_cast<const char*> (buf), sz);
+    return (sz == written);
 }
 }
 
 
 bool PipeUnix::Read(void* buf, size_t* sz) {
 bool PipeUnix::Read(void* buf, size_t* sz) {
-  size_t read = ReadFromFD(fd_, static_cast<char*> (buf), *sz);
-  if (read == -1) {
-    return false;
-  }
-  *sz = read;
-  return true;
+    size_t read = ReadFromFD(fd_, static_cast<char*> (buf), *sz);
+    if (read == -1) {
+        return false;
+    }
+    *sz = read;
+    return true;
 }
 }
 
 
 
 
 char* PipeTransport::Receive(size_t* size) {
 char* PipeTransport::Receive(size_t* size) {
-  if (buf_.Size() < kBufferSz) {
-    buf_.Resize(kBufferSz);
-  }
-
-  *size = kBufferSz;
-  if (!Read(&buf_[0], size)) {
-    return NULL;
-  }
-  return &buf_[0];
+    if (buf_.Size() < kBufferSz) {
+        buf_.Resize(kBufferSz);
+    }
+
+    *size = kBufferSz;
+    if (!Read(&buf_[0], size)) {
+        return NULL;
+    }
+    return &buf_[0];
 }
 }
 
 
 
 
-IPCProcess::IPCProcess(Context* context) : Object(context),
-    pid_(-1)
+IPCProcess::IPCProcess(Context* context, int fd1, int fd2, int pid) : Object(context),
+    pid_(pid),
+    fd1_(fd1),
+    fd2_(fd2)
 {
 {
 }
 }
 
 
@@ -121,9 +144,34 @@ IPCProcess::~IPCProcess()
 
 
 }
 }
 
 
+bool IPCProcess::IsRunning()
+{
+    if (pid_ == -1)
+        return false;
+
+#ifdef __APPLE__
+    char pathbuf[PROC_PIDPATHINFO_MAXSIZE];
+    int ret = proc_pidpath (pid_, pathbuf, sizeof(pathbuf));
+    if ( ret > 0 )
+    {
+        return true;
+    }
+#else
+
+    // this doesn't seem to work on OSX?
+    if (kill(pid_, 0) == 0)
+        return true;
+#endif
+
+    return false;
+
+}
+
 bool IPCProcess::Launch(const String& command, const Vector<String>& args, const String& initialDirectory)
 bool IPCProcess::Launch(const String& command, const Vector<String>& args, const String& initialDirectory)
 {
 {
 
 
+    assert(pid_ == -1);
+
     // We must not allocated memory after fork(),
     // We must not allocated memory after fork(),
     // therefore allocate all required buffers first.
     // therefore allocate all required buffers first.
 
 

+ 8 - 6
Source/Atomic/IPC/IPCUnix.h

@@ -51,26 +51,28 @@ private:
   PODVector<char> buf_;
   PODVector<char> buf_;
 };
 };
 
 
-// held by broker
 class IPCProcess : public Object
 class IPCProcess : public Object
 {
 {
     OBJECT(IPCProcess)
     OBJECT(IPCProcess)
 
 
 public:
 public:
 
 
-    IPCProcess(Context* context);
+    IPCProcess(Context* context, int fd1, int fd2, int pid = -1);
+
     virtual ~IPCProcess();
     virtual ~IPCProcess();
 
 
-    int fd1() const { return pp_.fd1(); }
-    int fd2() const { return pp_.fd2(); }
+    bool IsRunning();
+
+    int fd1() const { return fd1_; }
+    int fd2() const { return fd2_; }
 
 
     bool Launch(const String& command, const Vector<String>& args, const String& initialDirectory);
     bool Launch(const String& command, const Vector<String>& args, const String& initialDirectory);
 
 
 private:
 private:
 
 
     int pid_;
     int pid_;
-    PipePair pp_;
-
+    int fd1_;
+    int fd2_;
 };
 };
 
 
 
 

+ 29 - 4
Source/Atomic/IPC/IPCWorker.cpp

@@ -5,12 +5,20 @@
 #include "IPCMessage.h"
 #include "IPCMessage.h"
 #include "IPCUnix.h"
 #include "IPCUnix.h"
 
 
+#ifdef ATOMIC_PLATFORM_WINDOWS
+#else
+    #include <unistd.h>
+#endif
+
 namespace Atomic
 namespace Atomic
 {
 {
 
 
 IPCWorker::IPCWorker(int fd, Context* context) : IPCChannel(context),
 IPCWorker::IPCWorker(int fd, Context* context) : IPCChannel(context),
     fd_(fd)
     fd_(fd)
 {
 {
+
+    otherProcess_ = new IPCProcess(context_, -1, fd, getppid());
+
     if (!transport_.OpenClient(fd_))
     if (!transport_.OpenClient(fd_))
     {
     {
         LOGERRORF("Unable to open IPC transport fd = %i", fd_);
         LOGERRORF("Unable to open IPC transport fd = %i", fd_);
@@ -20,7 +28,6 @@ IPCWorker::IPCWorker(int fd, Context* context) : IPCChannel(context),
 
 
     LOGERRORF("Opened IPC transport fd = %i", fd_);
     LOGERRORF("Opened IPC transport fd = %i", fd_);
 
 
-
 }
 }
 
 
 IPCWorker::~IPCWorker()
 IPCWorker::~IPCWorker()
@@ -28,18 +35,36 @@ IPCWorker::~IPCWorker()
 
 
 }
 }
 
 
-void IPCWorker::ThreadFunction()
+bool IPCWorker::Update()
 {
 {
+    if (otherProcess_.Null())
+        return false;
+
+    if (!shouldRun_)
+    {
+        Stop();
+        return false;
+    }
 
 
-    while(shouldRun_)
+    return true;
+}
+
+void IPCWorker::ThreadFunction()
+{
+    while (shouldRun_)
     {
     {
+        if (!otherProcess_->IsRunning())
+        {
+            break;
+        }
+
         if (!Receive())
         if (!Receive())
         {
         {
-            Stop();
             break;
             break;
         }
         }
     }
     }
 
 
+    shouldRun_ = false;
 }
 }
 
 
 }
 }

+ 2 - 0
Source/Atomic/IPC/IPCWorker.h

@@ -19,6 +19,8 @@ public:
 
 
     void ThreadFunction();
     void ThreadFunction();
 
 
+    bool Update();
+
 private:
 private:
 
 
     int fd_;
     int fd_;