asyncio.pp 14 KB


  1. {
  2. $Id$
  3. Async_IO: Mananging class for asynchronous input/output
  4. Copyright (C) 2000 by Sebastian Guenther ([email protected])
  5. See the file COPYING.FPC, included in this distribution,
  6. for details about the copyright.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  10. }
  11. {$MODE objfpc}
  12. {$H+}
  13. unit AsyncIO;
  14. interface
  15. {$i asyncioh.inc}
  16. type
  17. TAsyncIOManager = class;
  18. TAsyncIONotify = procedure(UserData: TObject) of object;
  19. TAsyncIONotifyInfo = record
  20. Method: TAsyncIONotify;
  21. UserData: TObject;
  22. end;
  23. // -------------------------------------------------------------------
  24. // TAsyncIOManager
  25. // -------------------------------------------------------------------
  26. TAsyncIOManager = class
  27. protected
  28. DoBreak: Boolean;
  29. IOdata : TIOdata;
  30. ReadNotifies, WriteNotifies: array[0..MaxHandle] of TAsyncIONotifyInfo;
  31. HighestHandle: Integer;
  32. FTimeout: Integer;
  33. TimeoutNotify: TAsyncIONotifyInfo;
  34. procedure CalcHighestHandle(max: Integer);
  35. procedure ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  36. function GetHandleAsync(AHandle: Integer): Boolean;
  37. procedure SetHandleAsync(AHandle: Integer; AValue: Boolean);
  38. public
  39. constructor Create;
  40. procedure Run;
  41. procedure BreakRun;
  42. procedure SetReadHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  43. procedure ClearReadHandler(AHandle: Integer);
  44. function GetReadHandler(AHandle: Integer): TAsyncIONotify;
  45. procedure SetWriteHandler(AHandle: Integer; AMethod: TAsyncIONotify; AUserData: TObject);
  46. procedure ClearWriteHandler(AHandle: Integer);
  47. function GetWriteHandler(AHandle: Integer): TAsyncIONotify;
  48. procedure SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  49. procedure ClearTimeoutHandler;
  50. function GetTimeoutHandler: TAsyncIONotify;
  51. property Timeout: Integer read FTimeout write FTimeout;
  52. property HandleAsync[AHandle: Integer]: Boolean read GetHandleAsync write SetHandleAsync;
  53. end;
  54. // -------------------------------------------------------------------
  55. // Line reader classes
  56. // -------------------------------------------------------------------
  57. TLineNotify = procedure(const line: String) of object;
  58. PBoolean = ^Boolean;
  59. TGenericLineReader = class
  60. protected
  61. FDestroyedFlag: PBoolean;
  62. RealBuffer, FBuffer: PChar;
  63. FBytesInBuffer: Integer;
  64. FOnLine: TLineNotify;
  65. function Read(var ABuffer; count: Integer): Integer; virtual; abstract;
  66. procedure NoData; virtual; abstract;
  67. public
  68. destructor Destroy; override;
  69. procedure Run; // Process as many lines as possible
  70. property Buffer: PChar read FBuffer;
  71. property BytesInBuffer: Integer read FBytesInBuffer;
  72. property OnLine: TLineNotify read FOnLine write FOnLine;
  73. end;
  74. TAsyncStreamLineReader = class(TGenericLineReader)
  75. protected
  76. FManager: TAsyncIOManager;
  77. FDataStream: TStream;
  78. FBlockingStream: THandleStream;
  79. FOnEOF: TNotifyEvent;
  80. function Read(var ABuffer; count: Integer): Integer; override;
  81. procedure NoData; override;
  82. procedure StreamDataAvailable(UserData: TObject);
  83. public
  84. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  85. constructor Create(AManager: TAsyncIOManager;
  86. ADataStream: TStream; ABlockingStream: THandleStream);
  87. destructor Destroy; override;
  88. property DataStream: TStream read FDataStream;
  89. property BlockingStream: THandleStream read FBlockingStream;
  90. property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
  91. end;
  92. // -------------------------------------------------------------------
  93. // TWriteBuffer
  94. // -------------------------------------------------------------------
  95. TWriteBuffer = class(TStream)
  96. protected
  97. FBuffer: PChar;
  98. FBytesInBuffer: Integer;
  99. FOnBufferEmpty: TNotifyEvent;
  100. function Seek(Offset: LongInt; Origin: Word): LongInt; override;
  101. function Write(const ABuffer; Count: LongInt): LongInt; override;
  102. function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
  103. procedure WritingFailed; virtual; abstract;
  104. procedure WantWrite; virtual; abstract;
  105. procedure BufferEmpty; virtual;
  106. constructor Create;
  107. public
  108. EndOfLineMarker: String;
  109. destructor Destroy; override;
  110. procedure WriteLine(const line: String);
  111. procedure Run; // Write as many data as possible
  112. property BytesInBuffer: Integer read FBytesInBuffer;
  113. property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
  114. end;
  115. TAsyncWriteStream = class(TWriteBuffer)
  116. protected
  117. FManager: TAsyncIOManager;
  118. FDataStream: TStream;
  119. FBlockingStream: THandleStream;
  120. function DoRealWrite(const ABuffer; Count: Integer): Integer; override;
  121. procedure WritingFailed; override;
  122. procedure WantWrite; override;
  123. procedure BufferEmpty; override;
  124. procedure CanWrite(UserData: TObject);
  125. public
  126. constructor Create(AManager: TAsyncIOManager; AStream: THandleStream);
  127. constructor Create(AManager: TAsyncIOManager;
  128. ADataStream: TStream; ABlockingStream: THandleStream);
  129. destructor Destroy; override;
  130. property DataStream: TStream read FDataStream;
  131. property BlockingStream: THandleStream read FBlockingStream;
  132. end;
  133. // ===================================================================
  134. // ===================================================================
  135. implementation
  136. uses SysUtils;
  137. {$i asyncio.inc}
  138. // -------------------------------------------------------------------
  139. // TAsyncIOManager
  140. // -------------------------------------------------------------------
  141. procedure TAsyncIOManager.ExecuteNotify(const Notify: TAsyncIONotifyInfo);
  142. begin
  143. if Assigned(Notify.Method) then
  144. Notify.Method(Notify.UserData);
  145. end;
  146. procedure TAsyncIOManager.SetTimeoutHandler(AMethod: TAsyncIONotify; AUserData: TObject);
  147. begin
  148. TimeoutNotify.Method := AMethod;
  149. TimeoutNotify.UserData := AUserData;
  150. end;
  151. procedure TAsyncIOManager.ClearTimeoutHandler;
  152. begin
  153. TimeoutNotify.Method := nil;
  154. end;
  155. function TAsyncIOManager.GetTimeoutHandler: TAsyncIONotify;
  156. begin
  157. Result := TimeoutNotify.Method;
  158. end;
  159. // -------------------------------------------------------------------
  160. // TGenericLineReader
  161. // -------------------------------------------------------------------
  162. destructor TGenericLineReader.Destroy;
  163. begin
  164. if Assigned(FDestroyedFlag) then
  165. FDestroyedFlag^ := True;
  166. if Assigned(RealBuffer) then
  167. begin
  168. FreeMem(RealBuffer);
  169. RealBuffer := nil;
  170. end;
  171. inherited Destroy;
  172. end;
  173. procedure TGenericLineReader.Run;
  174. var
  175. NewData: array[0..1023] of Byte;
  176. p: PChar;
  177. BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
  178. line: String;
  179. FirstRun, DestroyedFlag: Boolean;
  180. begin
  181. FirstRun := True;
  182. DestroyedFlag := False;
  183. while True do
  184. begin
  185. BytesRead := Read(NewData, SizeOf(NewData));
  186. //WriteLn('Linereader: ', BytesRead, ' bytes read');
  187. if BytesRead <= 0 then begin
  188. if FirstRun then
  189. NoData;
  190. break;
  191. end;
  192. FirstRun := False;
  193. OldBufSize := FBytesInBuffer;
  194. // Append the new received data to the read buffer
  195. Inc(FBytesInBuffer, BytesRead);
  196. ReallocMem(RealBuffer, FBytesInBuffer);
  197. Move(NewData, RealBuffer[OldBufSize], BytesRead);
  198. {Process all potential lines in the current buffer. Attention: FBuffer and
  199. FBytesInBuffer MUST be updated for each line, as they can be accessed from
  200. within the FOnLine handler!}
  201. LastEndOfLine := 0;
  202. if OldBufSize > 0 then
  203. i := OldBufSize - 1
  204. else
  205. i := 0;
  206. CurBytesInBuffer := FBytesInBuffer;
  207. while i <= CurBytesInBuffer - 1 do
  208. begin
  209. if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
  210. begin
  211. LineLength := i - LastEndOfLine;
  212. SetLength(line, LineLength);
  213. if LineLength > 0 then
  214. Move(RealBuffer[LastEndOfLine], line[1], LineLength);
  215. if (i < CurBytesInBuffer - 1) and (RealBuffer[i] = #13) and
  216. (RealBuffer[i + 1] = #10) then
  217. Inc(i);
  218. LastEndOfLine := i + 1;
  219. if Assigned(FOnLine) then
  220. begin
  221. FBuffer := RealBuffer + LastEndOfLine;
  222. FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
  223. FDestroyedFlag := @DestroyedFlag;
  224. FOnLine(line);
  225. FDestroyedFlag := nil;
  226. if DestroyedFlag then
  227. exit;
  228. end;
  229. end;
  230. Inc(i);
  231. end;
  232. FBytesInBuffer := CurBytesInBuffer;
  233. if LastEndOfLine > 0 then
  234. begin
  235. // Remove all processed lines from the buffer
  236. Dec(FBytesInBuffer, LastEndOfLine);
  237. GetMem(p, FBytesInBuffer);
  238. Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
  239. FreeMem(RealBuffer);
  240. RealBuffer := p;
  241. end;
  242. FBuffer := RealBuffer;
  243. end;
  244. end;
  245. // -------------------------------------------------------------------
  246. // TAsyncStreamLineReader
  247. // -------------------------------------------------------------------
  248. function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
  249. begin
  250. Result := FDataStream.Read(ABuffer, count);
  251. end;
  252. procedure TAsyncStreamLineReader.NoData;
  253. var
  254. s: String;
  255. begin
  256. if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
  257. begin
  258. if (FBytesInBuffer > 0) and Assigned(FOnLine) then
  259. begin
  260. if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
  261. Dec(FBytesInBuffer);
  262. SetLength(s, FBytesInBuffer);
  263. Move(FBuffer^, s[1], FBytesInBuffer);
  264. FOnLine(s);
  265. end;
  266. FManager.ClearReadHandler(FBlockingStream.Handle);
  267. if Assigned(FOnEOF) then
  268. FOnEOF(Self);
  269. end;
  270. end;
  271. procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
  272. begin
  273. Run;
  274. end;
  275. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  276. begin
  277. Self.Create(AManager, AStream, AStream);
  278. end;
  279. constructor TAsyncStreamLineReader.Create(AManager: TAsyncIOManager;
  280. ADataStream: TStream; ABlockingStream: THandleStream);
  281. begin
  282. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  283. inherited Create;
  284. FManager := AManager;
  285. FDataStream := ADataStream;
  286. FBlockingStream := ABlockingStream;
  287. AManager.SetReadHandler(FBlockingStream.Handle, @StreamDataAvailable, nil);
  288. AManager.HandleAsync[FBlockingStream.Handle] := True;
  289. end;
  290. destructor TAsyncStreamLineReader.Destroy;
  291. var
  292. Handler: TMethod;
  293. begin
  294. Handler := TMethod(FManager.GetReadHandler(FBlockingStream.Handle));
  295. if (Handler.Code = Pointer(@StreamDataAvailable)) and
  296. (Handler.Data = Pointer(Self)) then
  297. FManager.ClearReadHandler(FBlockingStream.Handle);
  298. inherited Destroy;
  299. end;
  300. // -------------------------------------------------------------------
  301. // TWriteBuffer
  302. // -------------------------------------------------------------------
  303. procedure TWriteBuffer.BufferEmpty;
  304. begin
  305. if Assigned(FOnBufferEmpty) then
  306. FOnBufferEmpty(Self);
  307. end;
  308. constructor TWriteBuffer.Create;
  309. begin
  310. inherited Create;
  311. FBuffer := nil;
  312. FBytesInBuffer := 0;
  313. EndOfLineMarker := #10;
  314. end;
  315. destructor TWriteBuffer.Destroy;
  316. begin
  317. if Assigned(FBuffer) then
  318. FreeMem(FBuffer);
  319. inherited Destroy;
  320. end;
  321. function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
  322. begin
  323. if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
  324. ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
  325. Result := FBytesInBuffer
  326. else
  327. raise EStreamError.Create('Invalid stream operation');
  328. end;
  329. function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
  330. begin
  331. ReallocMem(FBuffer, FBytesInBuffer + Count);
  332. Move(ABuffer, FBuffer[FBytesInBuffer], Count);
  333. Inc(FBytesInBuffer, Count);
  334. WantWrite;
  335. Result := Count;
  336. end;
  337. procedure TWriteBuffer.WriteLine(const line: String);
  338. var
  339. s: String;
  340. begin
  341. s := line + EndOfLineMarker;
  342. WriteBuffer(s[1], Length(s));
  343. end;
  344. procedure TWriteBuffer.Run;
  345. var
  346. CurStart, written: Integer;
  347. NewBuf: PChar;
  348. failed: Boolean;
  349. begin
  350. CurStart := 0;
  351. failed := True;
  352. repeat
  353. if FBytesInBuffer = 0 then begin
  354. BufferEmpty;
  355. exit;
  356. end;
  357. written := DoRealWrite(FBuffer[CurStart], FBytesInBuffer - CurStart);
  358. if written > 0 then begin
  359. Inc(CurStart, written);
  360. failed := False;
  361. GetMem(NewBuf, FBytesInBuffer - CurStart);
  362. Move(FBuffer[CurStart], NewBuf[0], FBytesInBuffer - CurStart);
  363. FreeMem(FBuffer);
  364. FBuffer := NewBuf;
  365. Dec(FBytesInBuffer, CurStart);
  366. end;
  367. until written <= 0;
  368. if failed then
  369. WritingFailed;
  370. end;
  371. // -------------------------------------------------------------------
  372. // TAsyncWriteStream
  373. // -------------------------------------------------------------------
  374. function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
  375. begin
  376. Result := FDataStream.Write(ABuffer, count);
  377. end;
  378. procedure TAsyncWriteStream.WritingFailed;
  379. begin
  380. if FDataStream <> FBlockingStream then
  381. FManager.ClearWriteHandler(FBlockingStream.Handle);
  382. end;
  383. procedure TAsyncWriteStream.WantWrite;
  384. begin
  385. FManager.SetWriteHandler(FBlockingStream.Handle, @CanWrite, nil);
  386. end;
  387. procedure TAsyncWriteStream.BufferEmpty;
  388. begin
  389. FManager.ClearWriteHandler(FBlockingStream.Handle);
  390. inherited BufferEmpty;
  391. end;
  392. procedure TAsyncWriteStream.CanWrite(UserData: TObject);
  393. begin
  394. Run;
  395. end;
  396. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager; AStream: THandleStream);
  397. begin
  398. Self.Create(AManager, AStream, AStream);
  399. end;
  400. constructor TAsyncWriteStream.Create(AManager: TAsyncIOManager;
  401. ADataStream: TStream; ABlockingStream: THandleStream);
  402. begin
  403. ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
  404. inherited Create;
  405. FManager := AManager;
  406. FDataStream := ADataStream;
  407. FBlockingStream := ABlockingStream;
  408. AManager.HandleAsync[FBlockingStream.Handle] := True;
  409. end;
  410. destructor TAsyncWriteStream.Destroy;
  411. begin
  412. FManager.ClearWriteHandler(FBlockingStream.Handle);
  413. inherited Destroy;
  414. end;
  415. end.
  416. {
  417. $Log$
  418. Revision 1.4 2002-09-07 15:15:24 peter
  419. * old logs removed and tabs fixed
  420. }