IdIOHandlerChain.pas 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. {
  2. $Project$
  3. $Workfile$
  4. $Revision$
  5. $DateUTC$
  6. $Id$
  7. This file is part of the Indy (Internet Direct) project, and is offered
  8. under the dual-licensing agreement described on the Indy website.
  9. (http://www.indyproject.org/)
  10. Copyright:
  11. (c) 1993-2005, Chad Z. Hower and the Indy Pit Crew. All rights reserved.
  12. }
  13. {
  14. $Log$
  15. }
  16. {
  17. Rev 1.6 9/16/2004 8:11:40 PM JPMugaas
  18. Should compile again.
  19. Rev 1.5 6/11/2004 8:39:58 AM DSiders
  20. Added "Do not Localize" comments.
  21. Rev 1.4 2004.05.06 1:47:26 PM czhower
  22. Now uses IndexOf
  23. Rev 1.3 2004.04.13 10:37:56 PM czhower
  24. Updates
  25. Rev 1.2 2004.03.07 11:46:08 AM czhower
  26. Flushbuffer fix + other minor ones found
  27. Rev 1.1 2004.02.09 9:16:44 PM czhower
  28. Updated to compile and match lib changes.
  29. Rev 1.0 2004.02.03 12:38:56 AM czhower
  30. Move
  31. Rev 1.6 2003.10.24 10:37:38 AM czhower
  32. IdStream
  33. Rev 1.5 2003.10.19 4:38:32 PM czhower
  34. Updates
  35. Rev 1.4 2003.10.19 2:50:40 PM czhower
  36. Fiber cleanup
  37. Rev 1.3 2003.10.14 11:17:02 PM czhower
  38. Updates to match core changes.
  39. Rev 1.2 2003.10.11 5:43:30 PM czhower
  40. Chained servers now functional.
  41. Rev 1.1 2003.09.19 10:09:40 PM czhower
  42. Next stage of fiber support in servers.
  43. Rev 1.0 8/16/2003 11:09:08 AM JPMugaas
  44. Moved from Indy Core dir as part of package reorg
  45. Rev 1.49 2003.07.17 4:42:06 PM czhower
  46. More IOCP improvements.
  47. Rev 1.45 2003.07.14 11:46:46 PM czhower
  48. IOCP now passes all bubbles.
  49. Rev 1.43 2003.07.14 1:10:52 AM czhower
  50. Now passes all bubble tests for chained stack.
  51. Rev 1.41 7/7/2003 1:34:06 PM BGooijen
  52. Added WriteFile(...)
  53. Rev 1.40 7/3/2003 2:03:52 PM BGooijen
  54. IOCP works server-side now
  55. Rev 1.39 2003.06.30 5:41:54 PM czhower
  56. -Fixed AV that occurred sometimes when sockets were closed with chains
  57. -Consolidated code that was marked by a todo for merging as it no longer
  58. needed to be separate
  59. -Removed some older code that was no longer necessary
  60. Passes bubble tests.
  61. Rev 1.38 6/29/2003 10:56:26 PM BGooijen
  62. Removed .Memory from the buffer, and added some extra methods
  63. Rev 1.37 2003.06.25 4:30:02 PM czhower
  64. Temp hack fix for AV problem. Working on real solution now.
  65. Rev 1.36 6/24/2003 11:17:44 PM BGooijen
  66. change in TIdIOHandlerChain.ReadLn, LTermPos= 0 is now handled differently
  67. Rev 1.35 23/6/2003 22:33:18 GGrieve
  68. fix CheckForDataOnSource - specify timeout
  69. Rev 1.34 6/22/2003 11:22:22 PM JPMugaas
  70. Should now compile.
  71. Rev 1.33 6/4/2003 1:08:40 AM BGooijen
  72. Added CheckForDataOnSource and removed some (duplicate) code
  73. Rev 1.32 6/3/2003 8:07:20 PM BGooijen
  74. Added TIdIOHandlerChain.AllData
  75. Rev 1.31 5/11/2003 2:37:58 PM BGooijen
  76. Bindings are updated now
  77. Rev 1.30 5/11/2003 12:00:08 PM BGooijen
  78. Rev 1.29 5/11/2003 12:03:16 AM BGooijen
  79. Rev 1.28 2003.05.09 10:59:24 PM czhower
  80. Rev 1.27 2003.04.22 9:48:50 PM czhower
  81. Rev 1.25 2003.04.17 11:01:14 PM czhower
  82. Rev 1.19 2003.04.10 10:51:04 PM czhower
  83. Rev 1.18 4/2/2003 3:39:26 PM BGooijen
  84. Added Intercepts
  85. Rev 1.17 3/29/2003 5:53:52 PM BGooijen
  86. added AfterAccept
  87. Rev 1.16 3/27/2003 2:57:58 PM BGooijen
  88. Added a RawWrite for streams, implemented WriteStream, changed
  89. WriteToDestination to use TIdWorkOpUnitWriteBuffer
  90. Rev 1.15 2003.03.26 12:20:28 AM czhower
  91. Moved visibility of execute to protected.
  92. Rev 1.14 3/25/2003 11:07:58 PM BGooijen
  93. ChainEngine descends now from TIdBaseComponent
  94. Rev 1.13 3/25/2003 01:33:48 AM JPMugaas
  95. Fixed compiler warnings.
  96. Rev 1.12 3/24/2003 11:03:50 PM BGooijen
  97. Various fixes to readln:
  98. - uses connection default now
  99. - doesn't raise an exception on timeout any more
  100. Rev 1.11 2003.03.13 1:22:58 PM czhower
  101. Typo fixed. lenth --> Length
  102. Rev 1.10 3/13/2003 10:18:20 AM BGooijen
  103. Server side fibers, bug fixes
  104. Rev 1.9 3/2/2003 12:36:22 AM BGooijen
  105. Added woReadBuffer and TIdWorkOpUnitReadBuffer to read a buffer. Now
  106. ReadBuffer doesn't use ReadStream any more.
  107. TIdIOHandlerChain.ReadLn now supports MaxLineLength (splitting, and
  108. exceptions).
  109. woReadLn doesn't check the intire buffer any more, but continued where it
  110. stopped the last time.
  111. Added basic support for timeouts (probably only on read operations, and maybe
  112. connect), accuratie of timeout is currently 500msec.
  113. Rev 1.8 2/28/2003 10:15:16 PM BGooijen
  114. bugfix: changed some occurrences of FRecvBuffer to FInputBuffer
  115. Rev 1.7 2/27/2003 10:11:12 PM BGooijen
  116. Rev 1.6 2/26/2003 1:08:52 PM BGooijen
  117. Rev 1.5 2/25/2003 10:36:28 PM BGooijen
  118. Added more opcodes, methods, and moved opcodes to separate files.
  119. Rev 1.4 2003.02.25 9:02:32 PM czhower
  120. Hand off to Bas
  121. Rev 1.3 2003.02.25 1:36:04 AM czhower
  122. Rev 1.2 2002.12.11 11:00:58 AM czhower
  123. Rev 1.1 2002.12.07 12:26:06 AM czhower
  124. Rev 1.0 11/13/2002 08:45:00 AM JPMugaas
  125. }
  126. unit IdIOHandlerChain;
  127. interface
  128. uses
  129. Classes,
  130. IdBaseComponent, IdBuffer, IdGlobal, IdIOHandler, IdIOHandlerSocket,
  131. IdFiber, IdThreadSafe, IdWorkOpUnit, IdStackConsts, IdWinsock2, IdThread,
  132. IdFiberWeaver,
  133. Windows;
  134. type
  135. TIdConnectMode = (cmNonBlock, cmIOCP);
  136. TIdIOHandlerChain = class;
  137. TIdChainEngineThread = class;
  138. TIdChainEngine = class(TIdBaseComponent)
  139. protected
  140. FCompletionPort: THandle;
  141. FThread: TIdChainEngineThread;
  142. //
  143. procedure Execute;
  144. function GetInputBuffer(const AIOHandler: TIdIOHandler): TIdBuffer;
  145. procedure SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  146. procedure Terminating;
  147. public
  148. constructor Create(AOwner: TComponent); override;
  149. destructor Destroy; override;
  150. procedure AddWork(AWorkOpUnit: TIdWorkOpUnit);
  151. procedure BeforeDestruction; override;
  152. procedure RemoveSocket(AIOHandler: TIdIOHandlerChain);
  153. procedure SocketAccepted(AIOHandler: TIdIOHandlerChain);
  154. end;
  155. TIdIOHandlerChain = class(TIdIOHandlerSocket)
  156. protected
  157. FChainEngine: TIdChainEngine;
  158. FConnectMode: TIdConnectMode;
  159. FFiber: TIdFiber;
  160. FFiberWeaver: TIdFiberWeaver;
  161. FOverlapped: PIdOverlapped;
  162. //
  163. procedure ConnectClient; override;
  164. procedure QueueAndWait(
  165. AWorkOpUnit: TIdWorkOpUnit;
  166. ATimeout: Integer = IdTimeoutDefault;
  167. AFreeWorkOpUnit: Boolean = True;
  168. AAllowGracefulException: Boolean = True
  169. );
  170. procedure WorkOpUnitCompleted(
  171. AWorkOpUnit: TIdWorkOpUnit
  172. );
  173. public
  174. procedure AfterAccept; override;
  175. function AllData: string; override;
  176. procedure CheckForDataOnSource(
  177. ATimeout : Integer = 0
  178. ); override;
  179. procedure CheckForDisconnect(
  180. ARaiseExceptionIfDisconnected: Boolean = True;
  181. AIgnoreBuffer: Boolean = False
  182. ); override;
  183. constructor Create(
  184. AOwner: TComponent;
  185. AChainEngine: TIdChainEngine;
  186. AFiberWeaver: TIdFiberWeaver;
  187. AFiber: TIdFiber
  188. ); reintroduce; virtual;
  189. destructor Destroy; override;
  190. procedure Open; override;
  191. function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean = True;
  192. ATimeout: Integer = IdTimeoutDefault;
  193. ARaiseExceptionOnTimeout: Boolean = True): Integer; override;
  194. procedure ReadStream(AStream: TStream; AByteCount: Int64;
  195. AReadUntilDisconnect: Boolean); override;
  196. // TODO: Allow ReadBuffer to by pass the internal buffer. Will it really
  197. // help? Only ReadBuffer would be able to use this optimiztion in most
  198. // cases and it is not used by many. Most calls are to stream (disk) based
  199. // or strings as ReadLn.
  200. procedure ReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; AAppend: Boolean = True);
  201. override;
  202. function ReadLn(
  203. ATerminator: string = LF;
  204. ATimeout: Integer = IdTimeoutDefault;
  205. AMaxLineLength: Integer = -1
  206. ): string;
  207. override;
  208. // function WriteFile(
  209. // AFile: string;
  210. // AEnableTransferFile: Boolean
  211. // ): Cardinal; override;
  212. function WriteFile(
  213. const AFile: String;
  214. AEnableTransferFile: Boolean): Int64; override;
  215. { procedure Write(
  216. AStream: TStream;
  217. ASize: Integer = 0;
  218. AWriteByteCount: Boolean = False);
  219. override; }
  220. procedure Write(
  221. AStream: TStream;
  222. ASize: Int64 = 0;
  223. AWriteByteCount: Boolean = False
  224. ); override;
  225. procedure WriteDirect(
  226. ABuffer: TIdBytes
  227. ); override;
  228. //
  229. property ConnectMode: TIdConnectMode read FConnectMode write FConnectMode;
  230. property Overlapped: PIdOverlapped read FOverlapped;
  231. end;
  232. TIdChainEngineThread = class(TIdThread)
  233. protected
  234. FChainEngine: TIdChainEngine;
  235. public
  236. constructor Create(
  237. AOwner: TIdChainEngine;
  238. const AName: string
  239. ); reintroduce;
  240. procedure Run; override;
  241. property Terminated;
  242. end;
  243. implementation
  244. uses
  245. IdComponent, IdException, IdExceptionCore, IdStack, IdResourceStrings, IdWorkOpUnits,
  246. IdStackWindows,
  247. SysUtils;
  248. const
  249. GCompletionKeyTerminate = $F0F0F0F0;
  250. { TIdIOHandlerChain }
  251. procedure TIdIOHandlerChain.CheckForDataOnSource(ATimeout: Integer = 0);
  252. begin
  253. // TODO: Change this so we dont have to rely on an exception trap
  254. try
  255. QueueAndWait(TIdWorkOpUnitReadAvailable.Create, ATimeout, True, False);
  256. except
  257. on E: EIdReadTimeout do begin
  258. // Nothing
  259. end else begin
  260. raise;
  261. end;
  262. end;
  263. end;
  264. procedure TIdIOHandlerChain.ConnectClient;
  265. begin
  266. // TODO: Non blocking does not support Socks
  267. Binding.OverLapped := (ConnectMode = cmIOCP);
  268. inherited;
  269. case ConnectMode of
  270. cmNonBlock: begin
  271. //TODO: Non blocking DNS resolution too?
  272. Binding.SetPeer(GWindowsStack.ResolveHost(Host), Port);
  273. GWindowsStack.SetBlocking(Binding.Handle, False);
  274. // Does not block
  275. Binding.Connect;
  276. end;
  277. cmIOCP: begin
  278. //TODO: For now we are doing blocking, just to get it to work. fix later
  279. // IOCP was not designed for connects, so we'll have to do some monkeying
  280. // maybe even create an engine thread just to watch for connect events.
  281. //TODO: Resolution too?
  282. Binding.SetPeer(GStack.ResolveHost(Host), Port);
  283. Binding.Connect;
  284. GWindowsStack.SetBlocking(Binding.Handle, False);
  285. end;
  286. else begin
  287. raise EIdException.Create('Unrecognized ConnectMode'); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  288. end;
  289. end;
  290. QueueAndWait(TIdWorkOpUnitWaitConnected.Create);
  291. //Update the bindings
  292. Binding.UpdateBindingLocal;
  293. //TODO: Could Peer binding ever be other than what we specified above? Need to reread it?
  294. Binding.UpdateBindingPeer;
  295. end;
  296. procedure TIdIOHandlerChain.AfterAccept;
  297. begin
  298. FChainEngine.SocketAccepted(self);
  299. end;
  300. procedure TIdIOHandlerChain.Open;
  301. begin
  302. // Things before inherited, inherited actually connects and ConnectClient
  303. // needs these things
  304. inherited;
  305. end;
  306. procedure TIdIOHandlerChain.CheckForDisconnect(
  307. ARaiseExceptionIfDisconnected: Boolean; AIgnoreBuffer: Boolean);
  308. var
  309. LDisconnected: Boolean;
  310. begin
  311. // ClosedGracefully // Server disconnected
  312. // IOHandler = nil // Client disconnected
  313. if ClosedGracefully then begin
  314. if BindingAllocated then begin
  315. Close;
  316. // Call event handlers to inform the user program that we were disconnected
  317. // DoStatus(hsDisconnected);
  318. //DoOnDisconnected;
  319. end;
  320. LDisconnected := True;
  321. end else begin
  322. LDisconnected := not BindingAllocated;
  323. end;
  324. if LDisconnected then begin
  325. // Do not raise unless all data has been read by the user
  326. if Assigned(FInputBuffer) then begin
  327. if ((FInputBuffer.Size = 0) or AIgnoreBuffer)
  328. and ARaiseExceptionIfDisconnected then begin
  329. RaiseConnClosedGracefully;
  330. end;
  331. end;
  332. end;
  333. end;
  334. function TIdIOHandlerChain.ReadFromSource(
  335. ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer;
  336. ARaiseExceptionOnTimeout: Boolean): Integer;
  337. begin
  338. Result := 0;
  339. raise EIdException.CreateFmt('Fall through error in %s.ReadFromSource', [ClassName]); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  340. end;
  341. procedure TIdIOHandlerChain.ReadStream(AStream: TStream; AByteCount: Int64;
  342. AReadUntilDisconnect: Boolean);
  343. begin
  344. if AReadUntilDisconnect then begin
  345. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream), -1
  346. , True, False);
  347. end else begin
  348. QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream, AByteCount));
  349. end;
  350. end;
  351. procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
  352. AByteCount: Integer; AAppend: Boolean = True);
  353. begin
  354. EIdException.IfFalse(AByteCount >= 0);
  355. if AByteCount > 0 then begin
  356. if FInputBuffer.Size < AByteCount then begin
  357. QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
  358. end;
  359. Assert(FInputBuffer.Size >= AByteCount);
  360. FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
  361. end;
  362. end;
  363. function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
  364. ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
  365. var
  366. LTermPos: Integer;
  367. begin
  368. if AMaxLineLength = -1 then begin
  369. AMaxLineLength := MaxLineLength;
  370. end;
  371. // User may pass '' if they need to pass arguments beyond the first.
  372. if ATerminator = '' then begin
  373. ATerminator := LF;
  374. end;
  375. FReadLnSplit := False;
  376. FReadLnTimedOut := False;
  377. try
  378. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  379. if (LTermPos = 0) and ((AMaxLineLength = 0)
  380. or (FInputBuffer.Size < AMaxLineLength)) then begin
  381. QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
  382. , ATimeout);
  383. LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
  384. end;
  385. // LTermPos cannot be 0, and the code below can't handle it properly
  386. Assert(LTermPos > 0);
  387. if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
  388. case FMaxLineAction of
  389. // TODO: find the right exception class here
  390. maException: raise EIdException.Create('MaxLineLength exceded'); {do not localize} // TODO: add a resource string, and create a new Exception class for this
  391. maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
  392. end;
  393. end else begin
  394. Result := FInputBuffer.Extract(LTermPos - 1);
  395. if (ATerminator = LF) and TextEndsWith(Result, CR) then begin
  396. SetLength(Result, Length(Result)-1);
  397. end;
  398. FInputBuffer.Extract(Length(ATerminator));// remove the terminator
  399. end;
  400. except on E: EIdReadTimeout do
  401. FReadLnTimedOut := True;
  402. end;
  403. end;
  404. function TIdIOHandlerChain.AllData: string;
  405. var
  406. LStream: TStringStream;
  407. begin
  408. BeginWork(wmRead);
  409. try
  410. Result := '';
  411. LStream := TStringStream.Create('');
  412. try
  413. QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1, True, False);
  414. Result := LStream.DataString;
  415. finally
  416. LStream.Free;
  417. end;
  418. finally
  419. EndWork(wmRead);
  420. end;
  421. end;
  422. function TIdIOHandlerChain.WriteFile(
  423. const AFile: String;
  424. AEnableTransferFile: Boolean): Int64;
  425. var
  426. LWO:TIdWorkOpUnitWriteFile;
  427. begin
  428. //BGO: we ignore AEnableTransferFile for now
  429. Result := 0;
  430. // if not Assigned(Intercept) then begin
  431. LWO := TIdWorkOpUnitWriteFile.Create(AFile);
  432. try
  433. QueueAndWait(LWO,IdTimeoutDefault, false);
  434. finally
  435. // Result := LWO.BytesSent;
  436. LWO.Free;
  437. end;
  438. // end else begin
  439. // inherited WriteFile(AFile, AEnableTransferFile);
  440. // end;
  441. end;
  442. procedure TIdIOHandlerChain.Write(
  443. AStream: TStream;
  444. ASize: Int64 = 0;
  445. AWriteByteCount: Boolean = False
  446. );
  447. var
  448. LStart: Integer;
  449. LThisSize: Integer;
  450. begin
  451. if ASize < 0 then begin //"-1" All form current position
  452. LStart := AStream.Seek(0, soFromCurrent);
  453. ASize := AStream.Seek(0, soFromEnd) - LStart;
  454. AStream.Seek(LStart, soFromBeginning);
  455. end else if ASize = 0 then begin //"0" ALL
  456. LStart := 0;
  457. ASize := AStream.Seek(0, soFromEnd);
  458. AStream.Seek(0, soFromBeginning);
  459. end else begin //else ">0" ACount bytes
  460. LStart := AStream.Seek(0, soFromCurrent);
  461. end;
  462. if AWriteByteCount then begin
  463. Write(ASize);
  464. end;
  465. // BeginWork(wmWrite, ASize);
  466. try
  467. while ASize > 0 do begin
  468. LThisSize := Min(128 * 1024, ASize); // 128K blocks
  469. QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream, LStart, LThisSize
  470. , False));
  471. Dec(ASize, LThisSize);
  472. Inc(LStart, LThisSize);
  473. end;
  474. finally
  475. // EndWork(wmWrite);
  476. end;
  477. end;
  478. procedure TIdIOHandlerChain.WriteDirect(
  479. ABuffer: TIdBytes
  480. );
  481. begin
  482. QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(PByte(ABuffer), Length(ABuffer), False));
  483. end;
  484. procedure TIdIOHandlerChain.QueueAndWait(
  485. AWorkOpUnit: TIdWorkOpUnit;
  486. ATimeout: Integer = IdTimeoutDefault;
  487. AFreeWorkOpUnit: Boolean = True;
  488. AAllowGracefulException: Boolean = True
  489. );
  490. var
  491. LWorkOpUnit: TIdWorkOpUnit;
  492. begin
  493. try
  494. CheckForDisconnect(AAllowGracefulException);
  495. LWorkOpUnit := AWorkOpUnit;
  496. //
  497. if ATimeout = IdTimeoutInfinite then begin
  498. LWorkOpUnit.TimeOutAt := 0;
  499. end else begin
  500. if ATimeout = IdTimeoutDefault then begin
  501. if FReadTimeout <= 0 then begin
  502. LWorkOpUnit.TimeOutAt := 0;
  503. end else begin
  504. //we type cast FReadTimeOut as a cardinal to prevent the compiler from
  505. //expanding vars to an Int64 type. That can incur a performance penalty.
  506. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
  507. end
  508. end else begin
  509. //FReadTimeOut is typecase as a cardinal to prevent the compiler from
  510. //expanding vars to an Int64 type which can incur a performance penalty.
  511. LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
  512. end
  513. end;
  514. //
  515. LWorkOpUnit.Fiber := FFiber;
  516. LWorkOpUnit.IOHandler := Self;
  517. LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
  518. LWorkOpUnit.SocketHandle := Binding.Handle;
  519. // Add to queue and wait to be rescheduled when work is completed
  520. FChainEngine.AddWork(LWorkOpUnit);
  521. // Check to see if we need to reraise an exception
  522. LWorkOpUnit.RaiseException;
  523. // Check for timeout
  524. if LWorkOpUnit.TimedOut then begin
  525. raise EIdReadTimeout.Create('Timed out'); {do not localize}
  526. end;
  527. // Check to see if it was closed during this operation
  528. CheckForDisconnect(AAllowGracefulException);
  529. finally
  530. if AFreeWorkOpUnit then begin
  531. AWorkOpUnit.Free;
  532. end;
  533. end;
  534. end;
  535. constructor TIdIOHandlerChain.Create(
  536. AOwner: TComponent;
  537. AChainEngine: TIdChainEngine;
  538. AFiberWeaver: TIdFiberWeaver;
  539. AFiber: TIdFiber
  540. );
  541. begin
  542. inherited Create(AOwner);
  543. //
  544. EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
  545. FChainEngine := AChainEngine;
  546. FChainEngine.SetIOHandlerOptions(Self);
  547. //
  548. EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
  549. FFiberWeaver := AFiberWeaver;
  550. //
  551. EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
  552. FFiber := AFiber;
  553. // Initialize Overlapped structure
  554. New(FOverlapped);
  555. ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
  556. New(FOverlapped.Buffer);
  557. end;
  558. procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
  559. begin
  560. FFiberWeaver.Add(AWorkOpUnit.Fiber);
  561. end;
  562. destructor TIdIOHandlerChain.Destroy;
  563. begin
  564. // Tell the chain engine that we are closing and to remove any references to
  565. // us and cease any usage.
  566. // Do not do this in close, it can cause deadlocks because the engine can
  567. // call close while in its Execute.
  568. FChainEngine.RemoveSocket(Self);
  569. Dispose(FOverlapped.Buffer);
  570. Dispose(FOverlapped);
  571. inherited;
  572. end;
  573. { TIdChainEngine }
  574. constructor TIdChainEngine.Create(AOwner: TComponent);
  575. begin
  576. {
  577. var
  578. SysInfo: TSystemInfo;
  579. GetSystemInfo(SysInfo);
  580. SysInfo.dwNumberOfProcessors
  581. Use GetSystemInfo instead. It will return the all info on the local
  582. system's architecture and will also return a valid ActiveProcessorMask
  583. which is a DWORD to be read as a bit array of the processor on the
  584. system...
  585. CZH> And next
  586. CZH> question - any one know off hand how to set affinity? :)
  587. Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
  588. on wether you want to act on the whole process or just a single
  589. thread (SetThreadIdealProcessor is another way to do it: it just gives
  590. the scheduler a hint about where to run a thread without forcing it:
  591. good for keeping two threads doing IO one with each other on the same
  592. processor).
  593. }
  594. inherited Create(AOwner);
  595. if not IsDesignTime then begin
  596. // Cant use .Name, its not initialized yet in Create
  597. FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
  598. end;
  599. //MS says destruction is automatic, but Google seems to say that this initial
  600. //one is not auto managed as MS says, and that CloseHandle should be called.
  601. FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  602. if FCompletionPort = 0 then begin
  603. RaiseLastOSError;
  604. end;
  605. end;
  606. destructor TIdChainEngine.Destroy;
  607. begin
  608. if CloseHandle(FCompletionPort) = False then begin
  609. RaiseLastOSError;
  610. end;
  611. inherited;
  612. end;
  613. procedure TIdChainEngine.BeforeDestruction;
  614. begin
  615. if FThread <> nil then begin
  616. // Signal thread for termination
  617. FThread.Terminate;
  618. // Tell the engine we are attempting termination
  619. Terminating;
  620. // Wait for the thread to terminate
  621. FThread.WaitFor;
  622. // Free thread
  623. FreeAndNil(FThread);
  624. end;
  625. inherited;
  626. end;
  627. function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
  628. begin
  629. Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
  630. end;
  631. procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
  632. begin
  633. AIOHandler.ConnectMode := cmIOCP;
  634. end;
  635. procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
  636. begin
  637. // Associate the socket with the completion port.
  638. if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
  639. = 0 then begin
  640. RaiseLastOSError;
  641. end;
  642. end;
  643. procedure TIdChainEngine.Terminating;
  644. begin
  645. if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
  646. , nil) then begin
  647. RaiseLastOSError;
  648. end;
  649. end;
  650. procedure TIdChainEngine.Execute;
  651. var
  652. LBytesTransferred: DWord;
  653. LCompletionKey: DWord;
  654. LOverlapped: PIdOverlapped;
  655. begin
  656. // Wait forever on the completion port. If we are terminating, a terminate
  657. // signal is sent into the queue.
  658. if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
  659. , LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
  660. if LCompletionKey <> GCompletionKeyTerminate then begin
  661. // Socket has been closed
  662. if LBytesTransferred = 0 then begin
  663. LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
  664. end;
  665. LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
  666. end;
  667. end;
  668. end;
  669. procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
  670. begin
  671. // raise EIdException.CreateFmt('Fall through error in %s.RemoveSocket', [ClassName]); // TODO: add a resource string, and create a new Exception class for this
  672. end;
  673. procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
  674. begin
  675. if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
  676. // Associate the socket with the completion port.
  677. if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
  678. = 0 then begin
  679. RaiseLastOSError;
  680. end;
  681. AWorkOpUnit.Complete;
  682. end;
  683. AWorkOpUnit.Start;
  684. end;
  685. { TIdChainEngineThread }
  686. constructor TIdChainEngineThread.Create(
  687. AOwner: TIdChainEngine;
  688. const AName: string
  689. );
  690. begin
  691. FChainEngine := AOwner;
  692. inherited Create(False, True, AName);
  693. end;
  694. (*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  695. var
  696. LPOverlapped: PIdOverlapped;
  697. LHFile:THandle;
  698. begin
  699. New(LPOverlapped);
  700. ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
  701. New(LPOverlapped^.Buffer);
  702. LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
  703. LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
  704. LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
  705. if LHFile=INVALID_HANDLE_VALUE then begin
  706. RaiseLastOSError;
  707. end;
  708. try
  709. if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
  710. AWorkOpUnit.Fiber.Relinquish;
  711. end else begin
  712. raise EIdException.Create('error in ServiceQueryTransmitFile'); // TODO: add a resource string, and create a new Exception class for this
  713. end;
  714. finally
  715. CloseHandle(LHFile);
  716. end;
  717. end;
  718. *)
  719. (*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
  720. procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
  721. begin
  722. ADst.IOHandler := ASrc.IOHandler;
  723. ADst.Fiber := ASrc.Fiber;
  724. ADst.OnCompleted := ASrc.OnCompleted;
  725. ADst.SocketHandle:= ASrc.SocketHandle;
  726. end;
  727. var
  728. LStream:TfileStream;
  729. LWorkOpUnit : TIdWorkOpUnitWriteStream;
  730. LBuf:pointer;
  731. LBufLen:integer;
  732. begin
  733. Assert(False, 'to do');
  734. LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
  735. try
  736. LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
  737. try
  738. CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
  739. LBufLen:=Min(LStream.size,128*1024);
  740. getmem(LBuf,LBufLen);
  741. LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
  742. LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
  743. IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
  744. finally
  745. AWorkOpUnit.BytesSent := LStream.Size;
  746. LWorkOpUnit.free;
  747. end;
  748. finally
  749. LStream.free;
  750. end;
  751. end;
  752. *)
  753. procedure TIdChainEngineThread.Run;
  754. begin
  755. FChainEngine.Execute;
  756. end;
  757. end.