Browse Source

IPC events

Josh Engebretson 10 years ago
parent
commit
41c534dc54

+ 63 - 2
Source/Atomic/IPC/IPC.cpp

@@ -1,21 +1,82 @@
 
+#ifdef __APPLE__
+#include <unistd.h>
+#endif
+
+#include "../Core/CoreEvents.h"
+#include "../IO/Log.h"
+
 #include "IPCBroker.h"
 #include "IPCWorker.h"
 #include "IPC.h"
+#include "IPCEvents.h"
 
 namespace Atomic
 {
 
-IPC::IPC(Context* context) : Object(context)
+IPC::IPC(Context* context, int fd1, int fd2) : Object(context)
 {
+    SubscribeToEvent(E_UPDATE, HANDLER(IPC, HandleUpdate));
 
-}
+    if (fd1 != -1 && fd2 != -1)
+    {
+        // close server fd
+        close(fd1);
 
+        worker_ = new IPCWorker(fd2, context_);
+        worker_->Run();
+
+        SendEventToBroker(E_IPCWORKERSTART);
+    }
+
+}
 
 IPC::~IPC()
 {
 
 }
 
+void IPC::SendEventToBroker(StringHash eventType)
+{
+    SendEventToBroker(eventType, GetEventDataMap());
+}
+
+void IPC::SendEventToBroker(StringHash eventType, VariantMap& eventData)
+{
+    if (worker_.NotNull())
+    {
+        worker_->PostMessage(eventType, eventData);
+    }
+}
+
+
+void IPC::HandleUpdate(StringHash eventType, VariantMap& eventData)
+{
+    eventMutex_.Acquire();
+
+    for (List<QueuedEvent>::Iterator itr = queuedEvents_.Begin(); itr != queuedEvents_.End(); itr++)
+    {
+        StringHash qeventType =  (*itr).eventType_;
+        VariantMap& qeventData =  (*itr).eventData_;
+        SendEvent(qeventType, qeventData);
+    }
+
+    queuedEvents_.Clear();
+
+    eventMutex_.Release();
+}
+
+void IPC::QueueEvent(StringHash eventType, VariantMap& eventData)
+{
+    eventMutex_.Acquire();
+
+    QueuedEvent event;
+    event.eventType_ = eventType;
+    event.eventData_ = eventData;
+    queuedEvents_.Push(event);
+
+    eventMutex_.Release();
+}
+
 
 }

+ 24 - 2
Source/Atomic/IPC/IPC.h

@@ -1,6 +1,9 @@
 #pragma once
 
 #include "../Core/Object.h"
+#include "../Core/Mutex.h"
+
+#include "../Container/List.h"
 
 namespace Atomic
 {
@@ -8,6 +11,12 @@ namespace Atomic
 class IPCBroker;
 class IPCWorker;
 
+struct QueuedEvent
+{
+    StringHash eventType_;
+    VariantMap eventData_;
+};
+
 class IPC : public Object
 {
     OBJECT(IPC);
@@ -15,14 +24,27 @@ class IPC : public Object
 public:
 
     /// Construct.
-    IPC(Context* context);
+    IPC(Context* context, int fd1 = -1, int fd2 = -1);
     /// Destruct.
     virtual ~IPC();
 
+    void QueueEvent(StringHash eventType, VariantMap& eventData);
+
+    void SendEventToBroker(StringHash eventType);
+    void SendEventToBroker(StringHash eventType, VariantMap& eventData);
+
 private:
 
+    void HandleUpdate(StringHash eventType, VariantMap& eventData);
+
+    mutable Mutex eventMutex_;
+
+    List<QueuedEvent> queuedEvents_;
+
     Vector<SharedPtr<IPCBroker> > brokers_;
-    Vector<SharedPtr<IPCWorker> > workers_;
+
+    // valid on child
+    SharedPtr<IPCWorker> worker_;
 
 };
 

+ 5 - 64
Source/Atomic/IPC/IPCBroker.cpp

@@ -6,88 +6,29 @@
 
 #include "IPCUnix.h"
 #include "IPCBroker.h"
-#include "IPCMessage.h"
+
 
 namespace Atomic
 {
 
-IPCBroker::IPCBroker(Context* context) : Object(context)
-{
+IPCBroker::IPCBroker(Context* context) : IPCChannel(context)
+{    
 
 }
 
 IPCBroker::~IPCBroker()
 {
-
 }
 
 void IPCBroker::ThreadFunction()
 {
-    unsigned count = 0;
-
-    IPCMessageHeader header;
-    header.messageType_ = IPC_MESSAGE_UNDEFINED;
-    VectorBuffer dataBuffer;
-
     while (shouldRun_)
     {
-        size_t sz = 0;
-
-        const char* data = transport_.Receive(&sz);
-
-        if (!data)
+        if (!Receive())
         {
-            shouldRun_ = false;
+            Stop();
             break;
         }
-
-        if (!sz)
-            continue;
-
-        dataBuffer.Seek(dataBuffer.GetSize());
-        dataBuffer.Write(data, sz);
-        dataBuffer.Seek(0);
-
-        while (true)
-        {
-            if (header.messageType_ == IPC_MESSAGE_UNDEFINED &&
-                    dataBuffer.GetSize() - dataBuffer.GetPosition() >= sizeof(IPCMessageHeader))
-            {
-                dataBuffer.Read(&header, sizeof(IPCMessageHeader));
-            }
-
-            if (header.messageType_ == IPC_MESSAGE_UNDEFINED)
-                break;
-
-            if (header.messageType_ != IPC_MESSAGE_UNDEFINED &&
-                     header.messageSize_ <= dataBuffer.GetSize() - dataBuffer.GetPosition())
-            {
-                MemoryBuffer buffer(dataBuffer.GetData() + dataBuffer.GetPosition(), header.messageSize_);
-                dataBuffer.Seek( dataBuffer.GetPosition() + header.messageSize_);
-                header.messageType_ = IPC_MESSAGE_UNDEFINED;
-
-                IPCMessageEvent event;
-                StringHash eventType;
-                VariantMap eventData;
-                event.DoRead(buffer, eventType, eventData);
-
-                // LOGINFOF("Message: %s %i", eventData[eventType].ToString().CString(), (int) count++);
-            }
-
-            if (dataBuffer.IsEof())
-            {
-                dataBuffer.Clear();
-            }
-
-            if (dataBuffer.GetPosition() == 0)
-                break;
-
-            VectorBuffer newBuffer;
-            newBuffer.Write(dataBuffer.GetData() + dataBuffer.GetPosition(), dataBuffer.GetSize() - dataBuffer.GetPosition());
-            newBuffer.Seek(0);
-            dataBuffer = newBuffer;
-        }
-
     }
 
     shouldRun_ = false;

+ 3 - 7
Source/Atomic/IPC/IPCBroker.h

@@ -1,18 +1,14 @@
 
 #pragma once
 
-#include "../Core/Mutex.h"
-#include "../Core/Thread.h"
-#include "../Core/Object.h"
-
-#include "IPCUnix.h"
+#include "IPCChannel.h"
 
 namespace Atomic
 {
 
 class IPCProcess;
 
-class IPCBroker : public Object, public Thread
+class IPCBroker : public IPCChannel
 {
     OBJECT(IPCBroker);
 
@@ -29,7 +25,7 @@ public:
 private:
 
     SharedPtr<IPCProcess> workerProcess_;
-    PipeTransport transport_;
+
 
 };
 

+ 91 - 0
Source/Atomic/IPC/IPCChannel.cpp

@@ -0,0 +1,91 @@
+
+#include "../IO/Log.h"
+
+#include "IPCChannel.h"
+
+namespace Atomic
+{
+
+IPCChannel::IPCChannel(Context* context) : Object(context)
+{
+    ipc_ = GetSubsystem<IPC>();
+    currentHeader_.messageType_ = IPC_MESSAGE_UNDEFINED;
+}
+
+IPCChannel::~IPCChannel()
+{
+
+}
+
+void IPCChannel::PostMessage(StringHash eventType, VariantMap &eventData)
+{
+    IPCMessageEvent msgEvent;
+    msgEvent.DoSend(transport_, eventType, eventData);
+
+    LOGERRORF("Posting Message");
+}
+
+bool IPCChannel::Receive()
+{
+    size_t sz = 0;
+    const char* data = transport_.Receive(&sz);
+
+    if (!data)
+    {
+        // error
+        return false;
+    }
+
+    if (!sz)
+        return true;
+
+    dataBuffer_.Seek(dataBuffer_.GetSize());
+    dataBuffer_.Write(data, sz);
+    dataBuffer_.Seek(0);
+
+    while (true)
+    {
+        if (currentHeader_.messageType_ == IPC_MESSAGE_UNDEFINED &&
+                dataBuffer_.GetSize() - dataBuffer_.GetPosition() < sizeof(IPCMessageHeader))
+        {
+            return true;
+        }
+
+        if (currentHeader_.messageType_ == IPC_MESSAGE_UNDEFINED)
+        {
+            dataBuffer_.Read(&currentHeader_, sizeof(IPCMessageHeader));
+        }
+
+        if (currentHeader_.messageSize_ <= dataBuffer_.GetSize() - dataBuffer_.GetPosition())
+        {
+            MemoryBuffer buffer(dataBuffer_.GetData() + dataBuffer_.GetPosition(), currentHeader_.messageSize_);
+            dataBuffer_.Seek( dataBuffer_.GetPosition() + currentHeader_.messageSize_);
+            currentHeader_.messageType_ = IPC_MESSAGE_UNDEFINED;
+
+            IPCMessageEvent event;
+            StringHash eventType;
+            VariantMap eventData;
+            event.DoRead(buffer, eventType, eventData);
+            ipc_->QueueEvent(eventType, eventData);
+        }
+
+        if (dataBuffer_.IsEof())
+        {
+            dataBuffer_.Clear();
+        }
+
+        if (dataBuffer_.GetPosition() == 0)
+            break;
+
+        VectorBuffer newBuffer;
+        newBuffer.Write(dataBuffer_.GetData() + dataBuffer_.GetPosition(), dataBuffer_.GetSize() - dataBuffer_.GetPosition());
+        newBuffer.Seek(0);
+        dataBuffer_ = newBuffer;
+    }
+
+    return true;
+
+}
+
+
+}

+ 42 - 0
Source/Atomic/IPC/IPCChannel.h

@@ -0,0 +1,42 @@
+
+
+#pragma once
+
+#include "../Core/Mutex.h"
+#include "../Core/Thread.h"
+#include "../Core/Object.h"
+
+#include "IPC.h"
+#include "IPCMessage.h"
+#include "IPCUnix.h"
+
+namespace Atomic
+{
+
+class IPCChannel : public Object, public Thread
+{
+    OBJECT(IPCChannel)
+
+public:
+
+    IPCChannel(Context* context);
+    virtual ~IPCChannel();
+
+    virtual void ThreadFunction() {}
+
+    bool Receive();
+
+    void PostMessage(StringHash eventType, VariantMap& eventData);
+
+protected:
+
+    // for access from thread
+    WeakPtr<IPC> ipc_;
+    PipeTransport transport_;
+
+    IPCMessageHeader currentHeader_;
+    VectorBuffer dataBuffer_;
+
+};
+
+}

+ 14 - 0
Source/Atomic/IPC/IPCEvents.h

@@ -0,0 +1,14 @@
+#pragma once
+
+#include "../Core/Object.h"
+
+namespace Atomic
+{
+
+/// Worker start
+EVENT(E_IPCWORKERSTART, WorkerStart)
+{
+
+}
+
+}

+ 16 - 19
Source/Atomic/IPC/IPCWorker.cpp

@@ -8,9 +8,18 @@
 namespace Atomic
 {
 
-IPCWorker::IPCWorker(int fd, Context* context) : Object(context),
+IPCWorker::IPCWorker(int fd, Context* context) : IPCChannel(context),
     fd_(fd)
 {
+    if (!transport_.OpenClient(fd_))
+    {
+        LOGERRORF("Unable to open IPC transport fd = %i", fd_);
+        shouldRun_ = false;
+        return;
+    }
+
+    LOGERRORF("Opened IPC transport fd = %i", fd_);
+
 
 }
 
@@ -22,27 +31,15 @@ IPCWorker::~IPCWorker()
 void IPCWorker::ThreadFunction()
 {
 
-    PipeTransport transport;
-
-    if (!transport.OpenClient(fd_))
+    while(shouldRun_)
     {
-        LOGERRORF("Unable to open IPC transport fd = %i", fd_);
-        shouldRun_ = false;
-        return;
-    }
-
-    IPCMessageEvent msgEvent;
-
-    StringHash eventType(42);
-    VariantMap eventData;
-    eventData[eventType] = "MyMy";
-
-    for (unsigned i = 0; i < 380; i++)
-    {
-        msgEvent.DoSend(transport, eventType, eventData);
+        if (!Receive())
+        {
+            Stop();
+            break;
+        }
     }
 
-
 }
 
 }

+ 2 - 4
Source/Atomic/IPC/IPCWorker.h

@@ -1,14 +1,12 @@
 
 #pragma once
 
-#include "../Core/Mutex.h"
-#include "../Core/Thread.h"
-#include "../Core/Object.h"
+#include "IPCChannel.h"
 
 namespace Atomic
 {
 
-class IPCWorker : public Object, public Thread
+class IPCWorker : public IPCChannel
 {
     OBJECT(IPCWorker);
 

+ 5 - 0
Source/AtomicEditor/Source/AEApplication.cpp

@@ -24,6 +24,8 @@
 #include <Atomic/Environment/Environment.h>
 #include <Atomic/Graphics/Renderer.h>
 
+#include <Atomic/IPC/IPC.h>
+
 #include "AEEditorStrings.h"
 #include "AEEditorShortcuts.h"
 #include "Project/ProjectUtils.h"
@@ -70,6 +72,9 @@ void AEApplication::Start()
     Engine* engine = GetSubsystem<Engine>();
     engine->SetAutoExit(false);
 
+    // Register IPC system
+    context_->RegisterSubsystem(new IPC(context_));
+
     // Get default style
     ResourceCache* cache = GetSubsystem<ResourceCache>();
     cache->SetAutoReloadResources(true);

+ 12 - 1
Source/AtomicEditor/Source/Player/AEPlayer.cpp

@@ -9,10 +9,13 @@
 #include <Atomic/Core/Context.h>
 #include <Atomic/Core/StringUtils.h>
 #include <Atomic/IO/FileSystem.h>
+#include <Atomic/IO/Log.h>
 #include <Atomic/Input/Input.h>
 #include <Atomic/Resource/ResourceCache.h>
 #include <Atomic/UI/UI.h>
 
+#include <Atomic/IPC/IPC.h>
+#include <Atomic/IPC/IPCEvents.h>
 #include <Atomic/IPC/IPCBroker.h>
 
 #include <ToolCore/ToolEnvironment.h>
@@ -34,6 +37,7 @@ AEPlayer::AEPlayer(Context* context) :
     mode_(AE_PLAYERMODE_WIDGET)
 {
     SubscribeToEvent(E_EDITORSHUTDOWN, HANDLER(AEPlayer, HandleEditorShutdown));
+    SubscribeToEvent(E_IPCWORKERSTART, HANDLER(AEPlayer, HandleIPCWorkerStarted));
 
     assert(!context->GetSubsystem<AEPlayer>());
     context->RegisterSubsystem(this);
@@ -57,6 +61,13 @@ void AEPlayer::HandleJSError(StringHash eventType, VariantMap& eventData)
 
 }
 
+void AEPlayer::HandleIPCWorkerStarted(StringHash eventType, VariantMap& eventData)
+{
+
+    LOGINFOF("Yay");
+
+}
+
 bool AEPlayer::Play(AEPlayerMode mode, const IntRect &rect)
 {
     ToolCore::ToolEnvironment* env = GetSubsystem<ToolCore::ToolEnvironment>();
@@ -88,7 +99,7 @@ bool AEPlayer::Play(AEPlayerMode mode, const IntRect &rect)
     system->Launch(playerBinary, vargs);
     */
 
-    return false;
+    return true;
 }
 
 void AEPlayer::SetUIPlayer(UIPlayer* uiPlayer)

+ 2 - 0
Source/AtomicEditor/Source/Player/AEPlayer.h

@@ -63,6 +63,8 @@ private:
     void HandleJSError(StringHash eventType, VariantMap& eventData);
     void HandleEditorShutdown(StringHash eventType, VariantMap& eventData);
 
+    void HandleIPCWorkerStarted(StringHash eventType, VariantMap& eventData);
+
     SharedPtr<UIPlayer> uiPlayer_;    
 
 };

+ 7 - 9
Source/AtomicEditor/Source/Player/AEPlayerApplication.cpp

@@ -29,6 +29,7 @@
 #include <Atomic/Resource/ResourceCache.h>
 #include <Atomic/Resource/ResourceEvents.h>
 
+#include <Atomic/IPC/IPC.h>
 #include <Atomic/IPC/IPCWorker.h>
 
 // Move me
@@ -56,6 +57,9 @@ static Javascript* javascript = NULL;
 AEPlayerApplication::AEPlayerApplication(Context* context) :
     Application(context)
 {
+    fd_[0] = -1;
+    fd_[1] = -1;
+
 #ifdef ATOMIC_3D
     RegisterEnvironmentLibrary(context_);
 #endif
@@ -96,9 +100,6 @@ void AEPlayerApplication::Setup()
 
     const Vector<String>& arguments = GetArguments();
 
-    int ipc_server_fd = -1;
-    int ipc_client_fd = -1;
-
     for (unsigned i = 0; i < arguments.Size(); ++i)
     {
         if (arguments[i].Length() > 1)
@@ -115,14 +116,10 @@ void AEPlayerApplication::Setup()
                 {
                     int fd = ToInt(ipc[1].CString());
                     if (argument.StartsWith("--ipc-server="))
-                        ipc_server_fd = fd;
+                        fd_[0] = fd;
                     else
                     {
-
-                        ipc_client_fd = fd;
-                        close(ipc_server_fd);
-                        IPCWorker* worker = new IPCWorker(ipc_client_fd, context_);
-                        worker->Run();
+                        fd_[1] = fd;
                     }
 
                 }
@@ -143,6 +140,7 @@ void AEPlayerApplication::Setup()
 
 void AEPlayerApplication::Start()
 {
+    context_->RegisterSubsystem(new IPC(context_, fd_[0], fd_[1]));
 
     // Instantiate and register the Javascript subsystem
     javascript = new Javascript(context_);

+ 2 - 0
Source/AtomicEditor/Source/Player/AEPlayerApplication.h

@@ -52,6 +52,8 @@ private:
     /// Handle reload failure of the script file.
     void HandleScriptReloadFailed(StringHash eventType, VariantMap& eventData);
 
+    int fd_[2];
+
 };
 
 }