fpasync.pp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. {
  2. fpAsync: Asynchronous event management for Free Pascal
  3. Copyright (C) 2001-2005 by
  4. Areca Systems GmbH / Sebastian Guenther, [email protected]
  5. See the file COPYING.FPC, included in this distribution,
  6. for details about the copyright.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  10. }
  11. unit fpAsync;
  12. {$MODE objfpc}
  13. {$H+}
  14. interface
  15. uses SysUtils, Classes, libasync;
  16. type
  17. TNotifyEvent = procedure(Sender: TObject) of object;
  18. EAsyncError = class(Exception)
  19. private
  20. FErrorCode: TAsyncResult;
  21. public
  22. constructor Create(AErrorCode: TAsyncResult);
  23. property ErrorCode: TAsyncResult read FErrorCode;
  24. end;
  25. TEventLoop = class
  26. private
  27. FData: TAsyncData;
  28. FFirstNotifyData: Pointer;
  29. function GetIsRunning: Boolean;
  30. procedure SetIsRunning(AIsRunning: Boolean);
  31. protected
  32. procedure CheckResult(AResultCode: TAsyncResult);
  33. public
  34. constructor Create;
  35. destructor Destroy; override;
  36. function Handle: TAsyncHandle;
  37. // Main loop control
  38. procedure Run;
  39. procedure Break;
  40. // Timer support
  41. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  42. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  43. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  44. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  45. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  46. procedure RemoveTimerNotify(AHandle: Pointer);
  47. // I/O notification support (for files, sockets etc.)
  48. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  49. AUserData: Pointer);
  50. procedure ClearIOCallback(AHandle: Integer);
  51. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  52. ASender: TObject): Pointer;
  53. procedure ClearIONotify(AHandle: Pointer);
  54. procedure SetDataAvailableCallback(AHandle: Integer;
  55. ACallback: TAsyncCallback; AUserData: Pointer);
  56. procedure ClearDataAvailableCallback(AHandle: Integer);
  57. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  58. ASender: TObject): Pointer;
  59. procedure ClearDataAvailableNotify(AHandle: Pointer);
  60. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  61. AUserData: Pointer);
  62. procedure ClearCanWriteCallback(AHandle: Integer);
  63. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  64. ASender: TObject): Pointer;
  65. procedure ClearCanWriteNotify(AHandle: Pointer);
  66. class function TimerTicks: Int64;
  67. // Properties
  68. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  69. end;
  70. // -------------------------------------------------------------------
  71. // Asynchronous line reader
  72. // -------------------------------------------------------------------
  73. TLineNotify = procedure(const ALine: String) of object;
  74. TGenericLineReader = class
  75. protected
  76. RealBuffer, FBuffer: PChar;
  77. FBytesInBuffer: Integer;
  78. FOnLine: TLineNotify;
  79. InCallback, DoStopAndFree: Boolean;
  80. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  81. procedure NoData; virtual; abstract;
  82. public
  83. destructor Destroy; override;
  84. procedure Run; // Process as many lines as possible
  85. property Buffer: PChar read FBuffer;
  86. property BytesInBuffer: Integer read FBytesInBuffer;
  87. property OnLine: TLineNotify read FOnLine write FOnLine;
  88. end;
  89. TAsyncStreamLineReader = class(TGenericLineReader)
  90. protected
  91. FEventLoop: TEventLoop;
  92. FDataStream: TStream;
  93. FBlockingStream: THandleStream;
  94. FOnEOF: TNotifyEvent;
  95. NotifyHandle: Pointer;
  96. function Read(var ABuffer; count: Integer): Integer; override;
  97. procedure NoData; override;
  98. procedure StreamDataAvailable(UserData: TObject);
  99. public
  100. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  101. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  102. ABlockingStream: THandleStream);
  103. destructor Destroy; override;
  104. procedure StopAndFree; // Destroy instance after run
  105. property EventLoop: TEventLoop read FEventLoop;
  106. property DataStream: TStream read FDataStream;
  107. property BlockingStream: THandleStream read FBlockingStream;
  108. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  109. end;
  110. // -------------------------------------------------------------------
  111. // Asynchronous write buffers
  112. // -------------------------------------------------------------------
  113. TWriteBuffer = class(TStream)
  114. protected
  115. FBuffer: PChar;
  116. FBytesInBuffer: Integer;
  117. FBufferSent: Boolean;
  118. FOnBufferEmpty: TNotifyEvent;
  119. FOnBufferSent: TNotifyEvent;
  120. InCallback: Boolean;
  121. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  122. function Write(const ABuffer; Count: LongInt): LongInt; override;
  123. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  124. procedure WritingFailed; virtual; abstract;
  125. procedure WantWrite; virtual; abstract;
  126. procedure BufferEmpty; virtual;
  127. public
  128. EndOfLineMarker: String;
  129. constructor Create;
  130. destructor Destroy; override;
  131. procedure WriteLine(const line: String);
  132. procedure Run; // Write as many data as possible
  133. property BytesInBuffer: Integer read FBytesInBuffer;
  134. property BufferSent: Boolean read FBufferSent;
  135. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  136. property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent;
  137. end;
  138. TAsyncWriteStream = class(TWriteBuffer)
  139. protected
  140. FEventLoop: TEventLoop;
  141. FDataStream: TStream;
  142. FBlockingStream: THandleStream;
  143. NotifyHandle: Pointer;
  144. DoStopAndFree: Boolean;
  145. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  146. procedure WritingFailed; override;
  147. procedure WantWrite; override;
  148. procedure CanWrite(UserData: TObject);
  149. public
  150. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  151. constructor Create(AEventLoop: TEventLoop;
  152. ADataStream: TStream; ABlockingStream: THandleStream);
  153. destructor Destroy; override;
  154. procedure StopAndFree; // Destroy instance after run
  155. property EventLoop: TEventLoop read FEventLoop;
  156. property DataStream: TStream read FDataStream;
  157. property BlockingStream: THandleStream read FBlockingStream;
  158. end;
  159. var
  160. { All data written to a TWriteBuffer or descendant class will be written to
  161. this stream as well: }
  162. fpAsyncWriteBufferDebugStream: TStream;
  163. implementation
  164. type
  165. PNotifyData = ^TNotifyData;
  166. TNotifyData = record
  167. Next: PNotifyData;
  168. Notify: TNotifyEvent;
  169. Sender: TObject;
  170. case Boolean of
  171. False: (TimerHandle: TAsyncTimer);
  172. True: (FileHandle: LongInt);
  173. end;
  174. procedure EventHandler(Data: Pointer); cdecl;
  175. begin
  176. with PNotifyData(Data)^ do
  177. Notify(Sender);
  178. end;
  179. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  180. begin
  181. New(Result);
  182. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  183. Obj.FFirstNotifyData := Result;
  184. end;
  185. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  186. var
  187. CurData, PrevData, NextData: PNotifyData;
  188. begin
  189. PrevData := nil;
  190. CurData := Obj.FFirstNotifyData;
  191. while Assigned(CurData) do
  192. begin
  193. NextData := CurData^.Next;
  194. if CurData = Data then
  195. if Assigned(PrevData) then
  196. PrevData^.Next := NextData
  197. else
  198. Obj.FFirstNotifyData := NextData;
  199. PrevData := CurData;
  200. CurData := NextData;
  201. end;
  202. Dispose(Data);
  203. end;
  204. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  205. begin
  206. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  207. FErrorCode := AErrorCode;
  208. end;
  209. constructor TEventLoop.Create;
  210. begin
  211. asyncInit(Handle);
  212. end;
  213. destructor TEventLoop.Destroy;
  214. var
  215. NotifyData, NextNotifyData: PNotifyData;
  216. begin
  217. asyncFree(Handle);
  218. NotifyData := FFirstNotifyData;
  219. while Assigned(NotifyData) do
  220. begin
  221. NextNotifyData := NotifyData^.Next;
  222. Dispose(NotifyData);
  223. NotifyData := NextNotifyData;
  224. end;
  225. end;
  226. function TEventLoop.Handle: TAsyncHandle;
  227. begin
  228. Result := TAsyncHandle(Self);
  229. end;
  230. procedure TEventLoop.Run;
  231. begin
  232. asyncRun(Handle);
  233. end;
  234. procedure TEventLoop.Break;
  235. begin
  236. asyncBreak(Handle);
  237. end;
  238. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  239. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  240. begin
  241. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  242. end;
  243. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  244. begin
  245. asyncRemoveTimer(Handle, ATimer);
  246. end;
  247. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  248. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  249. var
  250. UserData: PNotifyData;
  251. begin
  252. UserData := AddNotifyData(Self);
  253. UserData^.Notify := ANotify;
  254. UserData^.Sender := ASender;
  255. UserData^.TimerHandle :=
  256. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  257. Result := UserData;
  258. end;
  259. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  260. var
  261. Data: PNotifyData;
  262. begin
  263. Data := PNotifyData(AHandle);
  264. asyncRemoveTimer(Handle, Data^.TimerHandle);
  265. FreeNotifyData(Self, Data);
  266. end;
  267. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  268. AUserData: Pointer);
  269. begin
  270. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  271. end;
  272. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  273. begin
  274. asyncClearIOCallback(Handle, AHandle);
  275. end;
  276. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  277. ASender: TObject): Pointer;
  278. var
  279. UserData: PNotifyData;
  280. ResultCode: TAsyncResult;
  281. begin
  282. UserData := AddNotifyData(Self);
  283. UserData^.Notify := ANotify;
  284. UserData^.Sender := ASender;
  285. UserData^.FileHandle := AHandle;
  286. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  287. if ResultCode <> asyncOK then
  288. begin
  289. FreeNotifyData(Self, UserData);
  290. raise EAsyncError.Create(ResultCode);
  291. end else
  292. Result := UserData;
  293. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  294. end;
  295. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  296. var
  297. Data: PNotifyData;
  298. begin
  299. Data := PNotifyData(AHandle);
  300. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  301. asyncClearIOCallback(Handle, Data^.FileHandle);
  302. FreeNotifyData(Self, Data);
  303. end;
  304. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  305. AUserData: Pointer);
  306. begin
  307. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  308. ACallback, AUserData));
  309. end;
  310. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  311. begin
  312. asyncClearDataAvailableCallback(Handle, AHandle);
  313. end;
  314. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  315. ASender: TObject): Pointer;
  316. var
  317. UserData: PNotifyData;
  318. ResultCode: TAsyncResult;
  319. begin
  320. UserData := AddNotifyData(Self);
  321. UserData^.Notify := ANotify;
  322. UserData^.Sender := ASender;
  323. UserData^.FileHandle := AHandle;
  324. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  325. @EventHandler, UserData);
  326. if ResultCode <> asyncOK then
  327. begin
  328. FreeNotifyData(Self, UserData);
  329. raise EAsyncError.Create(ResultCode);
  330. end else
  331. Result := UserData;
  332. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  333. end;
  334. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  335. var
  336. Data: PNotifyData;
  337. begin
  338. Data := PNotifyData(AHandle);
  339. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  340. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  341. FreeNotifyData(Self, Data);
  342. end;
  343. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  344. AUserData: Pointer);
  345. begin
  346. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  347. end;
  348. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  349. begin
  350. asyncClearCanWriteCallback(Handle, AHandle);
  351. end;
  352. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  353. ASender: TObject): Pointer;
  354. var
  355. UserData: PNotifyData;
  356. ResultCode: TAsyncResult;
  357. begin
  358. UserData := AddNotifyData(Self);
  359. UserData^.Notify := ANotify;
  360. UserData^.Sender := ASender;
  361. UserData^.FileHandle := AHandle;
  362. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  363. @EventHandler, UserData);
  364. if ResultCode <> asyncOK then
  365. begin
  366. FreeNotifyData(Self, UserData);
  367. raise EAsyncError.Create(ResultCode);
  368. end else
  369. Result := UserData;
  370. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  371. end;
  372. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  373. var
  374. Data: PNotifyData;
  375. begin
  376. Data := PNotifyData(AHandle);
  377. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  378. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  379. FreeNotifyData(Self, Data);
  380. end;
  381. class function TEventLoop.TimerTicks: Int64;
  382. begin
  383. Result := asyncGetTicks;
  384. end;
  385. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  386. begin
  387. if AResultCode <> asyncOK then
  388. raise EAsyncError.Create(AResultCode);
  389. end;
  390. function TEventLoop.GetIsRunning: Boolean;
  391. begin
  392. Result := asyncIsRunning(Handle);
  393. end;
  394. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  395. begin
  396. if IsRunning then
  397. begin
  398. if not AIsRunning then
  399. Run;
  400. end else
  401. if AIsRunning then
  402. Break;
  403. end;
  404. // -------------------------------------------------------------------
  405. // TGenericLineReader
  406. // -------------------------------------------------------------------
  407. destructor TGenericLineReader.Destroy;
  408. begin
  409. if Assigned(RealBuffer) then
  410. begin
  411. FreeMem(RealBuffer);
  412. RealBuffer := nil;
  413. end;
  414. inherited Destroy;
  415. end;
  416. procedure TGenericLineReader.Run;
  417. var
  418. NewData: array[0..1023] of Byte;
  419. p: PChar;
  420. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  421. line: String;
  422. FirstRun: Boolean;
  423. begin
  424. FirstRun := True;
  425. while True do
  426. begin
  427. BytesRead := Read(NewData, SizeOf(NewData));
  428. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  429. if BytesRead <= 0 then
  430. begin
  431. if FirstRun then
  432. NoData;
  433. break;
  434. end;
  435. FirstRun := False;
  436. OldBufSize := FBytesInBuffer;
  437. // Append the new received data to the read buffer
  438. Inc(FBytesInBuffer, BytesRead);
  439. ReallocMem(RealBuffer, FBytesInBuffer);
  440. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  441. {Process all potential lines in the current buffer. Attention: FBuffer and
  442. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  443. within the FOnLine handler!}
  444. LastEndOfLine := 0;
  445. if OldBufSize > 0 then
  446. i := OldBufSize - 1
  447. else
  448. i := 0;
  449. CurBytesInBuffer := FBytesInBuffer;
  450. while i <= CurBytesInBuffer - 2 do
  451. begin
  452. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  453. begin
  454. LineLength := i - LastEndOfLine;
  455. SetLength(line, LineLength);
  456. if LineLength > 0 then
  457. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  458. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  459. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  460. Inc(i);
  461. LastEndOfLine := i + 1;
  462. if Assigned(FOnLine) then
  463. begin
  464. FBuffer := RealBuffer + LastEndOfLine;
  465. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  466. InCallback := True;
  467. try
  468. FOnLine(line);
  469. finally
  470. InCallback := False;
  471. end;
  472. // Check if <this> has been destroyed by FOnLine:
  473. if DoStopAndFree then
  474. exit;
  475. end;
  476. end;
  477. Inc(i);
  478. end;
  479. FBytesInBuffer := CurBytesInBuffer;
  480. if LastEndOfLine > 0 then
  481. begin
  482. // Remove all processed lines from the buffer
  483. Dec(FBytesInBuffer, LastEndOfLine);
  484. GetMem(p, FBytesInBuffer);
  485. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  486. if Assigned(RealBuffer) then
  487. FreeMem(RealBuffer);
  488. RealBuffer := p;
  489. end;
  490. FBuffer := RealBuffer;
  491. end;
  492. end;
  493. // -------------------------------------------------------------------
  494. // TAsyncStreamLineReader
  495. // -------------------------------------------------------------------
  496. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  497. AStream: THandleStream);
  498. begin
  499. Self.Create(AEventLoop, AStream, AStream);
  500. end;
  501. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  502. ADataStream: TStream; ABlockingStream: THandleStream);
  503. begin
  504. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  505. inherited Create;
  506. FEventLoop := AEventLoop;
  507. FDataStream := ADataStream;
  508. FBlockingStream := ABlockingStream;
  509. NotifyHandle := EventLoop.SetDataAvailableNotify(
  510. FBlockingStream.Handle, @StreamDataAvailable, nil);
  511. end;
  512. destructor TAsyncStreamLineReader.Destroy;
  513. begin
  514. inherited Destroy;
  515. end;
  516. procedure TAsyncStreamLineReader.StopAndFree;
  517. begin
  518. if InCallback then
  519. begin
  520. if Assigned(NotifyHandle) then
  521. begin
  522. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  523. NotifyHandle := nil;
  524. end;
  525. DoStopAndFree := True;
  526. end else
  527. Self.Free;
  528. end;
  529. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  530. begin
  531. Result := FDataStream.Read(ABuffer, count);
  532. end;
  533. procedure TAsyncStreamLineReader.NoData;
  534. var
  535. s: String;
  536. begin
  537. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  538. begin
  539. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  540. begin
  541. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  542. Dec(FBytesInBuffer);
  543. SetLength(s, FBytesInBuffer);
  544. Move(FBuffer^, s[1], FBytesInBuffer);
  545. FOnLine(s);
  546. end;
  547. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  548. NotifyHandle := nil;
  549. if Assigned(FOnEOF) then
  550. begin
  551. InCallback := True;
  552. try
  553. FOnEOF(Self);
  554. finally
  555. InCallback := False;
  556. end;
  557. end;
  558. end;
  559. end;
  560. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  561. begin
  562. Run;
  563. if DoStopAndFree then
  564. Free;
  565. end;
  566. // -------------------------------------------------------------------
  567. // TWriteBuffer
  568. // -------------------------------------------------------------------
  569. procedure TWriteBuffer.BufferEmpty;
  570. begin
  571. if Assigned(FOnBufferEmpty) then
  572. begin
  573. InCallback := True;
  574. FOnBufferEmpty(Self);
  575. InCallback := False;
  576. end;
  577. end;
  578. constructor TWriteBuffer.Create;
  579. begin
  580. inherited Create;
  581. FBuffer := nil;
  582. FBytesInBuffer := 0;
  583. EndOfLineMarker := #10;
  584. end;
  585. destructor TWriteBuffer.Destroy;
  586. begin
  587. if Assigned(FBuffer) then
  588. FreeMem(FBuffer);
  589. inherited Destroy;
  590. end;
  591. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  592. begin
  593. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  594. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  595. Result := FBytesInBuffer
  596. else
  597. // !!!: No i18n for this string - solve this problem in the FCL?!?
  598. raise EStreamError.Create('Invalid stream operation');
  599. end;
  600. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  601. begin
  602. if Count > 0 then
  603. begin
  604. FBufferSent := False;
  605. ReallocMem(FBuffer, FBytesInBuffer + Count);
  606. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  607. Inc(FBytesInBuffer, Count);
  608. if Assigned(fpAsyncWriteBufferDebugStream) then
  609. fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
  610. WantWrite;
  611. end;
  612. Result := Count;
  613. end;
  614. procedure TWriteBuffer.WriteLine(const line: String);
  615. var
  616. s: String;
  617. begin
  618. s := line + EndOfLineMarker;
  619. WriteBuffer(s[1], Length(s));
  620. end;
  621. procedure TWriteBuffer.Run;
  622. var
  623. Written: Integer;
  624. NewBuf: PChar;
  625. Failed: Boolean;
  626. begin
  627. Failed := True;
  628. repeat
  629. if FBytesInBuffer = 0 then
  630. begin
  631. BufferEmpty;
  632. if FBufferSent then
  633. exit;
  634. WantWrite;
  635. exit;
  636. end;
  637. Written := DoRealWrite(FBuffer[0], FBytesInBuffer);
  638. if Written > 0 then
  639. begin
  640. Failed := False;
  641. Dec(FBytesInBuffer, Written);
  642. GetMem(NewBuf, FBytesInBuffer);
  643. Move(FBuffer[Written], NewBuf[0], FBytesInBuffer);
  644. FreeMem(FBuffer);
  645. FBuffer := NewBuf;
  646. end;
  647. until Written <= 0;
  648. if Failed then
  649. WritingFailed;
  650. end;
  651. // -------------------------------------------------------------------
  652. // TAsyncWriteStream
  653. // -------------------------------------------------------------------
  654. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  655. begin
  656. Result := FDataStream.Write(ABuffer, count);
  657. end;
  658. procedure TAsyncWriteStream.WritingFailed;
  659. begin
  660. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  661. begin
  662. EventLoop.ClearCanWriteNotify(NotifyHandle);
  663. NotifyHandle := nil;
  664. end;
  665. end;
  666. procedure TAsyncWriteStream.WantWrite;
  667. begin
  668. if not Assigned(NotifyHandle) then
  669. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  670. @CanWrite, nil);
  671. end;
  672. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  673. begin
  674. if FBytesInBuffer = 0 then
  675. begin
  676. if Assigned(NotifyHandle) then
  677. begin
  678. EventLoop.ClearCanWriteNotify(NotifyHandle);
  679. NotifyHandle := nil;
  680. end;
  681. FBufferSent := True;
  682. if Assigned(FOnBufferSent) then
  683. begin
  684. InCallback := True;
  685. try
  686. FOnBufferSent(Self);
  687. finally
  688. InCallback := False;
  689. end;
  690. end;
  691. end else
  692. Run;
  693. if DoStopAndFree then
  694. Free;
  695. end;
  696. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  697. AStream: THandleStream);
  698. begin
  699. Self.Create(AEventLoop, AStream, AStream);
  700. end;
  701. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  702. ADataStream: TStream; ABlockingStream: THandleStream);
  703. begin
  704. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  705. inherited Create;
  706. FEventLoop := AEventLoop;
  707. FDataStream := ADataStream;
  708. FBlockingStream := ABlockingStream;
  709. end;
  710. destructor TAsyncWriteStream.Destroy;
  711. begin
  712. if Assigned(NotifyHandle) then
  713. EventLoop.ClearCanWriteNotify(NotifyHandle);
  714. inherited Destroy;
  715. end;
  716. procedure TAsyncWriteStream.StopAndFree;
  717. begin
  718. if InCallback then
  719. begin
  720. if Assigned(NotifyHandle) then
  721. begin
  722. EventLoop.ClearCanWriteNotify(NotifyHandle);
  723. NotifyHandle := nil;
  724. end;
  725. DoStopAndFree := True;
  726. end else
  727. Self.Free;
  728. end;
  729. end.