Peer.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net.Sockets;
  6. using System.Reflection;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using GodotTools.IdeMessaging.Requests;
  11. using GodotTools.IdeMessaging.Utils;
  12. namespace GodotTools.IdeMessaging
  13. {
  14. public sealed class Peer : IDisposable
  15. {
  16. /// <summary>
  17. /// Major version.
  18. /// There is no forward nor backward compatibility between different major versions.
  19. /// Connection is refused if client and server have different major versions.
  20. /// </summary>
  21. public static readonly int ProtocolVersionMajor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Major;
  22. /// <summary>
  23. /// Minor version, which clients must be backward compatible with.
  24. /// Connection is refused if the client's minor version is lower than the server's.
  25. /// </summary>
  26. public static readonly int ProtocolVersionMinor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Minor;
  27. /// <summary>
  28. /// Revision, which doesn't affect compatibility.
  29. /// </summary>
  30. public static readonly int ProtocolVersionRevision = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Revision;
  31. public const string ClientHandshakeName = "GodotIdeClient";
  32. public const string ServerHandshakeName = "GodotIdeServer";
  33. private const int ClientWriteTimeout = 8000;
  34. public delegate Task<Response> RequestHandler(Peer peer, MessageContent content);
  35. private readonly TcpClient tcpClient;
  36. private readonly TextReader clientReader;
  37. private readonly TextWriter clientWriter;
  38. private readonly SemaphoreSlim writeSem = new SemaphoreSlim(1);
  39. private string? remoteIdentity;
  40. public string RemoteIdentity => remoteIdentity ??= string.Empty;
  41. public event Action? Connected;
  42. public event Action? Disconnected;
  43. private ILogger Logger { get; }
  44. public bool IsDisposed { get; private set; }
  45. public bool IsTcpClientConnected => tcpClient.Client != null && tcpClient.Client.Connected;
  46. private bool IsConnected { get; set; }
  47. private readonly IHandshake handshake;
  48. private readonly IMessageHandler messageHandler;
  49. private readonly Dictionary<string, Queue<ResponseAwaiter>> requestAwaiterQueues = new Dictionary<string, Queue<ResponseAwaiter>>();
  50. private readonly SemaphoreSlim requestsSem = new SemaphoreSlim(1);
  51. public Peer(TcpClient tcpClient, IHandshake handshake, IMessageHandler messageHandler, ILogger logger)
  52. {
  53. this.tcpClient = tcpClient;
  54. this.handshake = handshake;
  55. this.messageHandler = messageHandler;
  56. Logger = logger;
  57. NetworkStream clientStream = tcpClient.GetStream();
  58. clientStream.WriteTimeout = ClientWriteTimeout;
  59. clientReader = new StreamReader(clientStream, Encoding.UTF8);
  60. clientWriter = new StreamWriter(clientStream, Encoding.UTF8) { NewLine = "\n" };
  61. }
  62. public async Task Process()
  63. {
  64. try
  65. {
  66. var decoder = new MessageDecoder();
  67. string? messageLine;
  68. while ((messageLine = await ReadLine()) != null)
  69. {
  70. var state = decoder.Decode(messageLine, out var msg);
  71. if (state == MessageDecoder.State.Decoding)
  72. continue; // Not finished decoding yet
  73. if (state == MessageDecoder.State.Errored)
  74. {
  75. Logger.LogError($"Received message line with invalid format: {messageLine}");
  76. continue;
  77. }
  78. Logger.LogDebug($"Received message: {msg}");
  79. try
  80. {
  81. if (msg!.Kind == MessageKind.Request)
  82. {
  83. var responseContent = await messageHandler.HandleRequest(this, msg.Id, msg.Content, Logger);
  84. await WriteMessage(new Message(MessageKind.Response, msg.Id, responseContent));
  85. }
  86. else if (msg.Kind == MessageKind.Response)
  87. {
  88. ResponseAwaiter responseAwaiter;
  89. using (await requestsSem.UseAsync())
  90. {
  91. if (!requestAwaiterQueues.TryGetValue(msg.Id, out var queue) || queue.Count <= 0)
  92. {
  93. Logger.LogError($"Received unexpected response: {msg.Id}");
  94. return;
  95. }
  96. responseAwaiter = queue.Dequeue();
  97. }
  98. responseAwaiter.SetResult(msg.Content);
  99. }
  100. else
  101. {
  102. throw new IndexOutOfRangeException($"Invalid message kind {msg.Kind}");
  103. }
  104. }
  105. catch (Exception e)
  106. {
  107. Logger.LogError($"Message handler for '{msg}' failed with exception", e);
  108. }
  109. }
  110. }
  111. catch (Exception e)
  112. {
  113. if (!IsDisposed || !(e is SocketException || e.InnerException is SocketException))
  114. {
  115. Logger.LogError("Unhandled exception in the peer loop", e);
  116. }
  117. }
  118. }
  119. public async Task<bool> DoHandshake(string identity)
  120. {
  121. if (!await WriteLine(handshake.GetHandshakeLine(identity)))
  122. {
  123. Logger.LogError("Could not write handshake");
  124. return false;
  125. }
  126. var readHandshakeTask = ReadLine();
  127. if (await Task.WhenAny(readHandshakeTask, Task.Delay(8000)) != readHandshakeTask)
  128. {
  129. Logger.LogError("Timeout waiting for the client handshake");
  130. return false;
  131. }
  132. string? peerHandshake = await readHandshakeTask;
  133. if (peerHandshake == null || !handshake.IsValidPeerHandshake(peerHandshake, out remoteIdentity, Logger))
  134. {
  135. Logger.LogError("Received invalid handshake: " + peerHandshake);
  136. return false;
  137. }
  138. IsConnected = true;
  139. Connected?.Invoke();
  140. Logger.LogInfo("Peer connection started");
  141. return true;
  142. }
  143. private async Task<string?> ReadLine()
  144. {
  145. try
  146. {
  147. return await clientReader.ReadLineAsync();
  148. }
  149. catch (Exception e)
  150. {
  151. if (IsDisposed)
  152. {
  153. var se = e as SocketException ?? e.InnerException as SocketException;
  154. if (se != null && se.SocketErrorCode == SocketError.Interrupted)
  155. return null;
  156. }
  157. throw;
  158. }
  159. }
  160. private Task<bool> WriteMessage(Message message)
  161. {
  162. Logger.LogDebug($"Sending message: {message}");
  163. int bodyLineCount = message.Content.Body.Count(c => c == '\n');
  164. bodyLineCount += 1; // Extra line break at the end
  165. var builder = new StringBuilder();
  166. builder.AppendLine(message.Kind.ToString());
  167. builder.AppendLine(message.Id);
  168. builder.AppendLine(message.Content.Status.ToString());
  169. builder.AppendLine(bodyLineCount.ToString());
  170. builder.AppendLine(message.Content.Body);
  171. return WriteLine(builder.ToString());
  172. }
  173. public async Task<TResponse?> SendRequest<TResponse>(string id, string body)
  174. where TResponse : Response, new()
  175. {
  176. ResponseAwaiter responseAwaiter;
  177. using (await requestsSem.UseAsync())
  178. {
  179. bool written = await WriteMessage(new Message(MessageKind.Request, id, new MessageContent(body)));
  180. if (!written)
  181. return null;
  182. if (!requestAwaiterQueues.TryGetValue(id, out var queue))
  183. {
  184. queue = new Queue<ResponseAwaiter>();
  185. requestAwaiterQueues.Add(id, queue);
  186. }
  187. responseAwaiter = new ResponseAwaiter<TResponse>();
  188. queue.Enqueue(responseAwaiter);
  189. }
  190. return (TResponse)await responseAwaiter;
  191. }
  192. private async Task<bool> WriteLine(string text)
  193. {
  194. if (IsDisposed || !IsTcpClientConnected)
  195. return false;
  196. using (await writeSem.UseAsync())
  197. {
  198. try
  199. {
  200. await clientWriter.WriteLineAsync(text);
  201. await clientWriter.FlushAsync();
  202. }
  203. catch (Exception e)
  204. {
  205. if (!IsDisposed)
  206. {
  207. var se = e as SocketException ?? e.InnerException as SocketException;
  208. if (se != null && se.SocketErrorCode == SocketError.Shutdown)
  209. Logger.LogInfo("Client disconnected ungracefully");
  210. else
  211. Logger.LogError("Exception thrown when trying to write to client", e);
  212. Dispose();
  213. }
  214. }
  215. }
  216. return true;
  217. }
  218. // ReSharper disable once UnusedMember.Global
  219. public void ShutdownSocketSend()
  220. {
  221. tcpClient.Client.Shutdown(SocketShutdown.Send);
  222. }
  223. public void Dispose()
  224. {
  225. if (IsDisposed)
  226. return;
  227. IsDisposed = true;
  228. if (IsTcpClientConnected)
  229. {
  230. if (IsConnected)
  231. Disconnected?.Invoke();
  232. }
  233. clientReader.Dispose();
  234. clientWriter.Dispose();
  235. ((IDisposable)tcpClient).Dispose();
  236. }
  237. }
  238. }