Browse Source

Duplex client has its own listener loop, so special care on reply is needed.

Atsushi Eno 15 years ago
parent
commit
cb09118840

+ 10 - 5
mcs/class/System.ServiceModel/System.ServiceModel/ClientRuntimeChannel.cs

@@ -26,6 +26,7 @@
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 //
 using System;
+using System.Collections.Generic;
 using System.Reflection;
 using System.Runtime.Serialization;
 using System.ServiceModel.Channels;
@@ -576,11 +577,15 @@ namespace System.ServiceModel.MonoInternal
 		{
 			if (RequestChannel != null)
 				return RequestChannel.Request (msg, timeout);
-			else {
-				DateTime startTime = DateTime.Now;
-				OutputChannel.Send (msg, timeout);
-				return ((IDuplexChannel) OutputChannel).Receive (timeout - (DateTime.Now - startTime));
-			}
+			else
+				return RequestCorrelated (msg, timeout, OutputChannel);
+		}
+
+		internal virtual Message RequestCorrelated (Message msg, TimeSpan timeout, IOutputChannel channel)
+		{
+			DateTime startTime = DateTime.Now;
+			OutputChannel.Send (msg, timeout);
+			return ((IDuplexChannel) channel).Receive (timeout - (DateTime.Now - startTime));
 		}
 
 		internal IAsyncResult BeginRequest (Message msg, TimeSpan timeout, AsyncCallback callback, object state)

+ 39 - 2
mcs/class/System.ServiceModel/System.ServiceModel/DuplexClientRuntimeChannel.cs

@@ -26,6 +26,8 @@
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 //
 using System;
+using System.Collections.Generic;
+using System.Linq;
 using System.Reflection;
 using System.ServiceModel.Channels;
 using System.ServiceModel.Description;
@@ -93,6 +95,7 @@ namespace System.ServiceModel.MonoInternal
 		IAsyncResult loop_result;
 		AutoResetEvent loop_handle = new AutoResetEvent (false);
 		AutoResetEvent finish_handle = new AutoResetEvent (false);
+		AutoResetEvent receive_reply_handle = new AutoResetEvent (false);
 
 		protected override void OnOpen (TimeSpan timeout)
 		{
@@ -152,12 +155,28 @@ namespace System.ServiceModel.MonoInternal
 			}
 		}
 
-		void ProcessInput (IInputChannel input, Message message)
+		void ProcessInputCore (IInputChannel input, Message message)
 		{
-			try {
+				bool isReply = message != null && Contract.Operations.Any (od => !od.InCallbackContract && od.Messages.Any (md => md.Action == message.Headers.Action));
+				if (isReply) {
+					if (ReplyHandlerQueue.Count > 0) {
+						if (isReply) {
+							var h = ReplyHandlerQueue.Dequeue ();
+							h (message);
+							return;
+						}
+					}
+				}
+				
 				if (!MessageMatchesEndpointDispatcher (message, Runtime.CallbackDispatchRuntime.EndpointDispatcher))
 					throw new EndpointNotFoundException (String.Format ("The request message has the target '{0}' with action '{1}' which is not reachable in this service contract", message.Headers.To, message.Headers.Action));
 				new InputOrReplyRequestProcessor (Runtime.CallbackDispatchRuntime, input).ProcessInput (message);
+		}
+
+		void ProcessInput (IInputChannel input, Message message)
+		{
+			try {
+				ProcessInputCore (input, message);
 			} catch (Exception ex) {
 				// FIXME: log it.
 				Console.WriteLine (ex);
@@ -174,5 +193,23 @@ namespace System.ServiceModel.MonoInternal
 
 			return endpoint.ContractFilter.Match (req);
 		}
+		
+		internal override Message RequestCorrelated (Message msg, TimeSpan timeout, IOutputChannel channel)
+		{
+			DateTime startTime = DateTime.Now;
+			Message ret = null;
+			ManualResetEvent wait = new ManualResetEvent (false);
+			Action<Message> handler = delegate (Message reply) {
+				ret = reply;
+				wait.Set ();
+			};
+			ReplyHandlerQueue.Enqueue (handler);
+			channel.Send (msg, timeout);
+			if (ret == null && !wait.WaitOne (timeout - (DateTime.Now - startTime)))
+				throw new TimeoutException ();
+			return ret;
+		}
+		
+		internal Queue<Action<Message>> ReplyHandlerQueue = new Queue<Action<Message>> ();
 	}
 }