123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Net.Sockets;
- using System.Reflection;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using GodotTools.IdeMessaging.Requests;
- using GodotTools.IdeMessaging.Utils;
- namespace GodotTools.IdeMessaging
- {
- public sealed class Peer : IDisposable
- {
- /// <summary>
- /// Major version.
- /// There is no forward nor backward compatibility between different major versions.
- /// Connection is refused if client and server have different major versions.
- /// </summary>
- public static readonly int ProtocolVersionMajor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Major;
- /// <summary>
- /// Minor version, which clients must be backward compatible with.
- /// Connection is refused if the client's minor version is lower than the server's.
- /// </summary>
- public static readonly int ProtocolVersionMinor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Minor;
- /// <summary>
- /// Revision, which doesn't affect compatibility.
- /// </summary>
- public static readonly int ProtocolVersionRevision = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Revision;
- public const string ClientHandshakeName = "GodotIdeClient";
- public const string ServerHandshakeName = "GodotIdeServer";
- private const int ClientWriteTimeout = 8000;
- public delegate Task<Response> RequestHandler(Peer peer, MessageContent content);
- private readonly TcpClient tcpClient;
- private readonly TextReader clientReader;
- private readonly TextWriter clientWriter;
- private readonly SemaphoreSlim writeSem = new SemaphoreSlim(1);
- private string? remoteIdentity;
- public string RemoteIdentity => remoteIdentity ??= string.Empty;
- public event Action? Connected;
- public event Action? Disconnected;
- private ILogger Logger { get; }
- public bool IsDisposed { get; private set; }
- public bool IsTcpClientConnected => tcpClient.Client != null && tcpClient.Client.Connected;
- private bool IsConnected { get; set; }
- private readonly IHandshake handshake;
- private readonly IMessageHandler messageHandler;
- private readonly Dictionary<string, Queue<ResponseAwaiter>> requestAwaiterQueues = new Dictionary<string, Queue<ResponseAwaiter>>();
- private readonly SemaphoreSlim requestsSem = new SemaphoreSlim(1);
- public Peer(TcpClient tcpClient, IHandshake handshake, IMessageHandler messageHandler, ILogger logger)
- {
- this.tcpClient = tcpClient;
- this.handshake = handshake;
- this.messageHandler = messageHandler;
- Logger = logger;
- NetworkStream clientStream = tcpClient.GetStream();
- clientStream.WriteTimeout = ClientWriteTimeout;
- clientReader = new StreamReader(clientStream, Encoding.UTF8);
- clientWriter = new StreamWriter(clientStream, Encoding.UTF8) { NewLine = "\n" };
- }
- public async Task Process()
- {
- try
- {
- var decoder = new MessageDecoder();
- string? messageLine;
- while ((messageLine = await ReadLine()) != null)
- {
- var state = decoder.Decode(messageLine, out var msg);
- if (state == MessageDecoder.State.Decoding)
- continue; // Not finished decoding yet
- if (state == MessageDecoder.State.Errored)
- {
- Logger.LogError($"Received message line with invalid format: {messageLine}");
- continue;
- }
- Logger.LogDebug($"Received message: {msg}");
- try
- {
- if (msg!.Kind == MessageKind.Request)
- {
- var responseContent = await messageHandler.HandleRequest(this, msg.Id, msg.Content, Logger);
- await WriteMessage(new Message(MessageKind.Response, msg.Id, responseContent));
- }
- else if (msg.Kind == MessageKind.Response)
- {
- ResponseAwaiter responseAwaiter;
- using (await requestsSem.UseAsync())
- {
- if (!requestAwaiterQueues.TryGetValue(msg.Id, out var queue) || queue.Count <= 0)
- {
- Logger.LogError($"Received unexpected response: {msg.Id}");
- return;
- }
- responseAwaiter = queue.Dequeue();
- }
- responseAwaiter.SetResult(msg.Content);
- }
- else
- {
- throw new IndexOutOfRangeException($"Invalid message kind {msg.Kind}");
- }
- }
- catch (Exception e)
- {
- Logger.LogError($"Message handler for '{msg}' failed with exception", e);
- }
- }
- }
- catch (Exception e)
- {
- if (!IsDisposed || !(e is SocketException || e.InnerException is SocketException))
- {
- Logger.LogError("Unhandled exception in the peer loop", e);
- }
- }
- }
- public async Task<bool> DoHandshake(string identity)
- {
- if (!await WriteLine(handshake.GetHandshakeLine(identity)))
- {
- Logger.LogError("Could not write handshake");
- return false;
- }
- var readHandshakeTask = ReadLine();
- if (await Task.WhenAny(readHandshakeTask, Task.Delay(8000)) != readHandshakeTask)
- {
- Logger.LogError("Timeout waiting for the client handshake");
- return false;
- }
- string? peerHandshake = await readHandshakeTask;
- if (peerHandshake == null || !handshake.IsValidPeerHandshake(peerHandshake, out remoteIdentity, Logger))
- {
- Logger.LogError("Received invalid handshake: " + peerHandshake);
- return false;
- }
- IsConnected = true;
- Connected?.Invoke();
- Logger.LogInfo("Peer connection started");
- return true;
- }
- private async Task<string?> ReadLine()
- {
- try
- {
- return await clientReader.ReadLineAsync();
- }
- catch (Exception e)
- {
- if (IsDisposed)
- {
- var se = e as SocketException ?? e.InnerException as SocketException;
- if (se != null && se.SocketErrorCode == SocketError.Interrupted)
- return null;
- }
- throw;
- }
- }
- private Task<bool> WriteMessage(Message message)
- {
- Logger.LogDebug($"Sending message: {message}");
- int bodyLineCount = message.Content.Body.Count(c => c == '\n');
- bodyLineCount += 1; // Extra line break at the end
- var builder = new StringBuilder();
- builder.AppendLine(message.Kind.ToString());
- builder.AppendLine(message.Id);
- builder.AppendLine(message.Content.Status.ToString());
- builder.AppendLine(bodyLineCount.ToString());
- builder.AppendLine(message.Content.Body);
- return WriteLine(builder.ToString());
- }
- public async Task<TResponse?> SendRequest<TResponse>(string id, string body)
- where TResponse : Response, new()
- {
- ResponseAwaiter responseAwaiter;
- using (await requestsSem.UseAsync())
- {
- bool written = await WriteMessage(new Message(MessageKind.Request, id, new MessageContent(body)));
- if (!written)
- return null;
- if (!requestAwaiterQueues.TryGetValue(id, out var queue))
- {
- queue = new Queue<ResponseAwaiter>();
- requestAwaiterQueues.Add(id, queue);
- }
- responseAwaiter = new ResponseAwaiter<TResponse>();
- queue.Enqueue(responseAwaiter);
- }
- return (TResponse)await responseAwaiter;
- }
- private async Task<bool> WriteLine(string text)
- {
- if (IsDisposed || !IsTcpClientConnected)
- return false;
- using (await writeSem.UseAsync())
- {
- try
- {
- await clientWriter.WriteLineAsync(text);
- await clientWriter.FlushAsync();
- }
- catch (Exception e)
- {
- if (!IsDisposed)
- {
- var se = e as SocketException ?? e.InnerException as SocketException;
- if (se != null && se.SocketErrorCode == SocketError.Shutdown)
- Logger.LogInfo("Client disconnected ungracefully");
- else
- Logger.LogError("Exception thrown when trying to write to client", e);
- Dispose();
- }
- }
- }
- return true;
- }
- // ReSharper disable once UnusedMember.Global
- public void ShutdownSocketSend()
- {
- tcpClient.Client.Shutdown(SocketShutdown.Send);
- }
- public void Dispose()
- {
- if (IsDisposed)
- return;
- IsDisposed = true;
- if (IsTcpClientConnected)
- {
- if (IsConnected)
- Disconnected?.Invoke();
- }
- clientReader.Dispose();
- clientWriter.Dispose();
- ((IDisposable)tcpClient).Dispose();
- }
- }
- }
|