fpasync.pp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2002 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. Unix implementation
  7. See the file COPYING.FPC, included in this distribution,
  8. for details about the copyright.
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  12. }
  13. unit fpAsync;
  14. {$MODE objfpc}
  15. {$H+}
  16. interface
  17. uses SysUtils, Classes, libasync;
  18. type
  19. TNotifyEvent = procedure(Sender: TObject) of object;
  20. EAsyncError = class(Exception)
  21. private
  22. FErrorCode: TAsyncResult;
  23. public
  24. constructor Create(AErrorCode: TAsyncResult);
  25. property ErrorCode: TAsyncResult read FErrorCode;
  26. end;
  27. TEventLoop = class
  28. private
  29. FData: TAsyncData;
  30. FFirstNotifyData: Pointer;
  31. function GetIsRunning: Boolean;
  32. procedure SetIsRunning(AIsRunning: Boolean);
  33. protected
  34. procedure CheckResult(AResultCode: TAsyncResult);
  35. public
  36. constructor Create;
  37. destructor Destroy; override;
  38. function Handle: TAsyncHandle;
  39. // Main loop control
  40. procedure Run;
  41. procedure Break;
  42. // Timer support
  43. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  44. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  45. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  46. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  47. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  48. procedure RemoveTimerNotify(AHandle: Pointer);
  49. // I/O notification support (for files, sockets etc.)
  50. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  51. AUserData: Pointer);
  52. procedure ClearIOCallback(AHandle: Integer);
  53. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  54. ASender: TObject): Pointer;
  55. procedure ClearIONotify(AHandle: Pointer);
  56. procedure SetDataAvailableCallback(AHandle: Integer;
  57. ACallback: TAsyncCallback; AUserData: Pointer);
  58. procedure ClearDataAvailableCallback(AHandle: Integer);
  59. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  60. ASender: TObject): Pointer;
  61. procedure ClearDataAvailableNotify(AHandle: Pointer);
  62. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  63. AUserData: Pointer);
  64. procedure ClearCanWriteCallback(AHandle: Integer);
  65. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  66. ASender: TObject): Pointer;
  67. procedure ClearCanWriteNotify(AHandle: Pointer);
  68. class function TimerTicks: Int64;
  69. // Properties
  70. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  71. end;
  72. // -------------------------------------------------------------------
  73. // Asynchronous line reader
  74. // -------------------------------------------------------------------
  75. TLineNotify = procedure(const ALine: String) of object;
  76. TGenericLineReader = class
  77. protected
  78. RealBuffer, FBuffer: PChar;
  79. FBytesInBuffer: Integer;
  80. FOnLine: TLineNotify;
  81. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  82. procedure NoData; virtual; abstract;
  83. public
  84. destructor Destroy; override;
  85. procedure Run; // Process as many lines as possible
  86. property Buffer: PChar read FBuffer;
  87. property BytesInBuffer: Integer read FBytesInBuffer;
  88. property OnLine: TLineNotify read FOnLine write FOnLine;
  89. end;
  90. TAsyncStreamLineReader = class(TGenericLineReader)
  91. protected
  92. FEventLoop: TEventLoop;
  93. FDataStream: TStream;
  94. FBlockingStream: THandleStream;
  95. FOnEOF: TNotifyEvent;
  96. NotifyHandle: Pointer;
  97. DoStopAndFree: Boolean;
  98. function Read(var ABuffer; count: Integer): Integer; override;
  99. procedure NoData; override;
  100. procedure StreamDataAvailable(UserData: TObject);
  101. public
  102. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  103. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  104. ABlockingStream: THandleStream);
  105. destructor Destroy; override;
  106. procedure StopAndFree; // Destroy instance after run
  107. property EventLoop: TEventLoop read FEventLoop;
  108. property DataStream: TStream read FDataStream;
  109. property BlockingStream: THandleStream read FBlockingStream;
  110. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  111. end;
  112. // -------------------------------------------------------------------
  113. // Asynchronous write buffers
  114. // -------------------------------------------------------------------
  115. TWriteBuffer = class(TStream)
  116. protected
  117. FBuffer: PChar;
  118. FBytesInBuffer: Integer;
  119. FOnBufferEmpty: TNotifyEvent;
  120. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  121. function Write(const ABuffer; Count: LongInt): LongInt; override;
  122. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  123. procedure WritingFailed; virtual; abstract;
  124. procedure WantWrite; virtual; abstract;
  125. procedure BufferEmpty; virtual;
  126. public
  127. EndOfLineMarker: String;
  128. constructor Create;
  129. destructor Destroy; override;
  130. procedure WriteLine(const line: String);
  131. procedure Run; // Write as many data as possible
  132. property BytesInBuffer: Integer read FBytesInBuffer;
  133. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  134. end;
  135. TAsyncWriteStream = class(TWriteBuffer)
  136. protected
  137. FEventLoop: TEventLoop;
  138. FDataStream: TStream;
  139. FBlockingStream: THandleStream;
  140. NotifyHandle: Pointer;
  141. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  142. procedure WritingFailed; override;
  143. procedure WantWrite; override;
  144. procedure BufferEmpty; override;
  145. procedure CanWrite(UserData: TObject);
  146. public
  147. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  148. constructor Create(AEventLoop: TEventLoop;
  149. ADataStream: TStream; ABlockingStream: THandleStream);
  150. destructor Destroy; override;
  151. property EventLoop: TEventLoop read FEventLoop;
  152. property DataStream: TStream read FDataStream;
  153. property BlockingStream: THandleStream read FBlockingStream;
  154. end;
  155. implementation
  156. type
  157. PNotifyData = ^TNotifyData;
  158. TNotifyData = record
  159. Next: PNotifyData;
  160. Notify: TNotifyEvent;
  161. Sender: TObject;
  162. case Boolean of
  163. False: (TimerHandle: TAsyncTimer);
  164. True: (FileHandle: LongInt);
  165. end;
  166. procedure EventHandler(Data: Pointer); cdecl;
  167. begin
  168. with PNotifyData(Data)^ do
  169. Notify(Sender);
  170. end;
  171. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  172. begin
  173. New(Result);
  174. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  175. Obj.FFirstNotifyData := Result;
  176. end;
  177. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  178. var
  179. CurData, PrevData, NextData: PNotifyData;
  180. begin
  181. PrevData := nil;
  182. CurData := Obj.FFirstNotifyData;
  183. while Assigned(CurData) do
  184. begin
  185. NextData := CurData^.Next;
  186. if CurData = Data then
  187. if Assigned(PrevData) then
  188. PrevData^.Next := NextData
  189. else
  190. Obj.FFirstNotifyData := NextData;
  191. PrevData := CurData;
  192. CurData := NextData;
  193. end;
  194. Dispose(Data);
  195. end;
  196. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  197. begin
  198. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  199. FErrorCode := AErrorCode;
  200. end;
  201. constructor TEventLoop.Create;
  202. begin
  203. asyncInit(Handle);
  204. end;
  205. destructor TEventLoop.Destroy;
  206. var
  207. NotifyData, NextNotifyData: PNotifyData;
  208. begin
  209. asyncFree(Handle);
  210. NotifyData := FFirstNotifyData;
  211. while Assigned(NotifyData) do
  212. begin
  213. NextNotifyData := NotifyData^.Next;
  214. Dispose(NotifyData);
  215. NotifyData := NextNotifyData;
  216. end;
  217. end;
  218. function TEventLoop.Handle: TAsyncHandle;
  219. begin
  220. Result := TAsyncHandle(Self);
  221. end;
  222. procedure TEventLoop.Run;
  223. begin
  224. asyncRun(Handle);
  225. end;
  226. procedure TEventLoop.Break;
  227. begin
  228. asyncBreak(Handle);
  229. end;
  230. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  231. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  232. begin
  233. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  234. end;
  235. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  236. begin
  237. asyncRemoveTimer(Handle, ATimer);
  238. end;
  239. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  240. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  241. var
  242. UserData: PNotifyData;
  243. begin
  244. UserData := AddNotifyData(Self);
  245. UserData^.Notify := ANotify;
  246. UserData^.Sender := ASender;
  247. UserData^.TimerHandle :=
  248. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  249. end;
  250. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  251. var
  252. Data: PNotifyData;
  253. begin
  254. Data := PNotifyData(AHandle);
  255. asyncRemoveTimer(Handle, Data^.TimerHandle);
  256. FreeNotifyData(Self, Data);
  257. end;
  258. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  259. AUserData: Pointer);
  260. begin
  261. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  262. end;
  263. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  264. begin
  265. asyncClearIOCallback(Handle, AHandle);
  266. end;
  267. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  268. ASender: TObject): Pointer;
  269. var
  270. UserData: PNotifyData;
  271. ResultCode: TAsyncResult;
  272. begin
  273. UserData := AddNotifyData(Self);
  274. UserData^.Notify := ANotify;
  275. UserData^.Sender := ASender;
  276. UserData^.FileHandle := AHandle;
  277. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  278. if ResultCode <> asyncOK then
  279. begin
  280. FreeNotifyData(Self, UserData);
  281. raise EAsyncError.Create(ResultCode);
  282. end else
  283. Result := UserData;
  284. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  285. end;
  286. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  287. var
  288. Data: PNotifyData;
  289. begin
  290. Data := PNotifyData(AHandle);
  291. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  292. asyncClearIOCallback(Handle, Data^.FileHandle);
  293. FreeNotifyData(Self, Data);
  294. end;
  295. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  296. AUserData: Pointer);
  297. begin
  298. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  299. ACallback, AUserData));
  300. end;
  301. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  302. begin
  303. asyncClearDataAvailableCallback(Handle, AHandle);
  304. end;
  305. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  306. ASender: TObject): Pointer;
  307. var
  308. UserData: PNotifyData;
  309. ResultCode: TAsyncResult;
  310. begin
  311. UserData := AddNotifyData(Self);
  312. UserData^.Notify := ANotify;
  313. UserData^.Sender := ASender;
  314. UserData^.FileHandle := AHandle;
  315. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  316. @EventHandler, UserData);
  317. if ResultCode <> asyncOK then
  318. begin
  319. FreeNotifyData(Self, UserData);
  320. raise EAsyncError.Create(ResultCode);
  321. end else
  322. Result := UserData;
  323. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  324. end;
  325. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  326. var
  327. Data: PNotifyData;
  328. begin
  329. Data := PNotifyData(AHandle);
  330. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  331. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  332. FreeNotifyData(Self, Data);
  333. end;
  334. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  335. AUserData: Pointer);
  336. begin
  337. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  338. end;
  339. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  340. begin
  341. asyncClearCanWriteCallback(Handle, AHandle);
  342. end;
  343. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  344. ASender: TObject): Pointer;
  345. var
  346. UserData: PNotifyData;
  347. ResultCode: TAsyncResult;
  348. begin
  349. UserData := AddNotifyData(Self);
  350. UserData^.Notify := ANotify;
  351. UserData^.Sender := ASender;
  352. UserData^.FileHandle := AHandle;
  353. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  354. @EventHandler, UserData);
  355. if ResultCode <> asyncOK then
  356. begin
  357. FreeNotifyData(Self, UserData);
  358. raise EAsyncError.Create(ResultCode);
  359. end else
  360. Result := UserData;
  361. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  362. end;
  363. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  364. var
  365. Data: PNotifyData;
  366. begin
  367. Data := PNotifyData(AHandle);
  368. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  369. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  370. FreeNotifyData(Self, Data);
  371. end;
  372. class function TEventLoop.TimerTicks: Int64;
  373. begin
  374. Result := asyncGetTicks;
  375. end;
  376. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  377. begin
  378. if AResultCode <> asyncOK then
  379. raise EAsyncError.Create(AResultCode);
  380. end;
  381. function TEventLoop.GetIsRunning: Boolean;
  382. begin
  383. Result := asyncIsRunning(Handle);
  384. end;
  385. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  386. begin
  387. if IsRunning then
  388. begin
  389. if not AIsRunning then
  390. Run;
  391. end else
  392. if AIsRunning then
  393. Break;
  394. end;
  395. // -------------------------------------------------------------------
  396. // TGenericLineReader
  397. // -------------------------------------------------------------------
  398. destructor TGenericLineReader.Destroy;
  399. begin
  400. if Assigned(RealBuffer) then
  401. begin
  402. FreeMem(RealBuffer);
  403. RealBuffer := nil;
  404. end;
  405. inherited Destroy;
  406. end;
  407. procedure TGenericLineReader.Run;
  408. var
  409. NewData: array[0..1023] of Byte;
  410. p: PChar;
  411. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  412. line: String;
  413. FirstRun: Boolean;
  414. begin
  415. FirstRun := True;
  416. while True do
  417. begin
  418. BytesRead := Read(NewData, SizeOf(NewData));
  419. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  420. if BytesRead <= 0 then begin
  421. if FirstRun then
  422. NoData;
  423. break;
  424. end;
  425. FirstRun := False;
  426. OldBufSize := FBytesInBuffer;
  427. // Append the new received data to the read buffer
  428. Inc(FBytesInBuffer, BytesRead);
  429. ReallocMem(RealBuffer, FBytesInBuffer);
  430. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  431. {Process all potential lines in the current buffer. Attention: FBuffer and
  432. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  433. within the FOnLine handler!}
  434. LastEndOfLine := 0;
  435. if OldBufSize > 0 then
  436. i := OldBufSize - 1
  437. else
  438. i := 0;
  439. CurBytesInBuffer := FBytesInBuffer;
  440. while i <= CurBytesInBuffer - 2 do
  441. begin
  442. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  443. begin
  444. LineLength := i - LastEndOfLine;
  445. SetLength(line, LineLength);
  446. if LineLength > 0 then
  447. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  448. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  449. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  450. Inc(i);
  451. LastEndOfLine := i + 1;
  452. if Assigned(FOnLine) then begin
  453. FBuffer := RealBuffer + LastEndOfLine;
  454. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  455. FOnLine(line);
  456. // Check if <this> has been destroyed by FOnLine:
  457. if not Assigned(FBuffer) then exit;
  458. end;
  459. end;
  460. Inc(i);
  461. end;
  462. FBytesInBuffer := CurBytesInBuffer;
  463. if LastEndOfLine > 0 then
  464. begin
  465. // Remove all processed lines from the buffer
  466. Dec(FBytesInBuffer, LastEndOfLine);
  467. GetMem(p, FBytesInBuffer);
  468. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  469. if Assigned(RealBuffer) then
  470. FreeMem(RealBuffer);
  471. RealBuffer := p;
  472. end;
  473. FBuffer := RealBuffer;
  474. end;
  475. end;
  476. // -------------------------------------------------------------------
  477. // TAsyncStreamLineReader
  478. // -------------------------------------------------------------------
  479. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  480. AStream: THandleStream);
  481. begin
  482. Self.Create(AEventLoop, AStream, AStream);
  483. end;
  484. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  485. ADataStream: TStream; ABlockingStream: THandleStream);
  486. begin
  487. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  488. inherited Create;
  489. FEventLoop := AEventLoop;
  490. FDataStream := ADataStream;
  491. FBlockingStream := ABlockingStream;
  492. NotifyHandle := EventLoop.SetDataAvailableNotify(
  493. FBlockingStream.Handle, @StreamDataAvailable, nil);
  494. end;
  495. destructor TAsyncStreamLineReader.Destroy;
  496. begin
  497. if Assigned(NotifyHandle) then
  498. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  499. inherited Destroy;
  500. end;
  501. procedure TAsyncStreamLineReader.StopAndFree;
  502. begin
  503. DoStopAndFree := True;
  504. end;
  505. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  506. begin
  507. Result := FDataStream.Read(ABuffer, count);
  508. end;
  509. procedure TAsyncStreamLineReader.NoData;
  510. var
  511. s: String;
  512. begin
  513. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin
  514. if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin
  515. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  516. Dec(FBytesInBuffer);
  517. SetLength(s, FBytesInBuffer);
  518. Move(FBuffer^, s[1], FBytesInBuffer);
  519. FOnLine(s);
  520. end;
  521. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  522. NotifyHandle := nil;
  523. if Assigned(FOnEOF) then
  524. FOnEOF(Self);
  525. end;
  526. end;
  527. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  528. begin
  529. Run;
  530. if DoStopAndFree then
  531. Free;
  532. end;
  533. // -------------------------------------------------------------------
  534. // TWriteBuffer
  535. // -------------------------------------------------------------------
  536. procedure TWriteBuffer.BufferEmpty;
  537. begin
  538. if Assigned(FOnBufferEmpty) then
  539. FOnBufferEmpty(Self);
  540. end;
  541. constructor TWriteBuffer.Create;
  542. begin
  543. inherited Create;
  544. FBuffer := nil;
  545. FBytesInBuffer := 0;
  546. EndOfLineMarker := #10;
  547. end;
  548. destructor TWriteBuffer.Destroy;
  549. begin
  550. if Assigned(FBuffer) then
  551. FreeMem(FBuffer);
  552. inherited Destroy;
  553. end;
  554. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  555. begin
  556. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  557. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  558. Result := FBytesInBuffer
  559. else
  560. // !!!: No i18n for this string - solve this problem in the FCL?!?
  561. raise EStreamError.Create('Invalid stream operation');
  562. end;
  563. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  564. begin
  565. ReallocMem(FBuffer, FBytesInBuffer + Count);
  566. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  567. Inc(FBytesInBuffer, Count);
  568. WantWrite;
  569. Result := Count;
  570. end;
  571. procedure TWriteBuffer.WriteLine(const line: String);
  572. var
  573. s: String;
  574. begin
  575. s := line + EndOfLineMarker;
  576. WriteBuffer(s[1], Length(s));
  577. end;
  578. procedure TWriteBuffer.Run;
  579. var
  580. CurStart, Written: Integer;
  581. NewBuf: PChar;
  582. Failed: Boolean;
  583. begin
  584. CurStart := 0;
  585. Failed := True;
  586. repeat
  587. if FBytesInBuffer = 0 then
  588. begin
  589. BufferEmpty;
  590. exit;
  591. end;
  592. Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  593. if Written > 0 then
  594. begin
  595. Inc(CurStart, Written);
  596. Failed := False;
  597. GetMem(NewBuf, FBytesInBuffer - CurStart);
  598. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  599. FreeMem(FBuffer);
  600. FBuffer := NewBuf;
  601. Dec(FBytesInBuffer, CurStart);
  602. end;
  603. until Written <= 0;
  604. if Failed then
  605. WritingFailed;
  606. end;
  607. // -------------------------------------------------------------------
  608. // TAsyncWriteStream
  609. // -------------------------------------------------------------------
  610. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  611. begin
  612. Result := FDataStream.Write(ABuffer, count);
  613. end;
  614. procedure TAsyncWriteStream.WritingFailed;
  615. begin
  616. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  617. begin
  618. EventLoop.ClearCanWriteNotify(NotifyHandle);
  619. NotifyHandle := nil;
  620. end;
  621. end;
  622. procedure TAsyncWriteStream.WantWrite;
  623. begin
  624. if not Assigned(NotifyHandle) then
  625. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  626. @CanWrite, nil);
  627. end;
  628. procedure TAsyncWriteStream.BufferEmpty;
  629. begin
  630. if Assigned(NotifyHandle) then
  631. begin
  632. EventLoop.ClearCanWriteNotify(NotifyHandle);
  633. NotifyHandle := nil;
  634. end;
  635. inherited BufferEmpty;
  636. end;
  637. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  638. begin
  639. Run;
  640. end;
  641. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  642. AStream: THandleStream);
  643. begin
  644. Self.Create(AEventLoop, AStream, AStream);
  645. end;
  646. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  647. ADataStream: TStream; ABlockingStream: THandleStream);
  648. begin
  649. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  650. inherited Create;
  651. FEventLoop := AEventLoop;
  652. FDataStream := ADataStream;
  653. FBlockingStream := ABlockingStream;
  654. end;
  655. destructor TAsyncWriteStream.Destroy;
  656. begin
  657. if Assigned(NotifyHandle) then
  658. EventLoop.ClearCanWriteNotify(NotifyHandle);
  659. inherited Destroy;
  660. end;
  661. end.
  662. {
  663. $Log$
  664. Revision 1.1 2003-03-17 22:25:32 michael
  665. + Async moved from package to FCL
  666. Revision 1.3 2002/09/15 15:45:38 sg
  667. * Added stream line reader classes
  668. Revision 1.2 2002/09/07 15:42:57 peter
  669. * old logs removed and tabs fixed
  670. Revision 1.1 2002/01/29 17:55:02 peter
  671. * splitted to base and extra
  672. }