fpasync.pp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2003 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. See the file COPYING.FPC, included in this distribution,
  7. for details about the copyright.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  11. }
  12. unit fpAsync;
  13. {$MODE objfpc}
  14. {$H+}
  15. interface
  16. uses SysUtils, Classes, libasync;
  17. type
  18. TNotifyEvent = procedure(Sender: TObject) of object;
  19. EAsyncError = class(Exception)
  20. private
  21. FErrorCode: TAsyncResult;
  22. public
  23. constructor Create(AErrorCode: TAsyncResult);
  24. property ErrorCode: TAsyncResult read FErrorCode;
  25. end;
  26. TEventLoop = class
  27. private
  28. FData: TAsyncData;
  29. FFirstNotifyData: Pointer;
  30. function GetIsRunning: Boolean;
  31. procedure SetIsRunning(AIsRunning: Boolean);
  32. protected
  33. procedure CheckResult(AResultCode: TAsyncResult);
  34. public
  35. constructor Create;
  36. destructor Destroy; override;
  37. function Handle: TAsyncHandle;
  38. // Main loop control
  39. procedure Run;
  40. procedure Break;
  41. // Timer support
  42. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  43. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  44. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  45. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  46. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  47. procedure RemoveTimerNotify(AHandle: Pointer);
  48. // I/O notification support (for files, sockets etc.)
  49. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  50. AUserData: Pointer);
  51. procedure ClearIOCallback(AHandle: Integer);
  52. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  53. ASender: TObject): Pointer;
  54. procedure ClearIONotify(AHandle: Pointer);
  55. procedure SetDataAvailableCallback(AHandle: Integer;
  56. ACallback: TAsyncCallback; AUserData: Pointer);
  57. procedure ClearDataAvailableCallback(AHandle: Integer);
  58. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  59. ASender: TObject): Pointer;
  60. procedure ClearDataAvailableNotify(AHandle: Pointer);
  61. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  62. AUserData: Pointer);
  63. procedure ClearCanWriteCallback(AHandle: Integer);
  64. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  65. ASender: TObject): Pointer;
  66. procedure ClearCanWriteNotify(AHandle: Pointer);
  67. class function TimerTicks: Int64;
  68. // Properties
  69. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  70. end;
  71. // -------------------------------------------------------------------
  72. // Asynchronous line reader
  73. // -------------------------------------------------------------------
  74. TLineNotify = procedure(const ALine: String) of object;
  75. TGenericLineReader = class
  76. protected
  77. RealBuffer, FBuffer: PChar;
  78. FBytesInBuffer: Integer;
  79. FOnLine: TLineNotify;
  80. InCallback, DoStopAndFree: Boolean;
  81. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  82. procedure NoData; virtual; abstract;
  83. public
  84. destructor Destroy; override;
  85. procedure Run; // Process as many lines as possible
  86. property Buffer: PChar read FBuffer;
  87. property BytesInBuffer: Integer read FBytesInBuffer;
  88. property OnLine: TLineNotify read FOnLine write FOnLine;
  89. end;
  90. TAsyncStreamLineReader = class(TGenericLineReader)
  91. protected
  92. FEventLoop: TEventLoop;
  93. FDataStream: TStream;
  94. FBlockingStream: THandleStream;
  95. FOnEOF: TNotifyEvent;
  96. NotifyHandle: Pointer;
  97. function Read(var ABuffer; count: Integer): Integer; override;
  98. procedure NoData; override;
  99. procedure StreamDataAvailable(UserData: TObject);
  100. public
  101. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  102. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  103. ABlockingStream: THandleStream);
  104. destructor Destroy; override;
  105. procedure StopAndFree; // Destroy instance after run
  106. property EventLoop: TEventLoop read FEventLoop;
  107. property DataStream: TStream read FDataStream;
  108. property BlockingStream: THandleStream read FBlockingStream;
  109. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  110. end;
  111. // -------------------------------------------------------------------
  112. // Asynchronous write buffers
  113. // -------------------------------------------------------------------
  114. TWriteBuffer = class(TStream)
  115. protected
  116. FBuffer: PChar;
  117. FBytesInBuffer: Integer;
  118. FBufferSent: Boolean;
  119. FOnBufferEmpty: TNotifyEvent;
  120. FOnBufferSent: TNotifyEvent;
  121. InCallback: Boolean;
  122. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  123. function Write(const ABuffer; Count: LongInt): LongInt; override;
  124. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  125. procedure WritingFailed; virtual; abstract;
  126. procedure WantWrite; virtual; abstract;
  127. procedure BufferEmpty; virtual;
  128. public
  129. EndOfLineMarker: String;
  130. constructor Create;
  131. destructor Destroy; override;
  132. procedure WriteLine(const line: String);
  133. procedure Run; // Write as many data as possible
  134. property BytesInBuffer: Integer read FBytesInBuffer;
  135. property BufferSent: Boolean read FBufferSent;
  136. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  137. property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent;
  138. end;
  139. TAsyncWriteStream = class(TWriteBuffer)
  140. protected
  141. FEventLoop: TEventLoop;
  142. FDataStream: TStream;
  143. FBlockingStream: THandleStream;
  144. NotifyHandle: Pointer;
  145. DoStopAndFree: Boolean;
  146. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  147. procedure WritingFailed; override;
  148. procedure WantWrite; override;
  149. procedure CanWrite(UserData: TObject);
  150. public
  151. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  152. constructor Create(AEventLoop: TEventLoop;
  153. ADataStream: TStream; ABlockingStream: THandleStream);
  154. destructor Destroy; override;
  155. procedure StopAndFree; // Destroy instance after run
  156. property EventLoop: TEventLoop read FEventLoop;
  157. property DataStream: TStream read FDataStream;
  158. property BlockingStream: THandleStream read FBlockingStream;
  159. end;
  160. var
  161. { All data written to a TWriteBuffer or descendant class will be written to
  162. this stream as well: }
  163. fpAsyncWriteBufferDebugStream: TStream;
  164. implementation
  165. type
  166. PNotifyData = ^TNotifyData;
  167. TNotifyData = record
  168. Next: PNotifyData;
  169. Notify: TNotifyEvent;
  170. Sender: TObject;
  171. case Boolean of
  172. False: (TimerHandle: TAsyncTimer);
  173. True: (FileHandle: LongInt);
  174. end;
  175. procedure EventHandler(Data: Pointer); cdecl;
  176. begin
  177. with PNotifyData(Data)^ do
  178. Notify(Sender);
  179. end;
  180. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  181. begin
  182. New(Result);
  183. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  184. Obj.FFirstNotifyData := Result;
  185. end;
  186. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  187. var
  188. CurData, PrevData, NextData: PNotifyData;
  189. begin
  190. PrevData := nil;
  191. CurData := Obj.FFirstNotifyData;
  192. while Assigned(CurData) do
  193. begin
  194. NextData := CurData^.Next;
  195. if CurData = Data then
  196. if Assigned(PrevData) then
  197. PrevData^.Next := NextData
  198. else
  199. Obj.FFirstNotifyData := NextData;
  200. PrevData := CurData;
  201. CurData := NextData;
  202. end;
  203. Dispose(Data);
  204. end;
  205. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  206. begin
  207. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  208. FErrorCode := AErrorCode;
  209. end;
  210. constructor TEventLoop.Create;
  211. begin
  212. asyncInit(Handle);
  213. end;
  214. destructor TEventLoop.Destroy;
  215. var
  216. NotifyData, NextNotifyData: PNotifyData;
  217. begin
  218. asyncFree(Handle);
  219. NotifyData := FFirstNotifyData;
  220. while Assigned(NotifyData) do
  221. begin
  222. NextNotifyData := NotifyData^.Next;
  223. Dispose(NotifyData);
  224. NotifyData := NextNotifyData;
  225. end;
  226. end;
  227. function TEventLoop.Handle: TAsyncHandle;
  228. begin
  229. Result := TAsyncHandle(Self);
  230. end;
  231. procedure TEventLoop.Run;
  232. begin
  233. asyncRun(Handle);
  234. end;
  235. procedure TEventLoop.Break;
  236. begin
  237. asyncBreak(Handle);
  238. end;
  239. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  240. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  241. begin
  242. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  243. end;
  244. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  245. begin
  246. asyncRemoveTimer(Handle, ATimer);
  247. end;
  248. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  249. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  250. var
  251. UserData: PNotifyData;
  252. begin
  253. UserData := AddNotifyData(Self);
  254. UserData^.Notify := ANotify;
  255. UserData^.Sender := ASender;
  256. UserData^.TimerHandle :=
  257. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  258. Result := UserData;
  259. end;
  260. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  261. var
  262. Data: PNotifyData;
  263. begin
  264. Data := PNotifyData(AHandle);
  265. asyncRemoveTimer(Handle, Data^.TimerHandle);
  266. FreeNotifyData(Self, Data);
  267. end;
  268. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  269. AUserData: Pointer);
  270. begin
  271. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  272. end;
  273. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  274. begin
  275. asyncClearIOCallback(Handle, AHandle);
  276. end;
  277. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  278. ASender: TObject): Pointer;
  279. var
  280. UserData: PNotifyData;
  281. ResultCode: TAsyncResult;
  282. begin
  283. UserData := AddNotifyData(Self);
  284. UserData^.Notify := ANotify;
  285. UserData^.Sender := ASender;
  286. UserData^.FileHandle := AHandle;
  287. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  288. if ResultCode <> asyncOK then
  289. begin
  290. FreeNotifyData(Self, UserData);
  291. raise EAsyncError.Create(ResultCode);
  292. end else
  293. Result := UserData;
  294. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  295. end;
  296. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  297. var
  298. Data: PNotifyData;
  299. begin
  300. Data := PNotifyData(AHandle);
  301. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  302. asyncClearIOCallback(Handle, Data^.FileHandle);
  303. FreeNotifyData(Self, Data);
  304. end;
  305. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  306. AUserData: Pointer);
  307. begin
  308. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  309. ACallback, AUserData));
  310. end;
  311. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  312. begin
  313. asyncClearDataAvailableCallback(Handle, AHandle);
  314. end;
  315. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  316. ASender: TObject): Pointer;
  317. var
  318. UserData: PNotifyData;
  319. ResultCode: TAsyncResult;
  320. begin
  321. UserData := AddNotifyData(Self);
  322. UserData^.Notify := ANotify;
  323. UserData^.Sender := ASender;
  324. UserData^.FileHandle := AHandle;
  325. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  326. @EventHandler, UserData);
  327. if ResultCode <> asyncOK then
  328. begin
  329. FreeNotifyData(Self, UserData);
  330. raise EAsyncError.Create(ResultCode);
  331. end else
  332. Result := UserData;
  333. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  334. end;
  335. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  336. var
  337. Data: PNotifyData;
  338. begin
  339. Data := PNotifyData(AHandle);
  340. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  341. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  342. FreeNotifyData(Self, Data);
  343. end;
  344. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  345. AUserData: Pointer);
  346. begin
  347. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  348. end;
  349. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  350. begin
  351. asyncClearCanWriteCallback(Handle, AHandle);
  352. end;
  353. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  354. ASender: TObject): Pointer;
  355. var
  356. UserData: PNotifyData;
  357. ResultCode: TAsyncResult;
  358. begin
  359. UserData := AddNotifyData(Self);
  360. UserData^.Notify := ANotify;
  361. UserData^.Sender := ASender;
  362. UserData^.FileHandle := AHandle;
  363. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  364. @EventHandler, UserData);
  365. if ResultCode <> asyncOK then
  366. begin
  367. FreeNotifyData(Self, UserData);
  368. raise EAsyncError.Create(ResultCode);
  369. end else
  370. Result := UserData;
  371. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  372. end;
  373. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  374. var
  375. Data: PNotifyData;
  376. begin
  377. Data := PNotifyData(AHandle);
  378. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  379. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  380. FreeNotifyData(Self, Data);
  381. end;
  382. class function TEventLoop.TimerTicks: Int64;
  383. begin
  384. Result := asyncGetTicks;
  385. end;
  386. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  387. begin
  388. if AResultCode <> asyncOK then
  389. raise EAsyncError.Create(AResultCode);
  390. end;
  391. function TEventLoop.GetIsRunning: Boolean;
  392. begin
  393. Result := asyncIsRunning(Handle);
  394. end;
  395. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  396. begin
  397. if IsRunning then
  398. begin
  399. if not AIsRunning then
  400. Run;
  401. end else
  402. if AIsRunning then
  403. Break;
  404. end;
  405. // -------------------------------------------------------------------
  406. // TGenericLineReader
  407. // -------------------------------------------------------------------
  408. destructor TGenericLineReader.Destroy;
  409. begin
  410. if Assigned(RealBuffer) then
  411. begin
  412. FreeMem(RealBuffer);
  413. RealBuffer := nil;
  414. end;
  415. inherited Destroy;
  416. end;
  417. procedure TGenericLineReader.Run;
  418. var
  419. NewData: array[0..1023] of Byte;
  420. p: PChar;
  421. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  422. line: String;
  423. FirstRun: Boolean;
  424. begin
  425. FirstRun := True;
  426. while True do
  427. begin
  428. BytesRead := Read(NewData, SizeOf(NewData));
  429. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  430. if BytesRead <= 0 then
  431. begin
  432. if FirstRun then
  433. NoData;
  434. break;
  435. end;
  436. FirstRun := False;
  437. OldBufSize := FBytesInBuffer;
  438. // Append the new received data to the read buffer
  439. Inc(FBytesInBuffer, BytesRead);
  440. ReallocMem(RealBuffer, FBytesInBuffer);
  441. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  442. {Process all potential lines in the current buffer. Attention: FBuffer and
  443. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  444. within the FOnLine handler!}
  445. LastEndOfLine := 0;
  446. if OldBufSize > 0 then
  447. i := OldBufSize - 1
  448. else
  449. i := 0;
  450. CurBytesInBuffer := FBytesInBuffer;
  451. while i <= CurBytesInBuffer - 2 do
  452. begin
  453. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  454. begin
  455. LineLength := i - LastEndOfLine;
  456. SetLength(line, LineLength);
  457. if LineLength > 0 then
  458. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  459. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  460. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  461. Inc(i);
  462. LastEndOfLine := i + 1;
  463. if Assigned(FOnLine) then
  464. begin
  465. FBuffer := RealBuffer + LastEndOfLine;
  466. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  467. InCallback := True;
  468. try
  469. FOnLine(line);
  470. finally
  471. InCallback := False;
  472. end;
  473. // Check if <this> has been destroyed by FOnLine:
  474. if DoStopAndFree then
  475. exit;
  476. end;
  477. end;
  478. Inc(i);
  479. end;
  480. FBytesInBuffer := CurBytesInBuffer;
  481. if LastEndOfLine > 0 then
  482. begin
  483. // Remove all processed lines from the buffer
  484. Dec(FBytesInBuffer, LastEndOfLine);
  485. GetMem(p, FBytesInBuffer);
  486. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  487. if Assigned(RealBuffer) then
  488. FreeMem(RealBuffer);
  489. RealBuffer := p;
  490. end;
  491. FBuffer := RealBuffer;
  492. end;
  493. end;
  494. // -------------------------------------------------------------------
  495. // TAsyncStreamLineReader
  496. // -------------------------------------------------------------------
  497. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  498. AStream: THandleStream);
  499. begin
  500. Self.Create(AEventLoop, AStream, AStream);
  501. end;
  502. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  503. ADataStream: TStream; ABlockingStream: THandleStream);
  504. begin
  505. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  506. inherited Create;
  507. FEventLoop := AEventLoop;
  508. FDataStream := ADataStream;
  509. FBlockingStream := ABlockingStream;
  510. NotifyHandle := EventLoop.SetDataAvailableNotify(
  511. FBlockingStream.Handle, @StreamDataAvailable, nil);
  512. end;
  513. destructor TAsyncStreamLineReader.Destroy;
  514. begin
  515. inherited Destroy;
  516. end;
  517. procedure TAsyncStreamLineReader.StopAndFree;
  518. begin
  519. if InCallback then
  520. begin
  521. if Assigned(NotifyHandle) then
  522. begin
  523. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  524. NotifyHandle := nil;
  525. end;
  526. DoStopAndFree := True;
  527. end else
  528. Self.Free;
  529. end;
  530. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  531. begin
  532. Result := FDataStream.Read(ABuffer, count);
  533. end;
  534. procedure TAsyncStreamLineReader.NoData;
  535. var
  536. s: String;
  537. begin
  538. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  539. begin
  540. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  541. begin
  542. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  543. Dec(FBytesInBuffer);
  544. SetLength(s, FBytesInBuffer);
  545. Move(FBuffer^, s[1], FBytesInBuffer);
  546. FOnLine(s);
  547. end;
  548. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  549. NotifyHandle := nil;
  550. if Assigned(FOnEOF) then
  551. begin
  552. InCallback := True;
  553. try
  554. FOnEOF(Self);
  555. finally
  556. InCallback := False;
  557. end;
  558. end;
  559. end;
  560. end;
  561. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  562. begin
  563. Run;
  564. if DoStopAndFree then
  565. Free;
  566. end;
  567. // -------------------------------------------------------------------
  568. // TWriteBuffer
  569. // -------------------------------------------------------------------
  570. procedure TWriteBuffer.BufferEmpty;
  571. begin
  572. if Assigned(FOnBufferEmpty) then
  573. begin
  574. InCallback := True;
  575. FOnBufferEmpty(Self);
  576. InCallback := False;
  577. end;
  578. end;
  579. constructor TWriteBuffer.Create;
  580. begin
  581. inherited Create;
  582. FBuffer := nil;
  583. FBytesInBuffer := 0;
  584. EndOfLineMarker := #10;
  585. end;
  586. destructor TWriteBuffer.Destroy;
  587. begin
  588. if Assigned(FBuffer) then
  589. FreeMem(FBuffer);
  590. inherited Destroy;
  591. end;
  592. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  593. begin
  594. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  595. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  596. Result := FBytesInBuffer
  597. else
  598. // !!!: No i18n for this string - solve this problem in the FCL?!?
  599. raise EStreamError.Create('Invalid stream operation');
  600. end;
  601. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  602. begin
  603. if Count > 0 then
  604. begin
  605. FBufferSent := False;
  606. ReallocMem(FBuffer, FBytesInBuffer + Count);
  607. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  608. Inc(FBytesInBuffer, Count);
  609. if Assigned(fpAsyncWriteBufferDebugStream) then
  610. fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
  611. WantWrite;
  612. end;
  613. Result := Count;
  614. end;
  615. procedure TWriteBuffer.WriteLine(const line: String);
  616. var
  617. s: String;
  618. begin
  619. s := line + EndOfLineMarker;
  620. WriteBuffer(s[1], Length(s));
  621. end;
  622. procedure TWriteBuffer.Run;
  623. var
  624. Written: Integer;
  625. NewBuf: PChar;
  626. Failed: Boolean;
  627. begin
  628. Failed := True;
  629. repeat
  630. if FBytesInBuffer = 0 then
  631. begin
  632. BufferEmpty;
  633. if FBufferSent then
  634. exit;
  635. WantWrite;
  636. exit;
  637. end;
  638. Written := DoRealWrite(FBuffer[0], FBytesInBuffer);
  639. if Written > 0 then
  640. begin
  641. Failed := False;
  642. Dec(FBytesInBuffer, Written);
  643. GetMem(NewBuf, FBytesInBuffer);
  644. Move(FBuffer[Written], NewBuf[0], FBytesInBuffer);
  645. FreeMem(FBuffer);
  646. FBuffer := NewBuf;
  647. end;
  648. until Written <= 0;
  649. if Failed then
  650. WritingFailed;
  651. end;
  652. // -------------------------------------------------------------------
  653. // TAsyncWriteStream
  654. // -------------------------------------------------------------------
  655. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  656. begin
  657. Result := FDataStream.Write(ABuffer, count);
  658. end;
  659. procedure TAsyncWriteStream.WritingFailed;
  660. begin
  661. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  662. begin
  663. EventLoop.ClearCanWriteNotify(NotifyHandle);
  664. NotifyHandle := nil;
  665. end;
  666. end;
  667. procedure TAsyncWriteStream.WantWrite;
  668. begin
  669. if not Assigned(NotifyHandle) then
  670. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  671. @CanWrite, nil);
  672. end;
  673. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  674. begin
  675. if FBytesInBuffer = 0 then
  676. begin
  677. if Assigned(NotifyHandle) then
  678. begin
  679. EventLoop.ClearCanWriteNotify(NotifyHandle);
  680. NotifyHandle := nil;
  681. end;
  682. FBufferSent := True;
  683. if Assigned(FOnBufferSent) then
  684. begin
  685. InCallback := True;
  686. try
  687. FOnBufferSent(Self);
  688. finally
  689. InCallback := False;
  690. end;
  691. end;
  692. end else
  693. Run;
  694. if DoStopAndFree then
  695. Free;
  696. end;
  697. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  698. AStream: THandleStream);
  699. begin
  700. Self.Create(AEventLoop, AStream, AStream);
  701. end;
  702. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  703. ADataStream: TStream; ABlockingStream: THandleStream);
  704. begin
  705. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  706. inherited Create;
  707. FEventLoop := AEventLoop;
  708. FDataStream := ADataStream;
  709. FBlockingStream := ABlockingStream;
  710. end;
  711. destructor TAsyncWriteStream.Destroy;
  712. begin
  713. if Assigned(NotifyHandle) then
  714. EventLoop.ClearCanWriteNotify(NotifyHandle);
  715. inherited Destroy;
  716. end;
  717. procedure TAsyncWriteStream.StopAndFree;
  718. begin
  719. if InCallback then
  720. begin
  721. if Assigned(NotifyHandle) then
  722. begin
  723. EventLoop.ClearCanWriteNotify(NotifyHandle);
  724. NotifyHandle := nil;
  725. end;
  726. DoStopAndFree := True;
  727. end else
  728. Self.Free;
  729. end;
  730. end.
  731. {
  732. $Log$
  733. Revision 1.6 2004-02-02 16:50:44 sg
  734. * Destroying a line reader or write buffer now works within an event
  735. handler of these classes
  736. Revision 1.5 2003/11/22 11:46:40 sg
  737. * Removed TAsyncWriteStream.BufferEmpty (not needed anymore)
  738. Revision 1.4 2003/08/03 21:18:40 sg
  739. * Added TWriteBuffer.OnBufferSent and made this and OnBufferEmpty
  740. working correctly
  741. Revision 1.3 2003/06/25 08:41:01 sg
  742. * Fixed serious bug in TGenericLineReader: When the reader gets killed
  743. via StopAndFree during an OnLine callback, the reader now will
  744. immediately stop reading, so that the owner of the reader can process
  745. the remaining buffer
  746. Revision 1.2 2002/04/25 19:12:27 sg
  747. * Added ability to write all write buffer data to an debug stream
  748. * Added TAsyncWriteStream.StopAndFree
  749. Revision 1.1 2003/03/17 22:25:32 michael
  750. + Async moved from package to FCL
  751. Revision 1.3 2002/09/15 15:45:38 sg
  752. * Added stream line reader classes
  753. Revision 1.2 2002/09/07 15:42:57 peter
  754. * old logs removed and tabs fixed
  755. Revision 1.1 2002/01/29 17:55:02 peter
  756. * splitted to base and extra
  757. }