asyncio.pp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. {
  2. $Id$
  3. Async_IO: Mananging class for asynchronous input/output
  4. Copyright (C) 2000 by Sebastian Guenther ([email protected])
  5. See the file COPYING.FPC, included in this distribution,
  6. for details about the copyright.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  10. }
  11. {$MODE objfpc}
  12. {$H+}
  13. unit AsyncIO;
  14. interface
  15. {$i asyncioh.inc}
  16. type
  17. TAsyncIOManager = class;
  18. TAsyncIONotify = procedure(UserData: TObject) of object;
  19. TAsyncIONotifyInfo = record
  20. Method: TAsyncIONotify;
  21. UserData: TObject;
  22. end;
  23. // -------------------------------------------------------------------
  24. // TAsyncIOManager
  25. // -------------------------------------------------------------------
  26. TAsyncIOManager = class
  27. protected
  28. DoBreak: Boolean;
  29. IOdata : TIOdata;
  30. ReadNotifies, WriteNotifies: array[0..MaxHandle] of TAsyncIONotifyInfo;
  31. HighestHandle: Integer;
  32. FTimeout: Integer;
  33. TimeoutNotify: TAsyncIONotifyInfo;
  34. procedure CalcHighestHandle(max: Integer);
  35. procedure ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  36. function GetHandleAsync(AHandle: Integer): Boolean;
  37. procedure SetHandleAsync(AHandle: Integer; AValue: Boolean);
  38. public
  39. constructor Create;
  40. procedure Run;
  41. procedure BreakRun;
  42. procedure SetReadHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  43. procedure ClearReadHandler(AHandle: Integer);
  44. procedure SetWriteHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  45. procedure ClearWriteHandler(AHandle: Integer);
  46. procedure SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  47. procedure ClearTimeoutHandler;
  48. property Timeout: Integer read FTimeout write FTimeout;
  49. property HandleAsync[AHandle: Integer]: Boolean read GetHandleAsync write SetHandleAsync;
  50. end;
  51. // -------------------------------------------------------------------
  52. // Line reader classes
  53. // -------------------------------------------------------------------
  54. TLineNotify = procedure(const line: String) of object;
  55. TGenericLineReader = class
  56. protected
  57. RealBuffer, FBuffer: PChar;
  58. FBytesInBuffer: Integer;
  59. FOnLine: TLineNotify;
  60. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  61. procedure NoData; virtual; abstract;
  62. public
  63. destructor Destroy; override;
  64. procedure Run; // Process as many lines as possible
  65. property Buffer: PChar read FBuffer;
  66. property BytesInBuffer: Integer read FBytesInBuffer;
  67. property OnLine: TLineNotify read FOnLine write FOnLine;
  68. end;
  69. TAsyncStreamLineReader = class(TGenericLineReader)
  70. protected
  71. FManager: TAsyncIOManager;
  72. FDataStream: TStream;
  73. FBlockingStream: THandleStream;
  74. FOnEOF: TNotifyEvent;
  75. function Read(var ABuffer; count: Integer): Integer; override;
  76. procedure NoData; override;
  77. procedure StreamDataAvailable(UserData: TObject);
  78. public
  79. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  80. constructor Create(AManager: TAsyncIOManager;
  81. ADataStream: TStream; ABlockingStream: THandleStream);
  82. destructor Destroy; override;
  83. property DataStream: TStream read FDataStream;
  84. property BlockingStream: THandleStream read FBlockingStream;
  85. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  86. end;
  87. // -------------------------------------------------------------------
  88. // TWriteBuffer
  89. // -------------------------------------------------------------------
  90. TWriteBuffer = class(TStream)
  91. protected
  92. FBuffer: PChar;
  93. FBytesInBuffer: Integer;
  94. FOnBufferEmpty: TNotifyEvent;
  95. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  96. function Write(const ABuffer; Count: LongInt): LongInt; override;
  97. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  98. procedure WritingFailed; virtual; abstract;
  99. procedure WantWrite; virtual; abstract;
  100. procedure BufferEmpty; virtual;
  101. constructor Create;
  102. public
  103. EndOfLineMarker: String;
  104. destructor Destroy; override;
  105. procedure WriteLine(const line: String);
  106. procedure Run; // Write as many data as possible
  107. property BytesInBuffer: Integer read FBytesInBuffer;
  108. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  109. end;
  110. TAsyncWriteStream = class(TWriteBuffer)
  111. protected
  112. FManager: TAsyncIOManager;
  113. FDataStream: TStream;
  114. FBlockingStream: THandleStream;
  115. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  116. procedure WritingFailed; override;
  117. procedure WantWrite; override;
  118. procedure BufferEmpty; override;
  119. procedure CanWrite(UserData: TObject);
  120. public
  121. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  122. constructor Create(AManager: TAsyncIOManager;
  123. ADataStream: TStream; ABlockingStream: THandleStream);
  124. destructor Destroy; override;
  125. property DataStream: TStream read FDataStream;
  126. property BlockingStream: THandleStream read FBlockingStream;
  127. end;
  128. // ===================================================================
  129. // ===================================================================
  130. implementation
  131. {$i asyncio.inc}
  132. // -------------------------------------------------------------------
  133. // TAsyncIOManager
  134. // -------------------------------------------------------------------
  135. procedure TAsyncIOManager.ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  136. begin
  137. if Assigned(Notify.Method) then
  138. Notify.Method(Notify.UserData);
  139. end;
  140. procedure TAsyncIOManager.SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  141. begin
  142. TimeoutNotify.Method := AMethod;
  143. TimeoutNotify.UserData := AUserData;
  144. end;
  145. procedure TAsyncIOManager.ClearTimeoutHandler;
  146. begin
  147. TimeoutNotify.Method := nil;
  148. end;
  149. // -------------------------------------------------------------------
  150. // TGenericLineReader
  151. // -------------------------------------------------------------------
  152. destructor TGenericLineReader.Destroy;
  153. begin
  154. if Assigned(RealBuffer) then begin
  155. FreeMem(RealBuffer);
  156. RealBuffer := nil;
  157. end;
  158. inherited Destroy;
  159. end;
  160. procedure TGenericLineReader.Run;
  161. var
  162. NewData: array[0..1023] of Byte;
  163. p: PChar;
  164. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  165. line: String;
  166. FirstRun: Boolean;
  167. begin
  168. FirstRun := True;
  169. while True do begin
  170. BytesRead := Read(NewData, SizeOf(NewData));
  171. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  172. if BytesRead <= 0 then begin
  173. if FirstRun then
  174. NoData;
  175. break;
  176. end;
  177. FirstRun := False;
  178. OldBufSize := FBytesInBuffer;
  179. // Append the new received data to the read buffer
  180. Inc(FBytesInBuffer, BytesRead);
  181. ReallocMem(RealBuffer, FBytesInBuffer);
  182. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  183. {Process all potential lines in the current buffer. Attention: FBuffer and
  184. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  185. within the FOnLine handler!}
  186. LastEndOfLine := 0;
  187. if OldBufSize > 0 then
  188. i := OldBufSize - 1
  189. else
  190. i := 0;
  191. CurBytesInBuffer := FBytesInBuffer;
  192. while i <= CurBytesInBuffer - 2 do begin
  193. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then begin
  194. LineLength := i - LastEndOfLine;
  195. SetLength(line, LineLength);
  196. if LineLength > 0 then
  197. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  198. if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
  199. ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
  200. Inc(i);
  201. LastEndOfLine := i + 1;
  202. if Assigned(FOnLine) then begin
  203. FBuffer := RealBuffer + LastEndOfLine;
  204. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  205. FOnLine(line);
  206. // Check if <this> has been destroyed by FOnLine:
  207. if not Assigned(FBuffer) then exit;
  208. end;
  209. end;
  210. Inc(i);
  211. end;
  212. FBytesInBuffer := CurBytesInBuffer;
  213. if LastEndOfLine > 0 then begin
  214. // Remove all processed lines from the buffer
  215. Dec(FBytesInBuffer, LastEndOfLine);
  216. GetMem(p, FBytesInBuffer);
  217. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  218. FreeMem(RealBuffer);
  219. RealBuffer := p;
  220. end;
  221. FBuffer := RealBuffer;
  222. end;
  223. end;
  224. // -------------------------------------------------------------------
  225. // TAsyncStreamLineReader
  226. // -------------------------------------------------------------------
  227. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  228. begin
  229. Result := FDataStream.Read(ABuffer, count);
  230. end;
  231. procedure TAsyncStreamLineReader.NoData;
  232. var
  233. s: String;
  234. begin
  235. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin
  236. if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin
  237. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  238. Dec(FBytesInBuffer);
  239. SetLength(s, FBytesInBuffer);
  240. Move(FBuffer^, s[1], FBytesInBuffer);
  241. FOnLine(s);
  242. end;
  243. FManager.ClearReadHandler(FBlockingStream.Handle);
  244. if Assigned(FOnEOF) then
  245. FOnEOF(Self);
  246. end;
  247. end;
  248. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  249. begin
  250. Run;
  251. end;
  252. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  253. begin
  254. Self.Create(AManager, AStream, AStream);
  255. end;
  256. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager;
  257. ADataStream: TStream; ABlockingStream: THandleStream);
  258. begin
  259. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  260. inherited Create;
  261. FManager := AManager;
  262. FDataStream := ADataStream;
  263. FBlockingStream := ABlockingStream;
  264. AManager.SetReadHandler(FBlockingStream.Handle, @StreamDataAvailable, nil);
  265. AManager.HandleAsync[FBlockingStream.Handle] := True;
  266. end;
  267. destructor TAsyncStreamLineReader.Destroy;
  268. begin
  269. FManager.ClearReadHandler(FBlockingStream.Handle);
  270. inherited Destroy;
  271. end;
  272. // -------------------------------------------------------------------
  273. // TWriteBuffer
  274. // -------------------------------------------------------------------
  275. procedure TWriteBuffer.BufferEmpty;
  276. begin
  277. if Assigned(FOnBufferEmpty) then
  278. FOnBufferEmpty(Self);
  279. end;
  280. constructor TWriteBuffer.Create;
  281. begin
  282. inherited Create;
  283. FBuffer := nil;
  284. FBytesInBuffer := 0;
  285. EndOfLineMarker := #10;
  286. end;
  287. destructor TWriteBuffer.Destroy;
  288. begin
  289. if Assigned(FBuffer) then
  290. FreeMem(FBuffer);
  291. inherited Destroy;
  292. end;
  293. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  294. begin
  295. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  296. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  297. Result := FBytesInBuffer
  298. else
  299. raise EStreamError.Create('Invalid stream operation');
  300. end;
  301. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  302. begin
  303. ReallocMem(FBuffer, FBytesInBuffer + Count);
  304. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  305. Inc(FBytesInBuffer, Count);
  306. WantWrite;
  307. Result := Count;
  308. end;
  309. procedure TWriteBuffer.WriteLine(const line: String);
  310. var
  311. s: String;
  312. begin
  313. s := line + EndOfLineMarker;
  314. WriteBuffer(s[1], Length(s));
  315. end;
  316. procedure TWriteBuffer.Run;
  317. var
  318. CurStart, written: Integer;
  319. NewBuf: PChar;
  320. failed: Boolean;
  321. begin
  322. CurStart := 0;
  323. failed := True;
  324. repeat
  325. if FBytesInBuffer = 0 then begin
  326. BufferEmpty;
  327. exit;
  328. end;
  329. written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  330. if written > 0 then begin
  331. Inc(CurStart, written);
  332. failed := False;
  333. GetMem(NewBuf, FBytesInBuffer - CurStart);
  334. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  335. FreeMem(FBuffer);
  336. FBuffer := NewBuf;
  337. Dec(FBytesInBuffer, CurStart);
  338. end;
  339. until written <= 0;
  340. if failed then
  341. WritingFailed;
  342. end;
  343. // -------------------------------------------------------------------
  344. // TAsyncWriteStream
  345. // -------------------------------------------------------------------
  346. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  347. begin
  348. Result := FDataStream.Write(ABuffer, count);
  349. end;
  350. procedure TAsyncWriteStream.WritingFailed;
  351. begin
  352. if FDataStream <> FBlockingStream then
  353. FManager.ClearWriteHandler(FBlockingStream.Handle);
  354. end;
  355. procedure TAsyncWriteStream.WantWrite;
  356. begin
  357. FManager.SetWriteHandler(FBlockingStream.Handle, @CanWrite, nil);
  358. end;
  359. procedure TAsyncWriteStream.BufferEmpty;
  360. begin
  361. FManager.ClearWriteHandler(FBlockingStream.Handle);
  362. inherited BufferEmpty;
  363. end;
  364. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  365. begin
  366. Run;
  367. end;
  368. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  369. begin
  370. Self.Create(AManager, AStream, AStream);
  371. end;
  372. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager;
  373. ADataStream: TStream; ABlockingStream: THandleStream);
  374. begin
  375. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  376. inherited Create;
  377. FManager := AManager;
  378. FDataStream := ADataStream;
  379. FBlockingStream := ABlockingStream;
  380. AManager.HandleAsync[FBlockingStream.Handle] := True;
  381. end;
  382. destructor TAsyncWriteStream.Destroy;
  383. begin
  384. FManager.ClearWriteHandler(FBlockingStream.Handle);
  385. inherited Destroy;
  386. end;
  387. end.
  388. {
  389. $Log$
  390. Revision 1.1 2000-02-18 23:14:48 michael
  391. + Initial implementation
  392. Revision 1.1 2000/02/17 22:40:05 sg
  393. * First version. This unit should go into FCL soon...
  394. }