fpasync.pp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2003 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. See the file COPYING.FPC, included in this distribution,
  7. for details about the copyright.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  11. }
  12. unit fpAsync;
  13. {$MODE objfpc}
  14. {$H+}
  15. interface
  16. uses SysUtils, Classes, libasync;
  17. type
  18. TNotifyEvent = procedure(Sender: TObject) of object;
  19. EAsyncError = class(Exception)
  20. private
  21. FErrorCode: TAsyncResult;
  22. public
  23. constructor Create(AErrorCode: TAsyncResult);
  24. property ErrorCode: TAsyncResult read FErrorCode;
  25. end;
  26. TEventLoop = class
  27. private
  28. FData: TAsyncData;
  29. FFirstNotifyData: Pointer;
  30. function GetIsRunning: Boolean;
  31. procedure SetIsRunning(AIsRunning: Boolean);
  32. protected
  33. procedure CheckResult(AResultCode: TAsyncResult);
  34. public
  35. constructor Create;
  36. destructor Destroy; override;
  37. function Handle: TAsyncHandle;
  38. // Main loop control
  39. procedure Run;
  40. procedure Break;
  41. // Timer support
  42. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  43. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  44. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  45. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  46. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  47. procedure RemoveTimerNotify(AHandle: Pointer);
  48. // I/O notification support (for files, sockets etc.)
  49. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  50. AUserData: Pointer);
  51. procedure ClearIOCallback(AHandle: Integer);
  52. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  53. ASender: TObject): Pointer;
  54. procedure ClearIONotify(AHandle: Pointer);
  55. procedure SetDataAvailableCallback(AHandle: Integer;
  56. ACallback: TAsyncCallback; AUserData: Pointer);
  57. procedure ClearDataAvailableCallback(AHandle: Integer);
  58. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  59. ASender: TObject): Pointer;
  60. procedure ClearDataAvailableNotify(AHandle: Pointer);
  61. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  62. AUserData: Pointer);
  63. procedure ClearCanWriteCallback(AHandle: Integer);
  64. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  65. ASender: TObject): Pointer;
  66. procedure ClearCanWriteNotify(AHandle: Pointer);
  67. class function TimerTicks: Int64;
  68. // Properties
  69. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  70. end;
  71. // -------------------------------------------------------------------
  72. // Asynchronous line reader
  73. // -------------------------------------------------------------------
  74. TLineNotify = procedure(const ALine: String) of object;
  75. TGenericLineReader = class
  76. protected
  77. RealBuffer, FBuffer: PChar;
  78. FBytesInBuffer: Integer;
  79. FOnLine: TLineNotify;
  80. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  81. procedure NoData; virtual; abstract;
  82. public
  83. destructor Destroy; override;
  84. procedure Run; // Process as many lines as possible
  85. property Buffer: PChar read FBuffer;
  86. property BytesInBuffer: Integer read FBytesInBuffer;
  87. property OnLine: TLineNotify read FOnLine write FOnLine;
  88. end;
  89. TAsyncStreamLineReader = class(TGenericLineReader)
  90. protected
  91. FEventLoop: TEventLoop;
  92. FDataStream: TStream;
  93. FBlockingStream: THandleStream;
  94. FOnEOF: TNotifyEvent;
  95. NotifyHandle: Pointer;
  96. DoStopAndFree: Boolean;
  97. function Read(var ABuffer; count: Integer): Integer; override;
  98. procedure NoData; override;
  99. procedure StreamDataAvailable(UserData: TObject);
  100. public
  101. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  102. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  103. ABlockingStream: THandleStream);
  104. destructor Destroy; override;
  105. procedure StopAndFree; // Destroy instance after run
  106. property EventLoop: TEventLoop read FEventLoop;
  107. property DataStream: TStream read FDataStream;
  108. property BlockingStream: THandleStream read FBlockingStream;
  109. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  110. end;
  111. // -------------------------------------------------------------------
  112. // Asynchronous write buffers
  113. // -------------------------------------------------------------------
  114. TWriteBuffer = class(TStream)
  115. protected
  116. FBuffer: PChar;
  117. FBytesInBuffer: Integer;
  118. FOnBufferEmpty: TNotifyEvent;
  119. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  120. function Write(const ABuffer; Count: LongInt): LongInt; override;
  121. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  122. procedure WritingFailed; virtual; abstract;
  123. procedure WantWrite; virtual; abstract;
  124. procedure BufferEmpty; virtual;
  125. public
  126. EndOfLineMarker: String;
  127. constructor Create;
  128. destructor Destroy; override;
  129. procedure WriteLine(const line: String);
  130. procedure Run; // Write as many data as possible
  131. property BytesInBuffer: Integer read FBytesInBuffer;
  132. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  133. end;
  134. TAsyncWriteStream = class(TWriteBuffer)
  135. protected
  136. FEventLoop: TEventLoop;
  137. FDataStream: TStream;
  138. FBlockingStream: THandleStream;
  139. NotifyHandle: Pointer;
  140. DoStopAndFree: Boolean;
  141. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  142. procedure WritingFailed; override;
  143. procedure WantWrite; override;
  144. procedure BufferEmpty; override;
  145. procedure CanWrite(UserData: TObject);
  146. public
  147. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  148. constructor Create(AEventLoop: TEventLoop;
  149. ADataStream: TStream; ABlockingStream: THandleStream);
  150. destructor Destroy; override;
  151. procedure StopAndFree; // Destroy instance after run
  152. property EventLoop: TEventLoop read FEventLoop;
  153. property DataStream: TStream read FDataStream;
  154. property BlockingStream: THandleStream read FBlockingStream;
  155. end;
  156. var
  157. { All data written to a TWriteBuffer or descendant class will be written to
  158. this stream as well: }
  159. fpAsyncWriteBufferDebugStream: TStream;
  160. implementation
  161. type
  162. PNotifyData = ^TNotifyData;
  163. TNotifyData = record
  164. Next: PNotifyData;
  165. Notify: TNotifyEvent;
  166. Sender: TObject;
  167. case Boolean of
  168. False: (TimerHandle: TAsyncTimer);
  169. True: (FileHandle: LongInt);
  170. end;
  171. procedure EventHandler(Data: Pointer); cdecl;
  172. begin
  173. with PNotifyData(Data)^ do
  174. Notify(Sender);
  175. end;
  176. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  177. begin
  178. New(Result);
  179. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  180. Obj.FFirstNotifyData := Result;
  181. end;
  182. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  183. var
  184. CurData, PrevData, NextData: PNotifyData;
  185. begin
  186. PrevData := nil;
  187. CurData := Obj.FFirstNotifyData;
  188. while Assigned(CurData) do
  189. begin
  190. NextData := CurData^.Next;
  191. if CurData = Data then
  192. if Assigned(PrevData) then
  193. PrevData^.Next := NextData
  194. else
  195. Obj.FFirstNotifyData := NextData;
  196. PrevData := CurData;
  197. CurData := NextData;
  198. end;
  199. Dispose(Data);
  200. end;
  201. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  202. begin
  203. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  204. FErrorCode := AErrorCode;
  205. end;
  206. constructor TEventLoop.Create;
  207. begin
  208. asyncInit(Handle);
  209. end;
  210. destructor TEventLoop.Destroy;
  211. var
  212. NotifyData, NextNotifyData: PNotifyData;
  213. begin
  214. asyncFree(Handle);
  215. NotifyData := FFirstNotifyData;
  216. while Assigned(NotifyData) do
  217. begin
  218. NextNotifyData := NotifyData^.Next;
  219. Dispose(NotifyData);
  220. NotifyData := NextNotifyData;
  221. end;
  222. end;
  223. function TEventLoop.Handle: TAsyncHandle;
  224. begin
  225. Result := TAsyncHandle(Self);
  226. end;
  227. procedure TEventLoop.Run;
  228. begin
  229. asyncRun(Handle);
  230. end;
  231. procedure TEventLoop.Break;
  232. begin
  233. asyncBreak(Handle);
  234. end;
  235. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  236. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  237. begin
  238. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  239. end;
  240. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  241. begin
  242. asyncRemoveTimer(Handle, ATimer);
  243. end;
  244. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  245. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  246. var
  247. UserData: PNotifyData;
  248. begin
  249. UserData := AddNotifyData(Self);
  250. UserData^.Notify := ANotify;
  251. UserData^.Sender := ASender;
  252. UserData^.TimerHandle :=
  253. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  254. end;
  255. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  256. var
  257. Data: PNotifyData;
  258. begin
  259. Data := PNotifyData(AHandle);
  260. asyncRemoveTimer(Handle, Data^.TimerHandle);
  261. FreeNotifyData(Self, Data);
  262. end;
  263. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  264. AUserData: Pointer);
  265. begin
  266. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  267. end;
  268. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  269. begin
  270. asyncClearIOCallback(Handle, AHandle);
  271. end;
  272. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  273. ASender: TObject): Pointer;
  274. var
  275. UserData: PNotifyData;
  276. ResultCode: TAsyncResult;
  277. begin
  278. UserData := AddNotifyData(Self);
  279. UserData^.Notify := ANotify;
  280. UserData^.Sender := ASender;
  281. UserData^.FileHandle := AHandle;
  282. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  283. if ResultCode <> asyncOK then
  284. begin
  285. FreeNotifyData(Self, UserData);
  286. raise EAsyncError.Create(ResultCode);
  287. end else
  288. Result := UserData;
  289. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  290. end;
  291. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  292. var
  293. Data: PNotifyData;
  294. begin
  295. Data := PNotifyData(AHandle);
  296. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  297. asyncClearIOCallback(Handle, Data^.FileHandle);
  298. FreeNotifyData(Self, Data);
  299. end;
  300. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  301. AUserData: Pointer);
  302. begin
  303. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  304. ACallback, AUserData));
  305. end;
  306. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  307. begin
  308. asyncClearDataAvailableCallback(Handle, AHandle);
  309. end;
  310. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  311. ASender: TObject): Pointer;
  312. var
  313. UserData: PNotifyData;
  314. ResultCode: TAsyncResult;
  315. begin
  316. UserData := AddNotifyData(Self);
  317. UserData^.Notify := ANotify;
  318. UserData^.Sender := ASender;
  319. UserData^.FileHandle := AHandle;
  320. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  321. @EventHandler, UserData);
  322. if ResultCode <> asyncOK then
  323. begin
  324. FreeNotifyData(Self, UserData);
  325. raise EAsyncError.Create(ResultCode);
  326. end else
  327. Result := UserData;
  328. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  329. end;
  330. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  331. var
  332. Data: PNotifyData;
  333. begin
  334. Data := PNotifyData(AHandle);
  335. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  336. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  337. FreeNotifyData(Self, Data);
  338. end;
  339. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  340. AUserData: Pointer);
  341. begin
  342. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  343. end;
  344. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  345. begin
  346. asyncClearCanWriteCallback(Handle, AHandle);
  347. end;
  348. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  349. ASender: TObject): Pointer;
  350. var
  351. UserData: PNotifyData;
  352. ResultCode: TAsyncResult;
  353. begin
  354. UserData := AddNotifyData(Self);
  355. UserData^.Notify := ANotify;
  356. UserData^.Sender := ASender;
  357. UserData^.FileHandle := AHandle;
  358. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  359. @EventHandler, UserData);
  360. if ResultCode <> asyncOK then
  361. begin
  362. FreeNotifyData(Self, UserData);
  363. raise EAsyncError.Create(ResultCode);
  364. end else
  365. Result := UserData;
  366. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  367. end;
  368. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  369. var
  370. Data: PNotifyData;
  371. begin
  372. Data := PNotifyData(AHandle);
  373. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  374. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  375. FreeNotifyData(Self, Data);
  376. end;
  377. class function TEventLoop.TimerTicks: Int64;
  378. begin
  379. Result := asyncGetTicks;
  380. end;
  381. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  382. begin
  383. if AResultCode <> asyncOK then
  384. raise EAsyncError.Create(AResultCode);
  385. end;
  386. function TEventLoop.GetIsRunning: Boolean;
  387. begin
  388. Result := asyncIsRunning(Handle);
  389. end;
  390. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  391. begin
  392. if IsRunning then
  393. begin
  394. if not AIsRunning then
  395. Run;
  396. end else
  397. if AIsRunning then
  398. Break;
  399. end;
  400. // -------------------------------------------------------------------
  401. // TGenericLineReader
  402. // -------------------------------------------------------------------
  403. destructor TGenericLineReader.Destroy;
  404. begin
  405. if Assigned(RealBuffer) then
  406. begin
  407. FreeMem(RealBuffer);
  408. RealBuffer := nil;
  409. end;
  410. inherited Destroy;
  411. end;
  412. procedure TGenericLineReader.Run;
  413. var
  414. NewData: array[0..1023] of Byte;
  415. p: PChar;
  416. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  417. line: String;
  418. FirstRun: Boolean;
  419. begin
  420. FirstRun := True;
  421. while True do
  422. begin
  423. BytesRead := Read(NewData, SizeOf(NewData));
  424. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  425. if BytesRead <= 0 then begin
  426. if FirstRun then
  427. NoData;
  428. break;
  429. end;
  430. FirstRun := False;
  431. OldBufSize := FBytesInBuffer;
  432. // Append the new received data to the read buffer
  433. Inc(FBytesInBuffer, BytesRead);
  434. ReallocMem(RealBuffer, FBytesInBuffer);
  435. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  436. {Process all potential lines in the current buffer. Attention: FBuffer and
  437. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  438. within the FOnLine handler!}
  439. LastEndOfLine := 0;
  440. if OldBufSize > 0 then
  441. i := OldBufSize - 1
  442. else
  443. i := 0;
  444. CurBytesInBuffer := FBytesInBuffer;
  445. while i <= CurBytesInBuffer - 2 do
  446. begin
  447. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  448. begin
  449. LineLength := i - LastEndOfLine;
  450. SetLength(line, LineLength);
  451. if LineLength > 0 then
  452. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  453. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  454. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  455. Inc(i);
  456. LastEndOfLine := i + 1;
  457. if Assigned(FOnLine) then begin
  458. FBuffer := RealBuffer + LastEndOfLine;
  459. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  460. FOnLine(line);
  461. // Check if <this> has been destroyed by FOnLine:
  462. if not Assigned(FBuffer) then exit;
  463. end;
  464. end;
  465. Inc(i);
  466. end;
  467. FBytesInBuffer := CurBytesInBuffer;
  468. if LastEndOfLine > 0 then
  469. begin
  470. // Remove all processed lines from the buffer
  471. Dec(FBytesInBuffer, LastEndOfLine);
  472. GetMem(p, FBytesInBuffer);
  473. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  474. if Assigned(RealBuffer) then
  475. FreeMem(RealBuffer);
  476. RealBuffer := p;
  477. end;
  478. FBuffer := RealBuffer;
  479. end;
  480. end;
  481. // -------------------------------------------------------------------
  482. // TAsyncStreamLineReader
  483. // -------------------------------------------------------------------
  484. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  485. AStream: THandleStream);
  486. begin
  487. Self.Create(AEventLoop, AStream, AStream);
  488. end;
  489. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  490. ADataStream: TStream; ABlockingStream: THandleStream);
  491. begin
  492. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  493. inherited Create;
  494. FEventLoop := AEventLoop;
  495. FDataStream := ADataStream;
  496. FBlockingStream := ABlockingStream;
  497. NotifyHandle := EventLoop.SetDataAvailableNotify(
  498. FBlockingStream.Handle, @StreamDataAvailable, nil);
  499. end;
  500. destructor TAsyncStreamLineReader.Destroy;
  501. begin
  502. inherited Destroy;
  503. end;
  504. procedure TAsyncStreamLineReader.StopAndFree;
  505. begin
  506. if Assigned(NotifyHandle) then
  507. begin
  508. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  509. NotifyHandle := nil;
  510. end;
  511. DoStopAndFree := True;
  512. end;
  513. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  514. begin
  515. Result := FDataStream.Read(ABuffer, count);
  516. end;
  517. procedure TAsyncStreamLineReader.NoData;
  518. var
  519. s: String;
  520. begin
  521. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin
  522. if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin
  523. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  524. Dec(FBytesInBuffer);
  525. SetLength(s, FBytesInBuffer);
  526. Move(FBuffer^, s[1], FBytesInBuffer);
  527. FOnLine(s);
  528. end;
  529. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  530. NotifyHandle := nil;
  531. if Assigned(FOnEOF) then
  532. FOnEOF(Self);
  533. end;
  534. end;
  535. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  536. begin
  537. Run;
  538. if DoStopAndFree then
  539. Free;
  540. end;
  541. // -------------------------------------------------------------------
  542. // TWriteBuffer
  543. // -------------------------------------------------------------------
  544. procedure TWriteBuffer.BufferEmpty;
  545. begin
  546. if Assigned(FOnBufferEmpty) then
  547. FOnBufferEmpty(Self);
  548. end;
  549. constructor TWriteBuffer.Create;
  550. begin
  551. inherited Create;
  552. FBuffer := nil;
  553. FBytesInBuffer := 0;
  554. EndOfLineMarker := #10;
  555. end;
  556. destructor TWriteBuffer.Destroy;
  557. begin
  558. if Assigned(FBuffer) then
  559. FreeMem(FBuffer);
  560. inherited Destroy;
  561. end;
  562. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  563. begin
  564. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  565. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  566. Result := FBytesInBuffer
  567. else
  568. // !!!: No i18n for this string - solve this problem in the FCL?!?
  569. raise EStreamError.Create('Invalid stream operation');
  570. end;
  571. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  572. begin
  573. ReallocMem(FBuffer, FBytesInBuffer + Count);
  574. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  575. Inc(FBytesInBuffer, Count);
  576. if Assigned(fpAsyncWriteBufferDebugStream) then
  577. fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
  578. WantWrite;
  579. Result := Count;
  580. end;
  581. procedure TWriteBuffer.WriteLine(const line: String);
  582. var
  583. s: String;
  584. begin
  585. s := line + EndOfLineMarker;
  586. WriteBuffer(s[1], Length(s));
  587. end;
  588. procedure TWriteBuffer.Run;
  589. var
  590. CurStart, Written: Integer;
  591. NewBuf: PChar;
  592. Failed: Boolean;
  593. begin
  594. CurStart := 0;
  595. Failed := True;
  596. repeat
  597. if FBytesInBuffer = 0 then
  598. begin
  599. BufferEmpty;
  600. exit;
  601. end;
  602. Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  603. if Written > 0 then
  604. begin
  605. Inc(CurStart, Written);
  606. Failed := False;
  607. GetMem(NewBuf, FBytesInBuffer - CurStart);
  608. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  609. FreeMem(FBuffer);
  610. FBuffer := NewBuf;
  611. Dec(FBytesInBuffer, CurStart);
  612. end;
  613. until Written <= 0;
  614. if Failed then
  615. WritingFailed;
  616. end;
  617. // -------------------------------------------------------------------
  618. // TAsyncWriteStream
  619. // -------------------------------------------------------------------
  620. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  621. begin
  622. Result := FDataStream.Write(ABuffer, count);
  623. end;
  624. procedure TAsyncWriteStream.WritingFailed;
  625. begin
  626. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  627. begin
  628. EventLoop.ClearCanWriteNotify(NotifyHandle);
  629. NotifyHandle := nil;
  630. end;
  631. end;
  632. procedure TAsyncWriteStream.WantWrite;
  633. begin
  634. if not Assigned(NotifyHandle) then
  635. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  636. @CanWrite, nil);
  637. end;
  638. procedure TAsyncWriteStream.BufferEmpty;
  639. begin
  640. if Assigned(NotifyHandle) then
  641. begin
  642. EventLoop.ClearCanWriteNotify(NotifyHandle);
  643. NotifyHandle := nil;
  644. end;
  645. inherited BufferEmpty;
  646. end;
  647. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  648. begin
  649. Run;
  650. if DoStopAndFree then
  651. Free;
  652. end;
  653. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  654. AStream: THandleStream);
  655. begin
  656. Self.Create(AEventLoop, AStream, AStream);
  657. end;
  658. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  659. ADataStream: TStream; ABlockingStream: THandleStream);
  660. begin
  661. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  662. inherited Create;
  663. FEventLoop := AEventLoop;
  664. FDataStream := ADataStream;
  665. FBlockingStream := ABlockingStream;
  666. end;
  667. destructor TAsyncWriteStream.Destroy;
  668. begin
  669. if Assigned(NotifyHandle) then
  670. EventLoop.ClearCanWriteNotify(NotifyHandle);
  671. inherited Destroy;
  672. end;
  673. procedure TAsyncWriteStream.StopAndFree;
  674. begin
  675. if Assigned(NotifyHandle) then
  676. begin
  677. EventLoop.ClearCanWriteNotify(NotifyHandle);
  678. NotifyHandle := nil;
  679. end;
  680. DoStopAndFree := True;
  681. end;
  682. end.
  683. {
  684. $Log$
  685. Revision 1.2 2002-04-25 19:12:27 sg
  686. * Added ability to write all write buffer data to an debug stream
  687. * Added TAsyncWriteStream.StopAndFree
  688. Revision 1.1 2003/03/17 22:25:32 michael
  689. + Async moved from package to FCL
  690. Revision 1.3 2002/09/15 15:45:38 sg
  691. * Added stream line reader classes
  692. Revision 1.2 2002/09/07 15:42:57 peter
  693. * old logs removed and tabs fixed
  694. Revision 1.1 2002/01/29 17:55:02 peter
  695. * splitted to base and extra
  696. }