SocketClient.cs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. using System.Collections.Concurrent;
  2. using System.Net;
  3. using System.Net.Sockets;
  4. using System.Text;
  5. namespace QuestPDF.Previewer;
  6. internal class SocketClient
  7. {
  8. private TcpClient Client { get; set; }
  9. private NetworkStream Stream;
  10. private ConcurrentQueue<string> OutgoingMessages { get; } = new();
  11. private ConcurrentQueue<string> IncomingMessages { get; } = new();
  12. public event Func<string, Task>? OnMessageReceived;
  13. public SocketClient(string ipAddress, int port)
  14. {
  15. Client = new TcpClient();
  16. Client.Connect(IPAddress.Parse(ipAddress), port);
  17. Stream = Client.GetStream();
  18. }
  19. public SocketClient(TcpClient client)
  20. {
  21. Client = client;
  22. Stream = client.GetStream();
  23. }
  24. public Task StartCommunication(CancellationToken cancellationToken = default)
  25. {
  26. var taskWorkers = Enumerable
  27. .Range(0, Environment.ProcessorCount)
  28. .Select(_ => Task.Run(() => HandleTaskReceivers(cancellationToken)))
  29. .ToList();
  30. var runningTasks = new List<Task>
  31. {
  32. Task.Run(() => HandleIncomingMessages(cancellationToken)),
  33. Task.Run(() => HandleOutgoingMessages(cancellationToken))
  34. };
  35. return Task.WhenAll(taskWorkers.Concat(runningTasks));
  36. }
  37. public void SendMessage(string message)
  38. {
  39. OutgoingMessages.Enqueue(message);
  40. }
  41. private async Task HandleIncomingMessages(CancellationToken cancellationToken = default)
  42. {
  43. using var binaryStream = new BinaryReader(Stream);
  44. while (Client.Connected)
  45. {
  46. var messageLength = binaryStream.ReadInt32();
  47. var messageBytes = new byte[messageLength];
  48. await Stream.ReadExactlyAsync(messageBytes, 0, messageBytes.Length, cancellationToken);
  49. var message = Encoding.UTF8.GetString(messageBytes);
  50. IncomingMessages.Enqueue(message);
  51. }
  52. }
  53. private async Task HandleOutgoingMessages(CancellationToken cancellationToken = default)
  54. {
  55. while (Client.Connected)
  56. {
  57. if (!OutgoingMessages.TryDequeue(out var message))
  58. {
  59. await Task.Delay(10, cancellationToken);
  60. continue;
  61. }
  62. Console.WriteLine($"Sending message: {message}");
  63. var data = Encoding.UTF8.GetBytes(message);
  64. var length = BitConverter.GetBytes(data.Length);
  65. await Stream.WriteAsync(length, 0, length.Length, cancellationToken);
  66. await Stream.WriteAsync(data, 0, data.Length, cancellationToken);
  67. }
  68. }
  69. private async Task HandleTaskReceivers(CancellationToken cancellationToken = default)
  70. {
  71. while (Client.Connected)
  72. {
  73. if (!IncomingMessages.TryDequeue(out var message))
  74. {
  75. await Task.Delay(10, cancellationToken);
  76. continue;
  77. }
  78. if (OnMessageReceived == null)
  79. continue;
  80. await OnMessageReceived.Invoke(message);
  81. }
  82. }
  83. public void Close()
  84. {
  85. OnMessageReceived = null;
  86. Stream.Close();
  87. Client.Close();
  88. }
  89. }