Quick.Threads.pas 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. { ***************************************************************************
  2. Copyright (c) 2016-2018 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.0
  7. Created : 09/03/2018
  8. Modified : 12/03/2018
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. interface
  23. uses
  24. Classes,
  25. Types,
  26. System.RTLConsts,
  27. System.Generics.Collections,
  28. System.SyncObjs;
  29. type
  30. TThreadedQueueCS<T> = class
  31. private
  32. FQueue: array of T;
  33. FQueueSize, FQueueOffset: Integer;
  34. FQueueLock: TCriticalSection;
  35. FQueueCondVar: TConditionVariableCS;
  36. FShutDown: Boolean;
  37. FPushTimeout, FPopTimeout: Cardinal;
  38. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  39. public
  40. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  41. destructor Destroy; override;
  42. procedure Grow(ADelta: Integer);
  43. function PushItem(const AItem: T): TWaitResult; overload;
  44. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  45. function PopItem: T; overload;
  46. function PopItem(var AQueueSize: Integer): T; overload;
  47. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  48. function PopItem(var AItem: T): TWaitResult; overload;
  49. procedure DoShutDown;
  50. property QueueSize: Integer read FQueueSize;
  51. property ShutDown: Boolean read FShutDown;
  52. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  53. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  54. end;
  55. TThreadedQueueList<T> = class
  56. private
  57. fQueue : TQueue<T>;
  58. fQueueSize : Integer;
  59. fQueueLock : TCriticalSection;
  60. fQueueCondVar: TConditionVariableCS;
  61. fShutDown : Boolean;
  62. fPushTimeout : Cardinal;
  63. fPopTimeout : Cardinal;
  64. fTotalItemsPushed : Cardinal;
  65. fTotalItemsPopped : Cardinal;
  66. public
  67. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  68. destructor Destroy; override;
  69. procedure Grow(ADelta: Integer);
  70. function PushItem(const AItem: T): TWaitResult; overload;
  71. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  72. function PopItem: T; overload;
  73. function PopItem(var AQueueSize: Integer): T; overload;
  74. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  75. function PopItem(var AItem: T): TWaitResult; overload;
  76. procedure DoShutDown;
  77. property QueueSize: Integer read FQueueSize;
  78. property ShutDown: Boolean read FShutDown;
  79. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  80. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  81. end;
  82. TThreadTask<T> = class(TThread)
  83. private
  84. fMaxQueue : Integer;
  85. fInsertTimeout : Integer;
  86. fExtractTimeout : Integer;
  87. fTaskQueue : TThreadedQueueCS<T>;
  88. function GetTaskQueue : Cardinal;
  89. public
  90. constructor Create;
  91. destructor Destroy; override;
  92. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  93. property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout;
  94. property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout;
  95. property TaskQueue : Cardinal read GetTaskQueue;
  96. procedure Execute; override;
  97. function AddTask(Task : T) : Boolean;
  98. procedure Start;
  99. end;
  100. TThreadObjectList<T: class> = class(TList<T>)
  101. private
  102. fList: TObjectList<T>;
  103. fLock: TObject;
  104. fDuplicates: TDuplicates;
  105. function GetItem(aIndex : Integer) : T;
  106. procedure SetItem(aIndex : Integer; aValue : T);
  107. public
  108. constructor Create(OwnedObjects : Boolean);
  109. destructor Destroy; override;
  110. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  111. procedure Add(const Item: T);
  112. procedure Clear;
  113. function LockList: TObjectList<T>;
  114. procedure Remove(const Item: T); inline;
  115. procedure RemoveItem(const Item: T; Direction: TDirection);
  116. procedure UnlockList; inline;
  117. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  118. end;
  119. implementation
  120. { TThreadedQueueCS<T> }
  121. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  122. begin
  123. inherited Create;
  124. SetLength(FQueue, AQueueDepth);
  125. FQueueLock := TCriticalSection.Create;
  126. FQueueCondVar := TConditionVariableCS.Create;
  127. FPushTimeout := PushTimeout;
  128. FPopTimeout := PopTimeout;
  129. end;
  130. destructor TThreadedQueueCS<T>.Destroy;
  131. begin
  132. DoShutDown;
  133. FQueueLock.Free;
  134. FQueueCondVar.Free;
  135. inherited;
  136. end;
  137. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  138. begin
  139. FQueueLock.Enter;
  140. try
  141. SetLength(FQueue, Length(FQueue) + ADelta);
  142. finally
  143. FQueueLock.Leave;
  144. end;
  145. FQueueCondVar.ReleaseAll;
  146. end;
  147. function TThreadedQueueCS<T>.PopItem: T;
  148. var
  149. LQueueSize: Integer;
  150. begin
  151. PopItem(LQueueSize, Result);
  152. end;
  153. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  154. begin
  155. AItem := Default(T);
  156. FQueueLock.Enter;
  157. try
  158. Result := wrSignaled;
  159. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  160. begin
  161. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  162. end;
  163. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  164. AItem := FQueue[FQueueOffset];
  165. FQueue[FQueueOffset] := Default(T);
  166. if FQueueSize = Length(FQueue) then FQueueCondVar.ReleaseAll;
  167. Dec(FQueueSize);
  168. Inc(FQueueOffset);
  169. Inc(FTotalItemsPopped);
  170. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  171. finally
  172. AQueueSize := FQueueSize;
  173. FQueueLock.Leave;
  174. end;
  175. end;
  176. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  177. var
  178. LQueueSize: Integer;
  179. begin
  180. Result := PopItem(LQueueSize, AItem);
  181. end;
  182. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  183. begin
  184. PopItem(AQueueSize, Result);
  185. end;
  186. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  187. var
  188. LQueueSize: Integer;
  189. begin
  190. Result := PushItem(AItem, LQueueSize);
  191. end;
  192. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  193. begin
  194. FQueueLock.Enter;
  195. try
  196. Result := wrSignaled;
  197. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  198. begin
  199. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  200. end;
  201. if FShutDown or (Result <> wrSignaled) then Exit;
  202. if FQueueSize = 0 then FQueueCondVar.ReleaseAll;
  203. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  204. Inc(FQueueSize);
  205. Inc(FTotalItemsPushed);
  206. finally
  207. AQueueSize := FQueueSize;
  208. FQueueLock.Leave;
  209. end;
  210. end;
  211. procedure TThreadedQueueCS<T>.DoShutDown;
  212. begin
  213. FShutDown := True;
  214. FQueueCondVar.ReleaseAll;
  215. end;
  216. { TThreadedQueueList<T> }
  217. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  218. begin
  219. inherited Create;
  220. fQueue := TQueue<T>.Create;
  221. fQueue.Capacity := AQueueDepth;
  222. fQueueSize := 0;
  223. fQueueLock := TCriticalSection.Create;
  224. fQueueCondVar := TConditionVariableCS.Create;
  225. fPushTimeout := PushTimeout;
  226. fPopTimeout := PopTimeout;
  227. end;
  228. destructor TThreadedQueueList<T>.Destroy;
  229. begin
  230. DoShutDown;
  231. fQueueLock.Free;
  232. fQueueCondVar.Free;
  233. fQueue.Free;
  234. inherited;
  235. end;
  236. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  237. begin
  238. fQueueLock.Enter;
  239. try
  240. fQueue.Capacity := fQueue.Capacity + ADelta;
  241. finally
  242. fQueueLock.Leave;
  243. end;
  244. fQueueCondVar.ReleaseAll;
  245. end;
  246. function TThreadedQueueList<T>.PopItem: T;
  247. var
  248. LQueueSize: Integer;
  249. begin
  250. PopItem(LQueueSize, Result);
  251. end;
  252. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  253. begin
  254. AItem := Default(T);
  255. fQueueLock.Enter;
  256. try
  257. Result := wrSignaled;
  258. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  259. begin
  260. Result := fQueueCondVar.WaitFor(fQueueLock, fPopTimeout);
  261. end;
  262. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  263. AItem := fQueue.Extract;
  264. if fQueueSize = fQueue.Count then fQueueCondVar.ReleaseAll;
  265. Dec(FQueueSize);
  266. Inc(fTotalItemsPopped);
  267. finally
  268. AQueueSize := fQueueSize;
  269. fQueueLock.Leave;
  270. end;
  271. end;
  272. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  273. var
  274. LQueueSize: Integer;
  275. begin
  276. Result := PopItem(LQueueSize, AItem);
  277. end;
  278. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  279. begin
  280. PopItem(AQueueSize, Result);
  281. end;
  282. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  283. var
  284. LQueueSize: Integer;
  285. begin
  286. Result := PushItem(AItem, LQueueSize);
  287. end;
  288. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  289. begin
  290. FQueueLock.Enter;
  291. try
  292. Result := wrSignaled;
  293. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  294. //begin
  295. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  296. //end;
  297. if fShutDown or (Result <> wrSignaled) then Exit;
  298. if fQueueSize = 0 then fQueueCondVar.ReleaseAll;
  299. fQueue.Enqueue(AItem);
  300. Inc(FQueueSize);
  301. Inc(fTotalItemsPushed);
  302. finally
  303. AQueueSize := fQueueSize;
  304. FQueueLock.Leave;
  305. end;
  306. end;
  307. procedure TThreadedQueueList<T>.DoShutDown;
  308. begin
  309. fShutDown := True;
  310. fQueueCondVar.ReleaseAll;
  311. end;
  312. { TThreadTask<T> }
  313. function TThreadTask<T>.AddTask(Task: T): Boolean;
  314. begin
  315. Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled;
  316. end;
  317. constructor TThreadTask<T>.Create;
  318. begin
  319. inherited Create(True);
  320. fMaxQueue := 10;
  321. fInsertTimeout := INFINITE;
  322. fExtractTimeout := INFINITE;
  323. end;
  324. destructor TThreadTask<T>.Destroy;
  325. begin
  326. if Assigned(fTaskQueue) then fTaskQueue.Free;
  327. inherited;
  328. end;
  329. procedure TThreadTask<T>.Execute;
  330. begin
  331. inherited;
  332. end;
  333. function TThreadTask<T>.GetTaskQueue: Cardinal;
  334. begin
  335. if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize
  336. else Result := 0;
  337. end;
  338. procedure TThreadTask<T>.Start;
  339. begin
  340. fTaskQueue := TThreadedQueueCS<T>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  341. end;
  342. { TThreadObjectList<T> }
  343. procedure TThreadObjectList<T>.Add(const Item: T);
  344. begin
  345. LockList;
  346. try
  347. if (Duplicates = dupAccept) or
  348. (fList.IndexOf(Item) = -1) then
  349. fList.Add(Item)
  350. else if Duplicates = dupError then
  351. raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  352. finally
  353. UnlockList;
  354. end;
  355. end;
  356. procedure TThreadObjectList<T>.Clear;
  357. begin
  358. LockList;
  359. try
  360. fList.Clear;
  361. finally
  362. UnlockList;
  363. end;
  364. end;
  365. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  366. begin
  367. inherited Create;
  368. fLock := TObject.Create;
  369. fList := TObjectList<T>.Create;
  370. fDuplicates := dupIgnore;
  371. end;
  372. destructor TThreadObjectList<T>.Destroy;
  373. begin
  374. LockList;
  375. try
  376. fList.Free;
  377. inherited Destroy;
  378. finally
  379. UnlockList;
  380. fLock.Free;
  381. end;
  382. end;
  383. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  384. begin
  385. LockList;
  386. try
  387. Result := fList[aIndex];
  388. finally
  389. UnlockList;
  390. end;
  391. end;
  392. function TThreadObjectList<T>.LockList: TObjectList<T>;
  393. begin
  394. System.TMonitor.Enter(fLock);
  395. Result := fList;
  396. end;
  397. procedure TThreadObjectList<T>.Remove(const Item: T);
  398. begin
  399. RemoveItem(Item, TDirection.FromBeginning);
  400. end;
  401. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  402. begin
  403. LockList;
  404. try
  405. fList.RemoveItem(Item, Direction);
  406. finally
  407. UnlockList;
  408. end;
  409. end;
  410. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  411. begin
  412. LockList;
  413. try
  414. fList[aIndex] := aValue;
  415. finally
  416. UnlockList;
  417. end;
  418. end;
  419. procedure TThreadObjectList<T>.UnlockList;
  420. begin
  421. System.TMonitor.Exit(fLock);
  422. end;
  423. end.