IdIOHandlerChain.pas 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. {
  2. $Project$
  3. $Workfile$
  4. $Revision$
  5. $DateUTC$
  6. $Id$
  7. This file is part of the Indy (Internet Direct) project, and is offered
  8. under the dual-licensing agreement described on the Indy website.
  9. (http://www.indyproject.org/)
  10. Copyright:
  11. (c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
  12. }
  13. {
  14. $Log$
  15. }
  16. {
  17. Rev 1.6 9/16/2004 8:11:40 PM JPMugaas
  18. Should compile again.
  19. Rev 1.5 6/11/2004 8:39:58 AM DSiders
  20. Added "Do not Localize" comments.
  21. Rev 1.4 2004.05.06 1:47:26 PM czhower
  22. Now uses IndexOf
  23. Rev 1.3 2004.04.13 10:37:56 PM czhower
  24. Updates
  25. Rev 1.2 2004.03.07 11:46:08 AM czhower
  26. Flushbuffer fix + other minor ones found
  27. Rev 1.1 2004.02.09 9:16:44 PM czhower
  28. Updated to compile and match lib changes.
  29. Rev 1.0 2004.02.03 12:38:56 AM czhower
  30. Move
  31. Rev 1.6 2003.10.24 10:37:38 AM czhower
  32. IdStream
  33. Rev 1.5 2003.10.19 4:38:32 PM czhower
  34. Updates
  35. Rev 1.4 2003.10.19 2:50:40 PM czhower
  36. Fiber cleanup
  37. Rev 1.3 2003.10.14 11:17:02 PM czhower
  38. Updates to match core changes.
  39. Rev 1.2 2003.10.11 5:43:30 PM czhower
  40. Chained servers now functional.
  41. Rev 1.1 2003.09.19 10:09:40 PM czhower
  42. Next stage of fiber support in servers.
  43. Rev 1.0 8/16/2003 11:09:08 AM JPMugaas
  44. Moved from Indy Core dir as part of package reorg
  45. Rev 1.49 2003.07.17 4:42:06 PM czhower
  46. More IOCP improvements.
  47. Rev 1.45 2003.07.14 11:46:46 PM czhower
  48. IOCP now passes all bubbles.
  49. Rev 1.43 2003.07.14 1:10:52 AM czhower
  50. Now passes all bubble tests for chained stack.
  51. Rev 1.41 7/7/2003 1:34:06 PM BGooijen
  52. Added WriteFile(...)
  53. Rev 1.40 7/3/2003 2:03:52 PM BGooijen
  54. IOCP works server-side now
  55. Rev 1.39 2003.06.30 5:41:54 PM czhower
  56. -Fixed AV that occurred sometimes when sockets were closed with chains
  57. -Consolidated code that was marked by a todo for merging as it no longer
  58. needed to be separate
  59. -Removed some older code that was no longer necessary
  60. Passes bubble tests.
  61. Rev 1.38 6/29/2003 10:56:26 PM BGooijen
  62. Removed .Memory from the buffer, and added some extra methods
  63. Rev 1.37 2003.06.25 4:30:02 PM czhower
  64. Temp hack fix for AV problem. Working on real solution now.
  65. Rev 1.36 6/24/2003 11:17:44 PM BGooijen
  66. change in TIdIOHandlerChain.ReadLn, LTermPos= 0 is now handled differently
  67. Rev 1.35 23/6/2003 22:33:18 GGrieve
  68. fix CheckForDataOnSource - specify timeout
  69. Rev 1.34 6/22/2003 11:22:22 PM JPMugaas
  70. Should now compile.
  71. Rev 1.33 6/4/2003 1:08:40 AM BGooijen
  72. Added CheckForDataOnSource and removed some (duplicate) code
  73. Rev 1.32 6/3/2003 8:07:20 PM BGooijen
  74. Added TIdIOHandlerChain.AllData
  75. Rev 1.31 5/11/2003 2:37:58 PM BGooijen
  76. Bindings are updated now
  77. Rev 1.30 5/11/2003 12:00:08 PM BGooijen
  78. Rev 1.29 5/11/2003 12:03:16 AM BGooijen
  79. Rev 1.28 2003.05.09 10:59:24 PM czhower
  80. Rev 1.27 2003.04.22 9:48:50 PM czhower
  81. Rev 1.25 2003.04.17 11:01:14 PM czhower
  82. Rev 1.19 2003.04.10 10:51:04 PM czhower
  83. Rev 1.18 4/2/2003 3:39:26 PM BGooijen
  84. Added Intercepts
  85. Rev 1.17 3/29/2003 5:53:52 PM BGooijen
  86. added AfterAccept
  87. Rev 1.16 3/27/2003 2:57:58 PM BGooijen
  88. Added a RawWrite for streams, implemented WriteStream, changed
  89. WriteToDestination to use TIdWorkOpUnitWriteBuffer
  90. Rev 1.15 2003.03.26 12:20:28 AM czhower
  91. Moved visibility of execute to protected.
  92. Rev 1.14 3/25/2003 11:07:58 PM BGooijen
  93. ChainEngine descends now from TIdBaseComponent
  94. Rev 1.13 3/25/2003 01:33:48 AM JPMugaas
  95. Fixed compiler warnings.
  96. Rev 1.12 3/24/2003 11:03:50 PM BGooijen
  97. Various fixes to readln:
  98. - uses connection default now
  99. - doesn't raise an exception on timeout any more
  100. Rev 1.11 2003.03.13 1:22:58 PM czhower
  101. Typo fixed. lenth --> Length
  102. Rev 1.10 3/13/2003 10:18:20 AM BGooijen
  103. Server side fibers, bug fixes
  104. Rev 1.9 3/2/2003 12:36:22 AM BGooijen
  105. Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
  106. ReadBuffer doesn't use ReadStream any more.
  107. TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
  108. exceptions).
  109. woReadLn doesn't check the intire buffer any more, but continued where it
  110. stopped the last time.
  111. Added basic support for timeouts (probably only on read operations, and maybe
  112. connect), accuratie of timeout is currently 500msec.
  113. Rev 1.8 2/28/2003 10:15:16 PM BGooijen
  114. bugfix: changed some occurrences of FRecvBuffer to FInputBuffer
  115. Rev 1.7 2/27/2003 10:11:12 PM BGooijen
  116. Rev 1.6 2/26/2003 1:08:52 PM BGooijen
  117. Rev 1.5 2/25/2003 10:36:28 PM BGooijen
  118. Added more opcodes, methods, and moved opcodes to separate files.
  119. Rev 1.4 2003.02.25 9:02:32 PM czhower
  120. Hand off to Bas
  121. Rev 1.3 2003.02.25 1:36:04 AM czhower
  122. Rev 1.2 2002.12.11 11:00:58 AM czhower
  123. Rev 1.1 2002.12.07 12:26:06 AM czhower
  124. Rev 1.0 11/13/2002 08:45:00 AM JPMugaas
  125. }
  126. unit IdIOHandlerChain;
  127. interface
  128. uses
  129. Classes,
  130. IdBaseComponent, IdBuffer, IdGlobal, IdIOHandler, IdIOHandlerSocket,
  131. IdFiber, IdThreadSafe, IdWorkOpUnit, IdStackConsts, IdWinsock2, IdThread,
  132. IdFiberWeaver, IdStream, IdStreamVCL,
  133. Windows;
  134. type
  135. TIdConnectMode = (cmNonBlock, cmIOCP);
  136. TIdIOHandlerChain = class;
  137. TIdChainEngineThread = class;
  138. TIdChainEngine = class(TIdBaseComponent)
  139. protected
  140. FCompletionPort: THandle;
  141. FThread: TIdChainEngineThread;
  142. //
  143. procedure Execute;
  144. function GetInputBuffer(const AIOHandler: TIdIOHandler): TIdBuffer;
  145. procedure InitComponent; override;
  146. procedure SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  147. procedure Terminating;
  148. public
  149. procedure AddWork(AWorkOpUnit: TIdWorkOpUnit);
  150. procedure BeforeDestruction; override;
  151. destructor Destroy; override;
  152. procedure RemoveSocket(AIOHandler: TIdIOHandlerChain);
  153. procedure SocketAccepted(AIOHandler: TIdIOHandlerChain);
  154. end;
  155. TIdIOHandlerChain = class(TIdIOHandlerSocket)
  156. protected
  157. FChainEngine: TIdChainEngine;
  158. FConnectMode: TIdConnectMode;
  159. FFiber: TIdFiber;
  160. FFiberWeaver: TIdFiberWeaver;
  161. FOverlapped: PIdOverlapped;
  162. //
  163. procedure ConnectClient; override;
  164. procedure QueueAndWait(
  165. AWorkOpUnit: TIdWorkOpUnit;
  166. ATimeout: Integer = IdTimeoutDefault;
  167. AFreeWorkOpUnit: Boolean = True;
  168. AAllowGracefulException: Boolean = True
  169. );
  170. procedure WorkOpUnitCompleted(
  171. AWorkOpUnit: TIdWorkOpUnit
  172. );
  173. public
  174. procedure AfterAccept; override;
  175. function AllData: string; override;
  176. procedure CheckForDataOnSource(
  177. ATimeout : Integer = 0
  178. ); override;
  179. procedure CheckForDisconnect(
  180. ARaiseExceptionIfDisconnected: Boolean = True;
  181. AIgnoreBuffer: Boolean = False
  182. ); override;
  183. constructor Create(
  184. AOwner: TComponent;
  185. AChainEngine: TIdChainEngine;
  186. AFiberWeaver: TIdFiberWeaver;
  187. AFiber: TIdFiber
  188. ); reintroduce; virtual;
  189. destructor Destroy; override;
  190. procedure Open; override;
  191. function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean = True;
  192. ATimeout: Integer = IdTimeoutDefault;
  193. ARaiseExceptionOnTimeout: Boolean = True): Integer; override;
  194. procedure ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
  195. AReadUntilDisconnect: Boolean); override;
  196. // TODO: Allow ReadBuffer to by pass the internal buffer. Will it really
  197. // help? Only ReadBuffer would be able to use this optimiztion in most
  198. // cases and it is not used by many. Most calls are to stream (disk) based
  199. // or strings as ReadLn.
  200. procedure ReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True);
  201. override;
  202. function ReadLn(
  203. ATerminator: string = LF;
  204. ATimeout: Integer = IdTimeoutDefault;
  205. AMaxLineLength: Integer = -1
  206. ): string;
  207. override;
  208. // function WriteFile(
  209. // AFile: string;
  210. // AEnableTransferFile: Boolean
  211. // ): Cardinal; override;
  212. function WriteFile(
  213. const AFile: String;
  214. AEnableTransferFile: Boolean): Int64; override;
  215. { procedure Write(
  216. AStream: TIdStream;
  217. ASize: Integer = 0;
  218. AWriteByteCount: Boolean = False);
  219. override; }
  220. procedure Write(
  221. AStream: TIdStreamVCL;
  222. ASize: Int64 = 0;
  223. AWriteByteCount: Boolean = False
  224. ); override;
  225. procedure WriteDirect(
  226. ABuffer: TIdBytes
  227. ); override;
  228. //
  229. property ConnectMode: TIdConnectMode read FConnectMode write FConnectMode;
  230. property Overlapped: PIdOverlapped read FOverlapped;
  231. end;
  232. TIdChainEngineThread = class(TIdThread)
  233. protected
  234. FChainEngine: TIdChainEngine;
  235. public
  236. constructor Create(
  237. AOwner: TIdChainEngine;
  238. const AName: string
  239. ); reintroduce;
  240. procedure Run; override;
  241. property Terminated;
  242. end;
  243. implementation
  244. uses
  245. IdComponent, IdException, IdExceptionCore, IdStack, IdResourceStrings, IdWorkOpUnits,
  246. IdStackBSDBase, IdStackWindows,
  247. SysUtils;
  248. const
  249. GCompletionKeyTerminate = $F0F0F0F0;
  250. { TIdIOHandlerChain }
  251. procedure TIdIOHandlerChain.CheckForDataOnSource(ATimeout: Integer = 0);
  252. begin
  253. // TODO: Change this so we dont have to rely on an exception trap
  254. try
  255. QueueAndWait(TIdWorkOpUnitReadAvailable.Create, ATimeout, True, False);
  256. except
  257. on E: EIdReadTimeout do begin
  258. // Nothing
  259. end else begin
  260. raise;
  261. end;
  262. end;
  263. end;
  264. procedure TIdIOHandlerChain.ConnectClient;
  265. begin
  266. // TODO: Non blocking does not support Socks
  267. Binding.OverLapped := (ConnectMode = cmIOCP);
  268. inherited;
  269. case ConnectMode of
  270. cmNonBlock: begin
  271. //TODO: Non blocking DNS resolution too?
  272. Binding.SetPeer(GStack.ResolveHost(Host), Port);
  273. GBSDStack.SetBlocking(Binding.Handle, False);
  274. // Does not block
  275. Binding.Connect;
  276. end;
  277. cmIOCP: begin
  278. //TODO: For now we are doing blocking, just to get it to work. fix later
  279. // IOCP was not designed for connects, so we'll have to do some monkeying
  280. // maybe even create an engine thread just to watch for connect events.
  281. //TODO: Resolution too?
  282. Binding.SetPeer(GStack.ResolveHost(Host), Port);
  283. Binding.Connect;
  284. GBSDStack.SetBlocking(Binding.Handle, False);
  285. end;
  286. else begin
  287. raise EIdException.Create('Unrecognized ConnectMode'); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  288. end;
  289. end;
  290. QueueAndWait(TIdWorkOpUnitWaitConnected.Create);
  291. //Update the bindings
  292. Binding.UpdateBindingLocal;
  293. //TODO: Could Peer binding ever be other than what we specified above? Need to reread it?
  294. Binding.UpdateBindingPeer;
  295. end;
  296. procedure TIdIOHandlerChain.AfterAccept;
  297. begin
  298. FChainEngine.SocketAccepted(self);
  299. end;
  300. procedure TIdIOHandlerChain.Open;
  301. begin
  302. // Things before inherited, inherited actually connects and ConnectClient
  303. // needs these things
  304. inherited;
  305. end;
  306. procedure TIdIOHandlerChain.CheckForDisconnect(
  307. ARaiseExceptionIfDisconnected: Boolean; AIgnoreBuffer: Boolean);
  308. var
  309. LDisconnected: Boolean;
  310. begin
  311. // ClosedGracefully // Server disconnected
  312. // IOHandler = nil // Client disconnected
  313. if ClosedGracefully then begin
  314. if BindingAllocated then begin
  315. Close;
  316. // Call event handlers to inform the user program that we were disconnected
  317. // DoStatus(hsDisconnected);
  318. //DoOnDisconnected;
  319. end;
  320. LDisconnected := True;
  321. end else begin
  322. LDisconnected := not BindingAllocated;
  323. end;
  324. if LDisconnected then begin
  325. // Do not raise unless all data has been read by the user
  326. if Assigned(FInputBuffer) then begin
  327. if ((FInputBuffer.Size = 0) or AIgnoreBuffer)
  328. and ARaiseExceptionIfDisconnected then begin
  329. RaiseConnClosedGracefully;
  330. end;
  331. end;
  332. end;
  333. end;
  334. function TIdIOHandlerChain.ReadFromSource(
  335. ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer;
  336. ARaiseExceptionOnTimeout: Boolean): Integer;
  337. begin
  338. Result := 0;
  339. raise EIdException.Create('Fall through error in ' + ClassName); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  340. end;
  341. procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Int64;
  342. AReadUntilDisconnect: Boolean);
  343. begin
  344. if AReadUntilDisconnect then begin
  345. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
  346. , True, False);
  347. end else begin
  348. QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
  349. end;
  350. end;
  351. procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
  352. AByteCount: Integer; AAppend: Boolean = True);
  353. begin
  354. EIdException.IfFalse(AByteCount >= 0);
  355. if AByteCount > 0 then begin
  356. if FInputBuffer.Size < AByteCount then begin
  357. QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
  358. end;
  359. Assert(FInputBuffer.Size >= AByteCount);
  360. FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
  361. end;
  362. end;
  363. function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
  364. ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
  365. var
  366. LTermPos: Integer;
  367. begin
  368. if AMaxLineLength = -1 then begin
  369. AMaxLineLength := MaxLineLength;
  370. end;
  371. // User may pass '' if they need to pass arguments beyond the first.
  372. if ATerminator = '' then begin
  373. ATerminator := LF;
  374. end;
  375. FReadLnSplit := False;
  376. FReadLnTimedOut := False;
  377. try
  378. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  379. if (LTermPos = 0) and ((AMaxLineLength = 0)
  380. or (FInputBuffer.Size < AMaxLineLength)) then begin
  381. QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
  382. , ATimeout);
  383. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  384. end;
  385. // LTermPos cannot be 0, and the code below can't handle it properly
  386. Assert(LTermPos > 0);
  387. if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
  388. case FMaxLineAction of
  389. // TODO: find the right exception class here
  390. maException: raise EIdException.Create('MaxLineLength exceded'); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  391. maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
  392. end;
  393. end else begin
  394. Result := FInputBuffer.Extract(LTermPos - 1);
  395. if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
  396. Delete(Result, Length(Result), 1);
  397. end;
  398. FInputBuffer.Extract(Length(ATerminator));// remove the terminator
  399. end;
  400. except on E: EIdReadTimeout do
  401. FReadLnTimedOut := True;
  402. end;
  403. end;
  404. function TIdIOHandlerChain.AllData: string;
  405. var
  406. LStream: TStringStream;
  407. begin
  408. BeginWork(wmRead); try
  409. Result := '';
  410. LStream := TStringStream.Create(''); try
  411. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
  412. , True, False);
  413. Result := LStream.DataString;
  414. finally FreeAndNil(LStream); end;
  415. finally EndWork(wmRead); end;
  416. end;
  417. function TIdIOHandlerChain.WriteFile(
  418. const AFile: String;
  419. AEnableTransferFile: Boolean): Int64;
  420. var
  421. LWO:TIdWorkOpUnitWriteFile;
  422. begin
  423. //BGO: we ignore AEnableTransferFile for now
  424. Result := 0;
  425. // if not Assigned(Intercept) then begin
  426. LWO := TIdWorkOpUnitWriteFile.Create(AFile);
  427. try
  428. QueueAndWait(LWO,IdTimeoutDefault, false);
  429. finally
  430. // Result := LWO.BytesSent;
  431. FreeAndNil(LWO);
  432. end;
  433. // end else begin
  434. // inherited WriteFile(AFile, AEnableTransferFile);
  435. // end;
  436. end;
  437. procedure TIdIOHandlerChain.Write(
  438. AStream: TIdStreamVCL;
  439. ASize: Int64 = 0;
  440. AWriteByteCount: Boolean = False
  441. );
  442. var
  443. LStart: Integer;
  444. LThisSize: Integer;
  445. begin
  446. if ASize < 0 then begin //"-1" All form current position
  447. LStart := AStream.VCLStream.Seek(0, soFromCurrent);
  448. ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
  449. AStream.VCLStream.Seek(LStart, soFromBeginning);
  450. end else if ASize = 0 then begin //"0" ALL
  451. LStart := 0;
  452. ASize := AStream.VCLStream.Seek(0, soFromEnd);
  453. AStream.VCLStream.Seek(0, soFromBeginning);
  454. end else begin //else ">0" ACount bytes
  455. LStart := AStream.VCLStream.Seek(0, soFromCurrent);
  456. end;
  457. if AWriteByteCount then begin
  458. Write(ASize);
  459. end;
  460. // BeginWork(wmWrite, ASize);
  461. try
  462. while ASize > 0 do begin
  463. LThisSize := Min(128 * 1024, ASize); // 128K blocks
  464. QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
  465. , False));
  466. Dec(ASize, LThisSize);
  467. Inc(LStart, LThisSize);
  468. end;
  469. finally
  470. // EndWork(wmWrite);
  471. end;
  472. end;
  473. procedure TIdIOHandlerChain.WriteDirect(
  474. ABuffer: TIdBytes
  475. );
  476. begin
  477. QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
  478. end;
  479. procedure TIdIOHandlerChain.QueueAndWait(
  480. AWorkOpUnit: TIdWorkOpUnit;
  481. ATimeout: Integer = IdTimeoutDefault;
  482. AFreeWorkOpUnit: Boolean = True;
  483. AAllowGracefulException: Boolean = True
  484. );
  485. var
  486. LWorkOpUnit: TIdWorkOpUnit;
  487. begin
  488. try
  489. CheckForDisconnect(AAllowGracefulException);
  490. LWorkOpUnit := AWorkOpUnit;
  491. //
  492. if ATimeout = IdTimeoutInfinite then begin
  493. LWorkOpUnit.TimeOutAt := 0;
  494. end else begin
  495. if ATimeout = IdTimeoutDefault then begin
  496. if FReadTimeout <= 0 then begin
  497. LWorkOpUnit.TimeOutAt := 0;
  498. end else begin
  499. //we type cast FReadTimeOut as a cardinal to prevent the compiler from
  500. //expanding vars to an Int64 type. That can incur a performance penalty.
  501. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
  502. end
  503. end else begin
  504. //FReadTimeOut is typecase as a cardinal to prevent the compiler from
  505. //expanding vars to an Int64 type which can incur a performance penalty.
  506. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
  507. end
  508. end;
  509. //
  510. LWorkOpUnit.Fiber := FFiber;
  511. LWorkOpUnit.IOHandler := Self;
  512. LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
  513. LWorkOpUnit.SocketHandle := Binding.Handle;
  514. // Add to queue and wait to be rescheduled when work is completed
  515. FChainEngine.AddWork(LWorkOpUnit);
  516. // Check to see if we need to reraise an exception
  517. LWorkOpUnit.RaiseException;
  518. // Check for timeout
  519. if LWorkOpUnit.TimedOut then begin
  520. raise EIdReadTimeout.Create('Timed out'); {do not localize}
  521. end;
  522. // Check to see if it was closed during this operation
  523. CheckForDisconnect(AAllowGracefulException);
  524. finally
  525. if AFreeWorkOpUnit then begin
  526. AWorkOpUnit.Free;
  527. end;
  528. end;
  529. end;
  530. constructor TIdIOHandlerChain.Create(
  531. AOwner: TComponent;
  532. AChainEngine: TIdChainEngine;
  533. AFiberWeaver: TIdFiberWeaver;
  534. AFiber: TIdFiber
  535. );
  536. begin
  537. inherited Create(AOwner);
  538. //
  539. EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
  540. FChainEngine := AChainEngine;
  541. FChainEngine.SetIOHandlerOptions(Self);
  542. //
  543. EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
  544. FFiberWeaver := AFiberWeaver;
  545. //
  546. EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
  547. FFiber := AFiber;
  548. // Initialize Overlapped structure
  549. New(FOverlapped);
  550. ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
  551. New(FOverlapped.Buffer);
  552. end;
  553. procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
  554. begin
  555. FFiberWeaver.Add(AWorkOpUnit.Fiber);
  556. end;
  557. destructor TIdIOHandlerChain.Destroy;
  558. begin
  559. // Tell the chain engine that we are closing and to remove any references to
  560. // us and cease any usage.
  561. // Do not do this in close, it can cause deadlocks because the engine can
  562. // call close while in its Execute.
  563. FChainEngine.RemoveSocket(Self);
  564. Dispose(FOverlapped.Buffer);
  565. Dispose(FOverlapped);
  566. inherited;
  567. end;
  568. { TIdChainEngine }
  569. procedure TIdChainEngine.BeforeDestruction;
  570. begin
  571. if FThread <> nil then begin
  572. // Signal thread for termination
  573. FThread.Terminate;
  574. // Tell the engine we are attempting termination
  575. Terminating;
  576. // Wait for the thread to terminate
  577. FThread.WaitFor;
  578. // Free thread
  579. FreeAndNil(FThread);
  580. end;
  581. inherited;
  582. end;
  583. function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
  584. begin
  585. Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
  586. end;
  587. procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  588. begin
  589. AIOHandler.ConnectMode := cmIOCP;
  590. end;
  591. procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
  592. begin
  593. // Associate the socket with the completion port.
  594. if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
  595. = 0 then begin
  596. RaiseLastOSError;
  597. end;
  598. end;
  599. procedure TIdChainEngine.Terminating;
  600. begin
  601. if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
  602. , nil) then begin
  603. RaiseLastOSError;
  604. end;
  605. end;
  606. procedure TIdChainEngine.Execute;
  607. var
  608. LBytesTransferred: DWord;
  609. LCompletionKey: DWord;
  610. LOverlapped: PIdOverlapped;
  611. begin
  612. // Wait forever on the completion port. If we are terminating, a terminate
  613. // signal is sent into the queue.
  614. if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
  615. , LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
  616. if LCompletionKey <> GCompletionKeyTerminate then begin
  617. // Socket has been closed
  618. if LBytesTransferred = 0 then begin
  619. LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
  620. end;
  621. LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
  622. end;
  623. end;
  624. end;
  625. procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
  626. begin
  627. // raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket'); // TODO: add a resource string, and create a new Exception class for this
  628. end;
  629. procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
  630. begin
  631. if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
  632. // Associate the socket with the completion port.
  633. if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
  634. = 0 then begin
  635. RaiseLastOSError;
  636. end;
  637. AWorkOpUnit.Complete;
  638. end;
  639. AWorkOpUnit.Start;
  640. end;
  641. destructor TIdChainEngine.Destroy;
  642. begin
  643. if CloseHandle(FCompletionPort) = False then begin
  644. RaiseLastOSError;
  645. end;
  646. inherited;
  647. end;
  648. procedure TIdChainEngine.InitComponent;
  649. begin
  650. {
  651. var SysInfo: TSystemInfo;
  652. GetSystemInfo(SysInfo);
  653. SysInfo.dwNumberOfProcessors
  654. Use GetSystemInfo instead. It will return the all info on the local
  655. system's architecture and will also return a valid ActiveProcessorMask
  656. which is a DWORD to be read as a bit array of the processor on the
  657. system...
  658. CZH> And next
  659. CZH> question - any one know off hand how to set affinity? :)
  660. Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
  661. on wether you want to act on the whole process or just a single
  662. thread (SetThreadIdealProcessor is another way to do it: it just gives
  663. the scheduler a hint about where to run a thread without forcing it:
  664. good for keeping two threads doing IO one with each other on the same
  665. processor).
  666. }
  667. inherited;
  668. if not (csDesigning in ComponentState) then begin
  669. // Cant use .Name, its not initialized yet in Create
  670. FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
  671. end;
  672. //MS says destruction is automatic, but Google seems to say that this initial
  673. //one is not auto managed as MS says, and that CloseHandle should be called.
  674. FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  675. if FCompletionPort = 0 then begin
  676. RaiseLastOSError;
  677. end;
  678. end;
  679. { TIdChainEngineThread }
  680. constructor TIdChainEngineThread.Create(
  681. AOwner: TIdChainEngine;
  682. const AName: string
  683. );
  684. begin
  685. FChainEngine := AOwner;
  686. inherited Create(False, True, AName);
  687. end;
  688. (*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  689. var
  690. LPOverlapped: PIdOverlapped;
  691. LHFile:THandle;
  692. begin
  693. New(LPOverlapped);
  694. ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
  695. New(LPOverlapped^.Buffer);
  696. LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
  697. LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
  698. LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
  699. if LHFile=INVALID_HANDLE_VALUE then begin
  700. RaiseLastOSError;
  701. end;
  702. try
  703. if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
  704. AWorkOpUnit.Fiber.Relinquish;
  705. end else begin
  706. raise EIdException.Create('error in ServiceQueryTransmitFile'); // TODO: add a resource string, and create a new Exception class for this
  707. end;
  708. finally
  709. CloseHandle(LHFile);
  710. end;
  711. end;
  712. *)
  713. (*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  714. procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
  715. begin
  716. ADst.IOHandler := ASrc.IOHandler;
  717. ADst.Fiber := ASrc.Fiber;
  718. ADst.OnCompleted := ASrc.OnCompleted;
  719. ADst.SocketHandle:= ASrc.SocketHandle;
  720. end;
  721. var
  722. LStream:TfileStream;
  723. LWorkOpUnit : TIdWorkOpUnitWriteStream;
  724. LBuf:pointer;
  725. LBufLen:integer;
  726. begin
  727. Assert(False, 'to do');
  728. LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
  729. try
  730. LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
  731. try
  732. CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
  733. LBufLen:=Min(LStream.size,128*1024);
  734. getmem(LBuf,LBufLen);
  735. LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
  736. LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
  737. IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
  738. finally
  739. AWorkOpUnit.BytesSent := LStream.Size;
  740. LWorkOpUnit.free;
  741. end;
  742. finally
  743. LStream.free;
  744. end;
  745. end;
  746. *)
  747. procedure TIdChainEngineThread.Run;
  748. begin
  749. FChainEngine.Execute;
  750. end;
  751. end.