UdpDuplexChannel.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. //
  2. // Author: Atsushi Enomoto <[email protected]>
  3. //
  4. // Copyright (C) 2010 Novell, Inc (http://www.novell.com)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining
  7. // a copy of this software and associated documentation files (the
  8. // "Software"), to deal in the Software without restriction, including
  9. // without limitation the rights to use, copy, modify, merge, publish,
  10. // distribute, sublicense, and/or sell copies of the Software, and to
  11. // permit persons to whom the Software is furnished to do so, subject to
  12. // the following conditions:
  13. //
  14. // The above copyright notice and this permission notice shall be
  15. // included in all copies or substantial portions of the Software.
  16. //
  17. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  18. // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  19. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  20. // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
  21. // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
  22. // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  23. // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  24. //
  25. using System;
  26. using System.Collections.Generic;
  27. using System.IO;
  28. using System.Linq;
  29. using System.Net;
  30. using System.Net.NetworkInformation;
  31. using System.Net.Sockets;
  32. using System.ServiceModel;
  33. using System.ServiceModel.Channels;
  34. using System.Threading;
  35. using System.Xml;
  36. namespace System.ServiceModel.Discovery
  37. {
  38. internal class UdpDuplexChannel : ChannelBase, IDuplexChannel
  39. {
  40. // channel factory
  41. public UdpDuplexChannel (UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)
  42. : base (factory)
  43. {
  44. if (factory == null)
  45. throw new ArgumentNullException ("factory");
  46. if (context == null)
  47. throw new ArgumentNullException ("context");
  48. if (address == null)
  49. throw new ArgumentNullException ("address");
  50. binding_element = factory.Source;
  51. RemoteAddress = address;
  52. Via = via;
  53. FillMessageEncoder (context);
  54. }
  55. public UdpDuplexChannel (UdpChannelListener listener)
  56. : base (listener)
  57. {
  58. binding_element = listener.Source;
  59. LocalAddress = new EndpointAddress (listener.Uri);
  60. FillMessageEncoder (listener.Context);
  61. }
  62. MessageEncoder message_encoder;
  63. UdpClient client;
  64. IPAddress multicast_address;
  65. UdpTransportBindingElement binding_element;
  66. // for servers
  67. public EndpointAddress LocalAddress { get; private set; }
  68. // for clients
  69. public EndpointAddress RemoteAddress { get; private set; }
  70. public Uri Via { get; private set; }
  71. void FillMessageEncoder (BindingContext ctx)
  72. {
  73. var mbe = (MessageEncodingBindingElement) ctx.RemainingBindingElements.FirstOrDefault (be => be is MessageEncodingBindingElement);
  74. if (mbe == null)
  75. mbe = new TextMessageEncodingBindingElement ();
  76. message_encoder = mbe.CreateMessageEncoderFactory ().Encoder;
  77. }
  78. public void Send (Message message)
  79. {
  80. Send (message, DefaultSendTimeout);
  81. }
  82. static readonly Random rnd = new Random ();
  83. UdpClient GetSenderClient (Message message)
  84. {
  85. if (RemoteAddress != null)
  86. return client;
  87. var rmp = message.Properties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
  88. if (rmp == null)
  89. throw new ArgumentException ("This duplex channel from the channel listener cannot send messages without RemoteEndpointMessageProperty");
  90. var cli = new UdpClient ();
  91. cli.Connect (IPAddress.Parse (rmp.Address), rmp.Port);
  92. return cli;
  93. }
  94. public void Send (Message message, TimeSpan timeout)
  95. {
  96. if (State != CommunicationState.Opened)
  97. throw new InvalidOperationException ("The UDP channel must be opened before sending a message.");
  98. var cli = GetSenderClient (message);
  99. try {
  100. SendCore (cli, message, timeout);
  101. } finally {
  102. if (cli != client)
  103. cli.Close ();
  104. }
  105. }
  106. void SendCore (UdpClient cli, Message message, TimeSpan timeout)
  107. {
  108. var ms = new MemoryStream ();
  109. message_encoder.WriteMessage (message, ms);
  110. // It seems .NET sends the same Message a couple of times so that the receivers don't miss it. So, do the same hack.
  111. for (int i = 0; i < 6; i++) {
  112. // FIXME: use MaxAnnouncementDelay. It is fixed now.
  113. Thread.Sleep (rnd.Next (50, 500));
  114. cli.Send (ms.GetBuffer (), (int) ms.Length);
  115. }
  116. }
  117. public bool WaitForMessage (TimeSpan timeout)
  118. {
  119. throw new NotImplementedException ();
  120. }
  121. public Message Receive ()
  122. {
  123. return Receive (DefaultReceiveTimeout);
  124. }
  125. public Message Receive (TimeSpan timeout)
  126. {
  127. Message msg;
  128. if (!TryReceive (timeout, out msg))
  129. throw new TimeoutException ();
  130. return msg;
  131. }
  132. public bool TryReceive (TimeSpan timeout, out Message msg)
  133. {
  134. DateTime start = DateTime.Now;
  135. ThrowIfDisposedOrNotOpen ();
  136. msg = null;
  137. if (client == null) // could be invoked while being closed.
  138. return false;
  139. byte [] bytes = null;
  140. IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0);
  141. var ar = client.BeginReceive (delegate (IAsyncResult result) {
  142. bytes = client.EndReceive (result, ref ip);
  143. }, null);
  144. if (!ar.IsCompleted && !ar.AsyncWaitHandle.WaitOne (timeout))
  145. return false;
  146. if (bytes == null || bytes.Length == 0)
  147. return false;
  148. // Clients will send the same message many times, and this receiver has to
  149. // FIXME: give maxSizeOfHeaders
  150. msg = message_encoder.ReadMessage (new MemoryStream (bytes), int.MaxValue);
  151. var id = msg.Headers.MessageId;
  152. if (message_ids.Contains (id))
  153. return TryReceive (timeout - (DateTime.Now - start), out msg);
  154. if (id != null) {
  155. message_ids.Enqueue (id);
  156. if (message_ids.Count >= binding_element.TransportSettings.DuplicateMessageHistoryLength)
  157. message_ids.Dequeue ();
  158. }
  159. msg.Properties.Add ("Via", LocalAddress.Uri);
  160. msg.Properties.Add ("Encoder", message_encoder);
  161. msg.Properties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (ip.Address.ToString (), ip.Port));
  162. return true;
  163. }
  164. Queue<UniqueId> message_ids = new Queue<UniqueId> ();
  165. protected override void OnAbort ()
  166. {
  167. OnClose (TimeSpan.Zero);
  168. }
  169. Action<TimeSpan> open_delegate, close_delegate;
  170. protected override IAsyncResult OnBeginClose (TimeSpan timeout, AsyncCallback callback, object state)
  171. {
  172. if (close_delegate == null)
  173. close_delegate = new Action<TimeSpan> (OnClose);
  174. return close_delegate.BeginInvoke (timeout, callback, state);
  175. }
  176. protected override void OnEndClose (IAsyncResult result)
  177. {
  178. close_delegate.EndInvoke (result);
  179. }
  180. protected override IAsyncResult OnBeginOpen (TimeSpan timeout, AsyncCallback callback, object state)
  181. {
  182. if (open_delegate == null)
  183. open_delegate = new Action<TimeSpan> (OnOpen);
  184. return open_delegate.BeginInvoke (timeout, callback, state);
  185. }
  186. protected override void OnEndOpen (IAsyncResult result)
  187. {
  188. open_delegate.EndInvoke (result);
  189. }
  190. protected override void OnClose (TimeSpan timeout)
  191. {
  192. if (client != null) {
  193. if (multicast_address != null) {
  194. client.DropMulticastGroup (multicast_address, LocalAddress.Uri.Port);
  195. multicast_address = null;
  196. }
  197. client.Close ();
  198. }
  199. client = null;
  200. }
  201. protected override void OnOpen (TimeSpan timeout)
  202. {
  203. if (RemoteAddress != null) {
  204. client = new UdpClient ();
  205. var uri = Via ?? RemoteAddress.Uri;
  206. client.Connect (uri.Host, uri.Port);
  207. } else {
  208. var ip = IPAddress.Parse (LocalAddress.Uri.Host);
  209. bool isMulticast = NetworkInterface.GetAllNetworkInterfaces ().Any (nic => nic.SupportsMulticast && nic.GetIPProperties ().MulticastAddresses.Any (mca => mca.Address.Equals (ip)));
  210. int port = LocalAddress.Uri.Port;
  211. if (isMulticast) {
  212. multicast_address = ip;
  213. client = new UdpClient (new IPEndPoint (IPAddress.Any, port));
  214. client.JoinMulticastGroup (ip, binding_element.TransportSettings.TimeToLive);
  215. }
  216. else
  217. client = new UdpClient (new IPEndPoint (ip, port));
  218. }
  219. client.EnableBroadcast = true;
  220. // FIXME: apply UdpTransportSetting here.
  221. }
  222. Func<TimeSpan,Message> receive_delegate;
  223. public IAsyncResult BeginReceive (AsyncCallback callback, object state)
  224. {
  225. return BeginReceive (DefaultReceiveTimeout, callback, state);
  226. }
  227. public IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
  228. {
  229. if (receive_delegate == null)
  230. receive_delegate = new Func<TimeSpan,Message> (Receive);
  231. return receive_delegate.BeginInvoke (timeout, callback, state);
  232. }
  233. public Message EndReceive (IAsyncResult result)
  234. {
  235. return receive_delegate.EndInvoke (result);
  236. }
  237. delegate bool TryReceiveDelegate (TimeSpan timeout, out Message msg);
  238. TryReceiveDelegate try_receive_delegate;
  239. public IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
  240. {
  241. if (try_receive_delegate == null)
  242. try_receive_delegate = new TryReceiveDelegate (TryReceive);
  243. Message dummy;
  244. return try_receive_delegate.BeginInvoke (timeout, out dummy, callback, state);
  245. }
  246. public bool EndTryReceive (IAsyncResult result, out Message msg)
  247. {
  248. return try_receive_delegate.EndInvoke (out msg, result);
  249. }
  250. Func<TimeSpan,bool> wait_delegate;
  251. public IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
  252. {
  253. if (wait_delegate == null)
  254. wait_delegate = new Func<TimeSpan,bool> (WaitForMessage);
  255. return wait_delegate.BeginInvoke (timeout, callback, state);
  256. }
  257. public bool EndWaitForMessage (IAsyncResult result)
  258. {
  259. return wait_delegate.EndInvoke (result);
  260. }
  261. Action<Message,TimeSpan> send_delegate;
  262. public IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
  263. {
  264. return BeginSend (message, DefaultSendTimeout, callback, state);
  265. }
  266. public IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
  267. {
  268. if (send_delegate == null)
  269. send_delegate = new Action<Message,TimeSpan> (Send);
  270. return send_delegate.BeginInvoke (message, timeout, callback, state);
  271. }
  272. public void EndSend (IAsyncResult result)
  273. {
  274. send_delegate.EndInvoke (result);
  275. }
  276. }
  277. }