fpasync.pp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2003 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. See the file COPYING.FPC, included in this distribution,
  7. for details about the copyright.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  11. }
  12. unit fpAsync;
  13. {$MODE objfpc}
  14. {$H+}
  15. interface
  16. uses SysUtils, Classes, libasync;
  17. type
  18. TNotifyEvent = procedure(Sender: TObject) of object;
  19. EAsyncError = class(Exception)
  20. private
  21. FErrorCode: TAsyncResult;
  22. public
  23. constructor Create(AErrorCode: TAsyncResult);
  24. property ErrorCode: TAsyncResult read FErrorCode;
  25. end;
  26. TEventLoop = class
  27. private
  28. FData: TAsyncData;
  29. FFirstNotifyData: Pointer;
  30. function GetIsRunning: Boolean;
  31. procedure SetIsRunning(AIsRunning: Boolean);
  32. protected
  33. procedure CheckResult(AResultCode: TAsyncResult);
  34. public
  35. constructor Create;
  36. destructor Destroy; override;
  37. function Handle: TAsyncHandle;
  38. // Main loop control
  39. procedure Run;
  40. procedure Break;
  41. // Timer support
  42. function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  43. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  44. procedure RemoveTimerCallback(ATimer: TAsyncTimer);
  45. function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  46. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  47. procedure RemoveTimerNotify(AHandle: Pointer);
  48. // I/O notification support (for files, sockets etc.)
  49. procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  50. AUserData: Pointer);
  51. procedure ClearIOCallback(AHandle: Integer);
  52. function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  53. ASender: TObject): Pointer;
  54. procedure ClearIONotify(AHandle: Pointer);
  55. procedure SetDataAvailableCallback(AHandle: Integer;
  56. ACallback: TAsyncCallback; AUserData: Pointer);
  57. procedure ClearDataAvailableCallback(AHandle: Integer);
  58. function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  59. ASender: TObject): Pointer;
  60. procedure ClearDataAvailableNotify(AHandle: Pointer);
  61. procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  62. AUserData: Pointer);
  63. procedure ClearCanWriteCallback(AHandle: Integer);
  64. function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  65. ASender: TObject): Pointer;
  66. procedure ClearCanWriteNotify(AHandle: Pointer);
  67. class function TimerTicks: Int64;
  68. // Properties
  69. property IsRunning: Boolean read GetIsRunning write SetIsRunning;
  70. end;
  71. // -------------------------------------------------------------------
  72. // Asynchronous line reader
  73. // -------------------------------------------------------------------
  74. TLineNotify = procedure(const ALine: String) of object;
  75. TGenericLineReader = class
  76. protected
  77. RealBuffer, FBuffer: PChar;
  78. FBytesInBuffer: Integer;
  79. FOnLine: TLineNotify;
  80. DoStopAndFree: Boolean;
  81. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  82. procedure NoData; virtual; abstract;
  83. public
  84. destructor Destroy; override;
  85. procedure Run; // Process as many lines as possible
  86. property Buffer: PChar read FBuffer;
  87. property BytesInBuffer: Integer read FBytesInBuffer;
  88. property OnLine: TLineNotify read FOnLine write FOnLine;
  89. end;
  90. TAsyncStreamLineReader = class(TGenericLineReader)
  91. protected
  92. FEventLoop: TEventLoop;
  93. FDataStream: TStream;
  94. FBlockingStream: THandleStream;
  95. FOnEOF: TNotifyEvent;
  96. NotifyHandle: Pointer;
  97. function Read(var ABuffer; count: Integer): Integer; override;
  98. procedure NoData; override;
  99. procedure StreamDataAvailable(UserData: TObject);
  100. public
  101. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  102. constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
  103. ABlockingStream: THandleStream);
  104. destructor Destroy; override;
  105. procedure StopAndFree; // Destroy instance after run
  106. property EventLoop: TEventLoop read FEventLoop;
  107. property DataStream: TStream read FDataStream;
  108. property BlockingStream: THandleStream read FBlockingStream;
  109. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  110. end;
  111. // -------------------------------------------------------------------
  112. // Asynchronous write buffers
  113. // -------------------------------------------------------------------
  114. TWriteBuffer = class(TStream)
  115. protected
  116. FBuffer: PChar;
  117. FBytesInBuffer: Integer;
  118. FOnBufferEmpty: TNotifyEvent;
  119. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  120. function Write(const ABuffer; Count: LongInt): LongInt; override;
  121. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  122. procedure WritingFailed; virtual; abstract;
  123. procedure WantWrite; virtual; abstract;
  124. procedure BufferEmpty; virtual;
  125. public
  126. EndOfLineMarker: String;
  127. constructor Create;
  128. destructor Destroy; override;
  129. procedure WriteLine(const line: String);
  130. procedure Run; // Write as many data as possible
  131. property BytesInBuffer: Integer read FBytesInBuffer;
  132. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  133. end;
  134. TAsyncWriteStream = class(TWriteBuffer)
  135. protected
  136. FEventLoop: TEventLoop;
  137. FDataStream: TStream;
  138. FBlockingStream: THandleStream;
  139. NotifyHandle: Pointer;
  140. DoStopAndFree: Boolean;
  141. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  142. procedure WritingFailed; override;
  143. procedure WantWrite; override;
  144. procedure BufferEmpty; override;
  145. procedure CanWrite(UserData: TObject);
  146. public
  147. constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
  148. constructor Create(AEventLoop: TEventLoop;
  149. ADataStream: TStream; ABlockingStream: THandleStream);
  150. destructor Destroy; override;
  151. procedure StopAndFree; // Destroy instance after run
  152. property EventLoop: TEventLoop read FEventLoop;
  153. property DataStream: TStream read FDataStream;
  154. property BlockingStream: THandleStream read FBlockingStream;
  155. end;
  156. var
  157. { All data written to a TWriteBuffer or descendant class will be written to
  158. this stream as well: }
  159. fpAsyncWriteBufferDebugStream: TStream;
  160. implementation
  161. type
  162. PNotifyData = ^TNotifyData;
  163. TNotifyData = record
  164. Next: PNotifyData;
  165. Notify: TNotifyEvent;
  166. Sender: TObject;
  167. case Boolean of
  168. False: (TimerHandle: TAsyncTimer);
  169. True: (FileHandle: LongInt);
  170. end;
  171. procedure EventHandler(Data: Pointer); cdecl;
  172. begin
  173. with PNotifyData(Data)^ do
  174. Notify(Sender);
  175. end;
  176. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  177. begin
  178. New(Result);
  179. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  180. Obj.FFirstNotifyData := Result;
  181. end;
  182. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  183. var
  184. CurData, PrevData, NextData: PNotifyData;
  185. begin
  186. PrevData := nil;
  187. CurData := Obj.FFirstNotifyData;
  188. while Assigned(CurData) do
  189. begin
  190. NextData := CurData^.Next;
  191. if CurData = Data then
  192. if Assigned(PrevData) then
  193. PrevData^.Next := NextData
  194. else
  195. Obj.FFirstNotifyData := NextData;
  196. PrevData := CurData;
  197. CurData := NextData;
  198. end;
  199. Dispose(Data);
  200. end;
  201. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  202. begin
  203. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  204. FErrorCode := AErrorCode;
  205. end;
  206. constructor TEventLoop.Create;
  207. begin
  208. asyncInit(Handle);
  209. end;
  210. destructor TEventLoop.Destroy;
  211. var
  212. NotifyData, NextNotifyData: PNotifyData;
  213. begin
  214. asyncFree(Handle);
  215. NotifyData := FFirstNotifyData;
  216. while Assigned(NotifyData) do
  217. begin
  218. NextNotifyData := NotifyData^.Next;
  219. Dispose(NotifyData);
  220. NotifyData := NextNotifyData;
  221. end;
  222. end;
  223. function TEventLoop.Handle: TAsyncHandle;
  224. begin
  225. Result := TAsyncHandle(Self);
  226. end;
  227. procedure TEventLoop.Run;
  228. begin
  229. asyncRun(Handle);
  230. end;
  231. procedure TEventLoop.Break;
  232. begin
  233. asyncBreak(Handle);
  234. end;
  235. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  236. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  237. begin
  238. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  239. end;
  240. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  241. begin
  242. asyncRemoveTimer(Handle, ATimer);
  243. end;
  244. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  245. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  246. var
  247. UserData: PNotifyData;
  248. begin
  249. UserData := AddNotifyData(Self);
  250. UserData^.Notify := ANotify;
  251. UserData^.Sender := ASender;
  252. UserData^.TimerHandle :=
  253. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  254. end;
  255. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  256. var
  257. Data: PNotifyData;
  258. begin
  259. Data := PNotifyData(AHandle);
  260. asyncRemoveTimer(Handle, Data^.TimerHandle);
  261. FreeNotifyData(Self, Data);
  262. end;
  263. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  264. AUserData: Pointer);
  265. begin
  266. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  267. end;
  268. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  269. begin
  270. asyncClearIOCallback(Handle, AHandle);
  271. end;
  272. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  273. ASender: TObject): Pointer;
  274. var
  275. UserData: PNotifyData;
  276. ResultCode: TAsyncResult;
  277. begin
  278. UserData := AddNotifyData(Self);
  279. UserData^.Notify := ANotify;
  280. UserData^.Sender := ASender;
  281. UserData^.FileHandle := AHandle;
  282. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  283. if ResultCode <> asyncOK then
  284. begin
  285. FreeNotifyData(Self, UserData);
  286. raise EAsyncError.Create(ResultCode);
  287. end else
  288. Result := UserData;
  289. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  290. end;
  291. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  292. var
  293. Data: PNotifyData;
  294. begin
  295. Data := PNotifyData(AHandle);
  296. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  297. asyncClearIOCallback(Handle, Data^.FileHandle);
  298. FreeNotifyData(Self, Data);
  299. end;
  300. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  301. AUserData: Pointer);
  302. begin
  303. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  304. ACallback, AUserData));
  305. end;
  306. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  307. begin
  308. asyncClearDataAvailableCallback(Handle, AHandle);
  309. end;
  310. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  311. ASender: TObject): Pointer;
  312. var
  313. UserData: PNotifyData;
  314. ResultCode: TAsyncResult;
  315. begin
  316. UserData := AddNotifyData(Self);
  317. UserData^.Notify := ANotify;
  318. UserData^.Sender := ASender;
  319. UserData^.FileHandle := AHandle;
  320. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  321. @EventHandler, UserData);
  322. if ResultCode <> asyncOK then
  323. begin
  324. FreeNotifyData(Self, UserData);
  325. raise EAsyncError.Create(ResultCode);
  326. end else
  327. Result := UserData;
  328. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  329. end;
  330. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  331. var
  332. Data: PNotifyData;
  333. begin
  334. Data := PNotifyData(AHandle);
  335. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  336. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  337. FreeNotifyData(Self, Data);
  338. end;
  339. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  340. AUserData: Pointer);
  341. begin
  342. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  343. end;
  344. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  345. begin
  346. asyncClearCanWriteCallback(Handle, AHandle);
  347. end;
  348. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  349. ASender: TObject): Pointer;
  350. var
  351. UserData: PNotifyData;
  352. ResultCode: TAsyncResult;
  353. begin
  354. UserData := AddNotifyData(Self);
  355. UserData^.Notify := ANotify;
  356. UserData^.Sender := ASender;
  357. UserData^.FileHandle := AHandle;
  358. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  359. @EventHandler, UserData);
  360. if ResultCode <> asyncOK then
  361. begin
  362. FreeNotifyData(Self, UserData);
  363. raise EAsyncError.Create(ResultCode);
  364. end else
  365. Result := UserData;
  366. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  367. end;
  368. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  369. var
  370. Data: PNotifyData;
  371. begin
  372. Data := PNotifyData(AHandle);
  373. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  374. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  375. FreeNotifyData(Self, Data);
  376. end;
  377. class function TEventLoop.TimerTicks: Int64;
  378. begin
  379. Result := asyncGetTicks;
  380. end;
  381. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  382. begin
  383. if AResultCode <> asyncOK then
  384. raise EAsyncError.Create(AResultCode);
  385. end;
  386. function TEventLoop.GetIsRunning: Boolean;
  387. begin
  388. Result := asyncIsRunning(Handle);
  389. end;
  390. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  391. begin
  392. if IsRunning then
  393. begin
  394. if not AIsRunning then
  395. Run;
  396. end else
  397. if AIsRunning then
  398. Break;
  399. end;
  400. // -------------------------------------------------------------------
  401. // TGenericLineReader
  402. // -------------------------------------------------------------------
  403. destructor TGenericLineReader.Destroy;
  404. begin
  405. if Assigned(RealBuffer) then
  406. begin
  407. FreeMem(RealBuffer);
  408. RealBuffer := nil;
  409. end;
  410. inherited Destroy;
  411. end;
  412. procedure TGenericLineReader.Run;
  413. var
  414. NewData: array[0..1023] of Byte;
  415. p: PChar;
  416. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  417. line: String;
  418. FirstRun: Boolean;
  419. begin
  420. FirstRun := True;
  421. while True do
  422. begin
  423. BytesRead := Read(NewData, SizeOf(NewData));
  424. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  425. if BytesRead <= 0 then begin
  426. if FirstRun then
  427. NoData;
  428. break;
  429. end;
  430. FirstRun := False;
  431. OldBufSize := FBytesInBuffer;
  432. // Append the new received data to the read buffer
  433. Inc(FBytesInBuffer, BytesRead);
  434. ReallocMem(RealBuffer, FBytesInBuffer);
  435. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  436. {Process all potential lines in the current buffer. Attention: FBuffer and
  437. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  438. within the FOnLine handler!}
  439. LastEndOfLine := 0;
  440. if OldBufSize > 0 then
  441. i := OldBufSize - 1
  442. else
  443. i := 0;
  444. CurBytesInBuffer := FBytesInBuffer;
  445. while i <= CurBytesInBuffer - 2 do
  446. begin
  447. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  448. begin
  449. LineLength := i - LastEndOfLine;
  450. SetLength(line, LineLength);
  451. if LineLength > 0 then
  452. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  453. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  454. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  455. Inc(i);
  456. LastEndOfLine := i + 1;
  457. if Assigned(FOnLine) then begin
  458. FBuffer := RealBuffer + LastEndOfLine;
  459. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  460. FOnLine(line);
  461. // Check if <this> has been destroyed by FOnLine:
  462. if DoStopAndFree then
  463. exit;
  464. end;
  465. end;
  466. Inc(i);
  467. end;
  468. FBytesInBuffer := CurBytesInBuffer;
  469. if LastEndOfLine > 0 then
  470. begin
  471. // Remove all processed lines from the buffer
  472. Dec(FBytesInBuffer, LastEndOfLine);
  473. GetMem(p, FBytesInBuffer);
  474. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  475. if Assigned(RealBuffer) then
  476. FreeMem(RealBuffer);
  477. RealBuffer := p;
  478. end;
  479. FBuffer := RealBuffer;
  480. end;
  481. end;
  482. // -------------------------------------------------------------------
  483. // TAsyncStreamLineReader
  484. // -------------------------------------------------------------------
  485. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  486. AStream: THandleStream);
  487. begin
  488. Self.Create(AEventLoop, AStream, AStream);
  489. end;
  490. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  491. ADataStream: TStream; ABlockingStream: THandleStream);
  492. begin
  493. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  494. inherited Create;
  495. FEventLoop := AEventLoop;
  496. FDataStream := ADataStream;
  497. FBlockingStream := ABlockingStream;
  498. NotifyHandle := EventLoop.SetDataAvailableNotify(
  499. FBlockingStream.Handle, @StreamDataAvailable, nil);
  500. end;
  501. destructor TAsyncStreamLineReader.Destroy;
  502. begin
  503. inherited Destroy;
  504. end;
  505. procedure TAsyncStreamLineReader.StopAndFree;
  506. begin
  507. if Assigned(NotifyHandle) then
  508. begin
  509. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  510. NotifyHandle := nil;
  511. end;
  512. DoStopAndFree := True;
  513. end;
  514. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  515. begin
  516. Result := FDataStream.Read(ABuffer, count);
  517. end;
  518. procedure TAsyncStreamLineReader.NoData;
  519. var
  520. s: String;
  521. begin
  522. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  523. begin
  524. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  525. begin
  526. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  527. Dec(FBytesInBuffer);
  528. SetLength(s, FBytesInBuffer);
  529. Move(FBuffer^, s[1], FBytesInBuffer);
  530. FOnLine(s);
  531. end;
  532. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  533. NotifyHandle := nil;
  534. if Assigned(FOnEOF) then
  535. FOnEOF(Self);
  536. end;
  537. end;
  538. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  539. begin
  540. Run;
  541. if DoStopAndFree then
  542. Free;
  543. end;
  544. // -------------------------------------------------------------------
  545. // TWriteBuffer
  546. // -------------------------------------------------------------------
  547. procedure TWriteBuffer.BufferEmpty;
  548. begin
  549. if Assigned(FOnBufferEmpty) then
  550. FOnBufferEmpty(Self);
  551. end;
  552. constructor TWriteBuffer.Create;
  553. begin
  554. inherited Create;
  555. FBuffer := nil;
  556. FBytesInBuffer := 0;
  557. EndOfLineMarker := #10;
  558. end;
  559. destructor TWriteBuffer.Destroy;
  560. begin
  561. if Assigned(FBuffer) then
  562. FreeMem(FBuffer);
  563. inherited Destroy;
  564. end;
  565. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  566. begin
  567. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  568. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  569. Result := FBytesInBuffer
  570. else
  571. // !!!: No i18n for this string - solve this problem in the FCL?!?
  572. raise EStreamError.Create('Invalid stream operation');
  573. end;
  574. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  575. begin
  576. ReallocMem(FBuffer, FBytesInBuffer + Count);
  577. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  578. Inc(FBytesInBuffer, Count);
  579. if Assigned(fpAsyncWriteBufferDebugStream) then
  580. fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
  581. WantWrite;
  582. Result := Count;
  583. end;
  584. procedure TWriteBuffer.WriteLine(const line: String);
  585. var
  586. s: String;
  587. begin
  588. s := line + EndOfLineMarker;
  589. WriteBuffer(s[1], Length(s));
  590. end;
  591. procedure TWriteBuffer.Run;
  592. var
  593. CurStart, Written: Integer;
  594. NewBuf: PChar;
  595. Failed: Boolean;
  596. begin
  597. CurStart := 0;
  598. Failed := True;
  599. repeat
  600. if FBytesInBuffer = 0 then
  601. begin
  602. BufferEmpty;
  603. exit;
  604. end;
  605. Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  606. if Written > 0 then
  607. begin
  608. Inc(CurStart, Written);
  609. Failed := False;
  610. GetMem(NewBuf, FBytesInBuffer - CurStart);
  611. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  612. FreeMem(FBuffer);
  613. FBuffer := NewBuf;
  614. Dec(FBytesInBuffer, CurStart);
  615. end;
  616. until Written <= 0;
  617. if Failed then
  618. WritingFailed;
  619. end;
  620. // -------------------------------------------------------------------
  621. // TAsyncWriteStream
  622. // -------------------------------------------------------------------
  623. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  624. begin
  625. Result := FDataStream.Write(ABuffer, count);
  626. end;
  627. procedure TAsyncWriteStream.WritingFailed;
  628. begin
  629. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  630. begin
  631. EventLoop.ClearCanWriteNotify(NotifyHandle);
  632. NotifyHandle := nil;
  633. end;
  634. end;
  635. procedure TAsyncWriteStream.WantWrite;
  636. begin
  637. if not Assigned(NotifyHandle) then
  638. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  639. @CanWrite, nil);
  640. end;
  641. procedure TAsyncWriteStream.BufferEmpty;
  642. begin
  643. if Assigned(NotifyHandle) then
  644. begin
  645. EventLoop.ClearCanWriteNotify(NotifyHandle);
  646. NotifyHandle := nil;
  647. end;
  648. inherited BufferEmpty;
  649. end;
  650. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  651. begin
  652. Run;
  653. if DoStopAndFree then
  654. Free;
  655. end;
  656. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  657. AStream: THandleStream);
  658. begin
  659. Self.Create(AEventLoop, AStream, AStream);
  660. end;
  661. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  662. ADataStream: TStream; ABlockingStream: THandleStream);
  663. begin
  664. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  665. inherited Create;
  666. FEventLoop := AEventLoop;
  667. FDataStream := ADataStream;
  668. FBlockingStream := ABlockingStream;
  669. end;
  670. destructor TAsyncWriteStream.Destroy;
  671. begin
  672. if Assigned(NotifyHandle) then
  673. EventLoop.ClearCanWriteNotify(NotifyHandle);
  674. inherited Destroy;
  675. end;
  676. procedure TAsyncWriteStream.StopAndFree;
  677. begin
  678. if Assigned(NotifyHandle) then
  679. begin
  680. EventLoop.ClearCanWriteNotify(NotifyHandle);
  681. NotifyHandle := nil;
  682. end;
  683. DoStopAndFree := True;
  684. end;
  685. end.
  686. {
  687. $Log$
  688. Revision 1.3 2003-06-25 08:41:01 sg
  689. * Fixed serious bug in TGenericLineReader: When the reader gets killed
  690. via StopAndFree during an OnLine callback, the reader now will
  691. immediately stop reading, so that the owner of the reader can process
  692. the remaining buffer
  693. Revision 1.2 2002/04/25 19:12:27 sg
  694. * Added ability to write all write buffer data to an debug stream
  695. * Added TAsyncWriteStream.StopAndFree
  696. Revision 1.1 2003/03/17 22:25:32 michael
  697. + Async moved from package to FCL
  698. Revision 1.3 2002/09/15 15:45:38 sg
  699. * Added stream line reader classes
  700. Revision 1.2 2002/09/07 15:42:57 peter
  701. * old logs removed and tabs fixed
  702. Revision 1.1 2002/01/29 17:55:02 peter
  703. * splitted to base and extra
  704. }