IpcUnserializer.hx 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package asys.io;
  2. import haxe.Error;
  3. import haxe.NoData;
  4. import haxe.async.*;
  5. import haxe.io.*;
  6. import asys.net.Socket;
  7. /**
  8. Class used internally to receive messages and handles over an IPC channel.
  9. See `CurrentProcess.initIpc` for initialising IPC for a process.
  10. **/
  11. class IpcUnserializer {
  12. static var activeUnserializer:IpcUnserializer = null;
  13. public final messageSignal:Signal<IpcMessage> = new ArraySignal();
  14. public final errorSignal:Signal<Dynamic> = new ArraySignal();
  15. final pipe:Socket;
  16. // var chunkSockets:Array<Socket> = [];
  17. var chunkLenbuf:String = "";
  18. var chunkBuf:StringBuf;
  19. var chunkSize:Null<Int> = 0;
  20. var chunkSocketCount:Int = 0;
  21. function new(pipe:Socket) {
  22. this.pipe = pipe;
  23. pipe.dataSignal.on(handleData);
  24. }
  25. function handleData(data:Bytes):Void {
  26. if (data.length == 0)
  27. return;
  28. try {
  29. var data = data.toString();
  30. while (data != null) {
  31. if (chunkSize == 0) {
  32. chunkLenbuf += data;
  33. var colonPos = chunkLenbuf.indexOf(":");
  34. if (colonPos != -1) {
  35. chunkSocketCount = 0;
  36. while (chunkLenbuf.charAt(chunkSocketCount) == "s")
  37. chunkSocketCount++;
  38. chunkSize = Std.parseInt(chunkLenbuf.substr(chunkSocketCount, colonPos));
  39. if (chunkSize == null || chunkSize <= 0) {
  40. chunkSize = 0;
  41. throw "invalid chunk size received";
  42. }
  43. chunkBuf = new StringBuf();
  44. chunkBuf.add(chunkLenbuf.substr(colonPos + 1));
  45. chunkLenbuf = "";
  46. // chunkSockets.resize(0);
  47. }
  48. } else {
  49. chunkBuf.add(data);
  50. }
  51. data = null;
  52. if (chunkSize != 0) {
  53. if (chunkBuf.length >= chunkSize) {
  54. var serial = chunkBuf.toString();
  55. if (serial.length > chunkSize) {
  56. data = serial.substr(chunkSize);
  57. serial = serial.substr(0, chunkSize);
  58. }
  59. chunkBuf = null;
  60. var chunkSockets = [];
  61. if (chunkSocketCount > pipe.handlesPending)
  62. throw "not enough handles received";
  63. for (i in 0...chunkSocketCount)
  64. chunkSockets.push(pipe.readHandle());
  65. activeUnserializer = this;
  66. var message = haxe.Unserializer.run(serial);
  67. messageSignal.emit({message: message, sockets: chunkSockets});
  68. chunkSize = 0;
  69. chunkSocketCount = 0;
  70. // chunkSockets.resize(0);
  71. activeUnserializer = null;
  72. }
  73. }
  74. }
  75. } catch (e:Dynamic) {
  76. errorSignal.emit(e);
  77. }
  78. }
  79. }