Quick.Data.InfluxDB.pas 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. { ***************************************************************************
  2. Copyright (c) 2016-2019 Kike Pérez
  3. Unit : Quick.Data.InfluxDB
  4. Description : InfluxDB data provider
  5. Author : Kike Pérez
  6. Version : 1.0
  7. Created : 05/04/2019
  8. Modified : 10/04/2019
  9. This file is part of QuickLogger: https://github.com/exilon/QuickLogger
  10. ***************************************************************************
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. *************************************************************************** }
  21. unit Quick.Data.InfluxDB;
  22. {$i QuickLib.inc}
  23. interface
  24. uses
  25. Classes,
  26. SysUtils,
  27. DateUtils,
  28. Quick.HttpClient,
  29. Quick.Commons,
  30. Quick.Value,
  31. Quick.Arrays,
  32. Quick.Data.Custom;
  33. type
  34. TInfluxDBData = class(TDataProvider)
  35. private
  36. fHTTPClient : TJsonHTTPClient;
  37. fURL : string;
  38. fFullURL : string;
  39. fDataBase : string;
  40. fUserName : string;
  41. fPassword : string;
  42. fUserAgent : string;
  43. fTags : TPairArray;
  44. fCreateDataBaseIfNotExists : Boolean;
  45. procedure CreateDataBase;
  46. function GenerateWriteQuery(const aMeasurement : string; aTagPairs : TPairArray; aFieldPairs : TFlexPairArray; aTime : TDateTime): string;
  47. procedure SetWriteURL;
  48. procedure SetPassword(const Value: string);
  49. procedure SetUserName(const Value: string);
  50. procedure Write(const aLine: string); overload;
  51. public
  52. constructor Create; override;
  53. destructor Destroy; override;
  54. property URL : string read fURL write fURL;
  55. property DataBase : string read fDataBase write fDataBase;
  56. property UserName : string read fUserName write SetUserName;
  57. property Password : string read fPassword write SetPassword;
  58. property CreateDataBaseIfNotExists : Boolean read fCreateDataBaseIfNotExists write fCreateDataBaseIfNotExists;
  59. property UserAgent : string read fUserAgent write fUserAgent;
  60. property Tags : TPairArray read fTags write fTags;
  61. procedure Init; override;
  62. procedure Restart; override;
  63. procedure Write(const aMeasurement : string; aFieldPairs : TFlexPairArray; aTime : TDateTime = 0); overload;
  64. procedure Write(const aMeasurement : string; aTagPairs : TPairArray; aFieldPairs : TFlexPairArray; aTime : TDateTime = 0); overload;
  65. procedure Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime); overload;
  66. end;
  67. EInfluxDBData = class(Exception);
  68. implementation
  69. constructor TInfluxDBData.Create;
  70. begin
  71. inherited;
  72. fURL := 'http://localhost:8086';
  73. fDataBase := 'db';
  74. fUserName := '';
  75. fPassword := '';
  76. fCreateDataBaseIfNotExists := True;
  77. OutputOptions.UseUTCTime := True;
  78. fUserAgent := DEF_USER_AGENT;
  79. end;
  80. destructor TInfluxDBData.Destroy;
  81. begin
  82. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  83. inherited;
  84. end;
  85. procedure TInfluxDBData.Init;
  86. begin
  87. if fInitiated then Stop;
  88. SetWriteURL;
  89. fHTTPClient := TJsonHTTPClient.Create;
  90. fHTTPClient.ContentType := 'application/json';
  91. fHTTPClient.UserAgent := fUserAgent;
  92. fHTTPClient.HandleRedirects := True;
  93. if fCreateDataBaseIfNotExists then CreateDataBase;
  94. inherited;
  95. end;
  96. procedure TInfluxDBData.Restart;
  97. begin
  98. Stop;
  99. if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
  100. Init;
  101. end;
  102. procedure TInfluxDBData.SetPassword(const Value: string);
  103. begin
  104. if fPassword <> Value then
  105. begin
  106. fPassword := Value;
  107. SetWriteURL;
  108. end;
  109. end;
  110. procedure TInfluxDBData.SetWriteURL;
  111. begin
  112. if fUserName+fPassword <> '' then fFullURL := Format('%s/write?db=%s&u=%s&p=%s&precision=ms',[fURL,fDataBase,fUserName,fPassword])
  113. else fFullURL := Format('%s/write?db=%s&precision=ms',[fURL,fDataBase]);
  114. end;
  115. procedure TInfluxDBData.Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime);
  116. var
  117. fparray : TFlexPairArray;
  118. begin
  119. fparray.Add(aFieldKey,aFieldValue);
  120. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,fparray,aTime))
  121. else Write(GenerateWriteQuery(aMeasurement,nil,fparray,Now()));
  122. end;
  123. procedure TInfluxDBData.Write(const aMeasurement: string; aTagPairs : TPairArray; aFieldPairs: TFlexPairArray; aTime: TDateTime);
  124. begin
  125. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,aTime))
  126. else Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,Now()));
  127. end;
  128. procedure TInfluxDBData.Write(const aMeasurement: string; aFieldPairs: TFlexPairArray; aTime: TDateTime);
  129. begin
  130. if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,aTime))
  131. else Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,Now()));
  132. end;
  133. procedure TInfluxDBData.SetUserName(const Value: string);
  134. begin
  135. if fUserName <> Value then
  136. begin
  137. fUserName := Value;
  138. SetWriteURL;
  139. end;
  140. end;
  141. procedure TInfluxDBData.CreateDataBase;
  142. var
  143. resp : IHttpRequestResponse;
  144. begin
  145. exit;
  146. try
  147. resp := fHTTPClient.Post(Format('%s/query?q=CREATE DATABASE %s',[fURL,fDatabase]),'');
  148. except
  149. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Creating DB: %s',[e.Message]);
  150. end;
  151. if not (resp.StatusCode in [200,204]) then
  152. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to create database',[resp.StatusCode,resp.StatusText]));
  153. end;
  154. function TInfluxDBData.GenerateWriteQuery(const aMeasurement : string; aTagPairs : TPairArray; aFieldPairs : TFlexPairArray; aTime : TDateTime): string;
  155. var
  156. incinfo : TStringList;
  157. tags : string;
  158. fields : string;
  159. tagpair : TPair;
  160. flexpair : TFlexPair;
  161. begin
  162. incinfo := TStringList.Create;
  163. try
  164. //add global tags
  165. for tagpair in fTags do
  166. begin
  167. incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  168. end;
  169. //add current query tags
  170. for tagpair in aTagPairs do
  171. begin
  172. incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
  173. end;
  174. tags := CommaText(incinfo);
  175. incinfo.Clear;
  176. for flexpair in aFieldPairs do
  177. begin
  178. if flexpair.Value.IsInteger then incinfo.Add(Format('%s=%d',[flexpair.Name,flexpair.Value.AsInt64]))
  179. else if flexpair.Value.IsFloating then incinfo.Add(Format('%s=%f',[flexpair.Name,flexpair.Value.AsExtended]))
  180. else incinfo.Add(Format('%s="%s"',[flexpair.Name,flexpair.Value.AsString]));
  181. end;
  182. fields := CommaText(incinfo);
  183. Result := Format('%s,%s %s %d',[aMeasurement,tags,fields,DateTimeToUnix(LocalTimeToUTC(aTime){$IFNDEF FPC},True{$ENDIF})*1000]);
  184. finally
  185. incinfo.Free;
  186. end;
  187. end;
  188. procedure TInfluxDBData.Write(const aLine : string);
  189. var
  190. resp : IHttpRequestResponse;
  191. stream : TStringStream;
  192. begin
  193. if not fInitiated then Init;
  194. stream := TStringStream.Create(aLine);
  195. try
  196. try
  197. resp := fHTTPClient.Post(fFullURL,stream);
  198. except
  199. on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Write Error: %s',[e.Message]);
  200. end;
  201. finally
  202. stream.Free;
  203. end;
  204. if not (resp.StatusCode in [200,204]) then
  205. raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to post event',[resp.StatusCode,resp.StatusText]));
  206. end;
  207. end.