Browse Source

Fix IPC issue on Windows which was causing reads to hang and a problem at exit

JoshEngebretson 10 years ago
parent
commit
83cf8f05c0

+ 1 - 1
Source/Atomic/IPC/IPCChannel.cpp

@@ -60,7 +60,7 @@ bool IPCChannel::Receive()
         return true;
 
     dataBuffer_.Seek(dataBuffer_.GetSize());
-    dataBuffer_.Write(data, sz);
+    dataBuffer_.Write(data, (unsigned) sz);
     dataBuffer_.Seek(0);
 
     while (true)

+ 188 - 147
Source/Atomic/IPC/IPCWindows.cpp

@@ -35,225 +35,266 @@ typedef std::wstring IPCWString;
 namespace Atomic
 {
 
-    static const wchar_t kPipePrefix[] = L"\\\\.\\pipe\\";
-    static const int kPipeBufferSz = 4 * 1024;
-    static LONG g_pipe_seq = 0;
+static const wchar_t kPipePrefix[] = L"\\\\.\\pipe\\";
+static const int kPipeBufferSz = 4 * 1024;
+static LONG g_pipe_seq = 0;
 
-    HANDLE PipePair::OpenPipeServer(const wchar_t* name, bool read)
-	{
-        IPCWString pipename(kPipePrefix);
-        pipename.append(name);
+HANDLE PipePair::OpenPipeServer(const wchar_t* name, bool read)
+{
+    IPCWString pipename(kPipePrefix);
+    pipename.append(name);
 
-		DWORD openMode = read ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND;
+    DWORD openMode = read ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND;
 
-        return ::CreateNamedPipeW(pipename.c_str(), openMode,
-            PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
-            1, kPipeBufferSz, kPipeBufferSz, 200, NULL);
-    }
+    return ::CreateNamedPipeW(pipename.c_str(), openMode,
+                              PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+                              1, kPipeBufferSz, kPipeBufferSz, 200, NULL);
+}
 
-    HANDLE PipePair::OpenPipeClient(const wchar_t* name, bool read)
-	{
-        IPCWString pipename(kPipePrefix);
-        pipename.append(name);
+HANDLE PipePair::OpenPipeClient(const wchar_t* name, bool read)
+{
+    IPCWString pipename(kPipePrefix);
+    pipename.append(name);
 
-		SECURITY_ATTRIBUTES sa;
-		sa.bInheritHandle = TRUE;
-		sa.lpSecurityDescriptor = NULL;
-		sa.nLength = sizeof(SECURITY_ATTRIBUTES);
+    SECURITY_ATTRIBUTES sa;
+    sa.bInheritHandle = TRUE;
+    sa.lpSecurityDescriptor = NULL;
+    sa.nLength = sizeof(SECURITY_ATTRIBUTES);
 
-		DWORD accessMode = read ? GENERIC_READ : GENERIC_WRITE;
+    DWORD accessMode = read ? GENERIC_READ : GENERIC_WRITE;
 
-        for (;;) {
+    for (;;) {
 
-			HANDLE pipe = ::CreateFileW(pipename.c_str(), accessMode, 0, &sa,
-                OPEN_EXISTING, 0, NULL);
+        HANDLE pipe = ::CreateFileW(pipename.c_str(), accessMode, 0, &sa,
+                                    OPEN_EXISTING, 0, NULL);
 
-            if (INVALID_HANDLE_VALUE == pipe) {
-                if (ERROR_PIPE_BUSY != ::GetLastError()) {
-                    return pipe;
-                }
-                // wait and retry.
-                ::Sleep(25);
-            }
-            else {
-                // success.
+        if (INVALID_HANDLE_VALUE == pipe) {
+            if (ERROR_PIPE_BUSY != ::GetLastError()) {
                 return pipe;
             }
+            // wait and retry.
+            ::Sleep(25);
+        }
+        else {
+            // success.
+            return pipe;
         }
     }
+}
 
-    PipePair::PipePair() :
-        srvRead_(INVALID_IPCHANDLE_VALUE),
-		srvWrite_(INVALID_IPCHANDLE_VALUE),
-		clnRead_(INVALID_IPCHANDLE_VALUE),
-		clnWrite_(INVALID_IPCHANDLE_VALUE)
-    {
-        // Come up with a reasonable unique name.
-        const wchar_t kPipePattern[] = L"ko.%x.%x.%x";
+PipePair::PipePair() :
+    srvRead_(INVALID_IPCHANDLE_VALUE),
+    srvWrite_(INVALID_IPCHANDLE_VALUE),
+    clnRead_(INVALID_IPCHANDLE_VALUE),
+    clnWrite_(INVALID_IPCHANDLE_VALUE)
+{
+    // Come up with a reasonable unique name.
+    const wchar_t kPipePattern[] = L"ko.%x.%x.%x";
 
-        wchar_t serverReadName[8 * 3 + sizeof(kPipePattern)];
-        ::wsprintfW(serverReadName, kPipePattern, ::GetCurrentProcessId(), ::GetTickCount(),
-            ::InterlockedIncrement(&g_pipe_seq));
+    wchar_t serverReadName[8 * 3 + sizeof(kPipePattern)];
+    ::wsprintfW(serverReadName, kPipePattern, ::GetCurrentProcessId(), ::GetTickCount(),
+                ::InterlockedIncrement(&g_pipe_seq));
 
-		wchar_t serverWriteName[8 * 3 + sizeof(kPipePattern)];
-		::wsprintfW(serverWriteName, kPipePattern, ::GetCurrentProcessId(), ::GetTickCount() + 1,
-			::InterlockedIncrement(&g_pipe_seq));
+    wchar_t serverWriteName[8 * 3 + sizeof(kPipePattern)];
+    ::wsprintfW(serverWriteName, kPipePattern, ::GetCurrentProcessId(), ::GetTickCount() + 1,
+                ::InterlockedIncrement(&g_pipe_seq));
 
-        srvRead_ = OpenPipeServer(serverReadName, true);
-		srvWrite_ = OpenPipeServer(serverWriteName, false);
+    srvRead_ = OpenPipeServer(serverReadName, true);
+    srvWrite_ = OpenPipeServer(serverWriteName, false);
 
 
-        // Don't allow client impersonation.
-        clnRead_ = OpenPipeClient(serverWriteName, true);
-		clnWrite_ = OpenPipeClient(serverReadName, false);
+    // Don't allow client impersonation.
+    clnRead_ = OpenPipeClient(serverWriteName, true);
+    clnWrite_ = OpenPipeClient(serverReadName, false);
 
-		/*
+    /*
         if (INVALID_HANDLE_VALUE == client)
         {
             ::CloseHandle(server);
             return;
         }
-		*/
+        */
 
-		if (!::ConnectNamedPipe(srvRead_, NULL))
+    if (!::ConnectNamedPipe(srvRead_, NULL))
+    {
+        if (ERROR_PIPE_CONNECTED != ::GetLastError())
         {
-            if (ERROR_PIPE_CONNECTED != ::GetLastError())
-            {
-               // ::CloseHandle(server);
-                //::CloseHandle(client);
-                return;
-            }
+            // ::CloseHandle(server);
+            //::CloseHandle(client);
+            return;
         }
-
-		if (!::ConnectNamedPipe(srvWrite_, NULL))
-		{
-			if (ERROR_PIPE_CONNECTED != ::GetLastError())
-			{
-				// ::CloseHandle(server);
-				//::CloseHandle(client);
-				return;
-			}
-		}
-
     }
 
-    PipeWin::PipeWin() : pipeRead_(INVALID_IPCHANDLE_VALUE), pipeWrite_(INVALID_IPCHANDLE_VALUE)
+    if (!::ConnectNamedPipe(srvWrite_, NULL))
     {
-
-    }
-
-    PipeWin::~PipeWin()
-    {
-        if (pipeRead_ != INVALID_HANDLE_VALUE)
+        if (ERROR_PIPE_CONNECTED != ::GetLastError())
         {
-            ::DisconnectNamedPipe(pipeRead_);  // $$$ disconect is valid on the server side.
-            ::CloseHandle(pipeRead_);
+            // ::CloseHandle(server);
+            //::CloseHandle(client);
+            return;
         }
+    }
 
-		if (pipeWrite_ != INVALID_HANDLE_VALUE)
-		{
-			::DisconnectNamedPipe(pipeWrite_);  // $$$ disconect is valid on the server side.
-			::CloseHandle(pipeWrite_);
-		}
+}
 
-    }
+PipeWin::PipeWin() : pipeRead_(INVALID_IPCHANDLE_VALUE), pipeWrite_(INVALID_IPCHANDLE_VALUE), readerThread_(this)
+{
 
-    bool PipeWin::OpenClient(IPCHandle pipeRead, IPCHandle pipeWrite)
-    {
-        pipeRead_ = pipeRead;
-		pipeWrite_ = pipeWrite;
-        return true;
+}
+
+PipeWin::~PipeWin()
+{
+    readerThread_.Kill();
 
+    if (pipeRead_ != INVALID_HANDLE_VALUE)
+    {
+        ::DisconnectNamedPipe(pipeRead_);  // $$$ disconect is valid on the server side.
+        ::CloseHandle(pipeRead_);
     }
 
-    bool PipeWin::OpenServer(IPCHandle pipeRead, IPCHandle pipeWrite)
+    if (pipeWrite_ != INVALID_HANDLE_VALUE)
     {
-		pipeRead_ = pipeRead;
-		pipeWrite_ = pipeWrite;
-		return true;
-	}
+        ::DisconnectNamedPipe(pipeWrite_);  // $$$ disconect is valid on the server side.
+        ::CloseHandle(pipeWrite_);
+    }
 
+}
 
-    bool PipeWin::Write(const void* buf, size_t sz)
-    {
-        DWORD written = 0;
-        if (TRUE == ::WriteFile(pipeWrite_, buf, sz, &written, NULL))
-            return true;
+bool PipeWin::OpenClient(IPCHandle pipeRead, IPCHandle pipeWrite)
+{
+    pipeRead_ = pipeRead;
+    pipeWrite_ = pipeWrite;
 
-        return false;
+    readerThread_.Run();
+
+    return true;
+
+}
+
+bool PipeWin::OpenServer(IPCHandle pipeRead, IPCHandle pipeWrite)
+{
+    pipeRead_ = pipeRead;
+    pipeWrite_ = pipeWrite;
+
+    readerThread_.Run();
+
+    return true;
+}
+
+
+bool PipeWin::Write(const void* buf, size_t sz)
+{
+    DWORD written = 0;
+    if (TRUE == ::WriteFile(pipeWrite_, buf, (DWORD) sz, &written, NULL))
+        return true;
+
+    return false;
+}
+
+void PipeWin::ReaderThread::Kill()
+{
+    if (handle_)
+    {
+        BOOL result = TerminateThread((HANDLE)handle_, 0);
+        result = CloseHandle((HANDLE)handle_);
+        handle_ = 0;
     }
+}
 
-    bool PipeWin::Read(void* buf, size_t* sz)
+void PipeWin::ReaderThread::ThreadFunction()
+{
+    while(shouldRun_)
     {
-        if (TRUE == ::ReadFile(pipeRead_, buf, *sz, reinterpret_cast<DWORD*>(sz), NULL))
+        if (readSize_)
+            continue;
+
+        DWORD bytesRead = 0;
+        if (TRUE == ::ReadFile(pipeWin_->pipeRead_, &buf_[0], 4096, &bytesRead, NULL))
         {
-            return true;
+            readSize_ = (unsigned) bytesRead;
         }
 
-        return false;
     }
+}
+
+bool PipeWin::Read(void* buf, size_t* sz)
+{
 
-    char* PipeTransport::Receive(size_t* size)
+    *sz = 0;
+    if (readerThread_.readSize_)
     {
-        if (buf_.Size() < kBufferSz)
-        {
-            buf_.Resize(kBufferSz);
-        }
+        memcpy(buf, &readerThread_.buf_[0], readerThread_.readSize_);
+        *sz = readerThread_.readSize_;
+        readerThread_.readSize_ = 0;
+    }
 
-        *size = kBufferSz;
+    return true;
+}
 
-        if (!Read(&buf_[0], size))
-        {
-            return NULL;
-        }
-        return &buf_[0];
+char* PipeTransport::Receive(size_t* size)
+{
+    if (buf_.Size() < kBufferSz)
+    {
+        buf_.Resize(kBufferSz);
     }
 
+    *size = kBufferSz;
 
-    IPCProcess::IPCProcess(Context* context, IPCHandle clientRead, IPCHandle clientWrite, IPCHandle pid) : Object(context),
-        pid_(pid),
-        clientRead_(clientRead),
-        clientWrite_(clientWrite)
+    if (!Read(&buf_[0], size))
     {
+        return NULL;
     }
+    return &buf_[0];
+}
 
-    IPCProcess::~IPCProcess()
-    {
 
-    }
+IPCProcess::IPCProcess(Context* context, IPCHandle clientRead, IPCHandle clientWrite, IPCHandle pid) : Object(context),
+    pid_(pid),
+    clientRead_(clientRead),
+    clientWrite_(clientWrite)
+{
+}
 
-    bool IPCProcess::IsRunning()
-    {
-        return true;
+IPCProcess::~IPCProcess()
+{
 
-    }
+}
 
-    bool IPCProcess::Launch(const String& command, const Vector<String>& args, const String& initialDirectory)
-    {
-        STARTUPINFOW si = { sizeof(si) };
-        PROCESS_INFORMATION pi = { 0 };
+bool IPCProcess::IsRunning()
+{
+    DWORD exitCode;
+    if (!GetExitCodeProcess(pid_, &exitCode))
+        return false;
 
-        // CreateProcess wants a single string
-        String sargs;
-        sargs.Join(args, " ");
+    return exitCode == STILL_ACTIVE;
+}
 
-        // convert to wide
-        WString wcommand(command);
+bool IPCProcess::Launch(const String& command, const Vector<String>& args, const String& initialDirectory)
+{
+    STARTUPINFOW si = { sizeof(si) };
+    PROCESS_INFORMATION pi = { 0 };
 
-        // prepend the command and convert to wide
-        WString wargs("\"" + command + "\" " + sargs);
+    // CreateProcess wants a single string
+    String sargs;
+    sargs.Join(args, " ");
 
-        // The child process inherits the pipe handle.
-        if (!::CreateProcessW(wcommand.CString(), (LPWSTR) wargs.CString(), NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
-            return false;
-        }
+    // convert to wide
+    WString wcommand(command);
 
-        pid_ = pi.hProcess;
-        ::CloseHandle(pi.hThread);
+    // prepend the command and convert to wide
+    WString wargs("\"" + command + "\" " + sargs);
 
-        return true;
+    // The child process inherits the pipe handle.
+    if (!::CreateProcessW(wcommand.CString(), (LPWSTR) wargs.CString(), NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
+        return false;
     }
 
+    pid_ = pi.hProcess;
+    ::CloseHandle(pi.hThread);
+
+    return true;
+}
+
 }
 
 #endif

+ 31 - 7
Source/Atomic/IPC/IPCWindows.h

@@ -24,6 +24,8 @@
 
 #pragma once
 
+#include "../Core/Mutex.h"
+#include "../Core/Thread.h"
 #include "../Core/Object.h"
 #include "IPCTypes.h"
 
@@ -38,18 +40,18 @@ public:
     IPCHandle serverRead() const { return srvRead_; }
     IPCHandle serverWrite() const { return srvWrite_; }
 
-	IPCHandle clientRead() const { return clnRead_; }
-	IPCHandle clientWrite() const { return clnWrite_; }
+    IPCHandle clientRead() const { return clnRead_; }
+    IPCHandle clientWrite() const { return clnWrite_; }
 
     static IPCHandle OpenPipeServer(const wchar_t* name, bool read);
     static IPCHandle OpenPipeClient(const wchar_t* name, bool read);
 
 private:
     IPCHandle srvRead_;
-	IPCHandle srvWrite_;
+    IPCHandle srvWrite_;
 
-	IPCHandle clnRead_;
-	IPCHandle clnWrite_;
+    IPCHandle clnRead_;
+    IPCHandle clnWrite_;
 };
 
 class PipeWin {
@@ -66,8 +68,30 @@ public:
     bool IsConnected() const { return pipeRead_ != INVALID_IPCHANDLE_VALUE && pipeWrite_ != INVALID_IPCHANDLE_VALUE; }
 
 private:
+
+    class ReaderThread : public Thread
+    {
+    public:
+
+        ReaderThread(PipeWin* pipeWin) : pipeWin_(pipeWin), readSize_(0)
+        {
+            buf_.Resize(4096);
+        }
+
+        void Kill();
+        void ThreadFunction();
+
+        Mutex mutex_;
+        PipeWin* pipeWin_;
+        PODVector<char> buf_;
+        unsigned readSize_;
+    };
+
     IPCHandle pipeRead_;
-	IPCHandle pipeWrite_;
+    IPCHandle pipeWrite_;
+
+    ReaderThread readerThread_;
+
 };
 
 
@@ -91,7 +115,7 @@ class IPCProcess : public Object
 
     public:
 
-    IPCProcess(Context* context, IPCHandle clientRead, IPCHandle clientWrite, IPCHandle pid = INVALID_IPCHANDLE_VALUE);
+        IPCProcess(Context* context, IPCHandle clientRead, IPCHandle clientWrite, IPCHandle pid = INVALID_IPCHANDLE_VALUE);
 
     virtual ~IPCProcess();
 

+ 3 - 0
Source/AtomicEditor/Application/AEEditorCommon.cpp

@@ -54,6 +54,9 @@ void AEEditorCommon::Setup()
 
 void AEEditorCommon::Stop()
 {
+
+    context_->RemoveSubsystem<IPC>();
+
     vm_ = 0;
     context_->RemoveSubsystem<Javascript>();
     // make sure JSVM is really down and no outstanding refs

+ 3 - 0
Source/AtomicEditor/EditorMode/AEEditorMode.cpp

@@ -62,6 +62,9 @@ void EditorMode::HandleIPCWorkerStarted(StringHash eventType, VariantMap& eventD
 void EditorMode::HandleIPCWorkerExit(StringHash eventType, VariantMap& eventData)
 {
     //SendEvent(E_EDITORPLAYSTOP);
+
+    if ( eventData[IPCWorkerExit::P_BROKER] == playerBroker_)
+        playerBroker_ = 0;
 }
 
 void EditorMode::HandleIPCWorkerLog(StringHash eventType, VariantMap& eventData)