fpasync.pp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2003 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. See the file COPYING.FPC, included in this distribution,
  7. for details about the copyright.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  11. }
  12. unit fpAsync;
  13. {$MODE objfpc}
  14. {$H+}
  15. interface
  16. uses SysUtils, Classes, libasync;
  17. type
  18. TNotifyEvent = procedure(Sender: TObject) of object;
  19. EAsyncError = class(Exception)
  20. private
  21. FErrorCode: TAsyncResult;
  22. public
  23. constructor Create(AErrorCode: TAsyncResult);
  24. property ErrorCode: TAsyncResult read FErrorCode;
  25. end;
  26. TEventLoop = class
  27. private
  28. FData: TAsyncData;
  29. FFirstNotifyData: Pointer;
  30. function GetIsRunning: Boolean;
  31. procedure SetIsRunning(AIsRunning: Boolean);
  32. protected
  33. procedure CheckResult(AResultCode: TAsyncResult);
  34. public
  35. constructor Create;
  36. destructor Destroy; override;
  37. function Handle: TAsyncHandle;
  38. // Main loop control
  39. procedure Run;
  40. procedure Break;
  41. // Timer support
  42. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  43. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  44. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  45. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  46. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  47. procedure RemoveTimerNotify(AHandle: Pointer);
  48. // I/O notification support (for files, sockets etc.)
  49. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  50. AUserData: Pointer);
  51. procedure ClearIOCallback(AHandle: Integer);
  52. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  53. ASender: TObject): Pointer;
  54. procedure ClearIONotify(AHandle: Pointer);
  55. procedure SetDataAvailableCallback(AHandle: Integer;
  56. ACallback: TAsyncCallback; AUserData: Pointer);
  57. procedure ClearDataAvailableCallback(AHandle: Integer);
  58. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  59. ASender: TObject): Pointer;
  60. procedure ClearDataAvailableNotify(AHandle: Pointer);
  61. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  62. AUserData: Pointer);
  63. procedure ClearCanWriteCallback(AHandle: Integer);
  64. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  65. ASender: TObject): Pointer;
  66. procedure ClearCanWriteNotify(AHandle: Pointer);
  67. class function TimerTicks: Int64;
  68. // Properties
  69. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  70. end;
  71. // -------------------------------------------------------------------
  72. // Asynchronous line reader
  73. // -------------------------------------------------------------------
  74. TLineNotify = procedure(const ALine: String) of object;
  75. TGenericLineReader = class
  76. protected
  77. RealBuffer, FBuffer: PChar;
  78. FBytesInBuffer: Integer;
  79. FOnLine: TLineNotify;
  80. DoStopAndFree: Boolean;
  81. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  82. procedure NoData; virtual; abstract;
  83. public
  84. destructor Destroy; override;
  85. procedure Run; // Process as many lines as possible
  86. property Buffer: PChar read FBuffer;
  87. property BytesInBuffer: Integer read FBytesInBuffer;
  88. property OnLine: TLineNotify read FOnLine write FOnLine;
  89. end;
  90. TAsyncStreamLineReader = class(TGenericLineReader)
  91. protected
  92. FEventLoop: TEventLoop;
  93. FDataStream: TStream;
  94. FBlockingStream: THandleStream;
  95. FOnEOF: TNotifyEvent;
  96. NotifyHandle: Pointer;
  97. function Read(var ABuffer; count: Integer): Integer; override;
  98. procedure NoData; override;
  99. procedure StreamDataAvailable(UserData: TObject);
  100. public
  101. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  102. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  103. ABlockingStream: THandleStream);
  104. destructor Destroy; override;
  105. procedure StopAndFree; // Destroy instance after run
  106. property EventLoop: TEventLoop read FEventLoop;
  107. property DataStream: TStream read FDataStream;
  108. property BlockingStream: THandleStream read FBlockingStream;
  109. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  110. end;
  111. // -------------------------------------------------------------------
  112. // Asynchronous write buffers
  113. // -------------------------------------------------------------------
  114. TWriteBuffer = class(TStream)
  115. protected
  116. FBuffer: PChar;
  117. FBytesInBuffer: Integer;
  118. FBufferSent: Boolean;
  119. FOnBufferEmpty: TNotifyEvent;
  120. FOnBufferSent: TNotifyEvent;
  121. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  122. function Write(const ABuffer; Count: LongInt): LongInt; override;
  123. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  124. procedure WritingFailed; virtual; abstract;
  125. procedure WantWrite; virtual; abstract;
  126. procedure BufferEmpty; virtual;
  127. public
  128. EndOfLineMarker: String;
  129. constructor Create;
  130. destructor Destroy; override;
  131. procedure WriteLine(const line: String);
  132. procedure Run; // Write as many data as possible
  133. property BytesInBuffer: Integer read FBytesInBuffer;
  134. property BufferSent: Boolean read FBufferSent;
  135. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  136. property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent;
  137. end;
  138. TAsyncWriteStream = class(TWriteBuffer)
  139. protected
  140. FEventLoop: TEventLoop;
  141. FDataStream: TStream;
  142. FBlockingStream: THandleStream;
  143. NotifyHandle: Pointer;
  144. DoStopAndFree: Boolean;
  145. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  146. procedure WritingFailed; override;
  147. procedure WantWrite; override;
  148. procedure BufferEmpty; override;
  149. procedure CanWrite(UserData: TObject);
  150. public
  151. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  152. constructor Create(AEventLoop: TEventLoop;
  153. ADataStream: TStream; ABlockingStream: THandleStream);
  154. destructor Destroy; override;
  155. procedure StopAndFree; // Destroy instance after run
  156. property EventLoop: TEventLoop read FEventLoop;
  157. property DataStream: TStream read FDataStream;
  158. property BlockingStream: THandleStream read FBlockingStream;
  159. end;
  160. var
  161. { All data written to a TWriteBuffer or descendant class will be written to
  162. this stream as well: }
  163. fpAsyncWriteBufferDebugStream: TStream;
  164. implementation
  165. type
  166. PNotifyData = ^TNotifyData;
  167. TNotifyData = record
  168. Next: PNotifyData;
  169. Notify: TNotifyEvent;
  170. Sender: TObject;
  171. case Boolean of
  172. False: (TimerHandle: TAsyncTimer);
  173. True: (FileHandle: LongInt);
  174. end;
  175. procedure EventHandler(Data: Pointer); cdecl;
  176. begin
  177. with PNotifyData(Data)^ do
  178. Notify(Sender);
  179. end;
  180. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  181. begin
  182. New(Result);
  183. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  184. Obj.FFirstNotifyData := Result;
  185. end;
  186. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  187. var
  188. CurData, PrevData, NextData: PNotifyData;
  189. begin
  190. PrevData := nil;
  191. CurData := Obj.FFirstNotifyData;
  192. while Assigned(CurData) do
  193. begin
  194. NextData := CurData^.Next;
  195. if CurData = Data then
  196. if Assigned(PrevData) then
  197. PrevData^.Next := NextData
  198. else
  199. Obj.FFirstNotifyData := NextData;
  200. PrevData := CurData;
  201. CurData := NextData;
  202. end;
  203. Dispose(Data);
  204. end;
  205. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  206. begin
  207. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  208. FErrorCode := AErrorCode;
  209. end;
  210. constructor TEventLoop.Create;
  211. begin
  212. asyncInit(Handle);
  213. end;
  214. destructor TEventLoop.Destroy;
  215. var
  216. NotifyData, NextNotifyData: PNotifyData;
  217. begin
  218. asyncFree(Handle);
  219. NotifyData := FFirstNotifyData;
  220. while Assigned(NotifyData) do
  221. begin
  222. NextNotifyData := NotifyData^.Next;
  223. Dispose(NotifyData);
  224. NotifyData := NextNotifyData;
  225. end;
  226. end;
  227. function TEventLoop.Handle: TAsyncHandle;
  228. begin
  229. Result := TAsyncHandle(Self);
  230. end;
  231. procedure TEventLoop.Run;
  232. begin
  233. asyncRun(Handle);
  234. end;
  235. procedure TEventLoop.Break;
  236. begin
  237. asyncBreak(Handle);
  238. end;
  239. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  240. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  241. begin
  242. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  243. end;
  244. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  245. begin
  246. asyncRemoveTimer(Handle, ATimer);
  247. end;
  248. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  249. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  250. var
  251. UserData: PNotifyData;
  252. begin
  253. UserData := AddNotifyData(Self);
  254. UserData^.Notify := ANotify;
  255. UserData^.Sender := ASender;
  256. UserData^.TimerHandle :=
  257. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  258. Result := UserData;
  259. end;
  260. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  261. var
  262. Data: PNotifyData;
  263. begin
  264. Data := PNotifyData(AHandle);
  265. asyncRemoveTimer(Handle, Data^.TimerHandle);
  266. FreeNotifyData(Self, Data);
  267. end;
  268. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  269. AUserData: Pointer);
  270. begin
  271. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  272. end;
  273. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  274. begin
  275. asyncClearIOCallback(Handle, AHandle);
  276. end;
  277. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  278. ASender: TObject): Pointer;
  279. var
  280. UserData: PNotifyData;
  281. ResultCode: TAsyncResult;
  282. begin
  283. UserData := AddNotifyData(Self);
  284. UserData^.Notify := ANotify;
  285. UserData^.Sender := ASender;
  286. UserData^.FileHandle := AHandle;
  287. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  288. if ResultCode <> asyncOK then
  289. begin
  290. FreeNotifyData(Self, UserData);
  291. raise EAsyncError.Create(ResultCode);
  292. end else
  293. Result := UserData;
  294. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  295. end;
  296. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  297. var
  298. Data: PNotifyData;
  299. begin
  300. Data := PNotifyData(AHandle);
  301. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  302. asyncClearIOCallback(Handle, Data^.FileHandle);
  303. FreeNotifyData(Self, Data);
  304. end;
  305. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  306. AUserData: Pointer);
  307. begin
  308. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  309. ACallback, AUserData));
  310. end;
  311. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  312. begin
  313. asyncClearDataAvailableCallback(Handle, AHandle);
  314. end;
  315. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  316. ASender: TObject): Pointer;
  317. var
  318. UserData: PNotifyData;
  319. ResultCode: TAsyncResult;
  320. begin
  321. UserData := AddNotifyData(Self);
  322. UserData^.Notify := ANotify;
  323. UserData^.Sender := ASender;
  324. UserData^.FileHandle := AHandle;
  325. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  326. @EventHandler, UserData);
  327. if ResultCode <> asyncOK then
  328. begin
  329. FreeNotifyData(Self, UserData);
  330. raise EAsyncError.Create(ResultCode);
  331. end else
  332. Result := UserData;
  333. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  334. end;
  335. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  336. var
  337. Data: PNotifyData;
  338. begin
  339. Data := PNotifyData(AHandle);
  340. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  341. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  342. FreeNotifyData(Self, Data);
  343. end;
  344. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  345. AUserData: Pointer);
  346. begin
  347. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  348. end;
  349. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  350. begin
  351. asyncClearCanWriteCallback(Handle, AHandle);
  352. end;
  353. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  354. ASender: TObject): Pointer;
  355. var
  356. UserData: PNotifyData;
  357. ResultCode: TAsyncResult;
  358. begin
  359. UserData := AddNotifyData(Self);
  360. UserData^.Notify := ANotify;
  361. UserData^.Sender := ASender;
  362. UserData^.FileHandle := AHandle;
  363. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  364. @EventHandler, UserData);
  365. if ResultCode <> asyncOK then
  366. begin
  367. FreeNotifyData(Self, UserData);
  368. raise EAsyncError.Create(ResultCode);
  369. end else
  370. Result := UserData;
  371. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  372. end;
  373. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  374. var
  375. Data: PNotifyData;
  376. begin
  377. Data := PNotifyData(AHandle);
  378. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  379. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  380. FreeNotifyData(Self, Data);
  381. end;
  382. class function TEventLoop.TimerTicks: Int64;
  383. begin
  384. Result := asyncGetTicks;
  385. end;
  386. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  387. begin
  388. if AResultCode <> asyncOK then
  389. raise EAsyncError.Create(AResultCode);
  390. end;
  391. function TEventLoop.GetIsRunning: Boolean;
  392. begin
  393. Result := asyncIsRunning(Handle);
  394. end;
  395. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  396. begin
  397. if IsRunning then
  398. begin
  399. if not AIsRunning then
  400. Run;
  401. end else
  402. if AIsRunning then
  403. Break;
  404. end;
  405. // -------------------------------------------------------------------
  406. // TGenericLineReader
  407. // -------------------------------------------------------------------
  408. destructor TGenericLineReader.Destroy;
  409. begin
  410. if Assigned(RealBuffer) then
  411. begin
  412. FreeMem(RealBuffer);
  413. RealBuffer := nil;
  414. end;
  415. inherited Destroy;
  416. end;
  417. procedure TGenericLineReader.Run;
  418. var
  419. NewData: array[0..1023] of Byte;
  420. p: PChar;
  421. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  422. line: String;
  423. FirstRun: Boolean;
  424. begin
  425. FirstRun := True;
  426. while True do
  427. begin
  428. BytesRead := Read(NewData, SizeOf(NewData));
  429. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  430. if BytesRead <= 0 then begin
  431. if FirstRun then
  432. NoData;
  433. break;
  434. end;
  435. FirstRun := False;
  436. OldBufSize := FBytesInBuffer;
  437. // Append the new received data to the read buffer
  438. Inc(FBytesInBuffer, BytesRead);
  439. ReallocMem(RealBuffer, FBytesInBuffer);
  440. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  441. {Process all potential lines in the current buffer. Attention: FBuffer and
  442. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  443. within the FOnLine handler!}
  444. LastEndOfLine := 0;
  445. if OldBufSize > 0 then
  446. i := OldBufSize - 1
  447. else
  448. i := 0;
  449. CurBytesInBuffer := FBytesInBuffer;
  450. while i <= CurBytesInBuffer - 2 do
  451. begin
  452. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  453. begin
  454. LineLength := i - LastEndOfLine;
  455. SetLength(line, LineLength);
  456. if LineLength > 0 then
  457. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  458. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  459. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  460. Inc(i);
  461. LastEndOfLine := i + 1;
  462. if Assigned(FOnLine) then begin
  463. FBuffer := RealBuffer + LastEndOfLine;
  464. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  465. FOnLine(line);
  466. // Check if <this> has been destroyed by FOnLine:
  467. if DoStopAndFree then
  468. exit;
  469. end;
  470. end;
  471. Inc(i);
  472. end;
  473. FBytesInBuffer := CurBytesInBuffer;
  474. if LastEndOfLine > 0 then
  475. begin
  476. // Remove all processed lines from the buffer
  477. Dec(FBytesInBuffer, LastEndOfLine);
  478. GetMem(p, FBytesInBuffer);
  479. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  480. if Assigned(RealBuffer) then
  481. FreeMem(RealBuffer);
  482. RealBuffer := p;
  483. end;
  484. FBuffer := RealBuffer;
  485. end;
  486. end;
  487. // -------------------------------------------------------------------
  488. // TAsyncStreamLineReader
  489. // -------------------------------------------------------------------
  490. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  491. AStream: THandleStream);
  492. begin
  493. Self.Create(AEventLoop, AStream, AStream);
  494. end;
  495. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  496. ADataStream: TStream; ABlockingStream: THandleStream);
  497. begin
  498. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  499. inherited Create;
  500. FEventLoop := AEventLoop;
  501. FDataStream := ADataStream;
  502. FBlockingStream := ABlockingStream;
  503. NotifyHandle := EventLoop.SetDataAvailableNotify(
  504. FBlockingStream.Handle, @StreamDataAvailable, nil);
  505. end;
  506. destructor TAsyncStreamLineReader.Destroy;
  507. begin
  508. inherited Destroy;
  509. end;
  510. procedure TAsyncStreamLineReader.StopAndFree;
  511. begin
  512. if Assigned(NotifyHandle) then
  513. begin
  514. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  515. NotifyHandle := nil;
  516. end;
  517. DoStopAndFree := True;
  518. end;
  519. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  520. begin
  521. Result := FDataStream.Read(ABuffer, count);
  522. end;
  523. procedure TAsyncStreamLineReader.NoData;
  524. var
  525. s: String;
  526. begin
  527. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  528. begin
  529. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  530. begin
  531. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  532. Dec(FBytesInBuffer);
  533. SetLength(s, FBytesInBuffer);
  534. Move(FBuffer^, s[1], FBytesInBuffer);
  535. FOnLine(s);
  536. end;
  537. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  538. NotifyHandle := nil;
  539. if Assigned(FOnEOF) then
  540. FOnEOF(Self);
  541. end;
  542. end;
  543. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  544. begin
  545. Run;
  546. if DoStopAndFree then
  547. Free;
  548. end;
  549. // -------------------------------------------------------------------
  550. // TWriteBuffer
  551. // -------------------------------------------------------------------
  552. procedure TWriteBuffer.BufferEmpty;
  553. begin
  554. if Assigned(FOnBufferEmpty) then
  555. FOnBufferEmpty(Self);
  556. end;
  557. constructor TWriteBuffer.Create;
  558. begin
  559. inherited Create;
  560. FBuffer := nil;
  561. FBytesInBuffer := 0;
  562. EndOfLineMarker := #10;
  563. end;
  564. destructor TWriteBuffer.Destroy;
  565. begin
  566. if Assigned(FBuffer) then
  567. FreeMem(FBuffer);
  568. inherited Destroy;
  569. end;
  570. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  571. begin
  572. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  573. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  574. Result := FBytesInBuffer
  575. else
  576. // !!!: No i18n for this string - solve this problem in the FCL?!?
  577. raise EStreamError.Create('Invalid stream operation');
  578. end;
  579. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  580. begin
  581. if Count > 0 then
  582. begin
  583. FBufferSent := False;
  584. ReallocMem(FBuffer, FBytesInBuffer + Count);
  585. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  586. Inc(FBytesInBuffer, Count);
  587. if Assigned(fpAsyncWriteBufferDebugStream) then
  588. fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
  589. WantWrite;
  590. end;
  591. Result := Count;
  592. end;
  593. procedure TWriteBuffer.WriteLine(const line: String);
  594. var
  595. s: String;
  596. begin
  597. s := line + EndOfLineMarker;
  598. WriteBuffer(s[1], Length(s));
  599. end;
  600. procedure TWriteBuffer.Run;
  601. var
  602. Written: Integer;
  603. NewBuf: PChar;
  604. Failed: Boolean;
  605. begin
  606. Failed := True;
  607. repeat
  608. if FBytesInBuffer = 0 then
  609. begin
  610. BufferEmpty;
  611. if FBufferSent then
  612. exit;
  613. WantWrite;
  614. exit;
  615. end;
  616. Written := DoRealWrite(FBuffer[0], FBytesInBuffer);
  617. if Written > 0 then
  618. begin
  619. Failed := False;
  620. Dec(FBytesInBuffer, Written);
  621. GetMem(NewBuf, FBytesInBuffer);
  622. Move(FBuffer[Written], NewBuf[0], FBytesInBuffer);
  623. FreeMem(FBuffer);
  624. FBuffer := NewBuf;
  625. end;
  626. until Written <= 0;
  627. if Failed then
  628. WritingFailed;
  629. end;
  630. // -------------------------------------------------------------------
  631. // TAsyncWriteStream
  632. // -------------------------------------------------------------------
  633. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  634. begin
  635. Result := FDataStream.Write(ABuffer, count);
  636. end;
  637. procedure TAsyncWriteStream.WritingFailed;
  638. begin
  639. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  640. begin
  641. EventLoop.ClearCanWriteNotify(NotifyHandle);
  642. NotifyHandle := nil;
  643. end;
  644. end;
  645. procedure TAsyncWriteStream.WantWrite;
  646. begin
  647. if not Assigned(NotifyHandle) then
  648. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  649. @CanWrite, nil);
  650. end;
  651. procedure TAsyncWriteStream.BufferEmpty;
  652. begin
  653. inherited BufferEmpty;
  654. end;
  655. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  656. begin
  657. if FBytesInBuffer = 0 then
  658. begin
  659. if Assigned(NotifyHandle) then
  660. begin
  661. EventLoop.ClearCanWriteNotify(NotifyHandle);
  662. NotifyHandle := nil;
  663. end;
  664. FBufferSent := True;
  665. if Assigned(FOnBufferSent) then
  666. FOnBufferSent(Self);
  667. end else
  668. Run;
  669. if DoStopAndFree then
  670. Free;
  671. end;
  672. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  673. AStream: THandleStream);
  674. begin
  675. Self.Create(AEventLoop, AStream, AStream);
  676. end;
  677. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  678. ADataStream: TStream; ABlockingStream: THandleStream);
  679. begin
  680. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  681. inherited Create;
  682. FEventLoop := AEventLoop;
  683. FDataStream := ADataStream;
  684. FBlockingStream := ABlockingStream;
  685. end;
  686. destructor TAsyncWriteStream.Destroy;
  687. begin
  688. if Assigned(NotifyHandle) then
  689. EventLoop.ClearCanWriteNotify(NotifyHandle);
  690. inherited Destroy;
  691. end;
  692. procedure TAsyncWriteStream.StopAndFree;
  693. begin
  694. if Assigned(NotifyHandle) then
  695. begin
  696. EventLoop.ClearCanWriteNotify(NotifyHandle);
  697. NotifyHandle := nil;
  698. end;
  699. DoStopAndFree := True;
  700. end;
  701. end.
  702. {
  703. $Log$
  704. Revision 1.4 2003-08-03 21:18:40 sg
  705. * Added TWriteBuffer.OnBufferSent and made this and OnBufferEmpty
  706. working correctly
  707. Revision 1.3 2003/06/25 08:41:01 sg
  708. * Fixed serious bug in TGenericLineReader: When the reader gets killed
  709. via StopAndFree during an OnLine callback, the reader now will
  710. immediately stop reading, so that the owner of the reader can process
  711. the remaining buffer
  712. Revision 1.2 2002/04/25 19:12:27 sg
  713. * Added ability to write all write buffer data to an debug stream
  714. * Added TAsyncWriteStream.StopAndFree
  715. Revision 1.1 2003/03/17 22:25:32 michael
  716. + Async moved from package to FCL
  717. Revision 1.3 2002/09/15 15:45:38 sg
  718. * Added stream line reader classes
  719. Revision 1.2 2002/09/07 15:42:57 peter
  720. * old logs removed and tabs fixed
  721. Revision 1.1 2002/01/29 17:55:02 peter
  722. * splitted to base and extra
  723. }