Quick.Threads.pas 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. { ***************************************************************************
  2. Copyright (c) 2016-2018 Kike Pérez
  3. Unit : Quick.Threads
  4. Description : Thread safe collections
  5. Author : Kike Pérez
  6. Version : 1.2
  7. Created : 09/03/2018
  8. Modified : 07/04/2018
  9. This file is part of QuickLib: https://github.com/exilon/QuickLib
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Threads;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. Types,
  27. //Quick.Chrono,
  28. {$IFNDEF FPC}
  29. System.RTLConsts,
  30. System.Generics.Collections,
  31. System.SyncObjs;
  32. {$ELSE}
  33. RtlConsts,
  34. Generics.Collections,
  35. syncobjs,
  36. SysUtils;
  37. {$ENDIF}
  38. type
  39. TThreadedQueueCS<T> = class
  40. private
  41. FQueue: array of T;
  42. FQueueSize, FQueueOffset: Integer;
  43. FQueueLock: TCriticalSection;
  44. {$IFDEF FPC}
  45. FQueueCondVar : TEventObject;
  46. {$ELSE}
  47. FQueueCondVar: TConditionVariableCS;
  48. {$ENDIF}
  49. FShutDown: Boolean;
  50. FPushTimeout, FPopTimeout: Cardinal;
  51. FTotalItemsPushed, FTotalItemsPopped: Cardinal;
  52. public
  53. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  54. destructor Destroy; override;
  55. procedure Grow(ADelta: Integer);
  56. function PushItem(const AItem: T): TWaitResult; overload;
  57. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  58. function PopItem: T; overload;
  59. function PopItem(var AQueueSize: Integer): T; overload;
  60. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  61. function PopItem(var AItem: T): TWaitResult; overload;
  62. procedure DoShutDown;
  63. property QueueSize: Integer read FQueueSize;
  64. property ShutDown: Boolean read FShutDown;
  65. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  66. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  67. end;
  68. TThreadedQueueList<T> = class
  69. private
  70. fQueue : TQueue<T>;
  71. fQueueSize : Integer;
  72. fQueueLock : TCriticalSection;
  73. {$IFDEF FPC}
  74. FQueueCondVar : TSimpleEvent;
  75. {$ELSE}
  76. FQueueCondVar: TConditionVariableCS;
  77. {$ENDIF}
  78. fShutDown : Boolean;
  79. fPushTimeout : Cardinal;
  80. fPopTimeout : Cardinal;
  81. fTotalItemsPushed : Cardinal;
  82. fTotalItemsPopped : Cardinal;
  83. public
  84. constructor Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  85. destructor Destroy; override;
  86. procedure Grow(ADelta: Integer);
  87. function PushItem(const AItem: T): TWaitResult; overload;
  88. function PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult; overload;
  89. function PopItem: T; overload;
  90. function PopItem(var AQueueSize: Integer): T; overload;
  91. function PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult; overload;
  92. function PopItem(var AItem: T): TWaitResult; overload;
  93. procedure DoShutDown;
  94. property QueueSize: Integer read FQueueSize;
  95. property ShutDown: Boolean read FShutDown;
  96. property TotalItemsPushed: Cardinal read FTotalItemsPushed;
  97. property TotalItemsPopped: Cardinal read FTotalItemsPopped;
  98. end;
  99. TThreadTask<T> = class(TThread)
  100. private
  101. fMaxQueue : Integer;
  102. fInsertTimeout : Integer;
  103. fExtractTimeout : Integer;
  104. fTaskQueue : TThreadedQueueCS<T>;
  105. function GetTaskQueue : Cardinal;
  106. public
  107. constructor Create;
  108. destructor Destroy; override;
  109. property MaxQueue : Integer read fMaxQueue write fMaxQueue;
  110. property InsertTimeout : Integer read fInsertTimeout write fInsertTimeout;
  111. property ExtractTimeout : Integer read fExtractTimeout write fExtractTimeout;
  112. property TaskQueue : Cardinal read GetTaskQueue;
  113. procedure Execute; override;
  114. function AddTask(Task : T) : Boolean;
  115. procedure Start;
  116. end;
  117. {$IFNDEF FPC}
  118. TThreadObjectList<T: class> = class(TList<T>)
  119. private
  120. fList: TObjectList<T>;
  121. fLock: TObject;
  122. fDuplicates: TDuplicates;
  123. function GetItem(aIndex : Integer) : T;
  124. procedure SetItem(aIndex : Integer; aValue : T);
  125. public
  126. constructor Create(OwnedObjects : Boolean);
  127. destructor Destroy; override;
  128. property Items[Index : Integer] : T read GetItem write SetItem ; default;
  129. procedure Add(const Item: T);
  130. procedure Clear;
  131. function LockList: TObjectList<T>;
  132. procedure Remove(const Item: T); inline;
  133. procedure RemoveItem(const Item: T; Direction: TDirection);
  134. procedure UnlockList; inline;
  135. property Duplicates: TDuplicates read fDuplicates write fDuplicates;
  136. end;
  137. {$ENDIF}
  138. implementation
  139. { TThreadedQueueCS<T> }
  140. constructor TThreadedQueueCS<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  141. begin
  142. inherited Create;
  143. SetLength(FQueue, AQueueDepth);
  144. FQueueLock := TCriticalSection.Create;
  145. {$IFDEF FPC}
  146. FQueueCondVar := TEventObject.Create(nil, True, False, 'TQCS');
  147. {$ELSE}
  148. FQueueCondVar := TConditionVariableCS.Create;
  149. {$ENDIF}
  150. FPushTimeout := PushTimeout;
  151. FPopTimeout := PopTimeout;
  152. end;
  153. destructor TThreadedQueueCS<T>.Destroy;
  154. begin
  155. DoShutDown;
  156. FQueueLock.Free;
  157. FQueueCondVar.Free;
  158. inherited;
  159. end;
  160. procedure TThreadedQueueCS<T>.Grow(ADelta: Integer);
  161. begin
  162. FQueueLock.Enter;
  163. try
  164. SetLength(FQueue, Length(FQueue) + ADelta);
  165. finally
  166. FQueueLock.Leave;
  167. end;
  168. {$IFDEF FPC}
  169. FQueueCondVar.SetEvent;
  170. {$ELSE}
  171. FQueueCondVar.ReleaseAll;
  172. {$ENDIF}
  173. end;
  174. function TThreadedQueueCS<T>.PopItem: T;
  175. var
  176. LQueueSize: Integer;
  177. begin
  178. PopItem(LQueueSize, Result);
  179. end;
  180. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  181. begin
  182. AItem := Default(T);
  183. FQueueLock.Enter;
  184. try
  185. Result := wrSignaled;
  186. while (Result = wrSignaled) and (FQueueSize = 0) and not FShutDown do
  187. begin
  188. {$IFDEF FPC}
  189. Result := FQueueCondVar.WaitFor(FPopTimeout);
  190. {$ELSE}
  191. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  192. {$ENDIF}
  193. end;
  194. if (FShutDown and (FQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  195. AItem := FQueue[FQueueOffset];
  196. FQueue[FQueueOffset] := Default(T);
  197. if FQueueSize = Length(FQueue) then
  198. begin
  199. {$IFDEF FPC}
  200. FQueueCondVar.SetEvent;
  201. {$ELSE}
  202. FQueueCondVar.ReleaseAll;
  203. {$ENDIF}
  204. end;
  205. Dec(FQueueSize);
  206. Inc(FQueueOffset);
  207. Inc(FTotalItemsPopped);
  208. if FQueueOffset = Length(FQueue) then FQueueOffset := 0;
  209. finally
  210. AQueueSize := FQueueSize;
  211. FQueueLock.Leave;
  212. end;
  213. end;
  214. function TThreadedQueueCS<T>.PopItem(var AItem: T): TWaitResult;
  215. var
  216. LQueueSize: Integer;
  217. begin
  218. Result := PopItem(LQueueSize, AItem);
  219. end;
  220. function TThreadedQueueCS<T>.PopItem(var AQueueSize: Integer): T;
  221. begin
  222. PopItem(AQueueSize, Result);
  223. end;
  224. function TThreadedQueueCS<T>.PushItem(const AItem: T): TWaitResult;
  225. var
  226. LQueueSize: Integer;
  227. begin
  228. Result := PushItem(AItem, LQueueSize);
  229. end;
  230. function TThreadedQueueCS<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  231. begin
  232. FQueueLock.Enter;
  233. try
  234. Result := wrSignaled;
  235. while (Result = wrSignaled) and (FQueueSize = Length(FQueue)) and not FShutDown do
  236. begin
  237. {$IFDEF FPC}
  238. Result := FQueueCondVar.WaitFor(FPushTimeout);
  239. {$ELSE}
  240. Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
  241. {$ENDIF}
  242. end;
  243. if FShutDown or (Result <> wrSignaled) then Exit;
  244. if FQueueSize = 0 then
  245. begin
  246. {$IFDEF FPC}
  247. FQueueCondVar.SetEvent;
  248. {$ELSE}
  249. FQueueCondVar.ReleaseAll;
  250. {$ENDIF}
  251. end;
  252. FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
  253. Inc(FQueueSize);
  254. Inc(FTotalItemsPushed);
  255. finally
  256. AQueueSize := FQueueSize;
  257. FQueueLock.Leave;
  258. end;
  259. end;
  260. procedure TThreadedQueueCS<T>.DoShutDown;
  261. begin
  262. FShutDown := True;
  263. {$IFDEF FPC}
  264. FQueueCondVar.SetEvent;
  265. {$ELSE}
  266. FQueueCondVar.ReleaseAll;
  267. {$ENDIF}
  268. end;
  269. { TThreadedQueueList<T> }
  270. constructor TThreadedQueueList<T>.Create(AQueueDepth: Integer = 10; PushTimeout: Cardinal = INFINITE; PopTimeout: Cardinal = INFINITE);
  271. begin
  272. inherited Create;
  273. fQueue := TQueue<T>.Create;
  274. fQueue.Capacity := AQueueDepth;
  275. fQueueSize := 0;
  276. fQueueLock := TCriticalSection.Create;
  277. {$IFDEF FPC}
  278. FQueueCondVar := TSimpleEvent.Create; //TEventObject.Create(nil, False, False, 'TQL');
  279. {$ELSE}
  280. fQueueCondVar := TConditionVariableCS.Create;
  281. {$ENDIF}
  282. fPushTimeout := PushTimeout;
  283. fPopTimeout := PopTimeout;
  284. end;
  285. destructor TThreadedQueueList<T>.Destroy;
  286. begin
  287. DoShutDown;
  288. fQueueLock.Free;
  289. fQueueCondVar.Free;
  290. fQueue.Free;
  291. inherited;
  292. end;
  293. procedure TThreadedQueueList<T>.Grow(ADelta: Integer);
  294. begin
  295. fQueueLock.Enter;
  296. try
  297. fQueue.Capacity := fQueue.Capacity + ADelta;
  298. finally
  299. fQueueLock.Leave;
  300. end;
  301. {$IFDEF FPC}
  302. FQueueCondVar.SetEvent;
  303. {$ELSE}
  304. FQueueCondVar.ReleaseAll;
  305. {$ENDIF}
  306. end;
  307. function TThreadedQueueList<T>.PopItem: T;
  308. var
  309. LQueueSize: Integer;
  310. begin
  311. PopItem(LQueueSize, Result);
  312. end;
  313. {$IFDEF FPC}
  314. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  315. //var
  316. //crono : TChronometer;
  317. begin
  318. AItem := Default(T);
  319. //crono := TChronometer.Create(False);
  320. try
  321. Result := wrSignaled;
  322. //writeln('popitem');
  323. //crono.Start;
  324. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  325. begin
  326. //crono.Start;
  327. Result := FQueueCondVar.WaitFor(FPopTimeout);
  328. //crono.Stop;
  329. //writeln('in: ' + crono.ElapsedTime);
  330. //if result = twaitresult.wrError then result := twaitresult.wrError;
  331. end;
  332. //crono.Stop;
  333. //writeln('out: ' + crono.ElapsedTime);
  334. fQueueLock.Enter;
  335. try
  336. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  337. AItem := fQueue.Extract;
  338. Dec(FQueueSize);
  339. Inc(fTotalItemsPopped);
  340. finally
  341. fQueueLock.Leave;
  342. end;
  343. finally
  344. AQueueSize := fQueueSize;
  345. end;
  346. end;
  347. {$ELSE}
  348. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
  349. begin
  350. AItem := Default(T);
  351. fQueueLock.Enter;
  352. try
  353. Result := wrSignaled;
  354. while (Result = wrSignaled) and (fQueueSize = 0) and not fShutDown do
  355. begin
  356. Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
  357. end;
  358. if (FShutDown and (fQueueSize = 0)) or (Result <> wrSignaled) then Exit;
  359. AItem := fQueue.Extract;
  360. if fQueueSize = fQueue.Count then
  361. begin
  362. FQueueCondVar.ReleaseAll;
  363. end;
  364. Dec(FQueueSize);
  365. Inc(fTotalItemsPopped);
  366. finally
  367. AQueueSize := fQueueSize;
  368. fQueueLock.Leave;
  369. end;
  370. end;
  371. {$ENDIF}
  372. function TThreadedQueueList<T>.PopItem(var AItem: T): TWaitResult;
  373. var
  374. LQueueSize: Integer;
  375. begin
  376. Result := PopItem(LQueueSize, AItem);
  377. end;
  378. function TThreadedQueueList<T>.PopItem(var AQueueSize: Integer): T;
  379. begin
  380. PopItem(AQueueSize, Result);
  381. end;
  382. function TThreadedQueueList<T>.PushItem(const AItem: T): TWaitResult;
  383. var
  384. LQueueSize: Integer;
  385. begin
  386. Result := PushItem(AItem, LQueueSize);
  387. end;
  388. {$IFDEF FPC}
  389. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  390. begin
  391. FQueueLock.Enter;
  392. try
  393. Result := wrSignaled;
  394. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  395. //begin
  396. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  397. //end;
  398. if fShutDown or (Result <> wrSignaled) then Exit;
  399. //if fQueueSize = 0 then
  400. //begin
  401. // FQueueCondVar.SetEvent;
  402. //end;
  403. fQueue.Enqueue(AItem);
  404. Inc(FQueueSize);
  405. Inc(fTotalItemsPushed);
  406. finally
  407. AQueueSize := fQueueSize;
  408. FQueueLock.Leave;
  409. //FQueueCondVar.SetEvent;
  410. end;
  411. end;
  412. {$ELSE}
  413. function TThreadedQueueList<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
  414. begin
  415. FQueueLock.Enter;
  416. try
  417. Result := wrSignaled;
  418. //while (Result = wrSignaled) and (fQueueSize = fQueue.Count) and not fShutDown do
  419. //begin
  420. // Result := fQueueCondVar.WaitFor(fQueueLock, fPushTimeout);
  421. //end;
  422. if fShutDown or (Result <> wrSignaled) then Exit;
  423. if fQueueSize = 0 then FQueueCondVar.ReleaseAll;
  424. fQueue.Enqueue(AItem);
  425. Inc(FQueueSize);
  426. Inc(fTotalItemsPushed);
  427. finally
  428. AQueueSize := fQueueSize;
  429. FQueueLock.Leave;
  430. end;
  431. end;
  432. {$ENDIF}
  433. procedure TThreadedQueueList<T>.DoShutDown;
  434. begin
  435. fShutDown := True;
  436. {$IFDEF FPC}
  437. FQueueCondVar.SetEvent;
  438. {$ELSE}
  439. FQueueCondVar.ReleaseAll;
  440. {$ENDIF}
  441. end;
  442. { TThreadTask<T> }
  443. function TThreadTask<T>.AddTask(Task: T): Boolean;
  444. begin
  445. Result := fTaskQueue.PushItem(Task) = TWaitResult.wrSignaled;
  446. end;
  447. constructor TThreadTask<T>.Create;
  448. begin
  449. inherited Create(True);
  450. fMaxQueue := 10;
  451. fInsertTimeout := INFINITE;
  452. fExtractTimeout := INFINITE;
  453. end;
  454. destructor TThreadTask<T>.Destroy;
  455. begin
  456. if Assigned(fTaskQueue) then fTaskQueue.Free;
  457. inherited;
  458. end;
  459. procedure TThreadTask<T>.Execute;
  460. begin
  461. inherited;
  462. end;
  463. function TThreadTask<T>.GetTaskQueue: Cardinal;
  464. begin
  465. if Assigned(fTaskQueue) then Result := fTaskQueue.QueueSize
  466. else Result := 0;
  467. end;
  468. procedure TThreadTask<T>.Start;
  469. begin
  470. fTaskQueue := TThreadedQueueCS<T>.Create(fMaxQueue,fInsertTimeout,fExtractTimeout);
  471. end;
  472. {$IFNDEF FPC}
  473. { TThreadObjectList<T> }
  474. procedure TThreadObjectList<T>.Add(const Item: T);
  475. begin
  476. LockList;
  477. try
  478. if (Duplicates = dupAccept) or (fList.IndexOf(Item) = -1) then fList.Add(Item)
  479. else if Duplicates = dupError then raise EListError.CreateFmt(SDuplicateItem, [fList.ItemValue(Item)]);
  480. finally
  481. UnlockList;
  482. end;
  483. end;
  484. procedure TThreadObjectList<T>.Clear;
  485. begin
  486. LockList;
  487. try
  488. fList.Clear;
  489. finally
  490. UnlockList;
  491. end;
  492. end;
  493. constructor TThreadObjectList<T>.Create(OwnedObjects : Boolean);
  494. begin
  495. inherited Create;
  496. fLock := TObject.Create;
  497. fList := TObjectList<T>.Create;
  498. fDuplicates := dupIgnore;
  499. end;
  500. destructor TThreadObjectList<T>.Destroy;
  501. begin
  502. LockList;
  503. try
  504. fList.Free;
  505. inherited Destroy;
  506. finally
  507. UnlockList;
  508. fLock.Free;
  509. end;
  510. end;
  511. function TThreadObjectList<T>.GetItem(aIndex: Integer): T;
  512. begin
  513. LockList;
  514. try
  515. Result := fList[aIndex];
  516. finally
  517. UnlockList;
  518. end;
  519. end;
  520. function TThreadObjectList<T>.LockList: TObjectList<T>;
  521. begin
  522. System.TMonitor.Enter(fLock);
  523. Result := fList;
  524. end;
  525. procedure TThreadObjectList<T>.Remove(const Item: T);
  526. begin
  527. RemoveItem(Item, TDirection.FromBeginning);
  528. end;
  529. procedure TThreadObjectList<T>.RemoveItem(const Item: T; Direction: TDirection);
  530. begin
  531. LockList;
  532. try
  533. fList.RemoveItem(Item, Direction);
  534. finally
  535. UnlockList;
  536. end;
  537. end;
  538. procedure TThreadObjectList<T>.SetItem(aIndex: Integer; aValue: T);
  539. begin
  540. LockList;
  541. try
  542. fList[aIndex] := aValue;
  543. finally
  544. UnlockList;
  545. end;
  546. end;
  547. procedure TThreadObjectList<T>.UnlockList;
  548. begin
  549. System.TMonitor.Exit(fLock);
  550. end;
  551. {$ENDIF}
  552. end.