UdpDuplexChannel.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. //
  2. // Author: Atsushi Enomoto <[email protected]>
  3. //
  4. // Copyright (C) 2010 Novell, Inc (http://www.novell.com)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining
  7. // a copy of this software and associated documentation files (the
  8. // "Software"), to deal in the Software without restriction, including
  9. // without limitation the rights to use, copy, modify, merge, publish,
  10. // distribute, sublicense, and/or sell copies of the Software, and to
  11. // permit persons to whom the Software is furnished to do so, subject to
  12. // the following conditions:
  13. //
  14. // The above copyright notice and this permission notice shall be
  15. // included in all copies or substantial portions of the Software.
  16. //
  17. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  18. // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  19. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  20. // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
  21. // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
  22. // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  23. // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  24. //
  25. using System;
  26. using System.IO;
  27. using System.Linq;
  28. using System.Net;
  29. using System.Net.NetworkInformation;
  30. using System.Net.Sockets;
  31. using System.ServiceModel;
  32. using System.ServiceModel.Channels;
  33. using System.Threading;
  34. namespace System.ServiceModel.Discovery
  35. {
  36. internal class UdpDuplexChannel : ChannelBase, IDuplexChannel
  37. {
  38. // channel factory
  39. public UdpDuplexChannel (UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)
  40. : base (factory)
  41. {
  42. if (factory == null)
  43. throw new ArgumentNullException ("factory");
  44. if (context == null)
  45. throw new ArgumentNullException ("context");
  46. if (address == null)
  47. throw new ArgumentNullException ("address");
  48. binding_element = factory.Source;
  49. RemoteAddress = address;
  50. Via = via;
  51. FillMessageEncoder (context);
  52. }
  53. public UdpDuplexChannel (UdpChannelListener listener)
  54. : base (listener)
  55. {
  56. binding_element = listener.Source;
  57. LocalAddress = new EndpointAddress (listener.Uri);
  58. FillMessageEncoder (listener.Context);
  59. }
  60. MessageEncoder message_encoder;
  61. UdpClient client;
  62. IPAddress multicast_address;
  63. UdpTransportBindingElement binding_element;
  64. // for servers
  65. public EndpointAddress LocalAddress { get; private set; }
  66. // for clients
  67. public EndpointAddress RemoteAddress { get; private set; }
  68. public Uri Via { get; private set; }
  69. void FillMessageEncoder (BindingContext ctx)
  70. {
  71. var mbe = (MessageEncodingBindingElement) ctx.RemainingBindingElements.FirstOrDefault (be => be is MessageEncodingBindingElement);
  72. if (mbe == null)
  73. mbe = new TextMessageEncodingBindingElement ();
  74. message_encoder = mbe.CreateMessageEncoderFactory ().Encoder;
  75. }
  76. public void Send (Message message)
  77. {
  78. Send (message, DefaultSendTimeout);
  79. }
  80. static readonly Random rnd = new Random ();
  81. public void Send (Message message, TimeSpan timeout)
  82. {
  83. if (State != CommunicationState.Opened)
  84. throw new InvalidOperationException ("The UDP channel must be opened before sending a message.");
  85. var ms = new MemoryStream ();
  86. message_encoder.WriteMessage (message, ms);
  87. client.Send (ms.GetBuffer (), (int) ms.Length);
  88. // FIXME: use MaxAnnouncementDelay. It is fixed now.
  89. Thread.Sleep (rnd.Next (500, 500));
  90. }
  91. public bool WaitForMessage (TimeSpan timeout)
  92. {
  93. throw new NotImplementedException ();
  94. }
  95. public Message Receive ()
  96. {
  97. return Receive (DefaultReceiveTimeout);
  98. }
  99. public Message Receive (TimeSpan timeout)
  100. {
  101. Message msg;
  102. if (!TryReceive (timeout, out msg))
  103. throw new TimeoutException ();
  104. return msg;
  105. }
  106. public bool TryReceive (TimeSpan timeout, out Message msg)
  107. {
  108. ThrowIfDisposedOrNotOpen ();
  109. msg = null;
  110. byte [] bytes = null;
  111. IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0);
  112. var ar = client.BeginReceive (delegate (IAsyncResult result) {
  113. bytes = client.EndReceive (result, ref ip);
  114. }, null);
  115. if (!ar.IsCompleted && !ar.AsyncWaitHandle.WaitOne (timeout))
  116. return false;
  117. if (bytes == null || bytes.Length == 0)
  118. return false;
  119. // FIXME: give maxSizeOfHeaders
  120. msg = message_encoder.ReadMessage (new MemoryStream (bytes), int.MaxValue);
  121. return true;
  122. }
  123. protected override void OnAbort ()
  124. {
  125. if (client != null)
  126. client.Close ();
  127. client = null;
  128. }
  129. Action<TimeSpan> open_delegate, close_delegate;
  130. protected override IAsyncResult OnBeginClose (TimeSpan timeout, AsyncCallback callback, object state)
  131. {
  132. if (close_delegate == null)
  133. close_delegate = new Action<TimeSpan> (OnClose);
  134. return close_delegate.BeginInvoke (timeout, callback, state);
  135. }
  136. protected override void OnEndClose (IAsyncResult result)
  137. {
  138. close_delegate.EndInvoke (result);
  139. }
  140. protected override IAsyncResult OnBeginOpen (TimeSpan timeout, AsyncCallback callback, object state)
  141. {
  142. if (open_delegate == null)
  143. open_delegate = new Action<TimeSpan> (OnOpen);
  144. return open_delegate.BeginInvoke (timeout, callback, state);
  145. }
  146. protected override void OnEndOpen (IAsyncResult result)
  147. {
  148. open_delegate.EndInvoke (result);
  149. }
  150. protected override void OnClose (TimeSpan timeout)
  151. {
  152. if (client != null) {
  153. if (multicast_address != null) {
  154. client.DropMulticastGroup (multicast_address, LocalAddress.Uri.Port);
  155. multicast_address = null;
  156. }
  157. client.Close ();
  158. }
  159. client = null;
  160. }
  161. protected override void OnOpen (TimeSpan timeout)
  162. {
  163. if (RemoteAddress != null) {
  164. client = new UdpClient ();
  165. var uri = Via ?? RemoteAddress.Uri;
  166. client.Connect (uri.Host, uri.Port);
  167. } else {
  168. var ip = IPAddress.Parse (LocalAddress.Uri.Host);
  169. bool isMulticast = NetworkInterface.GetAllNetworkInterfaces ().Any (nic => nic.SupportsMulticast && nic.GetIPProperties ().MulticastAddresses.Any (mca => mca.Address.Equals (ip)));
  170. int port = LocalAddress.Uri.Port;
  171. if (isMulticast) {
  172. multicast_address = ip;
  173. client = new UdpClient (new IPEndPoint (IPAddress.Any, port));
  174. client.JoinMulticastGroup (ip, binding_element.TransportSettings.TimeToLive);
  175. }
  176. else
  177. client = new UdpClient (new IPEndPoint (ip, port));
  178. }
  179. client.EnableBroadcast = true;
  180. // FIXME: apply UdpTransportSetting here.
  181. }
  182. Func<TimeSpan,Message> receive_delegate;
  183. public IAsyncResult BeginReceive (AsyncCallback callback, object state)
  184. {
  185. return BeginReceive (DefaultReceiveTimeout, callback, state);
  186. }
  187. public IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
  188. {
  189. if (receive_delegate == null)
  190. receive_delegate = new Func<TimeSpan,Message> (Receive);
  191. return receive_delegate.BeginInvoke (timeout, callback, state);
  192. }
  193. public Message EndReceive (IAsyncResult result)
  194. {
  195. return receive_delegate.EndInvoke (result);
  196. }
  197. delegate bool TryReceiveDelegate (TimeSpan timeout, out Message msg);
  198. TryReceiveDelegate try_receive_delegate;
  199. public IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
  200. {
  201. if (try_receive_delegate == null)
  202. try_receive_delegate = new TryReceiveDelegate (TryReceive);
  203. Message dummy;
  204. return try_receive_delegate.BeginInvoke (timeout, out dummy, callback, state);
  205. }
  206. public bool EndTryReceive (IAsyncResult result, out Message msg)
  207. {
  208. return try_receive_delegate.EndInvoke (out msg, result);
  209. }
  210. Func<TimeSpan,bool> wait_delegate;
  211. public IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
  212. {
  213. if (wait_delegate == null)
  214. wait_delegate = new Func<TimeSpan,bool> (WaitForMessage);
  215. return wait_delegate.BeginInvoke (timeout, callback, state);
  216. }
  217. public bool EndWaitForMessage (IAsyncResult result)
  218. {
  219. return wait_delegate.EndInvoke (result);
  220. }
  221. Action<Message,TimeSpan> send_delegate;
  222. public IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
  223. {
  224. return BeginSend (message, DefaultSendTimeout, callback, state);
  225. }
  226. public IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
  227. {
  228. if (send_delegate == null)
  229. send_delegate = new Action<Message,TimeSpan> (Send);
  230. return send_delegate.BeginInvoke (message, timeout, callback, state);
  231. }
  232. public void EndSend (IAsyncResult result)
  233. {
  234. send_delegate.EndInvoke (result);
  235. }
  236. }
  237. }