fpasync.inc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. {
  2. $Id$
  3. fpAsync: Asynchronous event management for Free Pascal
  4. Copyright (C) 2001-2002 by
  5. Areca Systems GmbH / Sebastian Guenther, [email protected]
  6. Common implementation
  7. See the file COPYING.FPC, included in this distribution,
  8. for details about the copyright.
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  12. }
  13. type
  14. PNotifyData = ^TNotifyData;
  15. TNotifyData = record
  16. Next: PNotifyData;
  17. Notify: TNotifyEvent;
  18. Sender: TObject;
  19. case Boolean of
  20. False: (TimerHandle: TAsyncTimer);
  21. True: (FileHandle: LongInt);
  22. end;
  23. procedure EventHandler(Data: Pointer); cdecl;
  24. begin
  25. with PNotifyData(Data)^ do
  26. Notify(Sender);
  27. end;
  28. function AddNotifyData(Obj: TEventLoop): PNotifyData;
  29. begin
  30. New(Result);
  31. Result^.Next := PNotifyData(Obj.FFirstNotifyData);
  32. Obj.FFirstNotifyData := Result;
  33. end;
  34. procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
  35. var
  36. CurData, PrevData, NextData: PNotifyData;
  37. begin
  38. PrevData := nil;
  39. CurData := Obj.FFirstNotifyData;
  40. while Assigned(CurData) do
  41. begin
  42. NextData := CurData^.Next;
  43. if CurData = Data then
  44. if Assigned(PrevData) then
  45. PrevData^.Next := NextData
  46. else
  47. Obj.FFirstNotifyData := NextData;
  48. PrevData := CurData;
  49. CurData := NextData;
  50. end;
  51. Dispose(Data);
  52. end;
  53. constructor EAsyncError.Create(AErrorCode: TAsyncResult);
  54. begin
  55. inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
  56. FErrorCode := AErrorCode;
  57. end;
  58. constructor TEventLoop.Create;
  59. begin
  60. asyncInit(Handle);
  61. end;
  62. destructor TEventLoop.Destroy;
  63. var
  64. NotifyData, NextNotifyData: PNotifyData;
  65. begin
  66. asyncFree(Handle);
  67. NotifyData := FFirstNotifyData;
  68. while Assigned(NotifyData) do
  69. begin
  70. NextNotifyData := NotifyData^.Next;
  71. Dispose(NotifyData);
  72. NotifyData := NextNotifyData;
  73. end;
  74. end;
  75. function TEventLoop.Handle: TAsyncHandle;
  76. begin
  77. Result := TAsyncHandle(Self);
  78. end;
  79. procedure TEventLoop.Run;
  80. begin
  81. asyncRun(Handle);
  82. end;
  83. procedure TEventLoop.Break;
  84. begin
  85. asyncBreak(Handle);
  86. end;
  87. function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
  88. ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
  89. begin
  90. Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
  91. end;
  92. procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
  93. begin
  94. asyncRemoveTimer(Handle, ATimer);
  95. end;
  96. function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
  97. ANotify: TNotifyEvent; ASender: TObject): Pointer;
  98. var
  99. UserData: PNotifyData;
  100. begin
  101. UserData := AddNotifyData(Self);
  102. UserData^.Notify := ANotify;
  103. UserData^.Sender := ASender;
  104. UserData^.TimerHandle :=
  105. asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
  106. end;
  107. procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
  108. var
  109. Data: PNotifyData;
  110. begin
  111. Data := PNotifyData(AHandle);
  112. asyncRemoveTimer(Handle, Data^.TimerHandle);
  113. FreeNotifyData(Self, Data);
  114. end;
  115. procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
  116. AUserData: Pointer);
  117. begin
  118. CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
  119. end;
  120. procedure TEventLoop.ClearIOCallback(AHandle: Integer);
  121. begin
  122. asyncClearIOCallback(Handle, AHandle);
  123. end;
  124. function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
  125. ASender: TObject): Pointer;
  126. var
  127. UserData: PNotifyData;
  128. ResultCode: TAsyncResult;
  129. begin
  130. UserData := AddNotifyData(Self);
  131. UserData^.Notify := ANotify;
  132. UserData^.Sender := ASender;
  133. UserData^.FileHandle := AHandle;
  134. ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
  135. if ResultCode <> asyncOK then
  136. begin
  137. FreeNotifyData(Self, UserData);
  138. raise EAsyncError.Create(ResultCode);
  139. end else
  140. Result := UserData;
  141. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  142. end;
  143. procedure TEventLoop.ClearIONotify(AHandle: Pointer);
  144. var
  145. Data: PNotifyData;
  146. begin
  147. Data := PNotifyData(AHandle);
  148. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  149. asyncClearIOCallback(Handle, Data^.FileHandle);
  150. FreeNotifyData(Self, Data);
  151. end;
  152. procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
  153. AUserData: Pointer);
  154. begin
  155. CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
  156. ACallback, AUserData));
  157. end;
  158. procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
  159. begin
  160. asyncClearDataAvailableCallback(Handle, AHandle);
  161. end;
  162. function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
  163. ASender: TObject): Pointer;
  164. var
  165. UserData: PNotifyData;
  166. ResultCode: TAsyncResult;
  167. begin
  168. UserData := AddNotifyData(Self);
  169. UserData^.Notify := ANotify;
  170. UserData^.Sender := ASender;
  171. UserData^.FileHandle := AHandle;
  172. ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
  173. @EventHandler, UserData);
  174. if ResultCode <> asyncOK then
  175. begin
  176. FreeNotifyData(Self, UserData);
  177. raise EAsyncError.Create(ResultCode);
  178. end else
  179. Result := UserData;
  180. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  181. end;
  182. procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
  183. var
  184. Data: PNotifyData;
  185. begin
  186. Data := PNotifyData(AHandle);
  187. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  188. asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
  189. FreeNotifyData(Self, Data);
  190. end;
  191. procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
  192. AUserData: Pointer);
  193. begin
  194. CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
  195. end;
  196. procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
  197. begin
  198. asyncClearCanWriteCallback(Handle, AHandle);
  199. end;
  200. function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
  201. ASender: TObject): Pointer;
  202. var
  203. UserData: PNotifyData;
  204. ResultCode: TAsyncResult;
  205. begin
  206. UserData := AddNotifyData(Self);
  207. UserData^.Notify := ANotify;
  208. UserData^.Sender := ASender;
  209. UserData^.FileHandle := AHandle;
  210. ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
  211. @EventHandler, UserData);
  212. if ResultCode <> asyncOK then
  213. begin
  214. FreeNotifyData(Self, UserData);
  215. raise EAsyncError.Create(ResultCode);
  216. end else
  217. Result := UserData;
  218. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
  219. end;
  220. procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
  221. var
  222. Data: PNotifyData;
  223. begin
  224. Data := PNotifyData(AHandle);
  225. {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
  226. asyncClearCanWriteCallback(Handle, Data^.FileHandle);
  227. FreeNotifyData(Self, Data);
  228. end;
  229. class function TEventLoop.TimerTicks: Int64;
  230. begin
  231. Result := asyncGetTicks;
  232. end;
  233. procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
  234. begin
  235. if AResultCode <> asyncOK then
  236. raise EAsyncError.Create(AResultCode);
  237. end;
  238. function TEventLoop.GetIsRunning: Boolean;
  239. begin
  240. Result := asyncIsRunning(Handle);
  241. end;
  242. procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
  243. begin
  244. if IsRunning then
  245. begin
  246. if not AIsRunning then
  247. Run;
  248. end else
  249. if AIsRunning then
  250. Break;
  251. end;
  252. // -------------------------------------------------------------------
  253. // TGenericLineReader
  254. // -------------------------------------------------------------------
  255. destructor TGenericLineReader.Destroy;
  256. begin
  257. if Assigned(RealBuffer) then
  258. begin
  259. FreeMem(RealBuffer);
  260. RealBuffer := nil;
  261. end;
  262. inherited Destroy;
  263. end;
  264. procedure TGenericLineReader.Run;
  265. var
  266. NewData: array[0..1023] of Byte;
  267. p: PChar;
  268. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  269. line: String;
  270. FirstRun: Boolean;
  271. begin
  272. FirstRun := True;
  273. while True do
  274. begin
  275. BytesRead := Read(NewData, SizeOf(NewData));
  276. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  277. if BytesRead <= 0 then begin
  278. if FirstRun then
  279. NoData;
  280. break;
  281. end;
  282. FirstRun := False;
  283. OldBufSize := FBytesInBuffer;
  284. // Append the new received data to the read buffer
  285. Inc(FBytesInBuffer, BytesRead);
  286. ReallocMem(RealBuffer, FBytesInBuffer);
  287. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  288. {Process all potential lines in the current buffer. Attention: FBuffer and
  289. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  290. within the FOnLine handler!}
  291. LastEndOfLine := 0;
  292. if OldBufSize > 0 then
  293. i := OldBufSize - 1
  294. else
  295. i := 0;
  296. CurBytesInBuffer := FBytesInBuffer;
  297. while i <= CurBytesInBuffer - 2 do
  298. begin
  299. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  300. begin
  301. LineLength := i - LastEndOfLine;
  302. SetLength(line, LineLength);
  303. if LineLength > 0 then
  304. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  305. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  306. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  307. Inc(i);
  308. LastEndOfLine := i + 1;
  309. if Assigned(FOnLine) then begin
  310. FBuffer := RealBuffer + LastEndOfLine;
  311. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  312. FOnLine(line);
  313. // Check if <this> has been destroyed by FOnLine:
  314. if not Assigned(FBuffer) then exit;
  315. end;
  316. end;
  317. Inc(i);
  318. end;
  319. FBytesInBuffer := CurBytesInBuffer;
  320. if LastEndOfLine > 0 then
  321. begin
  322. // Remove all processed lines from the buffer
  323. Dec(FBytesInBuffer, LastEndOfLine);
  324. GetMem(p, FBytesInBuffer);
  325. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  326. if Assigned(RealBuffer) then
  327. FreeMem(RealBuffer);
  328. RealBuffer := p;
  329. end;
  330. FBuffer := RealBuffer;
  331. end;
  332. end;
  333. // -------------------------------------------------------------------
  334. // TAsyncStreamLineReader
  335. // -------------------------------------------------------------------
  336. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  337. AStream: THandleStream);
  338. begin
  339. Self.Create(AEventLoop, AStream, AStream);
  340. end;
  341. constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
  342. ADataStream: TStream; ABlockingStream: THandleStream);
  343. begin
  344. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  345. inherited Create;
  346. FEventLoop := AEventLoop;
  347. FDataStream := ADataStream;
  348. FBlockingStream := ABlockingStream;
  349. NotifyHandle := EventLoop.SetDataAvailableNotify(
  350. FBlockingStream.Handle, @StreamDataAvailable, nil);
  351. end;
  352. destructor TAsyncStreamLineReader.Destroy;
  353. begin
  354. if Assigned(NotifyHandle) then
  355. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  356. inherited Destroy;
  357. end;
  358. procedure TAsyncStreamLineReader.StopAndFree;
  359. begin
  360. DoStopAndFree := True;
  361. end;
  362. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  363. begin
  364. Result := FDataStream.Read(ABuffer, count);
  365. end;
  366. procedure TAsyncStreamLineReader.NoData;
  367. var
  368. s: String;
  369. begin
  370. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin
  371. if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin
  372. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  373. Dec(FBytesInBuffer);
  374. SetLength(s, FBytesInBuffer);
  375. Move(FBuffer^, s[1], FBytesInBuffer);
  376. FOnLine(s);
  377. end;
  378. EventLoop.ClearDataAvailableNotify(NotifyHandle);
  379. NotifyHandle := nil;
  380. if Assigned(FOnEOF) then
  381. FOnEOF(Self);
  382. end;
  383. end;
  384. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  385. begin
  386. Run;
  387. if DoStopAndFree then
  388. Free;
  389. end;
  390. // -------------------------------------------------------------------
  391. // TWriteBuffer
  392. // -------------------------------------------------------------------
  393. procedure TWriteBuffer.BufferEmpty;
  394. begin
  395. if Assigned(FOnBufferEmpty) then
  396. FOnBufferEmpty(Self);
  397. end;
  398. constructor TWriteBuffer.Create;
  399. begin
  400. inherited Create;
  401. FBuffer := nil;
  402. FBytesInBuffer := 0;
  403. EndOfLineMarker := #10;
  404. end;
  405. destructor TWriteBuffer.Destroy;
  406. begin
  407. if Assigned(FBuffer) then
  408. FreeMem(FBuffer);
  409. inherited Destroy;
  410. end;
  411. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  412. begin
  413. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  414. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  415. Result := FBytesInBuffer
  416. else
  417. // !!!: No i18n for this string - solve this problem in the FCL?!?
  418. raise EStreamError.Create('Invalid stream operation');
  419. end;
  420. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  421. begin
  422. ReallocMem(FBuffer, FBytesInBuffer + Count);
  423. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  424. Inc(FBytesInBuffer, Count);
  425. WantWrite;
  426. Result := Count;
  427. end;
  428. procedure TWriteBuffer.WriteLine(const line: String);
  429. var
  430. s: String;
  431. begin
  432. s := line + EndOfLineMarker;
  433. WriteBuffer(s[1], Length(s));
  434. end;
  435. procedure TWriteBuffer.Run;
  436. var
  437. CurStart, Written: Integer;
  438. NewBuf: PChar;
  439. Failed: Boolean;
  440. begin
  441. CurStart := 0;
  442. Failed := True;
  443. repeat
  444. if FBytesInBuffer = 0 then
  445. begin
  446. BufferEmpty;
  447. exit;
  448. end;
  449. Written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  450. if Written > 0 then
  451. begin
  452. Inc(CurStart, Written);
  453. Failed := False;
  454. GetMem(NewBuf, FBytesInBuffer - CurStart);
  455. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  456. FreeMem(FBuffer);
  457. FBuffer := NewBuf;
  458. Dec(FBytesInBuffer, CurStart);
  459. end;
  460. until Written <= 0;
  461. if Failed then
  462. WritingFailed;
  463. end;
  464. // -------------------------------------------------------------------
  465. // TAsyncWriteStream
  466. // -------------------------------------------------------------------
  467. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  468. begin
  469. Result := FDataStream.Write(ABuffer, count);
  470. end;
  471. procedure TAsyncWriteStream.WritingFailed;
  472. begin
  473. if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
  474. begin
  475. EventLoop.ClearCanWriteNotify(NotifyHandle);
  476. NotifyHandle := nil;
  477. end;
  478. end;
  479. procedure TAsyncWriteStream.WantWrite;
  480. begin
  481. if not Assigned(NotifyHandle) then
  482. NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
  483. @CanWrite, nil);
  484. end;
  485. procedure TAsyncWriteStream.BufferEmpty;
  486. begin
  487. if Assigned(NotifyHandle) then
  488. begin
  489. EventLoop.ClearCanWriteNotify(NotifyHandle);
  490. NotifyHandle := nil;
  491. end;
  492. inherited BufferEmpty;
  493. end;
  494. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  495. begin
  496. Run;
  497. end;
  498. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  499. AStream: THandleStream);
  500. begin
  501. Self.Create(AEventLoop, AStream, AStream);
  502. end;
  503. constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
  504. ADataStream: TStream; ABlockingStream: THandleStream);
  505. begin
  506. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  507. inherited Create;
  508. FEventLoop := AEventLoop;
  509. FDataStream := ADataStream;
  510. FBlockingStream := ABlockingStream;
  511. end;
  512. destructor TAsyncWriteStream.Destroy;
  513. begin
  514. if Assigned(NotifyHandle) then
  515. EventLoop.ClearCanWriteNotify(NotifyHandle);
  516. inherited Destroy;
  517. end;
  518. {
  519. $Log$
  520. Revision 1.4 2002-12-27 15:46:15 peter
  521. * add class to timerticks
  522. Revision 1.3 2002/09/15 15:45:38 sg
  523. * Added stream line reader classes
  524. Revision 1.2 2002/09/07 15:42:57 peter
  525. * old logs removed and tabs fixed
  526. Revision 1.1 2002/01/29 17:55:02 peter
  527. * splitted to base and extra
  528. }