fphttpclientasyncpool.pas 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355
  1. unit FPHTTPClientAsyncPool;
  2. {
  3. Default HTTP Client asynchronous pool.
  4. Events are not synchronized - the program has to synchronize within the events itself if necessary
  5. (e.g. with critical sections).
  6. If you are looking for a client pool that can be used in an LCL application and that does the synchronization for you,
  7. check (TODO: URL)
  8. }
  9. {$IF (FPC_FULLVERSION >= 30301)}
  10. {$define use_functionreferences}
  11. {$ENDIF}
  12. {$mode ObjFPC}{$H+}
  13. {$modeswitch advancedrecords}
  14. {$IFDEF use_functionreferences}
  15. {$modeswitch functionreferences}
  16. {$ENDIF}
  17. interface
  18. uses
  19. Classes, SysUtils, fphttpclient, httpprotocol, URIParser, syncobjs, ssockets, DateUtils, FPHTTPClientPool;
  20. type
  21. TFPHTTPClientPoolMethodResult = (mrSuccess, mrAbortedByClient, mrAbortedWithException);
  22. TFPHTTPClientAbstractAsyncPoolRequest = class;
  23. TFPHTTPClientPoolResult = class(TPersistent)
  24. private
  25. fExceptionClass: TClass;
  26. fExceptionMessage: string;
  27. fRequest: TFPHTTPClientAbstractAsyncPoolRequest;
  28. fMethodResult: TFPHTTPClientPoolMethodResult;
  29. fResponseHeaders: TStringList;
  30. fResponseStatusCode: Integer;
  31. fResponseStatusText: string;
  32. fResponseStream: TStream;
  33. fOwnsResponseStream: Boolean;
  34. function GetResponseContentType: string;
  35. function GetResponseEncoding: TEncoding;
  36. function GetResponseString: string;
  37. function GetResponseBytes: TBytes;
  38. protected
  39. procedure AssignTo(Dest: TPersistent); override;
  40. public
  41. property Request: TFPHTTPClientAbstractAsyncPoolRequest read fRequest;
  42. property MethodResult: TFPHTTPClientPoolMethodResult read fMethodResult write fMethodResult;
  43. property ResponseStatusCode: Integer read fResponseStatusCode write fResponseStatusCode;
  44. property ResponseStatusText: string read fResponseStatusText write fResponseStatusText;
  45. // ResponseEncoding - must be destroyed after use if it is not a standard encoding
  46. property ResponseEncoding: TEncoding read GetResponseEncoding;
  47. property ResponseHeaders: TStringList read fResponseHeaders;
  48. property ResponseStream: TStream read fResponseStream write fResponseStream;
  49. property OwnsResponseStream: Boolean read fOwnsResponseStream write fOwnsResponseStream;
  50. property ResponseString: string read GetResponseString;
  51. property ResponseBytes: TBytes read GetResponseBytes;
  52. property ResponseContentType: string read GetResponseContentType;
  53. property ExceptionClass: TClass read fExceptionClass write fExceptionClass;
  54. property ExceptionMessage: string read fExceptionMessage write fExceptionMessage;
  55. public
  56. constructor Create(const aRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  57. destructor Destroy; override;
  58. end;
  59. TFPHTTPClientAsyncPoolRequestThread = class;
  60. TFPHTTPClientPoolProgressDirection = (pdDataSent, pdDataReceived);
  61. {$IFDEF use_functionreferences}
  62. TFPHTTPClientAsyncPoolRequestRef = class;
  63. TFPHTTPClientPoolInitRef = reference to procedure(const aRequest: TFPHTTPClientAsyncPoolRequestRef; const aClient: TFPHTTPClient);
  64. TFPHTTPClientPoolFinishRef = reference to procedure(const aResult: TFPHTTPClientPoolResult);
  65. TFPHTTPClientPoolProgressRef = reference to procedure(
  66. Sender: TFPHTTPClientAsyncPoolRequestThread;
  67. const aDirection: TFPHTTPClientPoolProgressDirection;
  68. const aPosition, aContentLength: Int64; var ioStop: Boolean);
  69. TFPHTTPClientPoolSimpleCallbackRef = reference to procedure;
  70. {$ENDIF}
  71. TFPHTTPClientAsyncPoolRequest = class;
  72. TFPHTTPClientPoolInit = procedure(const aRequest: TFPHTTPClientAsyncPoolRequest; const aClient: TFPHTTPClient) of object;
  73. TFPHTTPClientPoolFinish = procedure(const aResult: TFPHTTPClientPoolResult) of object;
  74. TFPHTTPClientPoolProgress = procedure(
  75. Sender: TFPHTTPClientAsyncPoolRequestThread;
  76. const aDirection: TFPHTTPClientPoolProgressDirection;
  77. const aPosition, aContentLength: Int64; var ioStop: Boolean) of object;
  78. TNotifyComponentEvent = procedure(AOwner: TComponent) of object;
  79. TFPCustomHTTPClientAsyncPool = class;
  80. TFPHTTPClientAbstractAsyncPoolRequest = class(TPersistent)
  81. public
  82. // if Owner gets destroyed, the request will be aborted (=rsAbortedByClient)
  83. // especially needed in an LCL application where e.g. the form can get closed while the request is still running
  84. Owner: TComponent;
  85. // if aBlocker <> nil, when sending the request, all running requests with the same aBlocker will be aborted (=rsAbortedByClient)
  86. Blocker: TObject;
  87. // parameters for TFPHTTPClient.HTTPMethod
  88. Method, URL: string;
  89. URLData: TBytes;
  90. ContentType: string;
  91. Headers: string;
  92. ResponseStream: TStream;
  93. OwnsResponseStream: Boolean;
  94. AllowedResponseCodes: array of Integer;
  95. // EVENTS
  96. // should OnInit be synchronized with the main thread?
  97. SynchronizeOnInit: Boolean;
  98. // should OnFinish be synchronized with the main thread?
  99. SynchronizeOnFinish: Boolean;
  100. // should OnFinish be executed when Owner is destroyed during the request
  101. ExecuteOnFinishOnOwnerDestroy: Boolean;
  102. // TIMEOUTS in ms
  103. // timeout to find a free client in the pool. Default=0 (infinite)
  104. ClientTimeout: Integer;
  105. // TFPHTTPClient.ConnectTimeout. Default=3000
  106. ConnectTimeout: Integer;
  107. // TFPHTTPClient.IOTimeout. Default=0 (infinite)
  108. IOTimeout: Integer;
  109. private
  110. function GetHost: string;
  111. function GetURLDataString: string;
  112. procedure SetURLDataString(const aURLDataString: string);
  113. protected
  114. procedure DoOnInit(const aClient: TFPHTTPClient); virtual; abstract;
  115. procedure DoOnFinish(const aResult: TFPHTTPClientPoolResult); virtual; abstract;
  116. procedure DoOnProgress(Sender: TFPHTTPClientAsyncPoolRequestThread; const aDirection: TFPHTTPClientPoolProgressDirection;
  117. const aPosition, aContentLength: Int64; var ioStop: Boolean); virtual; abstract;
  118. function HasProgress: Boolean; virtual; abstract;
  119. procedure OwnerDestroyed; virtual;
  120. public
  121. constructor Create;
  122. public
  123. property URLDataString: string read GetURLDataString write SetURLDataString;
  124. property Host: string read GetHost;
  125. end;
  126. TFPHTTPClientAsyncPoolRequest = class(TFPHTTPClientAbstractAsyncPoolRequest)
  127. protected
  128. procedure DoOnInit(const aClient: TFPHTTPClient); override;
  129. procedure DoOnFinish(const aResult: TFPHTTPClientPoolResult); override;
  130. procedure DoOnProgress(Sender: TFPHTTPClientAsyncPoolRequestThread; const aDirection: TFPHTTPClientPoolProgressDirection;
  131. const aPosition, aContentLength: Int64; var ioStop: Boolean); override;
  132. function HasProgress: Boolean; override;
  133. procedure OwnerDestroyed; override;
  134. public
  135. // EVENTS
  136. // setup custom client properties
  137. OnInit: TFPHTTPClientPoolInit;
  138. // read out the result
  139. OnFinish: TFPHTTPClientPoolFinish;
  140. // progress callback
  141. OnProgress: TFPHTTPClientPoolProgress;
  142. end;
  143. {$IFDEF use_functionreferences}
  144. TFPHTTPClientAsyncPoolRequestRef = class(TFPHTTPClientAbstractAsyncPoolRequest)
  145. protected
  146. procedure DoOnInit(const aClient: TFPHTTPClient); override;
  147. procedure DoOnFinish(const aResult: TFPHTTPClientPoolResult); override;
  148. procedure DoOnProgress(Sender: TFPHTTPClientAsyncPoolRequestThread; const aDirection: TFPHTTPClientPoolProgressDirection;
  149. const aPosition, aContentLength: Int64; var ioStop: Boolean); override;
  150. function HasProgress: Boolean; override;
  151. procedure OwnerDestroyed; override;
  152. public
  153. // EVENTS
  154. // setup custom client properties
  155. OnInit: TFPHTTPClientPoolInitRef;
  156. // read out the result
  157. OnFinish: TFPHTTPClientPoolFinishRef;
  158. // progress callback
  159. OnProgress: TFPHTTPClientPoolProgressRef;
  160. end;
  161. {$ENDIF}
  162. TFPHTTPClientAsyncPoolThread = class(TThread)
  163. private
  164. fPool: TFPCustomHTTPClientAsyncPool;
  165. fCSProperties: TCriticalSection;
  166. protected
  167. // access only through LockProperties
  168. procedure OwnerDestroyed; virtual;
  169. public
  170. property Pool: TFPCustomHTTPClientAsyncPool read fPool;
  171. // access only through LockProperties
  172. function GetOwner: TComponent; virtual; abstract;
  173. // lock&unlock read/write properties (properties are written currently only in OwnerDestroyed)
  174. procedure LockProperties;
  175. procedure UnlockProperties;
  176. public
  177. constructor Create(aPool: TFPCustomHTTPClientAsyncPool);
  178. destructor Destroy; override;
  179. end;
  180. TFPHTTPClientAsyncPoolCustomWaitForAllThread = class(TFPHTTPClientAsyncPoolThread)
  181. private
  182. fTimeoutMS: Integer;
  183. fOwner: TComponent;
  184. fSynchronizeOnAllDone: Boolean;
  185. procedure ExecOnAllDone;
  186. protected
  187. procedure DoOnAllDone; virtual; abstract;
  188. procedure Execute; override;
  189. // access only through LockProperties
  190. procedure OwnerDestroyed; override;
  191. public
  192. // access only through LockProperties
  193. function GetOwner: TComponent; override;
  194. public
  195. constructor Create(aPool: TFPCustomHTTPClientAsyncPool; const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  196. end;
  197. TFPHTTPClientAsyncPoolWaitForAllThread = class(TFPHTTPClientAsyncPoolCustomWaitForAllThread)
  198. private
  199. fOnAllDone: TNotifyEvent;
  200. protected
  201. procedure DoOnAllDone; override;
  202. procedure OwnerDestroyed; override;
  203. public
  204. constructor Create(aPool: TFPCustomHTTPClientAsyncPool; aOnAllDone: TNotifyEvent;
  205. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  206. end;
  207. {$IFDEF use_functionreferences}
  208. TFPHTTPClientAsyncPoolWaitForAllThreadRef = class(TFPHTTPClientAsyncPoolCustomWaitForAllThread)
  209. private
  210. fOnAllDone: TFPHTTPClientPoolSimpleCallbackRef;
  211. protected
  212. procedure DoOnAllDone; override;
  213. procedure OwnerDestroyed; override;
  214. public
  215. constructor Create(aPool: TFPCustomHTTPClientAsyncPool; aOnAllDone: TFPHTTPClientPoolSimpleCallbackRef;
  216. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  217. end;
  218. {$ENDIF}
  219. TFPHTTPClientAsyncPoolRequestThread = class(TFPHTTPClientAsyncPoolThread)
  220. private
  221. fRequest: TFPHTTPClientAbstractAsyncPoolRequest;
  222. fClient: TFPHTTPClient;
  223. fResult: TFPHTTPClientPoolResult;
  224. procedure OnDataReceived(Sender: TObject; const aContentLength, aCurrentPos: Int64);
  225. procedure OnDataSent(Sender: TObject; const aContentLength, aCurrentPos: Int64);
  226. // the ExecOn* methods call their DoOn* counterparts - do the synchronisation here
  227. procedure ExecOnInit;
  228. procedure ExecOnProgress(const aDirection: TFPHTTPClientPoolProgressDirection;
  229. const aCurrentPos, aContentLength: Integer; var ioStop: Boolean);
  230. procedure ExecOnFinish;
  231. procedure OnIdle(Sender: TObject; AOperation: TSocketOperationType; var AAbort: Boolean);
  232. protected
  233. // access only through LockProperties
  234. procedure OwnerDestroyed; override;
  235. protected
  236. procedure OnDataReceivedSend(Sender: TObject; const aDirection: TFPHTTPClientPoolProgressDirection; const aCurrentPos, aContentLength: Int64); virtual;
  237. protected
  238. property Client: TFPHTTPClient read fClient;
  239. property Result: TFPHTTPClientPoolResult read fResult;
  240. // the DoOn* methods do the actual work and can be synchronised by their ExecOn* counterparts
  241. // DoOnInit - executed when the request aquired a TFPHTTPClient to setup its extra properties
  242. // should not be synchronized with Synchronize() - it slows down the execution. Better to use CriticalSections
  243. procedure DoOnInit; virtual;
  244. // DoOnProgress - show progress during upload&download
  245. // should not be synchronized with Synchronize() - it slows down the execution. Better to use CriticalSections or Application.QueueAsyncCall in an LCL application
  246. procedure DoOnProgress(const aDirection: TFPHTTPClientPoolProgressDirection;
  247. const aCurrentPos, aContentLength: Integer; var ioStop: Boolean); virtual;
  248. // DoOnFinish - executed when the request is done
  249. // can happily be synchronized with Synchronize() because when called, the request connection is already released back to pool for reuse
  250. procedure DoOnFinish; virtual;
  251. protected
  252. procedure Execute; override;
  253. public
  254. constructor Create(aPool: TFPCustomHTTPClientAsyncPool;
  255. aRequest: TFPHTTPClientAbstractAsyncPoolRequest; aClient: TFPHTTPClient); virtual;
  256. destructor Destroy; override;
  257. public
  258. // access only through LockProperties
  259. property Request: TFPHTTPClientAbstractAsyncPoolRequest read fRequest;
  260. function GetOwner: TComponent; override;
  261. end;
  262. TFPHTTPClientAsyncPoolRequestQueueItem = class(TObject)
  263. public
  264. Pool: TFPCustomHTTPClientAsyncPool;
  265. Clients: TFPCustomHTTPClients;
  266. BreakUTC: TDateTime;
  267. Request: TFPHTTPClientAbstractAsyncPoolRequest;
  268. public
  269. destructor Destroy; override;
  270. end;
  271. TFPCustomHTTPClientAsyncPool = class(TComponent)
  272. private
  273. fHttpPool: TFPCustomHTTPClientPool;
  274. // do not access fWaitingQueue directly, use LockWorkingThreads() instead
  275. fWorkingThreads: TThreadList;
  276. fWaitingQueue: TList;
  277. fBlockRequestsCounter: Integer;
  278. function GetActiveAsyncMethodCount: Integer;
  279. function GetClientCount: Integer;
  280. function GetMaxClientsPerServer: Integer;
  281. function GetWaitingAsyncMethodCount: Integer;
  282. procedure SetMaxClientsPerServer(const aMaxClientsPerServer: Integer);
  283. private
  284. fDoOnAbortedFinishSynchronizedCS: TCriticalSection;
  285. fDoOnAbortedFinishSynchronizedRequest: TFPHTTPClientAbstractAsyncPoolRequest;
  286. procedure ExecOnAbortedFinish(var ioRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  287. procedure DoOnAbortedFinishSynchronized;
  288. protected
  289. procedure Notification(AComponent: TComponent; Operation: TOperation); override;
  290. function CreatePool: TFPCustomHTTPClientPool; virtual;
  291. function CreateRequestThread(aRequest: TFPHTTPClientAbstractAsyncPoolRequest; aClient: TFPHTTPClient): TFPHTTPClientAsyncPoolRequestThread; virtual;
  292. function CreateWaitForAllRequestsThread(const aOnAllDone: TNotifyEvent; const aSynchronizeOnAllDone: Boolean;
  293. const aOwner: TComponent; const aTimeoutMS: Integer): TFPHTTPClientAsyncPoolWaitForAllThread; virtual;
  294. {$IFDEF use_functionreferences}
  295. function CreateWaitForAllRequestsThreadRef(const aOnAllDone: TFPHTTPClientPoolSimpleCallbackRef; const aSynchronizeOnAllDone: Boolean;
  296. const aOwner: TComponent; const aTimeoutMS: Integer): TFPHTTPClientAsyncPoolWaitForAllThreadRef; virtual;
  297. {$ENDIF}
  298. // support for MaxClientsPerServer (add requests that wait for a client to a queue)
  299. procedure AddToQueue(const aClients: TFPCustomHTTPClients; const aBreakUTC: TDateTime; const aRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  300. procedure ReleaseClient(const aURL: string; const aClient: TFPHTTPClient);
  301. procedure DoOnAbortedFinish(var ioRequest: TFPHTTPClientAbstractAsyncPoolRequest); virtual;
  302. procedure LockWorkingThreads(out outWorkingThreads, outWaitingQueue: TList);
  303. procedure UnlockWorkingThreads;
  304. public
  305. // send an asynchronous HTTP request
  306. procedure AsyncMethod(aRequest: TFPHTTPClientAbstractAsyncPoolRequest); overload;
  307. // stop all requests with Blocker
  308. procedure StopRequests(const aBlocker: TObject);
  309. // stop all requests with Owner and don't send results to Owner
  310. procedure OwnerDestroyed(const aOwner: TObject);
  311. procedure BlockNewRequests;
  312. procedure UnblockNewRequests;
  313. // wait until all requests are finished
  314. // all new requests will be blocked in between
  315. procedure WaitForAllRequests(const aOnAllDone: TNotifyEvent; const aSynchronizeOnAllDone: Boolean;
  316. const aOwner: TComponent; const aTimeoutMS: Integer);
  317. {$IFDEF use_functionreferences}
  318. procedure WaitForAllRequests(const aOnAllDoneRef: TFPHTTPClientPoolSimpleCallbackRef; const aSynchronizeOnAllDone: Boolean;
  319. const aOwner: TComponent; const aTimeoutMS: Integer);
  320. {$ENDIF}
  321. public
  322. constructor Create(AOwner: TComponent); override;
  323. destructor Destroy; override;
  324. public
  325. property ClientCount: Integer read GetClientCount;
  326. property ActiveAsyncMethodCount: Integer read GetActiveAsyncMethodCount;
  327. property WaitingAsyncMethodCount: Integer read GetWaitingAsyncMethodCount;
  328. property MaxClientsPerServer: Integer read GetMaxClientsPerServer write SetMaxClientsPerServer;
  329. end;
  330. implementation
  331. { TFPHTTPClientAsyncPoolRequestRef }
  332. procedure TFPHTTPClientAsyncPoolRequestRef.DoOnFinish(const aResult: TFPHTTPClientPoolResult);
  333. begin
  334. if Assigned(OnFinish) then
  335. OnFinish(aResult);
  336. end;
  337. procedure TFPHTTPClientAsyncPoolRequestRef.DoOnInit(const aClient: TFPHTTPClient);
  338. begin
  339. if Assigned(OnInit) then
  340. OnInit(Self, aClient);
  341. end;
  342. procedure TFPHTTPClientAsyncPoolRequestRef.DoOnProgress(Sender: TFPHTTPClientAsyncPoolRequestThread;
  343. const aDirection: TFPHTTPClientPoolProgressDirection; const aPosition, aContentLength: Int64; var ioStop: Boolean);
  344. begin
  345. if Assigned(OnProgress) then
  346. OnProgress(Sender, aDirection, aPosition, aContentLength, ioStop);
  347. end;
  348. function TFPHTTPClientAsyncPoolRequestRef.HasProgress: Boolean;
  349. begin
  350. Result := Assigned(OnProgress);
  351. end;
  352. procedure TFPHTTPClientAsyncPoolRequestRef.OwnerDestroyed;
  353. begin
  354. inherited OwnerDestroyed;
  355. OnInit := nil;
  356. OnProgress := nil;
  357. if not ExecuteOnFinishOnOwnerDestroy then
  358. OnFinish := nil;
  359. end;
  360. { TFPHTTPClientAsyncPoolRequest }
  361. procedure TFPHTTPClientAsyncPoolRequest.DoOnFinish(const aResult: TFPHTTPClientPoolResult);
  362. begin
  363. if Assigned(OnFinish) then
  364. OnFinish(aResult);
  365. end;
  366. procedure TFPHTTPClientAsyncPoolRequest.DoOnInit(const aClient: TFPHTTPClient);
  367. begin
  368. if Assigned(OnInit) then
  369. OnInit(Self, aClient);
  370. end;
  371. procedure TFPHTTPClientAsyncPoolRequest.DoOnProgress(Sender: TFPHTTPClientAsyncPoolRequestThread;
  372. const aDirection: TFPHTTPClientPoolProgressDirection; const aPosition, aContentLength: Int64; var ioStop: Boolean);
  373. begin
  374. if Assigned(OnProgress) then
  375. OnProgress(Sender, aDirection, aPosition, aContentLength, ioStop);
  376. end;
  377. function TFPHTTPClientAsyncPoolRequest.HasProgress: Boolean;
  378. begin
  379. Result := Assigned(OnProgress);
  380. end;
  381. procedure TFPHTTPClientAsyncPoolRequest.OwnerDestroyed;
  382. begin
  383. inherited OwnerDestroyed;
  384. OnInit := nil;
  385. OnProgress := nil;
  386. if not ExecuteOnFinishOnOwnerDestroy then
  387. OnFinish := nil;
  388. end;
  389. { TFPHTTPClientAsyncPoolWaitForAllThreadRef }
  390. constructor TFPHTTPClientAsyncPoolWaitForAllThreadRef.Create(aPool: TFPCustomHTTPClientAsyncPool;
  391. aOnAllDone: TFPHTTPClientPoolSimpleCallbackRef; const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent;
  392. const aTimeoutMS: Integer);
  393. begin
  394. end;
  395. procedure TFPHTTPClientAsyncPoolWaitForAllThreadRef.DoOnAllDone;
  396. begin
  397. if Assigned(fOnAllDone) then
  398. fOnAllDone;
  399. end;
  400. procedure TFPHTTPClientAsyncPoolWaitForAllThreadRef.OwnerDestroyed;
  401. begin
  402. inherited OwnerDestroyed;
  403. fOnAllDone := nil;
  404. end;
  405. { TFPHTTPClientAsyncPoolRequestQueueItem }
  406. destructor TFPHTTPClientAsyncPoolRequestQueueItem.Destroy;
  407. begin
  408. if Assigned(Request) then
  409. begin
  410. Pool.DoOnAbortedFinish(Request);
  411. Request.Free;
  412. end;
  413. inherited Destroy;
  414. end;
  415. { TFPHTTPClientAsyncPoolWaitForAllThread }
  416. constructor TFPHTTPClientAsyncPoolWaitForAllThread.Create(aPool: TFPCustomHTTPClientAsyncPool;
  417. aOnAllDone: TNotifyEvent; const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  418. begin
  419. fOnAllDone := aOnAllDone;
  420. inherited Create(aPool, aSynchronizeOnAllDone, aOwner, aTimeoutMS);
  421. end;
  422. procedure TFPHTTPClientAsyncPoolWaitForAllThread.DoOnAllDone;
  423. begin
  424. if Assigned(fOnAllDone) then
  425. fOnAllDone(Self);
  426. end;
  427. procedure TFPHTTPClientAsyncPoolWaitForAllThread.OwnerDestroyed;
  428. begin
  429. inherited OwnerDestroyed;
  430. fOnAllDone := nil;
  431. end;
  432. { TFPHTTPClientAsyncPoolCustomWaitForAllThread }
  433. constructor TFPHTTPClientAsyncPoolCustomWaitForAllThread.Create(aPool: TFPCustomHTTPClientAsyncPool;
  434. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  435. begin
  436. inherited Create(aPool);
  437. fSynchronizeOnAllDone := aSynchronizeOnAllDone;
  438. fOwner := aOwner;
  439. fTimeoutMS := aTimeoutMS;
  440. end;
  441. procedure TFPHTTPClientAsyncPoolCustomWaitForAllThread.ExecOnAllDone;
  442. begin
  443. if fSynchronizeOnAllDone then
  444. Synchronize(@DoOnAllDone)
  445. else
  446. DoOnAllDone;
  447. end;
  448. procedure TFPHTTPClientAsyncPoolCustomWaitForAllThread.Execute;
  449. var
  450. xBreak: TDateTime;
  451. begin
  452. try
  453. Pool.BlockNewRequests;
  454. try
  455. if fTimeoutMS>0 then
  456. xBreak := IncMilliSecond(NowUTC, fTimeoutMS);
  457. while not Terminated and (Pool.ActiveAsyncMethodCount>0) and ((fTimeoutMS=0) or (NowUTC<xBreak)) do
  458. Sleep(10);
  459. finally
  460. Pool.UnblockNewRequests;
  461. end;
  462. if not Terminated then
  463. ExecOnAllDone;
  464. except
  465. end;
  466. end;
  467. function TFPHTTPClientAsyncPoolCustomWaitForAllThread.GetOwner: TComponent;
  468. begin
  469. Result := fOwner;
  470. end;
  471. procedure TFPHTTPClientAsyncPoolCustomWaitForAllThread.OwnerDestroyed;
  472. begin
  473. inherited OwnerDestroyed;
  474. fOwner := nil;
  475. end;
  476. { TFPHTTPClientAsyncPoolThread }
  477. constructor TFPHTTPClientAsyncPoolThread.Create(aPool: TFPCustomHTTPClientAsyncPool);
  478. begin
  479. fPool := aPool;
  480. fPool.fWorkingThreads.Add(Self);
  481. FreeOnTerminate := True;
  482. fCSProperties := TCriticalSection.Create;
  483. inherited Create(False);
  484. end;
  485. destructor TFPHTTPClientAsyncPoolThread.Destroy;
  486. begin
  487. fPool.fWorkingThreads.Remove(Self);
  488. fCSProperties.Free;
  489. inherited Destroy;
  490. end;
  491. procedure TFPHTTPClientAsyncPoolThread.LockProperties;
  492. begin
  493. fCSProperties.Enter;
  494. end;
  495. procedure TFPHTTPClientAsyncPoolThread.OwnerDestroyed;
  496. begin
  497. Terminate;
  498. end;
  499. procedure TFPHTTPClientAsyncPoolThread.UnlockProperties;
  500. begin
  501. fCSProperties.Leave;
  502. end;
  503. { TFPHTTPClientAbstractAsyncPoolRequest }
  504. constructor TFPHTTPClientAbstractAsyncPoolRequest.Create;
  505. begin
  506. inherited Create;
  507. ConnectTimeout := 3000;
  508. end;
  509. function TFPHTTPClientAbstractAsyncPoolRequest.GetHost: string;
  510. var
  511. xURI: TURI;
  512. begin
  513. xURI := ParseURI(URL, False);
  514. Result := xURI.Host;
  515. end;
  516. function TFPHTTPClientAbstractAsyncPoolRequest.GetURLDataString: string;
  517. begin
  518. Result := TEncoding.SystemEncoding.GetAnsiString(URLData);
  519. end;
  520. procedure TFPHTTPClientAbstractAsyncPoolRequest.OwnerDestroyed;
  521. begin
  522. Owner := nil;
  523. end;
  524. procedure TFPHTTPClientAbstractAsyncPoolRequest.SetURLDataString(const aURLDataString: string);
  525. begin
  526. URLData := TEncoding.SystemEncoding.GetAnsiBytes(aURLDataString);
  527. end;
  528. { TFPHTTPClientPoolResult }
  529. constructor TFPHTTPClientPoolResult.Create(const aRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  530. begin
  531. inherited Create;
  532. fRequest := aRequest;
  533. fResponseHeaders := TStringList.Create;
  534. end;
  535. procedure TFPHTTPClientPoolResult.AssignTo(Dest: TPersistent);
  536. var
  537. xDest: TFPHTTPClientPoolResult;
  538. begin
  539. if Dest is TFPHTTPClientPoolResult then
  540. begin
  541. xDest := TFPHTTPClientPoolResult(Dest);
  542. xDest.fExceptionClass := fExceptionClass;
  543. xDest.fExceptionMessage := fExceptionMessage;
  544. xDest.fMethodResult := fMethodResult;
  545. xDest.fResponseHeaders.Assign(fResponseHeaders);
  546. xDest.fResponseStatusCode := fResponseStatusCode;
  547. xDest.fResponseStatusText := fResponseStatusText;
  548. end else
  549. inherited AssignTo(Dest);
  550. end;
  551. destructor TFPHTTPClientPoolResult.Destroy;
  552. begin
  553. fResponseHeaders.Free;
  554. if OwnsResponseStream then
  555. ResponseStream.Free;
  556. fRequest.Free;
  557. inherited Destroy;
  558. end;
  559. function TFPHTTPClientPoolResult.GetResponseBytes: TBytes;
  560. begin
  561. Result := nil;
  562. if ResponseStream.Size=0 then
  563. Exit;
  564. SetLength(Result, ResponseStream.Size);
  565. ResponseStream.Position := 0;
  566. if Length(Result)>0 then
  567. ResponseStream.ReadBuffer(Result, Length(Result));
  568. end;
  569. function TFPHTTPClientPoolResult.GetResponseContentType: string;
  570. begin
  571. Result := TFPCustomHTTPClient.GetHeader(fResponseHeaders, HeaderContentType);
  572. end;
  573. function TFPHTTPClientPoolResult.GetResponseEncoding: TEncoding;
  574. var
  575. xContentType, xEncodingName: string;
  576. xStrL: TStringList;
  577. I: Integer;
  578. begin
  579. xContentType := GetResponseContentType;
  580. xContentType := 'Content-Type: text/html; charset=utf-8';
  581. xStrL := TStringList.Create;
  582. try
  583. xStrL.Delimiter := ';';
  584. xStrL.NameValueSeparator := '=';
  585. xStrL.StrictDelimiter := False;
  586. xStrL.DelimitedText := xContentType;
  587. for I := 0 to xStrL.Count-1 do
  588. begin
  589. if SameText(xStrL.Names[I], 'charset') then
  590. begin
  591. xEncodingName := xStrL.ValueFromIndex[I];
  592. Exit(TEncoding.GetEncoding(UnicodeString(xEncodingName)));
  593. end;
  594. end;
  595. finally
  596. xStrL.Free;
  597. end;
  598. Result := nil;
  599. end;
  600. function TFPHTTPClientPoolResult.GetResponseString: string;
  601. var
  602. xEncoding: TEncoding;
  603. xBytes: TBytes;
  604. begin
  605. if not Assigned(ResponseStream) or (ResponseStream.Size=0) then
  606. Exit('');
  607. xEncoding := ResponseEncoding;
  608. try
  609. if Assigned(xEncoding) then
  610. begin
  611. if ResponseStream is TBytesStream then
  612. Result := xEncoding.GetAnsiString(TBytesStream(ResponseStream).Bytes, 0, TBytesStream(ResponseStream).Size)
  613. else
  614. begin
  615. xBytes := nil;
  616. SetLength(xBytes, ResponseStream.Size);
  617. ResponseStream.Position := 0;
  618. ResponseStream.ReadBuffer(xBytes[0], Length(xBytes));
  619. Result := xEncoding.GetAnsiString(xBytes);
  620. end;
  621. end else
  622. begin
  623. SetLength(Result, ResponseStream.Size);
  624. ResponseStream.Position := 0;
  625. if Length(Result)>0 then
  626. ResponseStream.ReadBuffer(Result[1], Length(Result));
  627. end;
  628. finally
  629. if Assigned(xEncoding) and not TEncoding.IsStandardEncoding(xEncoding) then
  630. xEncoding.Free;
  631. end;
  632. end;
  633. { TFPCustomHTTPClientAsyncPool }
  634. procedure TFPCustomHTTPClientAsyncPool.AsyncMethod(aRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  635. var
  636. xClients: TFPCustomHTTPClients;
  637. xBreakUTC: TDateTime;
  638. xURI: TURI;
  639. xClient: TFPHTTPClient;
  640. begin
  641. fWorkingThreads.LockList;
  642. try
  643. if fBlockRequestsCounter<>0 then
  644. begin
  645. DoOnAbortedFinish(aRequest);
  646. Exit;
  647. end;
  648. if Assigned(aRequest.Blocker) then
  649. StopRequests(aRequest.Blocker);
  650. if Assigned(aRequest.Owner) then
  651. begin
  652. FreeNotification(aRequest.Owner);
  653. // We do not remove the notification with RemoveFreeNotification().
  654. // It would be unsafe if more requests are sent with the same owner.
  655. // That is fine - it will be removed automatically when the owner is destroyed.
  656. end;
  657. xURI := ParseURI(aRequest.URL, False);
  658. xClients := fHttpPool.GetCreateServerClients(xURI.Host, xURI.Port);
  659. if aRequest.ClientTimeout>0 then
  660. xBreakUTC := IncMilliSecond(NowUTC, aRequest.ClientTimeout)
  661. else
  662. xBreakUTC := 0;
  663. xClient := xClients.GetClient;
  664. if Assigned(xClient) then
  665. // client is available -> create request thread
  666. CreateRequestThread(aRequest, xClient)
  667. else
  668. // no client available -> add to queue
  669. AddToQueue(xClients, xBreakUTC, aRequest);
  670. aRequest := nil; // don't destroy aRequest
  671. finally
  672. fWorkingThreads.UnlockList;
  673. aRequest.Free;
  674. end;
  675. end;
  676. procedure TFPCustomHTTPClientAsyncPool.BlockNewRequests;
  677. begin
  678. fWorkingThreads.LockList;
  679. Inc(fBlockRequestsCounter);
  680. fWorkingThreads.UnlockList;
  681. end;
  682. function TFPCustomHTTPClientAsyncPool.CreatePool: TFPCustomHTTPClientPool;
  683. begin
  684. Result := TFPCustomHTTPClientPool.Create(Self);
  685. end;
  686. function TFPCustomHTTPClientAsyncPool.CreateRequestThread(aRequest: TFPHTTPClientAbstractAsyncPoolRequest;
  687. aClient: TFPHTTPClient): TFPHTTPClientAsyncPoolRequestThread;
  688. begin
  689. Result := TFPHTTPClientAsyncPoolRequestThread.Create(Self, aRequest, aClient);
  690. end;
  691. function TFPCustomHTTPClientAsyncPool.CreateWaitForAllRequestsThread(const aOnAllDone: TNotifyEvent;
  692. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent;
  693. const aTimeoutMS: Integer): TFPHTTPClientAsyncPoolWaitForAllThread;
  694. begin
  695. Result := TFPHTTPClientAsyncPoolWaitForAllThread.Create(Self, aOnAllDone, aSynchronizeOnAllDone, aOwner, aTimeoutMS);
  696. end;
  697. function TFPCustomHTTPClientAsyncPool.CreateWaitForAllRequestsThreadRef(
  698. const aOnAllDone: TFPHTTPClientPoolSimpleCallbackRef; const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent;
  699. const aTimeoutMS: Integer): TFPHTTPClientAsyncPoolWaitForAllThreadRef;
  700. begin
  701. Result := TFPHTTPClientAsyncPoolWaitForAllThreadRef.Create(Self, aOnAllDone, aSynchronizeOnAllDone, aOwner, aTimeoutMS);
  702. end;
  703. constructor TFPCustomHTTPClientAsyncPool.Create(AOwner: TComponent);
  704. begin
  705. fWorkingThreads := TThreadList.Create;
  706. fWaitingQueue := TList.Create;
  707. fHttpPool := CreatePool;
  708. fDoOnAbortedFinishSynchronizedCS := TCriticalSection.Create;
  709. inherited Create(AOwner);
  710. end;
  711. procedure TFPCustomHTTPClientAsyncPool.AddToQueue(const aClients: TFPCustomHTTPClients; const aBreakUTC: TDateTime;
  712. const aRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  713. var
  714. xNewItem: TFPHTTPClientAsyncPoolRequestQueueItem;
  715. xThreads, xQueue: TList;
  716. begin
  717. LockWorkingThreads(xThreads, xQueue);
  718. try
  719. xNewItem := TFPHTTPClientAsyncPoolRequestQueueItem.Create;
  720. xNewItem.Pool := Self;
  721. xNewItem.Clients := aClients;
  722. xNewItem.BreakUTC := aBreakUTC;
  723. xNewItem.Request := aRequest;
  724. xQueue.Add(xNewItem);
  725. finally
  726. UnlockWorkingThreads;
  727. end;
  728. end;
  729. procedure TFPCustomHTTPClientAsyncPool.ReleaseClient(const aURL: string; const aClient: TFPHTTPClient);
  730. var
  731. xURI: TURI;
  732. xClients: TFPCustomHTTPClients;
  733. xItem: TFPHTTPClientAsyncPoolRequestQueueItem;
  734. xRequest: TFPHTTPClientAbstractAsyncPoolRequest;
  735. I: Integer;
  736. xThreads, xQueue: TList;
  737. begin
  738. LockWorkingThreads(xThreads, xQueue);
  739. try
  740. xURI := ParseURI(aURL, False);
  741. xClients := fHttpPool.GetCreateServerClients(xURI.Host, xURI.Port);
  742. I := 0;
  743. while I<xQueue.Count do
  744. begin
  745. xItem := TFPHTTPClientAsyncPoolRequestQueueItem(xQueue[I]);
  746. if (CompareDateTime(xItem.BreakUTC, 0)<>0) and (CompareDateTime(xItem.BreakUTC, NowUTC)<0) then
  747. begin // timeout is over
  748. xItem.Free;
  749. xQueue.Delete(I);
  750. end else
  751. if xClients=xItem.Clients then
  752. begin // found a request waiting in queue
  753. xRequest := xItem.Request;
  754. xItem.Request := nil; // do not destroy/abort request
  755. xItem.Free;
  756. xQueue.Delete(I);
  757. CreateRequestThread(xRequest, aClient);
  758. Exit;
  759. end else
  760. Inc(I);
  761. end;
  762. // no waiting request found - release the client
  763. fHttpPool.ReleaseClient(xURI.Host, xURI.Port, aClient);
  764. finally
  765. UnlockWorkingThreads;
  766. end;
  767. end;
  768. destructor TFPCustomHTTPClientAsyncPool.Destroy;
  769. procedure _TerminateAll(_List: TList);
  770. var
  771. I: Integer;
  772. begin
  773. for I := 0 to _List.Count-1 do
  774. TThread(_List[I]).Terminate;
  775. end;
  776. procedure _ClearWaitingQueue(_List: TList);
  777. var
  778. I: Integer;
  779. begin
  780. for I := 0 to _List.Count-1 do
  781. TObject(_List[I]).Free;
  782. _List.Clear;
  783. end;
  784. var
  785. xThreads, xQueue: TList;
  786. begin
  787. LockWorkingThreads(xThreads, xQueue);
  788. try
  789. _TerminateAll(xThreads);
  790. _ClearWaitingQueue(xQueue);
  791. finally
  792. UnlockWorkingThreads;
  793. end;
  794. while ActiveAsyncMethodCount>0 do
  795. begin
  796. if (ThreadID=MainThreadID) then // we are synchronizing events - call CheckSynchronize to prevent deadlock in the main thread
  797. CheckSynchronize(10)
  798. else
  799. Sleep(10);
  800. end;
  801. fWorkingThreads.Free;
  802. fWaitingQueue.Free;
  803. fDoOnAbortedFinishSynchronizedCS.Free;
  804. inherited Destroy;
  805. end;
  806. procedure TFPCustomHTTPClientAsyncPool.DoOnAbortedFinish(var ioRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  807. var
  808. xResult: TFPHTTPClientPoolResult;
  809. begin
  810. xResult := TFPHTTPClientPoolResult.Create(ioRequest);
  811. try
  812. xResult.MethodResult := mrAbortedByClient;
  813. ioRequest.DoOnFinish(xResult);
  814. ioRequest := nil; // ioRequest gets destroyed in xResult.Free
  815. finally
  816. xResult.Free;
  817. end;
  818. end;
  819. procedure TFPCustomHTTPClientAsyncPool.DoOnAbortedFinishSynchronized;
  820. begin
  821. DoOnAbortedFinish(fDoOnAbortedFinishSynchronizedRequest);
  822. end;
  823. procedure TFPCustomHTTPClientAsyncPool.ExecOnAbortedFinish(var ioRequest: TFPHTTPClientAbstractAsyncPoolRequest);
  824. begin
  825. // always synchronize - even if OnFinish is nil, so that ioRequest gets destroyed in the main thread
  826. // if somebody had the idea to do something with the LCL in a custom request destructor
  827. // -- don't do: if not Assigned(ioRequest.OnFinish) then Exit;
  828. if ioRequest.SynchronizeOnFinish and (ThreadID<>MainThreadID) then
  829. begin
  830. fDoOnAbortedFinishSynchronizedCS.Enter; // we need to protect fDoOnAbortedFinishSynchronizedRequest
  831. try
  832. fDoOnAbortedFinishSynchronizedRequest := ioRequest;
  833. TThread.Synchronize(nil, @DoOnAbortedFinishSynchronized);
  834. ioRequest := nil;
  835. finally
  836. fDoOnAbortedFinishSynchronizedCS.Free;
  837. end;
  838. end else
  839. DoOnAbortedFinish(ioRequest);
  840. end;
  841. function TFPCustomHTTPClientAsyncPool.GetActiveAsyncMethodCount: Integer;
  842. var
  843. xThreads, xQueue: TList;
  844. begin
  845. LockWorkingThreads(xThreads, xQueue);
  846. Result := xThreads.Count;
  847. UnlockWorkingThreads;
  848. end;
  849. function TFPCustomHTTPClientAsyncPool.GetClientCount: Integer;
  850. begin
  851. Result := fHttpPool.ClientCount;
  852. end;
  853. function TFPCustomHTTPClientAsyncPool.GetMaxClientsPerServer: Integer;
  854. begin
  855. Result := fHttpPool.MaxClientsPerServer;
  856. end;
  857. function TFPCustomHTTPClientAsyncPool.GetWaitingAsyncMethodCount: Integer;
  858. var
  859. xThreads, xQueue: TList;
  860. begin
  861. LockWorkingThreads(xThreads, xQueue);
  862. Result := xQueue.Count;
  863. UnlockWorkingThreads;
  864. end;
  865. procedure TFPCustomHTTPClientAsyncPool.LockWorkingThreads(out outWorkingThreads, outWaitingQueue: TList);
  866. begin
  867. outWorkingThreads := fWorkingThreads.LockList;
  868. outWaitingQueue := fWaitingQueue;
  869. end;
  870. procedure TFPCustomHTTPClientAsyncPool.Notification(AComponent: TComponent; Operation: TOperation);
  871. begin
  872. if Operation=opRemove then
  873. OwnerDestroyed(AComponent);
  874. inherited Notification(AComponent, Operation);
  875. end;
  876. procedure TFPCustomHTTPClientAsyncPool.SetMaxClientsPerServer(const aMaxClientsPerServer: Integer);
  877. begin
  878. fHttpPool.MaxClientsPerServer := aMaxClientsPerServer;
  879. end;
  880. procedure TFPCustomHTTPClientAsyncPool.StopRequests(const aBlocker: TObject);
  881. var
  882. I: Integer;
  883. xThreads, xQueue: TList;
  884. xThread: TFPHTTPClientAsyncPoolRequestThread;
  885. xItem: TFPHTTPClientAsyncPoolRequestQueueItem;
  886. begin
  887. LockWorkingThreads(xThreads, xQueue);
  888. try
  889. for I := 0 to xThreads.Count-1 do
  890. begin
  891. if TObject(xThreads[I]) is TFPHTTPClientAsyncPoolRequestThread then
  892. begin
  893. xThread := TFPHTTPClientAsyncPoolRequestThread(TObject(xThreads[I]));
  894. xThread.LockProperties;
  895. try
  896. if xThread.Request.Blocker=aBlocker then
  897. xThread.Terminate;
  898. finally
  899. xThread.UnlockProperties;
  900. end;
  901. end;
  902. end;
  903. for I := xQueue.Count-1 downto 0 do
  904. begin
  905. xItem := TFPHTTPClientAsyncPoolRequestQueueItem(xQueue[I]);
  906. if xItem.Request.Blocker=aBlocker then
  907. begin // found a request waiting in queue
  908. xItem.Free;
  909. xQueue.Delete(I);
  910. end;
  911. end;
  912. finally
  913. UnlockWorkingThreads;
  914. end;
  915. end;
  916. procedure TFPCustomHTTPClientAsyncPool.OwnerDestroyed(const aOwner: TObject);
  917. var
  918. I: Integer;
  919. xList, xQueue: TList;
  920. xThread: TFPHTTPClientAsyncPoolThread;
  921. xItem: TFPHTTPClientAsyncPoolRequestQueueItem;
  922. begin
  923. LockWorkingThreads(xList, xQueue);
  924. try
  925. for I := 0 to xList.Count-1 do
  926. begin
  927. if TObject(xList[I]) is TFPHTTPClientAsyncPoolThread then
  928. begin
  929. xThread := TFPHTTPClientAsyncPoolThread(TObject(xList[I]));
  930. xThread.LockProperties;
  931. try
  932. if xThread.GetOwner=aOwner then
  933. xThread.OwnerDestroyed;
  934. finally
  935. xThread.UnlockProperties;
  936. end;
  937. end;
  938. end;
  939. for I := xQueue.Count-1 downto 0 do
  940. begin
  941. xItem := TFPHTTPClientAsyncPoolRequestQueueItem(xQueue[I]);
  942. if xItem.Request.Owner=aOwner then
  943. begin // found a request waiting in queue
  944. xItem.Free;
  945. xQueue.Delete(I);
  946. end;
  947. end;
  948. finally
  949. UnlockWorkingThreads;
  950. end;
  951. end;
  952. procedure TFPCustomHTTPClientAsyncPool.UnblockNewRequests;
  953. begin
  954. fWorkingThreads.LockList;
  955. Dec(fBlockRequestsCounter);
  956. fWorkingThreads.UnlockList;
  957. end;
  958. procedure TFPCustomHTTPClientAsyncPool.UnlockWorkingThreads;
  959. begin
  960. fWorkingThreads.UnlockList;
  961. end;
  962. procedure TFPCustomHTTPClientAsyncPool.WaitForAllRequests(const aOnAllDoneRef: TFPHTTPClientPoolSimpleCallbackRef;
  963. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  964. begin
  965. if ActiveAsyncMethodCount=0 then
  966. begin
  967. if Assigned(aOnAllDoneRef) then
  968. aOnAllDoneRef;
  969. Exit;
  970. end;
  971. if Assigned(aOwner) then
  972. begin
  973. FreeNotification(aOwner);
  974. // We do not remove the notification with RemoveFreeNotification().
  975. // It would be unsafe if more requests are sent with the same owner.
  976. // That is fine - it will be removed automatically when the owner is destroyed.
  977. end;
  978. CreateWaitForAllRequestsThreadRef(aOnAllDoneRef, aSynchronizeOnAllDone, aOwner, aTimeoutMS);
  979. end;
  980. procedure TFPCustomHTTPClientAsyncPool.WaitForAllRequests(const aOnAllDone: TNotifyEvent;
  981. const aSynchronizeOnAllDone: Boolean; const aOwner: TComponent; const aTimeoutMS: Integer);
  982. begin
  983. if ActiveAsyncMethodCount=0 then
  984. begin
  985. if Assigned(aOnAllDone) then
  986. aOnAllDone(Self);
  987. Exit;
  988. end;
  989. if Assigned(aOwner) then
  990. begin
  991. FreeNotification(aOwner);
  992. // We do not remove the notification with RemoveFreeNotification().
  993. // It would be unsafe if more requests are sent with the same owner.
  994. // That is fine - it will be removed automatically when the owner is destroyed.
  995. end;
  996. CreateWaitForAllRequestsThread(aOnAllDone, aSynchronizeOnAllDone, aOwner, aTimeoutMS);
  997. end;
  998. { TFPHTTPClientAsyncPoolRequestThread }
  999. constructor TFPHTTPClientAsyncPoolRequestThread.Create(aPool: TFPCustomHTTPClientAsyncPool;
  1000. aRequest: TFPHTTPClientAbstractAsyncPoolRequest; aClient: TFPHTTPClient);
  1001. begin
  1002. fRequest := aRequest;
  1003. fResult := TFPHTTPClientPoolResult.Create(fRequest);
  1004. fClient := aClient;
  1005. if Assigned(aRequest.ResponseStream) then
  1006. begin
  1007. fResult.ResponseStream := aRequest.ResponseStream;
  1008. fResult.OwnsResponseStream := aRequest.OwnsResponseStream;
  1009. end else
  1010. begin
  1011. fResult.ResponseStream := TBytesStream.Create;
  1012. fResult.OwnsResponseStream := True;
  1013. end;
  1014. inherited Create(aPool);
  1015. end;
  1016. destructor TFPHTTPClientAsyncPoolRequestThread.Destroy;
  1017. begin
  1018. fResult.Free;
  1019. inherited Destroy;
  1020. end;
  1021. procedure TFPHTTPClientAsyncPoolRequestThread.OnDataReceived(Sender: TObject; const aContentLength, aCurrentPos: Int64);
  1022. begin
  1023. OnDataReceivedSend(Sender, pdDataReceived, aCurrentPos, aContentLength);
  1024. end;
  1025. procedure TFPHTTPClientAsyncPoolRequestThread.OnDataReceivedSend(Sender: TObject;
  1026. const aDirection: TFPHTTPClientPoolProgressDirection; const aCurrentPos, aContentLength: Int64);
  1027. var
  1028. xStop: Boolean;
  1029. begin
  1030. LockProperties;
  1031. try
  1032. xStop := False;
  1033. if Request.HasProgress then
  1034. ExecOnProgress(aDirection, aCurrentPos, aContentLength, xStop);
  1035. if xStop or Terminated then
  1036. (Sender as TFPCustomHTTPClient).Terminate;
  1037. finally
  1038. UnlockProperties;
  1039. end;
  1040. end;
  1041. procedure TFPHTTPClientAsyncPoolRequestThread.OnDataSent(Sender: TObject; const aContentLength, aCurrentPos: Int64);
  1042. begin
  1043. OnDataReceivedSend(Sender, pdDataSent, aContentLength, aCurrentPos);
  1044. end;
  1045. procedure TFPHTTPClientAsyncPoolRequestThread.OnIdle(Sender: TObject; AOperation: TSocketOperationType;
  1046. var AAbort: Boolean);
  1047. begin
  1048. if Terminated then
  1049. AAbort := True;
  1050. end;
  1051. procedure TFPHTTPClientAsyncPoolRequestThread.OwnerDestroyed;
  1052. begin
  1053. inherited;
  1054. fRequest.OwnerDestroyed;
  1055. end;
  1056. procedure TFPHTTPClientAsyncPoolRequestThread.DoOnInit;
  1057. begin
  1058. LockProperties;
  1059. try
  1060. Request.DoOnInit(fClient);
  1061. finally
  1062. UnlockProperties;
  1063. end;
  1064. end;
  1065. procedure TFPHTTPClientAsyncPoolRequestThread.DoOnProgress(const aDirection: TFPHTTPClientPoolProgressDirection;
  1066. const aCurrentPos, aContentLength: Integer; var ioStop: Boolean);
  1067. begin
  1068. LockProperties;
  1069. try
  1070. if Request.HasProgress then
  1071. Request.DoOnProgress(Self, aDirection, aCurrentPos, aContentLength, ioStop);
  1072. finally
  1073. UnlockProperties;
  1074. end;
  1075. end;
  1076. procedure TFPHTTPClientAsyncPoolRequestThread.ExecOnFinish;
  1077. begin
  1078. if Request.SynchronizeOnFinish then
  1079. Synchronize(@DoOnFinish)
  1080. else
  1081. DoOnFinish;
  1082. end;
  1083. procedure TFPHTTPClientAsyncPoolRequestThread.ExecOnInit;
  1084. begin
  1085. if Request.SynchronizeOnInit then
  1086. Synchronize(@DoOnInit)
  1087. else
  1088. DoOnInit;
  1089. end;
  1090. procedure TFPHTTPClientAsyncPoolRequestThread.ExecOnProgress(const aDirection: TFPHTTPClientPoolProgressDirection;
  1091. const aCurrentPos, aContentLength: Integer; var ioStop: Boolean);
  1092. begin
  1093. DoOnProgress(aDirection, aCurrentPos, aContentLength, ioStop);
  1094. end;
  1095. procedure TFPHTTPClientAsyncPoolRequestThread.Execute;
  1096. begin
  1097. // don't LockProperties here - Request.Headers/ContentType/URLData/Method/URL/ResponseStream/AllowedResponseCodes are read-only
  1098. try
  1099. try
  1100. fClient.ConnectTimeout := Request.ConnectTimeout;
  1101. fClient.IOTimeout := Request.IOTimeout;
  1102. fClient.OnIdle := @OnIdle;
  1103. fClient.RequestHeaders.Text := Request.Headers;
  1104. if Request.ContentType<>'' then
  1105. fClient.AddHeader(fClient.RequestHeaders, HeaderContentType, Request.ContentType);
  1106. if Length(Request.URLData)>0 then
  1107. fClient.RequestBody := TBytesStream.Create(Request.URLData);
  1108. ExecOnInit;
  1109. fClient.OnDataReceived := @OnDataReceived;
  1110. fClient.OnDataSent := @OnDataSent;
  1111. if Terminated then
  1112. begin
  1113. fResult.MethodResult := mrAbortedByClient;
  1114. Exit;
  1115. end;
  1116. try
  1117. fClient.HTTPMethod(Request.Method, Request.URL, fResult.ResponseStream, Request.AllowedResponseCodes);
  1118. finally
  1119. fClient.RequestBody.Free;
  1120. fClient.RequestBody := nil;
  1121. end;
  1122. fResult.ResponseStream.Position := 0;
  1123. if Terminated then
  1124. begin
  1125. fResult.MethodResult := mrAbortedByClient;
  1126. end else
  1127. begin
  1128. fResult.MethodResult := mrSuccess;
  1129. fResult.ResponseStatusCode := fClient.ResponseStatusCode;
  1130. fResult.ResponseStatusText := fClient.ResponseStatusText;
  1131. fResult.ResponseHeaders.Assign(fClient.ResponseHeaders);
  1132. end;
  1133. except
  1134. on E: TObject do
  1135. begin
  1136. if Terminated then // client terminated the connection -> it has priority above mrAbortedWithException
  1137. fResult.MethodResult := mrAbortedByClient
  1138. else
  1139. fResult.MethodResult := mrAbortedWithException;
  1140. fResult.ExceptionClass := E.ClassType;
  1141. if E is Exception then
  1142. fResult.ExceptionMessage := Exception(E).Message;
  1143. end;
  1144. end;
  1145. finally
  1146. try
  1147. Pool.ReleaseClient(Request.URL, fClient);
  1148. fClient := nil; // do not use fClient - it doesn't belong here anymore
  1149. ExecOnFinish;
  1150. except
  1151. end;
  1152. end;
  1153. end;
  1154. function TFPHTTPClientAsyncPoolRequestThread.GetOwner: TComponent;
  1155. begin
  1156. Result := fRequest.Owner;
  1157. end;
  1158. procedure TFPHTTPClientAsyncPoolRequestThread.DoOnFinish;
  1159. begin
  1160. LockProperties;
  1161. try
  1162. Request.DoOnFinish(fResult);
  1163. // always destroy fResult so that the Request's destructor is synchronised if DoOnFinish is synchronised
  1164. fResult.Free;
  1165. fResult := nil;
  1166. finally
  1167. UnlockProperties;
  1168. end;
  1169. end;
  1170. end.