Quick.Data.InfluxDB.pas 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. { ***************************************************************************
  2. Copyright (c) 2016-2020 Kike Pérez
  3. Unit : Quick.Data.InfluxDB
  4. Description : InfluxDB data provider
  5. Author : Kike Pérez
  6. Version : 1.0
  7. Created : 05/04/2019
  8. Modified : 21/04/2020
  9. This file is part of QuickLogger: https://github.com/exilon/QuickLogger
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Data.InfluxDB;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. SysUtils,
  27. DateUtils,
  28. Quick.Collections,
  29. Quick.HttpClient,
  30. Quick.Commons,
  31. Quick.Value,
  32. Quick.Arrays,
  33. Quick.Data.Custom;
  34. type
  35. TInfluxDBData = class(TDataProvider)
  36. private
  37. fHTTPClient : TJsonHTTPClient;
  38. fURL : string;
  39. fFullURL : string;
  40. fDataBase : string;
  41. fUserName : string;
  42. fPassword : string;
  43. fUserAgent : string;
  44. fTags : TPairArray;
  45. fCreateDataBaseIfNotExists : Boolean;
  46. procedure CreateDataBase;
  47. function GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
  48. procedure EscapeData(var aTags : string);
  49. procedure SetWriteURL;
  50. procedure SetPassword(const Value: string);
  51. procedure SetUserName(const Value: string);
  52. procedure Write(const aLine: string); overload;
  53. public
  54. constructor Create; override;
  55. destructor Destroy; override;
  56. property URL : string read fURL write fURL;
  57. property DataBase : string read fDataBase write fDataBase;
  58. property UserName : string read fUserName write SetUserName;
  59. property Password : string read fPassword write SetPassword;
  60. property CreateDataBaseIfNotExists : Boolean read fCreateDataBaseIfNotExists write fCreateDataBaseIfNotExists;
  61. property UserAgent : string read fUserAgent write fUserAgent;
  62. property Tags : TPairArray read fTags write fTags;
  63. procedure Init; override;
  64. procedure Restart; override;
  65. procedure Stop; override;
  66. procedure Write(const aMeasurement : string; aFieldPairs : IList<TFlexPair>; aTime : TDateTime = 0); overload;
  67. procedure Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime); overload;
  68. procedure Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime); overload;
  69. end;
  70. EInfluxDBData = class(Exception);
  71. implementation
  72. constructor TInfluxDBData.Create;
  73. begin
  74. inherited;
  75. fURL := 'http://localhost:8086';
  76. fDataBase := 'db';
  77. fUserName := '';
  78. fPassword := '';
  79. fCreateDataBaseIfNotExists := True;
  80. OutputOptions.UseUTCTime := True;
  81. fUserAgent := DEF_USER_AGENT;
  82. end;
  83. destructor TInfluxDBData.Destroy;
  84. begin
  85. if Assigned(fHTTPClient) then fHTTPClient.Free;
  86. inherited;
  87. end;
  88. procedure TInfluxDBData.Init;
  89. begin
  90. if fInitiated then Stop;
  91. SetWriteURL;
  92. fHTTPClient := TJsonHTTPClient.Create;
  93. fHTTPClient.ContentType := 'application/json';
  94. fHTTPClient.UserAgent := fUserAgent;
  95. fHTTPClient.HandleRedirects := True;
  96. if fCreateDataBaseIfNotExists then CreateDataBase;
  97. inherited;
  98. end;
  99. procedure TInfluxDBData.Restart;
  100. begin
  101. Stop;
  102. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  103. Init;
  104. end;
  105. procedure TInfluxDBData.SetPassword(const Value: string);
  106. begin
  107. if fPassword <> Value then
  108. begin
  109. fPassword := Value;
  110. SetWriteURL;
  111. end;
  112. end;
  113. procedure TInfluxDBData.SetWriteURL;
  114. begin
  115. if fUserName+fPassword <> '' then fFullURL := Format('%s/write?db=%s&u=%s&p=%s&precision=ms',[fURL,fDataBase,fUserName,fPassword])
  116. else fFullURL := Format('%s/write?db=%s&precision=ms',[fURL,fDataBase]);
  117. end;
  118. procedure TInfluxDBData.Stop;
  119. begin
  120. inherited;
  121. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  122. end;
  123. procedure TInfluxDBData.Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime);
  124. var
  125. fields : IList<TFlexPair>;
  126. begin
  127. fields := TxList<TFlexPair>.Create;
  128. fields.Add(TFlexPair.Create(aFieldKey,aFieldValue));
  129. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,fields,aTime))
  130. else Write(GenerateWriteQuery(aMeasurement,nil,fields,Now()));
  131. end;
  132. procedure TInfluxDBData.Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
  133. begin
  134. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,aTime))
  135. else Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,Now()));
  136. end;
  137. procedure TInfluxDBData.Write(const aMeasurement: string; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
  138. begin
  139. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,aTime))
  140. else Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,Now()));
  141. end;
  142. procedure TInfluxDBData.SetUserName(const Value: string);
  143. begin
  144. if fUserName <> Value then
  145. begin
  146. fUserName := Value;
  147. SetWriteURL;
  148. end;
  149. end;
  150. procedure TInfluxDBData.CreateDataBase;
  151. var
  152. resp : IHttpRequestResponse;
  153. begin
  154. try
  155. resp := fHTTPClient.Post(Format('%s/query?q=CREATE DATABASE %s',[fURL,fDatabase]),'');
  156. except
  157. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Creating DB: %s',[e.Message]);
  158. end;
  159. if not (resp.StatusCode in [200,204]) then
  160. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to create database',[resp.StatusCode,resp.StatusText]));
  161. end;
  162. procedure TInfluxDBData.EscapeData(var aTags : string);
  163. begin
  164. aTags := StringReplace(aTags,' ','\ ',[rfReplaceAll]);
  165. end;
  166. function TInfluxDBData.GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
  167. var
  168. incinfo : TStringList;
  169. tags : string;
  170. fields : string;
  171. tagpair : TPair;
  172. flexpair : TFlexPair;
  173. begin
  174. incinfo := TStringList.Create;
  175. try
  176. //add global tags
  177. for tagpair in fTags do
  178. begin
  179. if not tagpair.Value.IsEmpty then incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  180. end;
  181. //add current query tags
  182. if aTagPairs <> nil then
  183. begin
  184. for tagpair in aTagPairs do
  185. begin
  186. if not tagpair.Value.IsEmpty then incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  187. end;
  188. end;
  189. tags := CommaText(incinfo);
  190. EscapeData(tags);
  191. incinfo.Clear;
  192. for flexpair in aFieldPairs do
  193. begin
  194. if flexpair.Value.IsInteger then incinfo.Add(Format('%s=%d',[flexpair.Name,flexpair.Value.AsInt64]))
  195. else if flexpair.Value.IsFloating then incinfo.Add(Format('%s=%f',[flexpair.Name,flexpair.Value.AsExtended]))
  196. else incinfo.Add(Format('%s="%s"',[flexpair.Name,flexpair.Value.AsString]));
  197. end;
  198. fields := CommaText(incinfo);
  199. Result := Format('%s,%s %s %d',[aMeasurement,tags,fields,DateTimeToUnix(LocalTimeToUTC(aTime){$IFNDEF FPC},True{$ENDIF})*1000]);
  200. finally
  201. incinfo.Free;
  202. end;
  203. end;
  204. procedure TInfluxDBData.Write(const aLine : string);
  205. var
  206. resp : IHttpRequestResponse;
  207. stream : TStringStream;
  208. begin
  209. if not fInitiated then Init;
  210. stream := TStringStream.Create(aLine,TEncoding.UTF8);
  211. var a := aline;
  212. try
  213. try
  214. resp := fHTTPClient.Post(fFullURL,stream);
  215. except
  216. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Write Error: %s',[e.Message]);
  217. end;
  218. finally
  219. stream.Free;
  220. end;
  221. if not (resp.StatusCode in [200,204]) then
  222. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to post event',[resp.StatusCode,resp.StatusText]));
  223. end;
  224. end.