2
0

TransactedBatchContext.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //-----------------------------------------------------------------------------
  4. namespace System.ServiceModel.Dispatcher
  5. {
  6. using System.Diagnostics;
  7. using System.Runtime;
  8. using System.ServiceModel.Channels;
  9. using System.ServiceModel.Diagnostics;
  10. using System.Transactions;
  11. sealed class TransactedBatchContext : IEnlistmentNotification
  12. {
  13. SharedTransactedBatchContext shared;
  14. CommittableTransaction transaction;
  15. DateTime commitNotLaterThan;
  16. int commits;
  17. bool batchFinished;
  18. bool inDispatch;
  19. internal TransactedBatchContext(SharedTransactedBatchContext shared)
  20. {
  21. this.shared = shared;
  22. this.transaction = TransactionBehavior.CreateTransaction(shared.IsolationLevel, shared.TransactionTimeout);
  23. this.transaction.EnlistVolatile(this, EnlistmentOptions.None);
  24. if (shared.TransactionTimeout <= TimeSpan.Zero)
  25. this.commitNotLaterThan = DateTime.MaxValue;
  26. else
  27. this.commitNotLaterThan = DateTime.UtcNow + TimeSpan.FromMilliseconds(shared.TransactionTimeout.TotalMilliseconds * 4 / 5);
  28. this.commits = 0;
  29. this.batchFinished = false;
  30. this.inDispatch = false;
  31. }
  32. internal bool AboutToExpire
  33. {
  34. get
  35. {
  36. return DateTime.UtcNow > this.commitNotLaterThan;
  37. }
  38. }
  39. internal bool IsActive
  40. {
  41. get
  42. {
  43. if (this.batchFinished)
  44. return false;
  45. try
  46. {
  47. return TransactionStatus.Active == this.transaction.TransactionInformation.Status;
  48. }
  49. catch (ObjectDisposedException ex)
  50. {
  51. MsmqDiagnostics.ExpectedException(ex);
  52. return false;
  53. }
  54. }
  55. }
  56. internal bool InDispatch
  57. {
  58. get { return this.inDispatch; }
  59. set
  60. {
  61. if (this.inDispatch == value)
  62. {
  63. Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchContext.InDispatch: (inDispatch == value)");
  64. }
  65. this.inDispatch = value;
  66. if (this.inDispatch)
  67. this.shared.DispatchStarted();
  68. else
  69. this.shared.DispatchEnded();
  70. }
  71. }
  72. internal SharedTransactedBatchContext Shared
  73. {
  74. get { return this.shared; }
  75. }
  76. internal void ForceRollback()
  77. {
  78. try
  79. {
  80. this.transaction.Rollback();
  81. }
  82. catch (ObjectDisposedException ex)
  83. {
  84. MsmqDiagnostics.ExpectedException(ex);
  85. }
  86. catch (TransactionException ex)
  87. {
  88. MsmqDiagnostics.ExpectedException(ex);
  89. }
  90. this.batchFinished = true;
  91. }
  92. internal void ForceCommit()
  93. {
  94. try
  95. {
  96. this.transaction.Commit();
  97. }
  98. catch (ObjectDisposedException ex)
  99. {
  100. MsmqDiagnostics.ExpectedException(ex);
  101. }
  102. catch (TransactionException ex)
  103. {
  104. MsmqDiagnostics.ExpectedException(ex);
  105. }
  106. this.batchFinished = true;
  107. }
  108. internal void Complete()
  109. {
  110. ++this.commits;
  111. if (this.commits >= this.shared.CurrentBatchSize || DateTime.UtcNow >= this.commitNotLaterThan)
  112. {
  113. ForceCommit();
  114. }
  115. }
  116. void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
  117. {
  118. preparingEnlistment.Prepared();
  119. }
  120. void IEnlistmentNotification.Commit(Enlistment enlistment)
  121. {
  122. this.shared.ReportCommit();
  123. this.shared.BatchDone();
  124. enlistment.Done();
  125. }
  126. void IEnlistmentNotification.Rollback(Enlistment enlistment)
  127. {
  128. this.shared.ReportAbort();
  129. this.shared.BatchDone();
  130. enlistment.Done();
  131. }
  132. void IEnlistmentNotification.InDoubt(Enlistment enlistment)
  133. {
  134. this.shared.ReportAbort();
  135. this.shared.BatchDone();
  136. enlistment.Done();
  137. }
  138. internal Transaction Transaction
  139. {
  140. get { return this.transaction; }
  141. }
  142. }
  143. sealed class SharedTransactedBatchContext
  144. {
  145. readonly int maxBatchSize;
  146. readonly int maxConcurrentBatches;
  147. readonly IsolationLevel isolationLevel;
  148. readonly TimeSpan txTimeout;
  149. int currentBatchSize;
  150. int currentConcurrentBatches;
  151. int currentConcurrentDispatches;
  152. int successfullCommits;
  153. object receiveLock = new object();
  154. object thisLock = new object();
  155. bool isBatching;
  156. ChannelHandler handler;
  157. internal SharedTransactedBatchContext(ChannelHandler handler, ChannelDispatcher dispatcher, int maxConcurrentBatches)
  158. {
  159. this.handler = handler;
  160. this.maxBatchSize = dispatcher.MaxTransactedBatchSize;
  161. this.maxConcurrentBatches = maxConcurrentBatches;
  162. this.currentBatchSize = dispatcher.MaxTransactedBatchSize;
  163. this.currentConcurrentBatches = 0;
  164. this.currentConcurrentDispatches = 0;
  165. this.successfullCommits = 0;
  166. this.isBatching = true;
  167. this.isolationLevel = dispatcher.TransactionIsolationLevel;
  168. this.txTimeout = TransactionBehavior.NormalizeTimeout(dispatcher.TransactionTimeout);
  169. BatchingStateChanged(this.isBatching);
  170. }
  171. internal TransactedBatchContext CreateTransactedBatchContext()
  172. {
  173. lock (thisLock)
  174. {
  175. TransactedBatchContext context = new TransactedBatchContext(this);
  176. ++this.currentConcurrentBatches;
  177. return context;
  178. }
  179. }
  180. internal void DispatchStarted()
  181. {
  182. lock (thisLock)
  183. {
  184. ++this.currentConcurrentDispatches;
  185. if (this.currentConcurrentDispatches == this.currentConcurrentBatches && this.currentConcurrentBatches < this.maxConcurrentBatches)
  186. {
  187. TransactedBatchContext context = new TransactedBatchContext(this);
  188. ++this.currentConcurrentBatches;
  189. ChannelHandler newHandler = new ChannelHandler(this.handler, context);
  190. ChannelHandler.Register(newHandler);
  191. }
  192. }
  193. }
  194. internal void DispatchEnded()
  195. {
  196. lock (thisLock)
  197. {
  198. --this.currentConcurrentDispatches;
  199. if (this.currentConcurrentDispatches < 0)
  200. {
  201. Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.SharedTransactedBatchContext.BatchDone: (currentConcurrentDispatches < 0)");
  202. }
  203. }
  204. }
  205. internal void BatchDone()
  206. {
  207. lock (thisLock)
  208. {
  209. --this.currentConcurrentBatches;
  210. if (this.currentConcurrentBatches < 0)
  211. {
  212. Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.SharedTransactedBatchContext.BatchDone: (currentConcurrentBatches < 0)");
  213. }
  214. }
  215. }
  216. internal int CurrentBatchSize
  217. {
  218. get
  219. {
  220. lock (thisLock)
  221. {
  222. return this.currentBatchSize;
  223. }
  224. }
  225. }
  226. internal IsolationLevel IsolationLevel
  227. {
  228. get
  229. {
  230. return this.isolationLevel;
  231. }
  232. }
  233. internal TimeSpan TransactionTimeout
  234. {
  235. get
  236. {
  237. return this.txTimeout;
  238. }
  239. }
  240. internal void ReportAbort()
  241. {
  242. lock (thisLock)
  243. {
  244. if (isBatching)
  245. {
  246. this.successfullCommits = 0;
  247. this.currentBatchSize = 1;
  248. this.isBatching = false;
  249. BatchingStateChanged(this.isBatching);
  250. }
  251. }
  252. }
  253. internal void ReportCommit()
  254. {
  255. lock (thisLock)
  256. {
  257. if (++this.successfullCommits >= this.maxBatchSize * 2)
  258. {
  259. this.successfullCommits = 0;
  260. if (!isBatching)
  261. {
  262. this.currentBatchSize = this.maxBatchSize;
  263. this.isBatching = true;
  264. BatchingStateChanged(this.isBatching);
  265. }
  266. }
  267. }
  268. }
  269. void BatchingStateChanged(bool batchingNow)
  270. {
  271. if (DiagnosticUtility.ShouldTraceVerbose)
  272. {
  273. TraceUtility.TraceEvent(
  274. TraceEventType.Verbose,
  275. batchingNow ? TraceCode.MsmqEnteredBatch : TraceCode.MsmqLeftBatch,
  276. batchingNow ? SR.GetString(SR.TraceCodeMsmqEnteredBatch) : SR.GetString(SR.TraceCodeMsmqLeftBatch),
  277. null,
  278. null,
  279. null);
  280. }
  281. }
  282. internal object ReceiveLock
  283. {
  284. get { return this.receiveLock; }
  285. }
  286. }
  287. }