asyncio.pp 14 KB


  1. {
  2. $Id$
  3. Async_IO: Mananging class for asynchronous input/output
  4. Copyright (C) 2000 by Sebastian Guenther ([email protected])
  5. See the file COPYING.FPC, included in this distribution,
  6. for details about the copyright.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  10. }
  11. {$MODE objfpc}
  12. {$H+}
  13. unit AsyncIO;
  14. interface
  15. {$i asyncioh.inc}
  16. type
  17. TAsyncIOManager = class;
  18. TAsyncIONotify = procedure(UserData: TObject) of object;
  19. TAsyncIONotifyInfo = record
  20. Method: TAsyncIONotify;
  21. UserData: TObject;
  22. end;
  23. // -------------------------------------------------------------------
  24. // TAsyncIOManager
  25. // -------------------------------------------------------------------
  26. TAsyncIOManager = class
  27. protected
  28. DoBreak: Boolean;
  29. IOdata : TIOdata;
  30. ReadNotifies, WriteNotifies: array[0..MaxHandle] of TAsyncIONotifyInfo;
  31. HighestHandle: Integer;
  32. FTimeout: Integer;
  33. TimeoutNotify: TAsyncIONotifyInfo;
  34. procedure CalcHighestHandle(max: Integer);
  35. procedure ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  36. function GetHandleAsync(AHandle: Integer): Boolean;
  37. procedure SetHandleAsync(AHandle: Integer; AValue: Boolean);
  38. public
  39. constructor Create;
  40. procedure Run;
  41. procedure BreakRun;
  42. procedure SetReadHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  43. procedure ClearReadHandler(AHandle: Integer);
  44. function GetReadHandler(AHandle: Integer): TAsyncIONotify;
  45. procedure SetWriteHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  46. procedure ClearWriteHandler(AHandle: Integer);
  47. function GetWriteHandler(AHandle: Integer): TAsyncIONotify;
  48. procedure SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  49. procedure ClearTimeoutHandler;
  50. function GetTimeoutHandler: TAsyncIONotify;
  51. property Timeout: Integer read FTimeout write FTimeout;
  52. property HandleAsync[AHandle: Integer]: Boolean read GetHandleAsync write SetHandleAsync;
  53. end;
  54. // -------------------------------------------------------------------
  55. // Line reader classes
  56. // -------------------------------------------------------------------
  57. TLineNotify = procedure(const line: String) of object;
  58. PBoolean = ^Boolean;
  59. TGenericLineReader = class
  60. protected
  61. FDestroyedFlag: PBoolean;
  62. RealBuffer, FBuffer: PChar;
  63. FBytesInBuffer: Integer;
  64. FOnLine: TLineNotify;
  65. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  66. procedure NoData; virtual; abstract;
  67. public
  68. destructor Destroy; override;
  69. procedure Run; // Process as many lines as possible
  70. property Buffer: PChar read FBuffer;
  71. property BytesInBuffer: Integer read FBytesInBuffer;
  72. property OnLine: TLineNotify read FOnLine write FOnLine;
  73. end;
  74. TAsyncStreamLineReader = class(TGenericLineReader)
  75. protected
  76. FManager: TAsyncIOManager;
  77. FDataStream: TStream;
  78. FBlockingStream: THandleStream;
  79. FOnEOF: TNotifyEvent;
  80. function Read(var ABuffer; count: Integer): Integer; override;
  81. procedure NoData; override;
  82. procedure StreamDataAvailable(UserData: TObject);
  83. public
  84. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  85. constructor Create(AManager: TAsyncIOManager;
  86. ADataStream: TStream; ABlockingStream: THandleStream);
  87. destructor Destroy; override;
  88. property DataStream: TStream read FDataStream;
  89. property BlockingStream: THandleStream read FBlockingStream;
  90. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  91. end;
  92. // -------------------------------------------------------------------
  93. // TWriteBuffer
  94. // -------------------------------------------------------------------
  95. TWriteBuffer = class(TStream)
  96. protected
  97. FBuffer: PChar;
  98. FBytesInBuffer: Integer;
  99. FOnBufferEmpty: TNotifyEvent;
  100. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  101. function Write(const ABuffer; Count: LongInt): LongInt; override;
  102. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  103. procedure WritingFailed; virtual; abstract;
  104. procedure WantWrite; virtual; abstract;
  105. procedure BufferEmpty; virtual;
  106. constructor Create;
  107. public
  108. EndOfLineMarker: String;
  109. destructor Destroy; override;
  110. procedure WriteLine(const line: String);
  111. procedure Run; // Write as many data as possible
  112. property BytesInBuffer: Integer read FBytesInBuffer;
  113. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  114. end;
  115. TAsyncWriteStream = class(TWriteBuffer)
  116. protected
  117. FManager: TAsyncIOManager;
  118. FDataStream: TStream;
  119. FBlockingStream: THandleStream;
  120. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  121. procedure WritingFailed; override;
  122. procedure WantWrite; override;
  123. procedure BufferEmpty; override;
  124. procedure CanWrite(UserData: TObject);
  125. public
  126. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  127. constructor Create(AManager: TAsyncIOManager;
  128. ADataStream: TStream; ABlockingStream: THandleStream);
  129. destructor Destroy; override;
  130. property DataStream: TStream read FDataStream;
  131. property BlockingStream: THandleStream read FBlockingStream;
  132. end;
  133. // ===================================================================
  134. // ===================================================================
  135. implementation
  136. uses SysUtils;
  137. {$i asyncio.inc}
  138. // -------------------------------------------------------------------
  139. // TAsyncIOManager
  140. // -------------------------------------------------------------------
  141. procedure TAsyncIOManager.ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  142. begin
  143. if Assigned(Notify.Method) then
  144. Notify.Method(Notify.UserData);
  145. end;
  146. procedure TAsyncIOManager.SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  147. begin
  148. TimeoutNotify.Method := AMethod;
  149. TimeoutNotify.UserData := AUserData;
  150. end;
  151. procedure TAsyncIOManager.ClearTimeoutHandler;
  152. begin
  153. TimeoutNotify.Method := nil;
  154. end;
  155. function TAsyncIOManager.GetTimeoutHandler: TAsyncIONotify;
  156. begin
  157. Result := TimeoutNotify.Method;
  158. end;
  159. // -------------------------------------------------------------------
  160. // TGenericLineReader
  161. // -------------------------------------------------------------------
  162. destructor TGenericLineReader.Destroy;
  163. begin
  164. if Assigned(FDestroyedFlag) then
  165. FDestroyedFlag^ := True;
  166. if Assigned(RealBuffer) then
  167. begin
  168. FreeMem(RealBuffer);
  169. RealBuffer := nil;
  170. end;
  171. inherited Destroy;
  172. end;
  173. procedure TGenericLineReader.Run;
  174. var
  175. NewData: array[0..1023] of Byte;
  176. p: PChar;
  177. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  178. line: String;
  179. FirstRun, DestroyedFlag: Boolean;
  180. begin
  181. FirstRun := True;
  182. DestroyedFlag := False;
  183. while True do
  184. begin
  185. BytesRead := Read(NewData, SizeOf(NewData));
  186. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  187. if BytesRead <= 0 then begin
  188. if FirstRun then
  189. NoData;
  190. break;
  191. end;
  192. FirstRun := False;
  193. OldBufSize := FBytesInBuffer;
  194. // Append the new received data to the read buffer
  195. Inc(FBytesInBuffer, BytesRead);
  196. ReallocMem(RealBuffer, FBytesInBuffer);
  197. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  198. {Process all potential lines in the current buffer. Attention: FBuffer and
  199. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  200. within the FOnLine handler!}
  201. LastEndOfLine := 0;
  202. if OldBufSize > 0 then
  203. i := OldBufSize - 1
  204. else
  205. i := 0;
  206. CurBytesInBuffer := FBytesInBuffer;
  207. while i <= CurBytesInBuffer - 2 do
  208. begin
  209. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  210. begin
  211. LineLength := i - LastEndOfLine;
  212. SetLength(line, LineLength);
  213. if LineLength > 0 then
  214. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  215. if (RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10) then
  216. Inc(i);
  217. LastEndOfLine := i + 1;
  218. if Assigned(FOnLine) then
  219. begin
  220. FBuffer := RealBuffer + LastEndOfLine;
  221. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  222. FDestroyedFlag := @DestroyedFlag;
  223. FOnLine(line);
  224. FDestroyedFlag := nil;
  225. if DestroyedFlag then
  226. exit;
  227. end;
  228. end;
  229. Inc(i);
  230. end;
  231. FBytesInBuffer := CurBytesInBuffer;
  232. if LastEndOfLine > 0 then
  233. begin
  234. // Remove all processed lines from the buffer
  235. Dec(FBytesInBuffer, LastEndOfLine);
  236. GetMem(p, FBytesInBuffer);
  237. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  238. FreeMem(RealBuffer);
  239. RealBuffer := p;
  240. end;
  241. FBuffer := RealBuffer;
  242. end;
  243. end;
  244. // -------------------------------------------------------------------
  245. // TAsyncStreamLineReader
  246. // -------------------------------------------------------------------
  247. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  248. begin
  249. Result := FDataStream.Read(ABuffer, count);
  250. end;
  251. procedure TAsyncStreamLineReader.NoData;
  252. var
  253. s: String;
  254. begin
  255. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  256. begin
  257. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  258. begin
  259. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  260. Dec(FBytesInBuffer);
  261. SetLength(s, FBytesInBuffer);
  262. Move(FBuffer^, s[1], FBytesInBuffer);
  263. FOnLine(s);
  264. end;
  265. FManager.ClearReadHandler(FBlockingStream.Handle);
  266. if Assigned(FOnEOF) then
  267. FOnEOF(Self);
  268. end;
  269. end;
  270. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  271. begin
  272. Run;
  273. end;
  274. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  275. begin
  276. Self.Create(AManager, AStream, AStream);
  277. end;
  278. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager;
  279. ADataStream: TStream; ABlockingStream: THandleStream);
  280. begin
  281. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  282. inherited Create;
  283. FManager := AManager;
  284. FDataStream := ADataStream;
  285. FBlockingStream := ABlockingStream;
  286. AManager.SetReadHandler(FBlockingStream.Handle, @StreamDataAvailable, nil);
  287. AManager.HandleAsync[FBlockingStream.Handle] := True;
  288. end;
  289. destructor TAsyncStreamLineReader.Destroy;
  290. var
  291. Handler: TMethod;
  292. begin
  293. Handler := TMethod(FManager.GetReadHandler(FBlockingStream.Handle));
  294. if (Handler.Code = Pointer(@StreamDataAvailable)) and
  295. (Handler.Data = Pointer(Self)) then
  296. FManager.ClearReadHandler(FBlockingStream.Handle);
  297. inherited Destroy;
  298. end;
  299. // -------------------------------------------------------------------
  300. // TWriteBuffer
  301. // -------------------------------------------------------------------
  302. procedure TWriteBuffer.BufferEmpty;
  303. begin
  304. if Assigned(FOnBufferEmpty) then
  305. FOnBufferEmpty(Self);
  306. end;
  307. constructor TWriteBuffer.Create;
  308. begin
  309. inherited Create;
  310. FBuffer := nil;
  311. FBytesInBuffer := 0;
  312. EndOfLineMarker := #10;
  313. end;
  314. destructor TWriteBuffer.Destroy;
  315. begin
  316. if Assigned(FBuffer) then
  317. FreeMem(FBuffer);
  318. inherited Destroy;
  319. end;
  320. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  321. begin
  322. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  323. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  324. Result := FBytesInBuffer
  325. else
  326. raise EStreamError.Create('Invalid stream operation');
  327. end;
  328. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  329. begin
  330. ReallocMem(FBuffer, FBytesInBuffer + Count);
  331. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  332. Inc(FBytesInBuffer, Count);
  333. WantWrite;
  334. Result := Count;
  335. end;
  336. procedure TWriteBuffer.WriteLine(const line: String);
  337. var
  338. s: String;
  339. begin
  340. s := line + EndOfLineMarker;
  341. WriteBuffer(s[1], Length(s));
  342. end;
  343. procedure TWriteBuffer.Run;
  344. var
  345. CurStart, written: Integer;
  346. NewBuf: PChar;
  347. failed: Boolean;
  348. begin
  349. CurStart := 0;
  350. failed := True;
  351. repeat
  352. if FBytesInBuffer = 0 then begin
  353. BufferEmpty;
  354. exit;
  355. end;
  356. written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  357. if written > 0 then begin
  358. Inc(CurStart, written);
  359. failed := False;
  360. GetMem(NewBuf, FBytesInBuffer - CurStart);
  361. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  362. FreeMem(FBuffer);
  363. FBuffer := NewBuf;
  364. Dec(FBytesInBuffer, CurStart);
  365. end;
  366. until written <= 0;
  367. if failed then
  368. WritingFailed;
  369. end;
  370. // -------------------------------------------------------------------
  371. // TAsyncWriteStream
  372. // -------------------------------------------------------------------
  373. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  374. begin
  375. Result := FDataStream.Write(ABuffer, count);
  376. end;
  377. procedure TAsyncWriteStream.WritingFailed;
  378. begin
  379. if FDataStream <> FBlockingStream then
  380. FManager.ClearWriteHandler(FBlockingStream.Handle);
  381. end;
  382. procedure TAsyncWriteStream.WantWrite;
  383. begin
  384. FManager.SetWriteHandler(FBlockingStream.Handle, @CanWrite, nil);
  385. end;
  386. procedure TAsyncWriteStream.BufferEmpty;
  387. begin
  388. FManager.ClearWriteHandler(FBlockingStream.Handle);
  389. inherited BufferEmpty;
  390. end;
  391. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  392. begin
  393. Run;
  394. end;
  395. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  396. begin
  397. Self.Create(AManager, AStream, AStream);
  398. end;
  399. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager;
  400. ADataStream: TStream; ABlockingStream: THandleStream);
  401. begin
  402. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  403. inherited Create;
  404. FManager := AManager;
  405. FDataStream := ADataStream;
  406. FBlockingStream := ABlockingStream;
  407. AManager.HandleAsync[FBlockingStream.Handle] := True;
  408. end;
  409. destructor TAsyncWriteStream.Destroy;
  410. begin
  411. FManager.ClearWriteHandler(FBlockingStream.Handle);
  412. inherited Destroy;
  413. end;
  414. end.
  415. {
  416. $Log$
  417. Revision 1.1 2000-07-13 06:31:29 michael
  418. + Initial import
  419. Revision 1.2 2000/07/09 11:49:05 sg
  420. * Added methods for reading event handlers in TAsyncIOManager
  421. * Fixed problems when LineReader gets destroyed while it still was
  422. parsing input data.
  423. Revision 1.1 2000/02/18 23:14:48 michael
  424. + Initial implementation
  425. Revision 1.1 2000/02/17 22:40:05 sg
  426. * First version. This unit should go into FCL soon...
  427. }