Quick.Threads.pas 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. { ***************************************************************************
  2. Copyright (c) 2016-2018 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.0
  7. Created : 09/03/2018
  8. Modified : 12/03/2018
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. interface
  23. uses
  24. Classes,
  25. System.Generics.Collections,
  26. System.SyncObjs;
  27. type
  28. TThreadedQueueCS<T> = class
  29. private
  30. FQueue: array of T;
  31. FQueueSize, FQueueOffset: Integer;
  32. FQueueLock: TCriticalSection;
  33. FQueueCondVar: TConditionVariableCS;
  34. FShutDown: Boolean;
  35. FPushTimeout, FPopTimeout: Cardinal;
  36. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  37. public
  38. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  39. destructor Destroy; override;
  40. procedure Grow(ADelta: Integer);
  41. function PushItem(const AItem: T): TWaitResult; overload;
  42. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  43. function PopItem: T; overload;
  44. function PopItem(var AQueueSize: Integer): T; overload;
  45. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  46. function PopItem(var AItem: T): TWaitResult; overload;
  47. procedure DoShutDown;
  48. property QueueSize: Integer read FQueueSize;
  49. property ShutDown: Boolean read FShutDown;
  50. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  51. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  52. end;
  53. TThreadedQueueList<T> = class
  54. private
  55. fQueue : TQueue<T>;
  56. fQueueSize : Integer;
  57. fQueueLock : TCriticalSection;
  58. fQueueCondVar: TConditionVariableCS;
  59. fShutDown : Boolean;
  60. fPushTimeout : Cardinal;
  61. fPopTimeout : Cardinal;
  62. fTotalItemsPushed : Cardinal;
  63. fTotalItemsPopped : Cardinal;
  64. public
  65. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  66. destructor Destroy; override;
  67. procedure Grow(ADelta: Integer);
  68. function PushItem(const AItem: T): TWaitResult; overload;
  69. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  70. function PopItem: T; overload;
  71. function PopItem(var AQueueSize: Integer): T; overload;
  72. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  73. function PopItem(var AItem: T): TWaitResult; overload;
  74. procedure DoShutDown;
  75. property QueueSize: Integer read FQueueSize;
  76. property ShutDown: Boolean read FShutDown;
  77. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  78. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  79. end;
  80. TThreadTask<T> = class(TThread)
  81. private
  82. fMaxQueue : Integer;
  83. fInsertTimeout : Integer;
  84. fExtractTimeout : Integer;
  85. fTaskQueue : TThreadedQueueCS<T>;
  86. function GetTaskQueue : Cardinal;
  87. public
  88. constructor Create;
  89. destructor Destroy; override;
  90. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  91. property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout;
  92. property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout;
  93. property TaskQueue : Cardinal read GetTaskQueue;
  94. procedure Execute; override;
  95. function AddTask(Task : T) : Boolean;
  96. procedure Start;
  97. end;
  98. implementation
  99. { TThreadedQueueCS<T> }
  100. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  101. begin
  102. inherited Create;
  103. SetLength(FQueue, AQueueDepth);
  104. FQueueLock := TCriticalSection.Create;
  105. FQueueCondVar := TConditionVariableCS.Create;
  106. FPushTimeout := PushTimeout;
  107. FPopTimeout := PopTimeout;
  108. end;
  109. destructor TThreadedQueueCS<T>.Destroy;
  110. begin
  111. DoShutDown;
  112. FQueueLock.Free;
  113. FQueueCondVar.Free;
  114. inherited;
  115. end;
  116. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  117. begin
  118. FQueueLock.Enter;
  119. try
  120. SetLength(FQueue, Length(FQueue) + ADelta);
  121. finally
  122. FQueueLock.Leave;
  123. end;
  124. FQueueCondVar.ReleaseAll;
  125. end;
  126. function TThreadedQueueCS<T>.PopItem: T;
  127. var
  128. LQueueSize: Integer;
  129. begin
  130. PopItem(LQueueSize, Result);
  131. end;
  132. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  133. begin
  134. AItem := Default(T);
  135. FQueueLock.Enter;
  136. try
  137. Result := wrSignaled;
  138. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  139. begin
  140. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  141. end;
  142. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  143. AItem := FQueue[FQueueOffset];
  144. FQueue[FQueueOffset] := Default(T);
  145. if FQueueSize = Length(FQueue) then FQueueCondVar.ReleaseAll;
  146. Dec(FQueueSize);
  147. Inc(FQueueOffset);
  148. Inc(FTotalItemsPopped);
  149. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  150. finally
  151. AQueueSize := FQueueSize;
  152. FQueueLock.Leave;
  153. end;
  154. end;
  155. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  156. var
  157. LQueueSize: Integer;
  158. begin
  159. Result := PopItem(LQueueSize, AItem);
  160. end;
  161. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  162. begin
  163. PopItem(AQueueSize, Result);
  164. end;
  165. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  166. var
  167. LQueueSize: Integer;
  168. begin
  169. Result := PushItem(AItem, LQueueSize);
  170. end;
  171. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  172. begin
  173. FQueueLock.Enter;
  174. try
  175. Result := wrSignaled;
  176. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  177. begin
  178. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  179. end;
  180. if FShutDown or (Result <> wrSignaled) then Exit;
  181. if FQueueSize = 0 then FQueueCondVar.ReleaseAll;
  182. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  183. Inc(FQueueSize);
  184. Inc(FTotalItemsPushed);
  185. finally
  186. AQueueSize := FQueueSize;
  187. FQueueLock.Leave;
  188. end;
  189. end;
  190. procedure TThreadedQueueCS<T>.DoShutDown;
  191. begin
  192. FShutDown := True;
  193. FQueueCondVar.ReleaseAll;
  194. end;
  195. { TThreadedQueueList<T> }
  196. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  197. begin
  198. inherited Create;
  199. fQueue := TQueue<T>.Create;
  200. fQueue.Capacity := AQueueDepth;
  201. fQueueSize := 0;
  202. fQueueLock := TCriticalSection.Create;
  203. fQueueCondVar := TConditionVariableCS.Create;
  204. fPushTimeout := PushTimeout;
  205. fPopTimeout := PopTimeout;
  206. end;
  207. destructor TThreadedQueueList<T>.Destroy;
  208. begin
  209. DoShutDown;
  210. fQueueLock.Free;
  211. fQueueCondVar.Free;
  212. fQueue.Free;
  213. inherited;
  214. end;
  215. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  216. begin
  217. fQueueLock.Enter;
  218. try
  219. fQueue.Capacity := fQueue.Capacity + ADelta;
  220. finally
  221. fQueueLock.Leave;
  222. end;
  223. fQueueCondVar.ReleaseAll;
  224. end;
  225. function TThreadedQueueList<T>.PopItem: T;
  226. var
  227. LQueueSize: Integer;
  228. begin
  229. PopItem(LQueueSize, Result);
  230. end;
  231. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  232. begin
  233. AItem := Default(T);
  234. fQueueLock.Enter;
  235. try
  236. Result := wrSignaled;
  237. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  238. begin
  239. Result := fQueueCondVar.WaitFor(fQueueLock, fPopTimeout);
  240. end;
  241. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  242. AItem := fQueue.Extract;
  243. if fQueueSize = fQueue.Count then fQueueCondVar.ReleaseAll;
  244. Dec(FQueueSize);
  245. Inc(fTotalItemsPopped);
  246. finally
  247. AQueueSize := fQueueSize;
  248. fQueueLock.Leave;
  249. end;
  250. end;
  251. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  252. var
  253. LQueueSize: Integer;
  254. begin
  255. Result := PopItem(LQueueSize, AItem);
  256. end;
  257. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  258. begin
  259. PopItem(AQueueSize, Result);
  260. end;
  261. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  262. var
  263. LQueueSize: Integer;
  264. begin
  265. Result := PushItem(AItem, LQueueSize);
  266. end;
  267. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  268. begin
  269. FQueueLock.Enter;
  270. try
  271. Result := wrSignaled;
  272. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  273. //begin
  274. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  275. //end;
  276. if fShutDown or (Result <> wrSignaled) then Exit;
  277. if fQueueSize = 0 then fQueueCondVar.ReleaseAll;
  278. fQueue.Enqueue(AItem);
  279. Inc(FQueueSize);
  280. Inc(fTotalItemsPushed);
  281. finally
  282. AQueueSize := fQueueSize;
  283. FQueueLock.Leave;
  284. end;
  285. end;
  286. procedure TThreadedQueueList<T>.DoShutDown;
  287. begin
  288. fShutDown := True;
  289. fQueueCondVar.ReleaseAll;
  290. end;
  291. { TThreadTask<T> }
  292. function TThreadTask<T>.AddTask(Task: T): Boolean;
  293. begin
  294. Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled;
  295. end;
  296. constructor TThreadTask<T>.Create;
  297. begin
  298. inherited Create(True);
  299. fMaxQueue := 10;
  300. fInsertTimeout := INFINITE;
  301. fExtractTimeout := INFINITE;
  302. end;
  303. destructor TThreadTask<T>.Destroy;
  304. begin
  305. if Assigned(fTaskQueue) then fTaskQueue.Free;
  306. inherited;
  307. end;
  308. procedure TThreadTask<T>.Execute;
  309. begin
  310. inherited;
  311. end;
  312. function TThreadTask<T>.GetTaskQueue: Cardinal;
  313. begin
  314. if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize
  315. else Result := 0;
  316. end;
  317. procedure TThreadTask<T>.Start;
  318. begin
  319. fTaskQueue := TThreadedQueueCS<T>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  320. end;
  321. end.