Browse Source

IPC improvements

Josh Engebretson 10 years ago
parent
commit
c9dde12db7
3 changed files with 84 additions and 27 deletions
  1. 64 20
      Source/Atomic/IPC/IPCBroker.cpp
  2. 15 6
      Source/Atomic/IPC/IPCMessage.h
  3. 5 1
      Source/Atomic/IPC/IPCWorker.cpp

+ 64 - 20
Source/Atomic/IPC/IPCBroker.cpp

@@ -23,31 +23,75 @@ IPCBroker::~IPCBroker()
 
 void IPCBroker::ThreadFunction()
 {
-    while (shouldRun_)
-    {
-        unsigned msgType;
-        unsigned msgSize;
-
-        size_t sz = sizeof(unsigned);
-        transport_.Read(&msgType, &sz);
-
-        sz = sizeof(unsigned);
-        transport_.Read(&msgSize, &sz);
+    unsigned count = 0;
 
-        VectorBuffer buffer;
-        buffer.Resize(msgSize);
+    IPCMessageHeader header;
+    header.messageType_ = IPC_MESSAGE_UNDEFINED;
+    VectorBuffer dataBuffer;
 
-        sz = msgSize;
-        transport_.Read(buffer.GetModifiableData(), &sz);
-        assert(sz = msgSize);
-
-        IPCMessageEvent event;
+    while (shouldRun_)
+    {
+        size_t sz = 0;
+
+        const char* data = transport_.Receive(&sz);
+
+        if (!data)
+        {
+            shouldRun_ = false;
+            break;
+        }
+
+        if (!sz)
+            continue;
+
+        dataBuffer.Seek(dataBuffer.GetSize());
+        dataBuffer.Write(data, sz);
+        dataBuffer.Seek(0);
+
+        while (true)
+        {
+            if (header.messageType_ == IPC_MESSAGE_UNDEFINED &&
+                    dataBuffer.GetSize() - dataBuffer.GetPosition() >= sizeof(IPCMessageHeader))
+            {
+                dataBuffer.Read(&header, sizeof(IPCMessageHeader));
+            }
+
+            if (header.messageType_ == IPC_MESSAGE_UNDEFINED)
+                break;
+
+            if (header.messageType_ != IPC_MESSAGE_UNDEFINED &&
+                     header.messageSize_ <= dataBuffer.GetSize() - dataBuffer.GetPosition())
+            {
+                MemoryBuffer buffer(dataBuffer.GetData() + dataBuffer.GetPosition(), header.messageSize_);
+                dataBuffer.Seek( dataBuffer.GetPosition() + header.messageSize_);
+                header.messageType_ = IPC_MESSAGE_UNDEFINED;
+
+                IPCMessageEvent event;
+                StringHash eventType;
+                VariantMap eventData;
+                event.DoRead(buffer, eventType, eventData);
+
+                // LOGINFOF("Message: %s %i", eventData[eventType].ToString().CString(), (int) count++);
+            }
+
+            if (dataBuffer.IsEof())
+            {
+                dataBuffer.Clear();
+            }
+
+            if (dataBuffer.GetPosition() == 0)
+                break;
+
+            VectorBuffer newBuffer;
+            newBuffer.Write(dataBuffer.GetData() + dataBuffer.GetPosition(), dataBuffer.GetSize() - dataBuffer.GetPosition());
+            newBuffer.Seek(0);
+            dataBuffer = newBuffer;
+        }
 
-        StringHash eventType;
-        VariantMap eventData;
-        event.DoRead(buffer, eventType, eventData);
     }
 
+    shouldRun_ = false;
+
 }
 
 bool IPCBroker::SpawnWorker(const String& command, const Vector<String>& args, const String& initialDirectory)

+ 15 - 6
Source/Atomic/IPC/IPCMessage.h

@@ -10,14 +10,21 @@
 namespace Atomic
 {
 
-const unsigned IPC_MESSAGE_EVENT = 0;
+const unsigned IPC_MESSAGE_UNDEFINED = 0;
+const unsigned IPC_MESSAGE_EVENT = 1;
+
+struct IPCMessageHeader
+{
+    unsigned messageType_;
+    unsigned messageSize_;
+};
 
 class IPCMessageEvent
 {
 
 public:
 
-    bool DoRead(VectorBuffer& buffer, StringHash& eventType, VariantMap& eventData)
+    bool DoRead(MemoryBuffer& buffer, StringHash& eventType, VariantMap& eventData)
     {
         eventType = buffer.ReadStringHash();
         eventData.Clear();
@@ -31,10 +38,12 @@ public:
         buffer.WriteStringHash(eventType);
         buffer.WriteVariantMap(eventData);
 
-        unsigned sz = buffer.GetSize();
-        transport.Write(&IPC_MESSAGE_EVENT, sizeof(unsigned));
-        transport.Write(&sz, sizeof(unsigned));
-        transport.Write(buffer.GetData(), sz);
+        IPCMessageHeader header;
+        header.messageType_ = IPC_MESSAGE_EVENT;
+        header.messageSize_ = buffer.GetSize();
+
+        transport.Write(&header, sizeof(IPCMessageHeader));
+        transport.Write(buffer.GetData(), header.messageSize_);
 
         return true;
     }

+ 5 - 1
Source/Atomic/IPC/IPCWorker.cpp

@@ -36,7 +36,11 @@ void IPCWorker::ThreadFunction()
     StringHash eventType(42);
     VariantMap eventData;
     eventData[eventType] = "MyMy";
-    msgEvent.DoSend(transport, eventType, eventData);
+
+    for (unsigned i = 0; i < 380; i++)
+    {
+        msgEvent.DoSend(transport, eventType, eventData);
+    }
 
 
 }