Browse Source

BufferQueue and less-copies WebSocket.

Jay Sistar 10 years ago
parent
commit
e7e6f0ef55

+ 1 - 1
Script/Packages/Atomic/IO.json

@@ -1,7 +1,7 @@
 {
 	"name" : "IO",
 	"sources" : ["Source/Atomic/IO"],
-	"classes" : ["Log", "File", "FileSystem", "FileWatcher"],
+	"classes" : ["Log", "File", "FileSystem", "FileWatcher", "BufferQueue"],
 	"overloads" : {
 		"File" : {
 			"File" : ["Context", "String", "FileMode"]

+ 1 - 1
Source/Atomic/Container/Vector.h

@@ -922,10 +922,10 @@ public:
     /// Return whether vector is empty.
     bool Empty() const { return size_ == 0; }
 
-private:
     /// Return the buffer with right type.
     T* Buffer() const { return reinterpret_cast<T*>(buffer_); }
 
+private:
     /// Move a range of elements within the vector.
     void MoveRange(unsigned dest, unsigned src, unsigned count)
     {

+ 127 - 0
Source/Atomic/IO/BufferQueue.cpp

@@ -0,0 +1,127 @@
+//
+// Copyright (c) 2014-2015, THUNDERBEAST GAMES LLC All rights reserved
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+
+#include "../Precompiled.h"
+
+#include "../IO/BufferQueue.h"
+
+#include "../DebugNew.h"
+
+// This BufferQueue implementation is based on the BufferedSoundStream.
+//
+// It is a Deserializer and a Serializer. However, it should only be written
+// to in large chunks because some writes cause allocation. Reads should
+// perform well reguardless of the read chunk sizes.
+
+namespace Atomic
+{
+
+BufferQueue::BufferQueue(Context* context) :
+    Object(context),
+    position_(0)
+{
+}
+
+BufferQueue::~BufferQueue()
+{
+}
+
+unsigned BufferQueue::Read(void* dest_, unsigned numBytes)
+{
+    char* dest((char*)dest_);
+    MutexLock lock(bufferMutex_);
+
+    unsigned outBytes = 0;
+
+    while (numBytes && buffers_.Size())
+    {
+        // Copy as much from the front buffer as possible, then discard it and move to the next
+        List<Pair<SharedArrayPtr<signed char>, unsigned> >::Iterator front = buffers_.Begin();
+
+        unsigned copySize = front->second_ - position_;
+        if (copySize > numBytes)
+            copySize = numBytes;
+
+        memcpy(dest, front->first_.Get() + position_, copySize);
+        position_ += copySize;
+        if (position_ >= front->second_)
+        {
+            buffers_.PopFront();
+            position_ = 0;
+        }
+
+        dest += copySize;
+        outBytes += copySize;
+        numBytes -= copySize;
+    }
+
+    size_ -= outBytes;
+    return outBytes;
+}
+
+unsigned BufferQueue::Write(const void* data, unsigned numBytes)
+{
+    if (data && numBytes)
+    {
+        MutexLock lock(bufferMutex_);
+
+        SharedArrayPtr<signed char> newBuffer(new signed char[numBytes]);
+        memcpy(newBuffer.Get(), data, numBytes);
+        buffers_.Push(MakePair(newBuffer, numBytes));
+        size_ += numBytes;
+        return numBytes;
+    }
+    return 0;
+}
+
+void BufferQueue::Write(SharedArrayPtr<signed char> data, unsigned numBytes)
+{
+    if (data && numBytes)
+    {
+        MutexLock lock(bufferMutex_);
+
+        buffers_.Push(MakePair(data, numBytes));
+        size_ += numBytes;
+    }
+}
+
+void BufferQueue::Write(SharedArrayPtr<unsigned char> data, unsigned numBytes)
+{
+    if (data && numBytes)
+    {
+        MutexLock lock(bufferMutex_);
+
+        buffers_.Push(MakePair(ReinterpretCast<signed char>(data), numBytes));
+        size_ += numBytes;
+    }
+}
+
+void BufferQueue::Clear()
+{
+    MutexLock lock(bufferMutex_);
+
+    buffers_.Clear();
+    position_ = 0;
+    size_ = 0;
+}
+
+}

+ 72 - 0
Source/Atomic/IO/BufferQueue.h

@@ -0,0 +1,72 @@
+//
+// Copyright (c) 2014-2015, THUNDERBEAST GAMES LLC All rights reserved
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+
+#pragma once
+
+#include "../Core/Object.h"
+#include "../Core/Mutex.h"
+#include "../Container/ArrayPtr.h"
+#include "../Container/List.h"
+#include "../Container/Pair.h"
+#include "../IO/Deserializer.h"
+#include "../IO/Serializer.h"
+
+namespace Atomic
+{
+
+/// %Act as a continuous byte buffer which can be appened indefinatly.
+class ATOMIC_API BufferQueue : public Object, public Deserializer, public Serializer
+{
+    OBJECT(BufferQueue)
+
+public:
+    /// Construct.
+    BufferQueue(Context* context);
+    /// Destruct.
+    virtual ~BufferQueue();
+
+    /// Seek operation is not supported for a BufferQueue.
+    virtual unsigned Seek(unsigned position) { return 0; }
+
+    /// Produce data into destination. Return number of bytes produced.
+    virtual unsigned Read(void* dest, unsigned numBytes);
+
+    /// Buffer data. Makes a copy of it. Returns size passed in.
+    virtual unsigned Write(const void* data, unsigned size);
+    /// Buffer data by taking ownership of it.
+    void Write(SharedArrayPtr<signed char> data, unsigned numBytes);
+    /// Buffer data by taking ownership of it.
+    void Write(SharedArrayPtr<unsigned char> data, unsigned numBytes);
+
+    /// Remove all buffered data.
+    void Clear();
+
+private:
+    /// Buffers and their sizes.
+    List<Pair<SharedArrayPtr<signed char>, unsigned> > buffers_;
+    /// Byte position in the front most buffer.
+    unsigned position_;
+    /// Mutex for buffer data.
+    mutable Mutex bufferMutex_;
+};
+
+}

+ 1 - 0
Source/Atomic/Web/Web.h

@@ -23,6 +23,7 @@
 #pragma once
 
 #include "../Core/Object.h"
+#include "../Container/ArrayPtr.h"
 #include "../IO/VectorBuffer.h"
 
 namespace Atomic

+ 135 - 120
Source/Atomic/Web/WebSocket.cpp

@@ -23,6 +23,7 @@
 #include "../Precompiled.h"
 
 #include "../Core/Profiler.h"
+#include "../IO/BufferQueue.h"
 #include "../IO/Log.h"
 #include "../Web/WebSocket.h"
 
@@ -38,9 +39,6 @@
 #include <websocketpp/config/asio_no_tls_client.hpp>
 #include <websocketpp/client.hpp>
 #include <iostream>
-
-#include "../Core/Thread.h"
-
 #include "../DebugNew.h"
 
 typedef websocketpp::client<websocketpp::config::asio_client> client;
@@ -56,164 +54,181 @@ namespace Atomic
 
 struct WebSocketInternalState
 {
-  /// The work queue.
-  asio::io_service service;
-  /// The WebSocket external state.
-  WebSocket &es;
-  /// URL.
-  String url;
-  /// Error string. Empty if no error.
-  String error;
-  /// Connection state.
-  WebSocketState state;
-  /// WebSocket client.
-  client c;
-  /// WebSocket connection.
-  client::connection_ptr con;
-
-  WebSocketInternalState(WebSocket &es_)
-    : es(es_)
-  {
-  }
-
-  ~WebSocketInternalState()
-  {
-  }
-
-  void OnOpen(websocketpp::connection_hdl hdl)
-  {
-    state = WS_OPEN;
-    LOGDEBUG("WebSocket CONNECTED to: " + url);
-    es.SendEvent("open");
-  }
-
-  void OnClose(websocketpp::connection_hdl hdl)
-  {
-    state = WS_CLOSED;
-    LOGDEBUG("WebSocket DISCONNECTED from: " + url);
-    es.SendEvent("close");
-  }
-
-  void OnFail(websocketpp::connection_hdl hdl)
-  {
-    state = WS_FAIL_TO_CONNECT;
-    LOGDEBUG("WebSocket FAILED to connect to: " + url);
-    es.SendEvent("fail_to_connect");
-  }
-
-  void OnMessage(websocketpp::connection_hdl hdl, message_ptr msg)
-  {
-    VariantMap eventData;
-    const std::string &payload(msg->get_payload());
-
-    eventData.Insert(MakePair(StringHash("type"), Variant(int(msg->get_opcode()))));
-
-    switch (msg->get_opcode())
+    /// The WebSocket external state.
+    WebSocket *es;
+    /// URL.
+    String url;
+    /// Error string. Empty if no error.
+    String error;
+    /// Connection state.
+    WebSocketState state;
+    /// WebSocket client.
+    client c;
+    /// WebSocket connection.
+    client::connection_ptr con;
+
+    WebSocketInternalState(WebSocket *es_)
+        : es(es_)
     {
-      case websocketpp::frame::opcode::text:
-        eventData.Insert(MakePair(StringHash("data"), Variant(String(payload.data(), payload.length()))));
-        es.SendEvent("message", eventData);
-        break;
-
-      case websocketpp::frame::opcode::binary:
-        eventData.Insert(MakePair(StringHash("data"), Variant(PODVector<unsigned char>((const unsigned char *)payload.data(), payload.length()))));
-        es.SendEvent("message", eventData);
-        break;
-
-      default:
-        eventData.Insert(MakePair(StringHash("data"), Variant(String(payload.data(), payload.length()))));
-        LOGWARNING("Unsupported WebSocket message type: " + String(int(msg->get_opcode())));
-        break;
+        LOGDEBUG("Create WebSocketInternalState");
     }
-  }
 
-  void MakeConnection()
-  {
-    websocketpp::lib::error_code ec;
-    con = c.get_connection(url.CString(), ec);
-    if (ec)
+    ~WebSocketInternalState()
     {
-      state = WS_INVALID;
-      error = ec.message().c_str();
-      LOGDEBUG("WebSocket error: " + error);
-      es.SendEvent("invalid");
-      return;
+        LOGDEBUG("Destroy WebSocketInternalState");
+    }
+
+    void OnOpen(websocketpp::connection_hdl hdl)
+    {
+        state = WS_OPEN;
+        LOGDEBUG("WebSocket CONNECTED to: " + url);
+        if (es)
+        {
+            es->SendEvent("open");
+        }
+    }
+
+    void OnClose(websocketpp::connection_hdl hdl)
+    {
+        state = WS_CLOSED;
+        LOGDEBUG("WebSocket DISCONNECTED from: " + url);
+        if (es)
+        {
+            es->SendEvent("close");
+        }
+    }
+
+    void OnFail(websocketpp::connection_hdl hdl)
+    {
+        state = WS_FAIL_TO_CONNECT;
+        LOGDEBUG("WebSocket FAILED to connect to: " + url);
+        if (es)
+        {
+            es->SendEvent("fail_to_connect");
+        }
+    }
+
+    void OnMessage(websocketpp::connection_hdl hdl, message_ptr msg)
+    {
+        if (!es)
+        {
+            return;
+        }
+        VariantMap eventData;
+        const std::string& payload(msg->get_payload());
+        SharedPtr<BufferQueue> message(new BufferQueue(es->GetContext()));
+        message->Write((const void*)payload.data(), (unsigned)payload.size());
+
+        eventData.Insert(MakePair(StringHash("type"), Variant(int(msg->get_opcode()))));
+        eventData.Insert(MakePair(StringHash("message"), Variant(message)));
+        es->SendEvent("message", eventData);
+    }
+
+    void MakeConnection()
+    {
+        websocketpp::lib::error_code ec;
+        con = c.get_connection(url.CString(), ec);
+        if (ec)
+        {
+            state = WS_INVALID;
+            error = ec.message().c_str();
+            LOGDEBUG("WebSocket error: " + error);
+            if (es)
+            {
+                es->SendEvent("invalid");
+            }
+            return;
+        }
+        c.connect(con);
     }
-    c.connect(con);
-  }
 };
 
 WebSocket::WebSocket(Context* context, const String& url) :
-  Object(context),
-  is_(new WebSocketInternalState(*this))
+    Object(context),
+    is_(new WebSocketInternalState(this))
 {
-  is_->url = url.Trimmed();
-  is_->state = WS_CONNECTING;
+    is_->url = url.Trimmed();
+    is_->state = WS_CONNECTING;
 
-  is_->c.clear_access_channels(websocketpp::log::alevel::all);
-  is_->c.clear_error_channels(websocketpp::log::elevel::all);
+    is_->c.clear_access_channels(websocketpp::log::alevel::all);
+    is_->c.clear_error_channels(websocketpp::log::elevel::all);
 }
 
 void WebSocket::setup(asio::io_service *service)
 {
-  websocketpp::lib::error_code ec;
-  is_->c.init_asio(service, ec);
-  if (ec)
-  {
-    is_->state = WS_INVALID;
-    is_->error = ec.message().c_str();
-    LOGDEBUG("WebSocket error: " + is_->error);
-    SendEvent("invalid");
-    return;
-  }
-  is_->c.set_open_handler(bind(&WebSocketInternalState::OnOpen, is_, ::_1));
-  is_->c.set_close_handler(bind(&WebSocketInternalState::OnClose, is_, ::_1));
-  is_->c.set_fail_handler(bind(&WebSocketInternalState::OnFail, is_, ::_1));
-  is_->c.set_message_handler(bind(&WebSocketInternalState::OnMessage, is_, ::_1, ::_2));
-
-  LOGDEBUG("WebSocket request to URL " + is_->url);
-
-  is_->MakeConnection();
+    LOGDEBUG("Create WebSocket");
+    websocketpp::lib::error_code ec;
+    is_->c.init_asio(service, ec);
+    if (ec)
+    {
+        is_->state = WS_INVALID;
+        is_->error = ec.message().c_str();
+        LOGDEBUG("WebSocket error: " + is_->error);
+        SendEvent("invalid");
+        return;
+    }
+    is_->c.set_open_handler(bind(&WebSocketInternalState::OnOpen, is_, ::_1));
+    is_->c.set_close_handler(bind(&WebSocketInternalState::OnClose, is_, ::_1));
+    is_->c.set_fail_handler(bind(&WebSocketInternalState::OnFail, is_, ::_1));
+    is_->c.set_message_handler(bind(&WebSocketInternalState::OnMessage, is_, ::_1, ::_2));
+
+    LOGDEBUG("WebSocket request to URL " + is_->url);
+
+    is_->MakeConnection();
 }
 
 WebSocket::~WebSocket()
 {
-  delete is_;
+    std::error_code ec;
+    is_->es = nullptr;
+    is_->con->terminate(ec);
+    is_->c.set_open_handler(nullptr);
+    is_->c.set_close_handler(nullptr);
+    is_->c.set_fail_handler(nullptr);
+    is_->c.set_message_handler(nullptr);
+    is_->con.reset();
+    is_.reset();
+    LOGDEBUG("Destroy WebSocket");
 }
 
 const String& WebSocket::GetURL() const
 {
-  return is_->url;
+    return is_->url;
 }
 
 String WebSocket::GetError() const
 {
-  return is_->error;
+    return is_->error;
 }
 
 WebSocketState WebSocket::GetState() const
 {
-  return is_->state;
+    return is_->state;
 }
 
-void WebSocket::Send(String message)
+void WebSocket::Send(const String& message)
 {
-  is_->c.send(is_->con, message.CString(), message.Length(), websocketpp::frame::opcode::text);
+    is_->c.send(is_->con, message.CString(), message.Length(), websocketpp::frame::opcode::text);
+}
+
+void WebSocket::SendBinary(const PODVector<unsigned char>& message)
+{
+    is_->c.send(is_->con, message.Buffer(), message.Size(), websocketpp::frame::opcode::binary);
 }
 
 void  WebSocket::Close()
 {
-  is_->state = WS_CLOSING;
-  websocketpp::lib::error_code ec;
-  LOGDEBUG("WebSocket atempting to close URL " + is_->url);
-  is_->con->terminate(ec);
+    is_->state = WS_CLOSING;
+    websocketpp::lib::error_code ec;
+    LOGDEBUG("WebSocket atempting to close URL " + is_->url);
+    is_->con->terminate(ec);
 }
 
 void WebSocket::OpenAgain()
 {
-  is_->state = WS_CONNECTING;
-  LOGDEBUG("WebSocket request (again) to URL " + is_->url);
-  is_->MakeConnection();
+    is_->state = WS_CONNECTING;
+    LOGDEBUG("WebSocket request (again) to URL " + is_->url);
+    is_->MakeConnection();
 }
 
 }

+ 28 - 3
Source/Atomic/Web/WebSocket.h

@@ -24,7 +24,9 @@
 
 #include "../Core/Object.h"
 #include "../Container/Str.h"
-#include "../IO/Deserializer.h"
+#include "../Container/Vector.h"
+
+#include <memory>
 
 namespace asio
 {
@@ -45,6 +47,26 @@ enum WebSocketState
     WS_FAIL_TO_CONNECT   // WebSocket attempted to open, but the server refused
 };
 
+enum WebSocketMessageType
+{
+    WSMT_CONTINUATION = 0x0,
+    WSMT_TEXT = 0x1,
+    WSMT_BINARY = 0x2,
+    WSMT_RSV3 = 0x3,
+    WSMT_RSV4 = 0x4,
+    WSMT_RSV5 = 0x5,
+    WSMT_RSV6 = 0x6,
+    WSMT_RSV7 = 0x7,
+    WSMT_CLOSE = 0x8,
+    WSMT_PING = 0x9,
+    WSMT_PONG = 0xA,
+    WSMT_CONTROL_RSVB = 0xB,
+    WSMT_CONTROL_RSVC = 0xC,
+    WSMT_CONTROL_RSVD = 0xD,
+    WSMT_CONTROL_RSVE = 0xE,
+    WSMT_CONTROL_RSVF = 0xF
+};
+
 struct WebSocketInternalState;
 
 /// A WebSocket connection.
@@ -69,7 +91,10 @@ public:
     WebSocketState GetState() const;
 
     /// Send a message.
-    void Send(String message);
+    void Send(const String& message);
+
+    /// Send a binary message.
+    void SendBinary(const PODVector<unsigned char>& message);
 
     /// Disconnect the WebSocket.
     void Close();
@@ -82,7 +107,7 @@ public:
 
 private:
     void setup(asio::io_service *service);
-    WebSocketInternalState* is_;
+    std::shared_ptr<WebSocketInternalState> is_;
 };
 
 }

+ 69 - 12
Source/AtomicJS/Javascript/JSIO.cpp

@@ -22,6 +22,7 @@
 
 #include "JSIO.h"
 #include "JSVM.h"
+#include <Atomic/IO/BufferQueue.h>
 #include <Atomic/IO/File.h>
 
 namespace Atomic
@@ -34,7 +35,8 @@ enum IO_MAGIC_TYPE
     IO_MAGIC_BOOL,
     IO_MAGIC_FLOAT,
     IO_MAGIC_STRING,
-    IO_MAGIC_ZEROSTRING
+    IO_MAGIC_ZEROSTRING,
+    IO_MAGIC_BINARY
 };
 
 
@@ -46,11 +48,17 @@ static Serializer* CastToSerializer(duk_context* ctx, int index)
     if (!o)
         return NULL;
 
-    // type check! file supported for now
-    if (o->GetType() == File::GetTypeStatic())
+    // Type check! Only File and BufferQueue are supported for now.
+    StringHash type(o->GetType());
+    if (type == File::GetTypeStatic())
     {
-        // cast it!
-        serial = (Serializer*) ((File*)o);
+        // Cast it!
+        serial = (Serializer*)((File*)o);
+    }
+    else if (type == BufferQueue::GetTypeStatic())
+    {
+        // Cast it!
+        serial = (Serializer*)((BufferQueue*)o);
     }
 
     return serial;
@@ -65,11 +73,17 @@ static Deserializer* CastToDeserializer(duk_context* ctx, int index)
     if (!o)
         return NULL;
 
-    // type check! file supported for now
-    if (o->GetType() == File::GetTypeStatic())
+    // Type check! Only File and BufferQueue are supported for now.
+    StringHash type(o->GetType());
+    if (type == File::GetTypeStatic())
+    {
+        // Cast it!
+        deserial = (Deserializer*)((File*)o);
+    }
+    else if (type == BufferQueue::GetTypeStatic())
     {
-        // cast it!
-        deserial = (Deserializer*) ((File*)o);
+        // Cast it!
+        deserial = (Deserializer*)((BufferQueue*)o);
     }
 
     return deserial;
@@ -125,6 +139,10 @@ static int Serializer_Write(duk_context* ctx)
         case IO_MAGIC_ZEROSTRING:
             success = serial->WriteString(duk_require_string(ctx, 0));
             break;
+        case IO_MAGIC_BINARY:
+            str = (const char*) duk_require_buffer_data(ctx, 0, &length);
+            success = serial->Write(str, length);
+            break;
         default:
             break;
     }
@@ -152,7 +170,7 @@ static int Deserializer_Read(duk_context* ctx)
         return 1;
     }
 
-    PODVector<unsigned char> buffer;
+    char* data;
     String str;
     size_t length;
 
@@ -175,6 +193,14 @@ static int Deserializer_Read(duk_context* ctx)
         case IO_MAGIC_ZEROSTRING:
             success = duk_push_string(ctx, deserial->ReadString().CString());
             return 1;
+        case IO_MAGIC_BINARY:
+            length = deserial->GetSize() - deserial->GetPosition();
+            duk_push_fixed_buffer(ctx, length);
+            duk_push_buffer_object(ctx, -1, 0, 8, DUK_BUFOBJ_UINT8ARRAY);
+            duk_replace(ctx, -2);
+            data = (char*) duk_require_buffer_data(ctx, 0, &length);
+            success = deserial->Read(data, length);
+            return 1;
         default:
             break;
     }
@@ -185,6 +211,25 @@ static int Deserializer_Read(duk_context* ctx)
 
 }
 
+static int Deserializer_GetSize(duk_context* ctx)
+{
+    duk_push_this(ctx);
+
+    // safe cast based on type check above
+    Deserializer* deserial = CastToDeserializer(ctx, duk_get_top_index(ctx));
+
+    duk_pop(ctx);
+
+    if (!deserial)
+    {
+        duk_push_boolean(ctx, 0);
+        return 1;
+    }
+
+    duk_push_number(ctx, (double)deserial->GetSize());
+    return 1;
+}
+
 static void AddSerializerMixin(duk_context* ctx, const String& package, const String& classname)
 {
     js_class_get_prototype(ctx, package.CString(), classname.CString());
@@ -201,8 +246,11 @@ static void AddSerializerMixin(duk_context* ctx, const String& package, const St
     duk_set_magic(ctx, -1, (unsigned) IO_MAGIC_ZEROSTRING);
     duk_put_prop_string(ctx, -2, "writeZeroString");
 
-    duk_pop(ctx);
+    duk_push_c_function(ctx, Serializer_Write, 1);
+    duk_set_magic(ctx, -1, (unsigned)IO_MAGIC_BINARY);
+    duk_put_prop_string(ctx, -2, "writeBinary");
 
+    duk_pop(ctx);
 }
 
 static void AddDeserializerMixin(duk_context* ctx, const String& package, const String& classname)
@@ -221,8 +269,14 @@ static void AddDeserializerMixin(duk_context* ctx, const String& package, const
     duk_set_magic(ctx, -1, (unsigned) IO_MAGIC_ZEROSTRING);
     duk_put_prop_string(ctx, -2, "readZeroString");
 
-    duk_pop(ctx);
+    duk_push_c_function(ctx, Deserializer_Read, 0);
+    duk_set_magic(ctx, -1, (unsigned)IO_MAGIC_BINARY);
+    duk_put_prop_string(ctx, -2, "readBinary");
+
+    duk_push_c_function(ctx, Deserializer_GetSize, 0);
+    duk_put_prop_string(ctx, -2, "getSize");
 
+    duk_pop(ctx);
 }
 
 
@@ -251,6 +305,9 @@ void jsapi_init_io(JSVM* vm)
     AddSerializerMixin(ctx, "Atomic", "File");
     AddDeserializerMixin(ctx, "Atomic", "File");
 
+    AddSerializerMixin(ctx, "Atomic", "BufferQueue");
+    AddDeserializerMixin(ctx, "Atomic", "BufferQueue");
+
 }
 
 }