Quick.Data.InfluxDB.pas 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. { ***************************************************************************
  2. Copyright (c) 2016-2020 Kike Pérez
  3. Unit : Quick.Data.InfluxDB
  4. Description : InfluxDB data provider
  5. Author : Kike Pérez
  6. Version : 1.0
  7. Created : 05/04/2019
  8. Modified : 21/04/2020
  9. This file is part of QuickLogger: https://github.com/exilon/QuickLogger
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Data.InfluxDB;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. SysUtils,
  27. DateUtils,
  28. Quick.Collections,
  29. Quick.HttpClient,
  30. Quick.Commons,
  31. Quick.Value,
  32. Quick.Arrays,
  33. Quick.Data.Custom;
  34. type
  35. TInfluxDBData = class(TDataProvider)
  36. private
  37. fHTTPClient : TJsonHTTPClient;
  38. fURL : string;
  39. fFullURL : string;
  40. fDataBase : string;
  41. fUserName : string;
  42. fPassword : string;
  43. fUserAgent : string;
  44. fTags : TPairArray;
  45. fCreateDataBaseIfNotExists : Boolean;
  46. procedure CreateDataBase;
  47. function GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
  48. procedure SetWriteURL;
  49. procedure SetPassword(const Value: string);
  50. procedure SetUserName(const Value: string);
  51. procedure Write(const aLine: string); overload;
  52. public
  53. constructor Create; override;
  54. destructor Destroy; override;
  55. property URL : string read fURL write fURL;
  56. property DataBase : string read fDataBase write fDataBase;
  57. property UserName : string read fUserName write SetUserName;
  58. property Password : string read fPassword write SetPassword;
  59. property CreateDataBaseIfNotExists : Boolean read fCreateDataBaseIfNotExists write fCreateDataBaseIfNotExists;
  60. property UserAgent : string read fUserAgent write fUserAgent;
  61. property Tags : TPairArray read fTags write fTags;
  62. procedure Init; override;
  63. procedure Restart; override;
  64. procedure Stop; override;
  65. procedure Write(const aMeasurement : string; aFieldPairs : IList<TFlexPair>; aTime : TDateTime = 0); overload;
  66. procedure Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime); overload;
  67. procedure Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime); overload;
  68. end;
  69. EInfluxDBData = class(Exception);
  70. implementation
  71. constructor TInfluxDBData.Create;
  72. begin
  73. inherited;
  74. fURL := 'http://localhost:8086';
  75. fDataBase := 'db';
  76. fUserName := '';
  77. fPassword := '';
  78. fCreateDataBaseIfNotExists := True;
  79. OutputOptions.UseUTCTime := True;
  80. fUserAgent := DEF_USER_AGENT;
  81. end;
  82. destructor TInfluxDBData.Destroy;
  83. begin
  84. if Assigned(fHTTPClient) then fHTTPClient.Free;
  85. inherited;
  86. end;
  87. procedure TInfluxDBData.Init;
  88. begin
  89. if fInitiated then Stop;
  90. SetWriteURL;
  91. fHTTPClient := TJsonHTTPClient.Create;
  92. fHTTPClient.ContentType := 'application/json';
  93. fHTTPClient.UserAgent := fUserAgent;
  94. fHTTPClient.HandleRedirects := True;
  95. if fCreateDataBaseIfNotExists then CreateDataBase;
  96. inherited;
  97. end;
  98. procedure TInfluxDBData.Restart;
  99. begin
  100. Stop;
  101. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  102. Init;
  103. end;
  104. procedure TInfluxDBData.SetPassword(const Value: string);
  105. begin
  106. if fPassword <> Value then
  107. begin
  108. fPassword := Value;
  109. SetWriteURL;
  110. end;
  111. end;
  112. procedure TInfluxDBData.SetWriteURL;
  113. begin
  114. if fUserName+fPassword <> '' then fFullURL := Format('%s/write?db=%s&u=%s&p=%s&precision=ms',[fURL,fDataBase,fUserName,fPassword])
  115. else fFullURL := Format('%s/write?db=%s&precision=ms',[fURL,fDataBase]);
  116. end;
  117. procedure TInfluxDBData.Stop;
  118. begin
  119. inherited;
  120. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  121. end;
  122. procedure TInfluxDBData.Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime);
  123. var
  124. fields : IList<TFlexPair>;
  125. begin
  126. fields.Add(TFlexPair.Create(aFieldKey,aFieldValue));
  127. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,fields,aTime))
  128. else Write(GenerateWriteQuery(aMeasurement,nil,fields,Now()));
  129. end;
  130. procedure TInfluxDBData.Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
  131. begin
  132. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,aTime))
  133. else Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,Now()));
  134. end;
  135. procedure TInfluxDBData.Write(const aMeasurement: string; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
  136. begin
  137. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,aTime))
  138. else Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,Now()));
  139. end;
  140. procedure TInfluxDBData.SetUserName(const Value: string);
  141. begin
  142. if fUserName <> Value then
  143. begin
  144. fUserName := Value;
  145. SetWriteURL;
  146. end;
  147. end;
  148. procedure TInfluxDBData.CreateDataBase;
  149. var
  150. resp : IHttpRequestResponse;
  151. begin
  152. try
  153. resp := fHTTPClient.Post(Format('%s/query?q=CREATE DATABASE %s',[fURL,fDatabase]),'');
  154. except
  155. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Creating DB: %s',[e.Message]);
  156. end;
  157. if not (resp.StatusCode in [200,204]) then
  158. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to create database',[resp.StatusCode,resp.StatusText]));
  159. end;
  160. function TInfluxDBData.GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
  161. var
  162. incinfo : TStringList;
  163. tags : string;
  164. fields : string;
  165. tagpair : TPair;
  166. flexpair : TFlexPair;
  167. begin
  168. incinfo := TStringList.Create;
  169. try
  170. //add global tags
  171. for tagpair in fTags do
  172. begin
  173. incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  174. end;
  175. //add current query tags
  176. for tagpair in aTagPairs do
  177. begin
  178. incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  179. end;
  180. tags := CommaText(incinfo);
  181. incinfo.Clear;
  182. for flexpair in aFieldPairs do
  183. begin
  184. if flexpair.Value.IsInteger then incinfo.Add(Format('%s=%d',[flexpair.Name,flexpair.Value.AsInt64]))
  185. else if flexpair.Value.IsFloating then incinfo.Add(Format('%s=%f',[flexpair.Name,flexpair.Value.AsExtended]))
  186. else incinfo.Add(Format('%s="%s"',[flexpair.Name,flexpair.Value.AsString]));
  187. end;
  188. fields := CommaText(incinfo);
  189. Result := Format('%s,%s %s %d',[aMeasurement,tags,fields,DateTimeToUnix(LocalTimeToUTC(aTime){$IFNDEF FPC},True{$ENDIF})*1000]);
  190. finally
  191. incinfo.Free;
  192. end;
  193. end;
  194. procedure TInfluxDBData.Write(const aLine : string);
  195. var
  196. resp : IHttpRequestResponse;
  197. stream : TStringStream;
  198. begin
  199. if not fInitiated then Init;
  200. stream := TStringStream.Create(aLine);
  201. var a := aline;
  202. try
  203. try
  204. resp := fHTTPClient.Post(fFullURL,stream);
  205. except
  206. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Write Error: %s',[e.Message]);
  207. end;
  208. finally
  209. stream.Free;
  210. end;
  211. if not (resp.StatusCode in [200,204]) then
  212. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to post event',[resp.StatusCode,resp.StatusText]));
  213. end;
  214. end.