searchdhttp.cpp 92 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421
  1. //
  2. // Copyright (c) 2017-2025, Manticore Software LTD (https://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. #include "searchdhttp.h"
  13. #include "jsonqueryfilter.h"
  14. #include "attribute.h"
  15. #include "sphinxpq.h"
  16. #include "http/http_parser.h"
  17. #include "searchdaemon.h"
  18. #include "searchdha.h"
  19. #include "searchdreplication.h"
  20. #include "accumulator.h"
  21. #include "networking_daemon.h"
  22. #include "client_session.h"
  23. #include "tracer.h"
  24. #include "searchdbuddy.h"
  25. #include "compressed_http.h"
  26. #include "daemon/logger.h"
  27. #include "daemon/search_handler.h"
  28. #include "sphinxquery/xqparser.h"
  29. static bool g_bLogBadHttpReq = val_from_env ( "MANTICORE_LOG_HTTP_BAD_REQ", false ); // log content of bad http requests, ruled by this env variable
  30. static int g_iLogHttpData = val_from_env ( "MANTICORE_LOG_HTTP_DATA", 0 ); // verbose logging of http data, ruled by this env variable
  31. static bool LOG_LEVEL_HTTP = val_from_env ( "MANTICORE_LOG_HTTP", false ); // verbose logging http processing events, ruled by this env variable
  32. #define LOG_COMPONENT_HTTP ""
  33. #define HTTPINFO LOGMSG ( VERBOSE_DEBUG, HTTP, HTTP )
  34. extern CSphString g_sStatusVersion;
  35. static const Str_t g_sDataDisabled = FROMS("-");
  36. Str_t Data2Log ( Str_t tMsg ) { return ( g_iLogHttpData ? Str_t ( tMsg.first, Min ( tMsg.second, g_iLogHttpData ) ) : g_sDataDisabled ); }
  37. Str_t Data2Log ( ByteBlob_t tMsg ) { return ( g_iLogHttpData ? Str_t ( (const char *)tMsg.first, Min ( tMsg.second, g_iLogHttpData ) ) : g_sDataDisabled ); }
  38. int HttpGetStatusCodes ( EHTTP_STATUS eStatus ) noexcept
  39. {
  40. switch ( eStatus )
  41. {
  42. case EHTTP_STATUS::_100: return 100;
  43. case EHTTP_STATUS::_200: return 200;
  44. case EHTTP_STATUS::_206: return 206;
  45. case EHTTP_STATUS::_400: return 400;
  46. case EHTTP_STATUS::_403: return 403;
  47. case EHTTP_STATUS::_404: return 404;
  48. case EHTTP_STATUS::_405: return 405;
  49. case EHTTP_STATUS::_409: return 409;
  50. case EHTTP_STATUS::_413: return 413;
  51. case EHTTP_STATUS::_415: return 415;
  52. case EHTTP_STATUS::_500: return 500;
  53. case EHTTP_STATUS::_501: return 501;
  54. case EHTTP_STATUS::_503: return 503;
  55. case EHTTP_STATUS::_526: return 526;
  56. default: return 503;
  57. };
  58. };
  59. EHTTP_STATUS HttpGetStatusCodes ( int iStatus ) noexcept
  60. {
  61. switch ( iStatus )
  62. {
  63. case 100: return EHTTP_STATUS::_100;
  64. case 200: return EHTTP_STATUS::_200;
  65. case 206: return EHTTP_STATUS::_206;
  66. case 400: return EHTTP_STATUS::_400;
  67. case 403: return EHTTP_STATUS::_403;
  68. case 404: return EHTTP_STATUS::_404;
  69. case 405: return EHTTP_STATUS::_405;
  70. case 409: return EHTTP_STATUS::_409;
  71. case 413: return EHTTP_STATUS::_413;
  72. case 415: return EHTTP_STATUS::_415;
  73. case 500: return EHTTP_STATUS::_500;
  74. case 501: return EHTTP_STATUS::_501;
  75. case 503: return EHTTP_STATUS::_503;
  76. case 526: return EHTTP_STATUS::_526;
  77. default: return EHTTP_STATUS::_503;
  78. };
  79. }
  80. inline constexpr const char* HttpGetStatusName ( EHTTP_STATUS eStatus ) noexcept
  81. {
  82. switch ( eStatus )
  83. {
  84. case EHTTP_STATUS::_100: return "100 Continue";
  85. case EHTTP_STATUS::_200: return "200 OK";
  86. case EHTTP_STATUS::_206: return "206 Partial Content";
  87. case EHTTP_STATUS::_400: return "400 Bad Request";
  88. case EHTTP_STATUS::_403: return "403 Forbidden";
  89. case EHTTP_STATUS::_404: return "404 Not Found";
  90. case EHTTP_STATUS::_405: return "405 Method Not Allowed";
  91. case EHTTP_STATUS::_409: return "409 Conflict";
  92. case EHTTP_STATUS::_413: return "413 Request Entity Too Large";
  93. case EHTTP_STATUS::_415: return "415 Unsupported Media Type";
  94. case EHTTP_STATUS::_500: return "500 Internal Server Error";
  95. case EHTTP_STATUS::_501: return "501 Not Implemented";
  96. case EHTTP_STATUS::_503: return "503 Service Unavailable";
  97. case EHTTP_STATUS::_526: return "526 Invalid SSL Certificate";
  98. default: return "503 Service Unavailable";
  99. };
  100. }
  101. extern CSphString g_sMySQLVersion;
  102. static void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHtml, bool bHeadReply )
  103. {
  104. assert ( sBody && iBodyLen );
  105. const char * sContent = ( bHtml ? "text/html" : "application/json" );
  106. CSphString sHttp;
  107. sHttp.SetSprintf ( "HTTP/1.1 %s\r\nServer: %s\r\nContent-Type: %s; charset=UTF-8\r\nContent-Length:%d\r\n\r\n", HttpGetStatusName(eCode), g_sMySQLVersion.cstr(), sContent, iBodyLen );
  108. int iHeaderLen = sHttp.Length();
  109. int iBufLen = iHeaderLen;
  110. if ( !bHeadReply )
  111. iBufLen += iBodyLen;
  112. dData.Resize ( iBufLen );
  113. memcpy ( dData.Begin(), sHttp.cstr(), iHeaderLen );
  114. if ( !bHeadReply )
  115. memcpy ( dData.Begin() + iHeaderLen, sBody, iBodyLen );
  116. }
  117. void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHtml )
  118. {
  119. HttpBuildReply ( dData, eCode, sBody, iBodyLen, bHtml, false );
  120. }
  121. void HttpBuildReplyHead ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHeadReply )
  122. {
  123. HttpBuildReply ( dData, eCode, sBody, iBodyLen, false, bHeadReply );
  124. }
  125. void HttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError )
  126. {
  127. JsonObj_c tErr;
  128. tErr.AddStr ( "error", szError );
  129. CSphString sJsonError = tErr.AsString();
  130. HttpBuildReply ( dData, eCode, sJsonError.cstr(), sJsonError.Length(), false );
  131. }
  132. struct Endpoint_t
  133. {
  134. const char* m_szName1;
  135. const char* m_szName2;
  136. };
  137. static Endpoint_t g_dEndpoints[(size_t)EHTTP_ENDPOINT::TOTAL] =
  138. {
  139. { "index.html", nullptr },
  140. { "sql", nullptr },
  141. { "search", "json/search" },
  142. { "index", "json/index" },
  143. { "create", "json/create" },
  144. { "insert", "json/insert" },
  145. { "replace", "json/replace" },
  146. { "update", "json/update" },
  147. { "delete", "json/delete" },
  148. { "bulk", "json/bulk" },
  149. { "pq", "json/pq" },
  150. { "cli", nullptr },
  151. { "cli_json", nullptr },
  152. { "_bulk", nullptr }
  153. };
  154. EHTTP_ENDPOINT StrToHttpEndpoint ( const CSphString& sEndpoint ) noexcept
  155. {
  156. if ( sEndpoint.Begins ( g_dEndpoints[(int)EHTTP_ENDPOINT::PQ].m_szName1 ) || sEndpoint.Begins ( g_dEndpoints[(int)EHTTP_ENDPOINT::PQ].m_szName2 ) )
  157. return EHTTP_ENDPOINT::PQ;
  158. for ( int i = 0; i < (int)EHTTP_ENDPOINT::TOTAL; ++i )
  159. if ( sEndpoint == g_dEndpoints[i].m_szName1 || ( g_dEndpoints[i].m_szName2 && sEndpoint == g_dEndpoints[i].m_szName2 ) )
  160. return EHTTP_ENDPOINT ( i );
  161. return EHTTP_ENDPOINT::TOTAL;
  162. }
  163. ///////////////////////////////////////////////////////////////////////
  164. /// Stream reader
  165. bool CharStream_c::GetError() const
  166. {
  167. return ( ( m_pIn && m_pIn->GetError() ) || !m_sError.IsEmpty() );
  168. }
  169. const CSphString & CharStream_c::GetErrorMessage() const
  170. {
  171. if ( m_pIn && m_pIn->GetError() )
  172. return m_pIn->GetErrorMessage();
  173. else
  174. return m_sError;
  175. }
  176. /// stub - returns feed string
  177. class BlobStream_c final: public CharStream_c
  178. {
  179. Str_t m_sData;
  180. public:
  181. BlobStream_c ( const CSphString & sData )
  182. : CharStream_c ( nullptr )
  183. , m_sData { FromStr ( sData ) }
  184. {}
  185. Str_t Read() final
  186. {
  187. if ( m_bDone )
  188. return dEmptyStr;
  189. m_bDone = true;
  190. return m_sData;
  191. }
  192. Str_t ReadAll() final
  193. {
  194. auto sData = Read();
  195. Str_t sDescr = { sData.first, Min ( sData.second, 100 ) };
  196. myinfo::SetDescription ( sDescr, sDescr.second );
  197. return sData;
  198. }
  199. };
  200. /// stream with known content length and no special massage over socket
  201. class RawSocketStream_c final : public CharStream_c
  202. {
  203. int m_iContentLength;
  204. bool m_bTerminated = false;
  205. BYTE m_uOldTerminator = 0;
  206. CSphVector<BYTE> m_dUnpacked;
  207. bool m_bCompressed = false;
  208. public:
  209. RawSocketStream_c ( AsyncNetInputBuffer_c * pIn, int iContentLength, bool bCompressed )
  210. : CharStream_c ( pIn )
  211. , m_iContentLength ( iContentLength )
  212. , m_bCompressed ( bCompressed )
  213. {
  214. assert ( pIn );
  215. m_bDone = !m_iContentLength;
  216. }
  217. ~RawSocketStream_c() final
  218. {
  219. if ( m_bTerminated )
  220. m_pIn->Terminate ( 0, m_uOldTerminator );
  221. m_pIn->DiscardProcessed ( 0 );
  222. }
  223. Str_t Read() final
  224. {
  225. if ( m_bDone )
  226. return dEmptyStr;
  227. m_pIn->DiscardProcessed ( 0 );
  228. if ( !m_pIn->HasBytes() && m_pIn->ReadAny()<0 )
  229. {
  230. if ( !m_pIn->GetError() )
  231. m_sError.SetSprintf ( "failed to receive HTTP request (error='%s')", sphSockError() );
  232. m_bDone = true;
  233. return dEmptyStr;
  234. }
  235. auto iChunk = Min ( m_iContentLength, m_pIn->HasBytes() );
  236. m_iContentLength -= iChunk;
  237. m_bDone = !m_iContentLength;
  238. // Temporary write \0 at the end, since parser wants z-terminated buf
  239. if ( m_bDone )
  240. {
  241. m_uOldTerminator = m_pIn->Terminate ( iChunk, '\0' );
  242. m_bTerminated = true;
  243. }
  244. return Decompress ( m_pIn->PopTail ( iChunk ) );
  245. }
  246. Str_t ReadAll() final
  247. {
  248. if ( m_bDone )
  249. return dEmptyStr;
  250. // that is oneshot read - we sure, we're done
  251. m_bDone = true;
  252. if ( m_iContentLength && !m_pIn->ReadFrom ( m_iContentLength ) )
  253. {
  254. if ( !m_pIn->GetError() )
  255. m_sError.SetSprintf ( "failed to receive HTTP request (error='%s')", sphSockError() );
  256. return dEmptyStr;
  257. }
  258. m_uOldTerminator = m_pIn->Terminate ( m_iContentLength, '\0' );
  259. return Decompress ( m_pIn->PopTail ( m_iContentLength ) );
  260. }
  261. Str_t Decompress ( const ByteBlob_t & tIn )
  262. {
  263. if ( !m_bCompressed )
  264. return B2S ( tIn );
  265. m_dUnpacked.Resize ( 0 );
  266. if ( !GzipDecompress ( tIn, m_dUnpacked, m_sError ) )
  267. {
  268. m_bDone = true;
  269. return dEmptyStr;
  270. }
  271. return Str_t ( m_dUnpacked );
  272. }
  273. };
  274. /// chunked stream - i.e. total content length is unknown
  275. class ChunkedSocketStream_c final: public CharStream_c
  276. {
  277. CSphVector<BYTE> m_dData; // used only in ReadAll() call
  278. int m_iLastParsed;
  279. bool m_bBodyDone;
  280. CSphVector<Str_t> m_dBodies;
  281. http_parser_settings m_tParserSettings;
  282. http_parser* m_pParser;
  283. private:
  284. // callbacks
  285. static int cbParserBody ( http_parser* pParser, const char* sAt, size_t iLen )
  286. {
  287. assert ( pParser->data );
  288. auto pThis = static_cast<ChunkedSocketStream_c*> ( pParser->data );
  289. return pThis->ParserBody ( { sAt, (int)iLen } );
  290. }
  291. static int cbMessageComplete ( http_parser* pParser )
  292. {
  293. assert ( pParser->data );
  294. auto pThis = static_cast<ChunkedSocketStream_c*> ( pParser->data );
  295. return pThis->MessageComplete();
  296. }
  297. inline int MessageComplete()
  298. {
  299. HTTPINFO << "ChunkedSocketStream_c::MessageComplete";
  300. m_bBodyDone = true;
  301. return 0;
  302. }
  303. inline int ParserBody ( Str_t sData )
  304. {
  305. HTTPINFO << "ParserBody chunked str with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";;
  306. if ( !IsEmpty ( sData ) )
  307. m_dBodies.Add ( sData );
  308. return 0;
  309. }
  310. void ParseBody ( ByteBlob_t sData )
  311. {
  312. HTTPINFO << "ParseBody chunked blob with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";;
  313. m_iLastParsed = (int)http_parser_execute ( m_pParser, &m_tParserSettings, (const char*)sData.first, sData.second );
  314. if ( m_iLastParsed != sData.second )
  315. {
  316. HTTPINFO << "ParseBody error: parsed " << m_iLastParsed << ", chunk " << sData.second;
  317. if ( !m_pIn->GetError() )
  318. m_sError = http_errno_description ( (http_errno)m_pParser->http_errno );
  319. }
  320. }
  321. public:
  322. ChunkedSocketStream_c ( AsyncNetInputBuffer_c * pIn, http_parser * pParser, bool bBodyDone, CSphVector<Str_t> dBodies, int iLastParsed )
  323. : CharStream_c ( pIn )
  324. , m_iLastParsed ( iLastParsed )
  325. , m_bBodyDone ( bBodyDone )
  326. , m_pParser ( pParser )
  327. {
  328. assert ( pIn );
  329. m_dBodies = std::move ( dBodies );
  330. http_parser_settings_init ( &m_tParserSettings );
  331. m_tParserSettings.on_body = cbParserBody;
  332. m_tParserSettings.on_message_complete = cbMessageComplete;
  333. m_pParser->data = this;
  334. }
  335. void DiscardLast()
  336. {
  337. m_pIn->PopTail ( std::exchange ( m_iLastParsed, 0 ) );
  338. m_pIn->DiscardProcessed ( 0 );
  339. }
  340. ~ChunkedSocketStream_c() final
  341. {
  342. DiscardLast();
  343. }
  344. Str_t Read() final
  345. {
  346. if ( m_bDone )
  347. return dEmptyStr;
  348. while ( m_dBodies.IsEmpty() )
  349. {
  350. if ( m_bBodyDone )
  351. {
  352. m_bDone = true;
  353. return dEmptyStr;
  354. }
  355. DiscardLast();
  356. if ( !m_pIn->HasBytes() )
  357. {
  358. switch ( m_pIn->ReadAny() )
  359. {
  360. case -1:
  361. case 0:
  362. m_bDone = true;
  363. return dEmptyStr;
  364. default:
  365. break;
  366. }
  367. }
  368. ParseBody ( m_pIn->Tail() );
  369. }
  370. auto sResult = m_dBodies.First();
  371. m_dBodies.Remove ( 0 );
  372. if ( m_bBodyDone && m_dBodies.IsEmpty() )
  373. {
  374. m_bDone = true;
  375. const_cast<char&> ( sResult.first[sResult.second] ) = '\0';
  376. }
  377. return sResult;
  378. }
  379. Str_t ReadAll() final
  380. {
  381. auto sFirst = Read();
  382. if ( m_bDone )
  383. return sFirst;
  384. m_dData.Append ( sFirst );
  385. do
  386. m_dData.Append ( Read() );
  387. while ( !m_bDone );
  388. m_dData.Add ( '\0' );
  389. m_dData.Resize ( m_dData.GetLength() - 1 );
  390. return m_dData;
  391. }
  392. };
  393. ///////////////////////////////////////////////////////////////////////
  394. CSphString HttpEndpointToStr ( EHTTP_ENDPOINT eEndpoint )
  395. {
  396. assert ( eEndpoint < EHTTP_ENDPOINT::TOTAL );
  397. return g_dEndpoints[(int)eEndpoint].m_szName1;
  398. }
  399. void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, Str_t sReply, bool bHtml )
  400. {
  401. const char * sContent = ( bHtml ? "text/html" : "application/json" );
  402. StringBuilder_c sHttp;
  403. sHttp.Sprintf ( "HTTP/1.1 %s\r\nServer: %s\r\nContent-Type: %s; charset=UTF-8\r\nContent-Length: %d\r\n\r\n", HttpGetStatusName ( eCode ), g_sStatusVersion.cstr(), sContent, sReply.second );
  404. dData.Reserve ( sHttp.GetLength() + sReply.second );
  405. dData.Append ( (Str_t)sHttp );
  406. dData.Append ( sReply );
  407. }
  408. HttpRequestParser_c::HttpRequestParser_c()
  409. {
  410. http_parser_settings_init ( &m_tParserSettings );
  411. m_tParserSettings.on_url = cbParserUrl;
  412. m_tParserSettings.on_header_field = cbParserHeaderField;
  413. m_tParserSettings.on_header_value = cbParserHeaderValue;
  414. m_tParserSettings.on_headers_complete = cbParseHeaderCompleted;
  415. m_tParserSettings.on_body = cbParserBody;
  416. m_tParserSettings.on_message_begin = cbMessageBegin;
  417. m_tParserSettings.on_message_complete = cbMessageComplete;
  418. m_tParserSettings.on_status = cbMessageStatus;
  419. Reinit();
  420. }
  421. void HttpRequestParser_c::Reinit()
  422. {
  423. HTTPINFO << "HttpRequestParser_c::Reinit()";
  424. http_parser_init ( &m_tParser, HTTP_REQUEST );
  425. m_sEndpoint = "";
  426. m_sCurField.Clear();
  427. m_sCurValue.Clear();
  428. m_hOptions.Reset();
  429. m_eType = HTTP_GET;
  430. m_sUrl.Clear();
  431. m_bHeaderDone = false;
  432. m_bBodyDone = false;
  433. m_dParsedBodies.Reset();
  434. m_iParsedBodyLength = 0;
  435. m_iLastParsed = 0;
  436. m_szError = nullptr;
  437. m_tParser.data = this;
  438. }
  439. bool HttpRequestParser_c::ParseHeader ( ByteBlob_t sData )
  440. {
  441. HTTPINFO << "ParseChunk with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
  442. m_iLastParsed = (int) http_parser_execute ( &m_tParser, &m_tParserSettings, (const char *)sData.first, sData.second );
  443. if ( m_iLastParsed != sData.second )
  444. {
  445. if ( g_bLogBadHttpReq )
  446. {
  447. sphWarning ( "ParseChunk error: parsed %d, chunk %d, conn %d, %.*s", m_iLastParsed, sData.second, session::GetConnID(), sData.second, sData.first );
  448. } else
  449. {
  450. HTTPINFO << "ParseChunk error: parsed " << m_iLastParsed << ", chunk " << sData.second;
  451. }
  452. m_szError = http_errno_description ( (http_errno)m_tParser.http_errno );
  453. return true;
  454. }
  455. return m_bHeaderDone;
  456. }
  457. int HttpRequestParser_c::ParsedBodyLength() const
  458. {
  459. return m_iParsedBodyLength;
  460. }
  461. bool HttpRequestParser_c::Expect100() const
  462. {
  463. return m_hOptions.Exists ( "expect" ) && m_hOptions["expect"] == "100-continue";
  464. }
  465. bool HttpRequestParser_c::KeepAlive() const
  466. {
  467. return m_bKeepAlive;
  468. }
  469. const char* HttpRequestParser_c::Error() const
  470. {
  471. return m_szError;
  472. }
  473. bool HttpRequestParser_c::IsBuddyQuery () const
  474. {
  475. return ::IsBuddyQuery ( m_hOptions );
  476. }
  477. inline int Char2Hex ( BYTE uChar )
  478. {
  479. switch (uChar)
  480. {
  481. case '0': return 0;
  482. case '1': return 1;
  483. case '2': return 2;
  484. case '3': return 3;
  485. case '4': return 4;
  486. case '5': return 5;
  487. case '6': return 6;
  488. case '7': return 7;
  489. case '8': return 8;
  490. case '9': return 9;
  491. case 'a': case 'A': return 10;
  492. case 'b': case 'B': return 11;
  493. case 'c': case 'C': return 12;
  494. case 'd': case 'D': return 13;
  495. case 'e': case 'E': return 14;
  496. case 'f': case 'F': return 15;
  497. default: break;
  498. }
  499. return -1;
  500. }
  501. inline int Chars2Hex ( const char* pSrc )
  502. {
  503. int iRes = Char2Hex ( *( pSrc + 1 ) );
  504. return iRes < 0 ? iRes : iRes + Char2Hex ( *pSrc ) * 16;
  505. }
  506. void UriPercentReplace ( Str_t & sEntity, Replace_e ePlus )
  507. {
  508. if ( IsEmpty ( sEntity ) )
  509. return;
  510. const char* pSrc = sEntity.first;
  511. auto* pDst = const_cast<char*> ( pSrc );
  512. char cPlus = ((bool)ePlus) ? ' ' : '+';
  513. auto* pEnd = pSrc + sEntity.second;
  514. while ( pSrc < pEnd )
  515. {
  516. if ( *pSrc=='%' && *(pSrc+1) && *(pSrc+2) )
  517. {
  518. auto iCode = Chars2Hex ( pSrc + 1 );
  519. if ( iCode<0 )
  520. {
  521. *pDst++ = *pSrc++;
  522. continue;
  523. }
  524. pSrc += 3;
  525. *pDst++ = (char) iCode;
  526. } else
  527. {
  528. *pDst++ = ( *pSrc=='+' ? cPlus : *pSrc );
  529. pSrc++;
  530. }
  531. }
  532. sEntity.second = int ( pDst - sEntity.first );
  533. }
  534. void StoreRawQuery ( OptionsHash_t& hOptions, CSphString sRawBody )
  535. {
  536. if ( sRawBody.IsEmpty() )
  537. return;
  538. hOptions.Add ( std::move ( sRawBody ), "raw_query" );
  539. }
  540. void DecodeAndStoreRawQuery ( OptionsHash_t& hOptions, const Str_t& sWholeData )
  541. {
  542. if ( IsEmpty ( sWholeData ) )
  543. return;
  544. // store raw query
  545. CSphString sRawBody ( sWholeData ); // copy raw data, important!
  546. Str_t sRaw { sRawBody.cstr(), sWholeData.second }; // FromStr implies strlen(), but we don't need it
  547. UriPercentReplace ( sRaw, Replace_e::NoPlus ); // avoid +-decoding
  548. *const_cast<char*> ( sRaw.first + sRaw.second ) = '\0';
  549. StoreRawQuery ( hOptions, std::move ( sRawBody ));
  550. }
  551. void HttpRequestParser_c::ParseList ( Str_t sData, OptionsHash_t & hOptions )
  552. {
  553. HTTPINFO << "ParseList with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
  554. CSphString sBuf ( sData );
  555. const char * sCur = sBuf.cstr();
  556. const char * sLast = sCur;
  557. const char * sEnd = sCur + sData.second;
  558. Str_t sName = dEmptyStr;
  559. for ( ; sCur<sEnd; ++sCur )
  560. {
  561. switch (*sCur)
  562. {
  563. case '=':
  564. {
  565. sName = { sLast, int ( sCur - sLast ) };
  566. UriPercentReplace ( sName );
  567. sLast = sCur + 1;
  568. break;
  569. }
  570. case '&':
  571. {
  572. Str_t sVal { sLast, int ( sCur - sLast ) };
  573. UriPercentReplace ( sVal );
  574. ToLower ( sName );
  575. hOptions.Add ( sVal, sName );
  576. sLast = sCur + 1;
  577. sName = dEmptyStr;
  578. break;
  579. }
  580. default:
  581. break;
  582. }
  583. }
  584. if ( IsEmpty ( sName ) )
  585. return;
  586. Str_t sVal { sLast, int ( sCur - sLast ) };
  587. UriPercentReplace ( sVal );
  588. ToLower ( sName );
  589. hOptions.Add ( sVal, sName );
  590. }
  591. inline int HttpRequestParser_c::ParserUrl ( Str_t sData )
  592. {
  593. HTTPINFO << "ParseUrl with " << sData.second << " bytes '" << sData << "'";
  594. m_sUrl << sData;
  595. return 0;
  596. }
  597. inline void HttpRequestParser_c::FinishParserUrl ()
  598. {
  599. if ( m_sUrl.IsEmpty() )
  600. return;
  601. auto _ = AtScopeExit ( [this] { m_sUrl.Clear(); } );
  602. auto sData = (Str_t) m_sUrl;
  603. http_parser_url tUri;
  604. if ( http_parser_parse_url ( sData.first, sData.second, 0, &tUri ) )
  605. return;
  606. DWORD uPath = ( 1UL<<UF_PATH );
  607. DWORD uQuery = ( 1UL<<UF_QUERY );
  608. if ( ( tUri.field_set & uPath )!=0 )
  609. {
  610. const char * sPath = sData.first + tUri.field_data[UF_PATH].off;
  611. int iPathLen = tUri.field_data[UF_PATH].len;
  612. if ( *sPath=='/' )
  613. {
  614. ++sPath;
  615. --iPathLen;
  616. }
  617. // URL should be split fully to point to proper endpoint
  618. m_sEndpoint.SetBinary ( sPath, iPathLen );
  619. // transfer endpoint for further parse
  620. m_hOptions.Add ( m_sEndpoint, "endpoint" );
  621. }
  622. if ( ( tUri.field_set & uQuery )!=0 )
  623. {
  624. Str_t sRawGetQuery { sData.first + tUri.field_data[UF_QUERY].off, tUri.field_data[UF_QUERY].len };
  625. if ( m_eType==HTTP_GET )
  626. DecodeAndStoreRawQuery ( m_hOptions, sRawGetQuery );
  627. ParseList ( sRawGetQuery, m_hOptions );
  628. }
  629. CSphString sFullURL;
  630. if ( ( tUri.field_set & uPath )!=0 && ( tUri.field_set & uQuery )!=0 )
  631. {
  632. const char * sStart = sData.first + tUri.field_data[UF_PATH].off;
  633. const char * sEnd = sData.first + tUri.field_data[UF_QUERY].off + tUri.field_data[UF_QUERY].len;
  634. sFullURL.SetBinary ( sStart, sEnd-sStart );
  635. m_hOptions.Add ( sFullURL, "full_url" );
  636. } else if ( ( tUri.field_set & uPath )!=0 )
  637. {
  638. const char * sPath = sData.first + tUri.field_data[UF_PATH].off;
  639. int iPathLen = tUri.field_data[UF_PATH].len;
  640. // URL should be split fully to point to proper endpoint
  641. sFullURL.SetBinary ( sPath, iPathLen );
  642. m_hOptions.Add ( sFullURL, "full_url" );
  643. }
  644. }
  645. inline int HttpRequestParser_c::ParserHeaderField ( Str_t sData )
  646. {
  647. FinishParserKeyVal();
  648. m_sCurField << sData;
  649. return 0;
  650. }
  651. inline int HttpRequestParser_c::ParserHeaderValue ( Str_t sData )
  652. {
  653. m_sCurValue << sData;
  654. return 0;
  655. }
  656. inline void HttpRequestParser_c::FinishParserKeyVal()
  657. {
  658. if ( m_sCurValue.IsEmpty() )
  659. return;
  660. CSphString sField = (CSphString)m_sCurField;
  661. sField.ToLower();
  662. m_hOptions.Add ( (CSphString)m_sCurValue, sField );
  663. m_sCurField.Clear();
  664. m_sCurValue.Clear();
  665. }
  666. inline int HttpRequestParser_c::ParserBody ( Str_t sData )
  667. {
  668. HTTPINFO << "ParserBody parser with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
  669. if ( !m_dParsedBodies.IsEmpty() )
  670. {
  671. auto& sLast = m_dParsedBodies.Last();
  672. if ( sLast.first + sLast.second == sData.first )
  673. sLast.second += sData.second;
  674. else
  675. m_dParsedBodies.Add ( sData );
  676. } else
  677. m_dParsedBodies.Add ( sData );
  678. m_iParsedBodyLength += sData.second;
  679. return 0;
  680. }
  681. inline int HttpRequestParser_c::MessageComplete ()
  682. {
  683. HTTPINFO << "MessageComplete";
  684. m_bBodyDone = true;
  685. return 0;
  686. }
  687. inline int HttpRequestParser_c::ParseHeaderCompleted ()
  688. {
  689. HTTPINFO << "ParseHeaderCompleted. Upgrade=" << (unsigned int)m_tParser.upgrade << ", length=" << (int64_t) m_tParser.content_length;
  690. // we're not support connection upgrade - so just reset upgrade flag, if detected.
  691. // rfc7540 section-3.2 (for http/2) says, we just should continue as if no 'upgrade' header was found
  692. m_tParser.upgrade = 0;
  693. // connection wide http options
  694. m_bKeepAlive = ( ( m_tParser.flags & F_CONNECTION_KEEP_ALIVE ) != 0 );
  695. m_eType = (http_method)m_tParser.method;
  696. FinishParserKeyVal();
  697. FinishParserUrl();
  698. m_bHeaderDone = true;
  699. return 0;
  700. }
  701. int HttpRequestParser_c::cbParserUrl ( http_parser* pParser, const char* sAt, size_t iLen )
  702. {
  703. assert ( pParser->data );
  704. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  705. return pThis->ParserUrl ( { sAt, (int)iLen } );
  706. }
  707. int HttpRequestParser_c::cbParserHeaderField ( http_parser* pParser, const char* sAt, size_t iLen )
  708. {
  709. assert ( pParser->data );
  710. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  711. return pThis->ParserHeaderField ( { sAt, (int)iLen } );
  712. }
  713. int HttpRequestParser_c::cbParserHeaderValue ( http_parser* pParser, const char* sAt, size_t iLen )
  714. {
  715. assert ( pParser->data );
  716. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  717. return pThis->ParserHeaderValue ( { sAt, (int)iLen } );
  718. }
  719. int HttpRequestParser_c::cbParseHeaderCompleted ( http_parser* pParser )
  720. {
  721. assert ( pParser->data );
  722. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  723. return pThis->ParseHeaderCompleted ();
  724. }
  725. int HttpRequestParser_c::cbMessageBegin ( http_parser* pParser )
  726. {
  727. HTTPINFO << "cbMessageBegin";
  728. return 0;
  729. }
  730. int HttpRequestParser_c::cbMessageComplete ( http_parser* pParser )
  731. {
  732. assert ( pParser->data );
  733. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  734. return pThis->MessageComplete();
  735. }
  736. int HttpRequestParser_c::cbMessageStatus ( http_parser* pParser, const char* sAt, size_t iLen )
  737. {
  738. HTTPINFO << "cbMessageStatus with '" << Str_t { sAt, (int)iLen } << "'";
  739. return 0;
  740. }
  741. int HttpRequestParser_c::cbParserBody ( http_parser* pParser, const char* sAt, size_t iLen )
  742. {
  743. assert ( pParser->data );
  744. auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
  745. return pThis->ParserBody ( { sAt, (int)iLen } );
  746. }
  747. static const char * g_sIndexPage =
  748. R"index(<!DOCTYPE html>
  749. <html>
  750. <head>
  751. <title>Manticore</title>
  752. </head>
  753. <body>
  754. <h1>Manticore daemon</h1>
  755. <p>%s</p>
  756. </body>
  757. </html>)index";
  758. static void HttpHandlerIndexPage ( CSphVector<BYTE> & dData )
  759. {
  760. StringBuilder_c sIndexPage;
  761. sIndexPage.Appendf ( g_sIndexPage, g_sStatusVersion.cstr() );
  762. HttpBuildReply ( dData, EHTTP_STATUS::_200, (Str_t)sIndexPage, true );
  763. }
  764. //////////////////////////////////////////////////////////////////////////
  765. class JsonRequestBuilder_c : public RequestBuilder_i
  766. {
  767. public:
  768. JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint )
  769. : m_sEndpoint ( std::move ( sEndpoint ) )
  770. , m_tQuery ( szQuery )
  771. {
  772. // fixme: we can implement replacing indexes in a string (without parsing) if it becomes a performance issue
  773. }
  774. void BuildRequest ( const AgentConn_t & tAgent, ISphOutputBuffer & tOut ) const final
  775. {
  776. // replace "table" value in the json query
  777. m_tQuery.DelItem ( "table" );
  778. m_tQuery.AddStr ( "table", tAgent.m_tDesc.m_sIndexes.cstr() );
  779. CSphString sRequest = m_tQuery.AsString();
  780. auto tWr = APIHeader ( tOut, SEARCHD_COMMAND_JSON, VER_COMMAND_JSON ); // API header
  781. tOut.SendString ( m_sEndpoint.cstr() );
  782. tOut.SendString ( sRequest.cstr() );
  783. }
  784. private:
  785. CSphString m_sEndpoint;
  786. mutable JsonObj_c m_tQuery;
  787. };
  788. class JsonReplyParser_c : public ReplyParser_i
  789. {
  790. public:
  791. JsonReplyParser_c ( int & iAffected, int & iWarnings )
  792. : m_iAffected ( iAffected )
  793. , m_iWarnings ( iWarnings )
  794. {}
  795. bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & ) const final
  796. {
  797. CSphString sEndpoint = tReq.GetString();
  798. EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( sEndpoint );
  799. if ( eEndpoint!=EHTTP_ENDPOINT::JSON_UPDATE && eEndpoint!=EHTTP_ENDPOINT::JSON_DELETE )
  800. return false;
  801. DWORD uLength = tReq.GetDword();
  802. CSphFixedVector<BYTE> dResult ( uLength+1 );
  803. tReq.GetBytes ( dResult.Begin(), (int)uLength );
  804. dResult[uLength] = '\0';
  805. return sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE );
  806. }
  807. protected:
  808. int & m_iAffected;
  809. int & m_iWarnings;
  810. };
  811. std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept
  812. {
  813. return bJson ? sphCreateJsonQueryParser() : sphCreatePlainQueryParser();
  814. }
  815. std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const SqlStmt_t & tStmt )
  816. {
  817. if ( tStmt.m_bJson )
  818. {
  819. assert ( !tStmt.m_sEndpoint.IsEmpty() );
  820. return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint );
  821. } else
  822. {
  823. return std::make_unique<SphinxqlRequestBuilder_c> ( sQuery, tStmt );
  824. }
  825. }
  826. std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings )
  827. {
  828. if ( bJson )
  829. return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings );
  830. else
  831. return std::make_unique<SphinxqlReplyParser_c> ( &iUpdated, &iWarnings );
  832. }
  833. //////////////////////////////////////////////////////////////////////////
  834. class HttpErrorReporter_c final : public StmtErrorReporter_i
  835. {
  836. public:
  837. void Ok ( int iAffectedRows, const CSphString & /*sWarning*/, int64_t /*iLastInsertId*/ ) final { m_iAffected = iAffectedRows; }
  838. void Ok ( int iAffectedRows, int /*nWarnings*/ ) final { m_iAffected = iAffectedRows; }
  839. void ErrorEx ( EMYSQL_ERR eErr, const char * sError ) final;
  840. RowBuffer_i * GetBuffer() final { return nullptr; }
  841. bool IsError() const { return m_bError; }
  842. const char * GetError() const { return m_sError.cstr(); }
  843. int GetAffectedRows() const { return m_iAffected; }
  844. private:
  845. bool m_bError {false};
  846. CSphString m_sError;
  847. int m_iAffected {0};
  848. };
  849. void HttpErrorReporter_c::ErrorEx ( EMYSQL_ERR /*iErr*/, const char * sError )
  850. {
  851. m_bError = true;
  852. m_sError = sError;
  853. }
  854. StmtErrorReporter_i * CreateHttpErrorReporter()
  855. {
  856. return new HttpErrorReporter_c();
  857. }
  858. //////////////////////////////////////////////////////////////////////////
  859. // all the handlers for http queries
  860. void ReplyBuf ( Str_t sResult, EHTTP_STATUS eStatus, bool bNeedHttpResponse, CSphVector<BYTE> & dData )
  861. {
  862. if ( bNeedHttpResponse )
  863. HttpBuildReply ( dData, eStatus, sResult, false );
  864. else
  865. {
  866. dData.Resize ( 0 );
  867. dData.Append ( sResult );
  868. }
  869. }
  870. void HttpHandler_c::SetErrorFormat ( bool bNeedHttpResponse )
  871. {
  872. m_bNeedHttpResponse = bNeedHttpResponse;
  873. }
  874. CSphVector<BYTE> & HttpHandler_c::GetResult()
  875. {
  876. return m_dData;
  877. }
  878. const CSphString & HttpHandler_c::GetError () const
  879. {
  880. return m_sError;
  881. }
  882. EHTTP_STATUS HttpHandler_c::GetStatusCode() const
  883. {
  884. return m_eHttpCode;
  885. }
  886. void HttpHandler_c::ReportError ( const char * szError, EHTTP_STATUS eStatus )
  887. {
  888. m_sError = szError;
  889. ReportError ( eStatus );
  890. }
  891. void HttpHandler_c::ReportError ( EHTTP_STATUS eStatus )
  892. {
  893. m_eHttpCode = eStatus;
  894. if ( m_bNeedHttpResponse )
  895. sphHttpErrorReply ( m_dData, eStatus, m_sError.cstr() );
  896. else
  897. {
  898. m_dData.Resize ( m_sError.Length() );
  899. memcpy ( m_dData.Begin(), m_sError.cstr(), m_dData.GetLength() );
  900. }
  901. }
  902. void HttpHandler_c::FormatError ( EHTTP_STATUS eStatus, const char * sError, ... )
  903. {
  904. va_list ap;
  905. va_start ( ap, sError );
  906. m_sError.SetSprintfVa ( sError, ap );
  907. va_end ( ap );
  908. m_eHttpCode = eStatus;
  909. if ( m_bNeedHttpResponse )
  910. sphHttpErrorReply ( m_dData, eStatus, m_sError.cstr() );
  911. else
  912. {
  913. int iLen = m_sError.Length();
  914. m_dData.Resize ( iLen+1 );
  915. memcpy ( m_dData.Begin(), m_sError.cstr(), iLen );
  916. m_dData[iLen] = '\0';
  917. }
  918. }
  919. void HttpHandler_c::ReportError ( const char * sError, HttpErrorType_e eType, EHTTP_STATUS eStatus, const char * sIndex )
  920. {
  921. if ( sError )
  922. m_sError = sError;
  923. m_eHttpCode = eStatus;
  924. const char * sErrorType = GetErrorTypeName ( eType );
  925. int iStatus = HttpGetStatusCodes ( eStatus );
  926. CSphString sReply = ( sErrorType ? JsonEncodeResultError ( m_sError, sErrorType, iStatus, sIndex ) : JsonEncodeResultError ( m_sError, iStatus ) );
  927. HttpBuildReplyHead ( GetResult(), eStatus, sReply.cstr(), sReply.Length(), false );
  928. }
  929. void HttpHandler_c::BuildReply ( const CSphString & sResult, EHTTP_STATUS eStatus )
  930. {
  931. m_eHttpCode = eStatus;
  932. ReplyBuf ( FromStr ( sResult ), eStatus, m_bNeedHttpResponse, m_dData );
  933. }
  934. void HttpHandler_c::BuildReply ( const char* szResult, EHTTP_STATUS eStatus )
  935. {
  936. m_eHttpCode = eStatus;
  937. ReplyBuf ( FromSz( szResult ), eStatus, m_bNeedHttpResponse, m_dData );
  938. }
  939. void HttpHandler_c::BuildReply ( const StringBuilder_c & sResult, EHTTP_STATUS eStatus )
  940. {
  941. m_eHttpCode = eStatus;
  942. ReplyBuf ( (Str_t)sResult, eStatus, m_bNeedHttpResponse, m_dData );
  943. }
  944. void HttpHandler_c::BuildReply ( Str_t sResult, EHTTP_STATUS eStatus )
  945. {
  946. m_eHttpCode = eStatus;
  947. ReplyBuf ( sResult, eStatus, m_bNeedHttpResponse, m_dData );
  948. }
  949. // check whether given served index is exist and has requested type
  950. bool HttpHandler_c::CheckValid ( const ServedIndex_c* pServed, const CSphString& sIndex, IndexType_e eType )
  951. {
  952. if ( !pServed )
  953. {
  954. FormatError ( EHTTP_STATUS::_500, "no such table '%s'", sIndex.cstr () );
  955. return false;
  956. }
  957. if ( pServed->m_eType!=eType )
  958. {
  959. FormatError ( EHTTP_STATUS::_500, "table '%s' is not %s", sIndex.cstr(), GetIndexTypeName ( eType ) );
  960. return false;
  961. }
  962. return true;
  963. }
  964. struct HttpOptionTrait_t
  965. {
  966. const OptionsHash_t & m_tOptions;
  967. explicit HttpOptionTrait_t ( const OptionsHash_t & tOptions )
  968. : m_tOptions ( tOptions )
  969. {}
  970. };
  971. class HttpSearchHandler_c : public HttpHandler_c, public HttpOptionTrait_t
  972. {
  973. public:
  974. bool Process () final
  975. {
  976. TRACE_CONN ( "conn", "HttpSearchHandler_c::Process" );
  977. CSphString sWarning;
  978. std::unique_ptr<QueryParser_i> pQueryParser = PreParseQuery();
  979. if ( !pQueryParser )
  980. return false;
  981. if ( IsBuddyQuery ( m_tOptions ) )
  982. m_tParsed.m_tQuery.m_uDebugFlags |= QUERY_DEBUG_NO_LOG;
  983. auto tHandler = CreateMsearchHandler ( std::move ( pQueryParser ), m_eQueryType, m_tParsed );
  984. SetStmt ( tHandler );
  985. QueryProfile_c tProfile;
  986. tProfile.m_eNeedPlan = (PLAN_FLAVOUR)m_tParsed.m_iPlan;
  987. tProfile.m_bNeedProfile = m_tParsed.m_bProfile;
  988. bool bNeedProfile = m_tParsed.m_bProfile || ( m_tParsed.m_iPlan != 0 );
  989. if ( bNeedProfile )
  990. tHandler.SetProfile ( &tProfile );
  991. // search
  992. tHandler.RunQueries();
  993. if ( bNeedProfile )
  994. tProfile.Stop();
  995. AggrResult_t & tRes = tHandler.m_dAggrResults.First();
  996. if ( !tRes.m_sError.IsEmpty() )
  997. {
  998. ReportError ( tRes.m_sError.cstr(), EHTTP_STATUS::_500 );
  999. return false;
  1000. }
  1001. // fixme: handle more than one warning at once?
  1002. if ( tRes.m_sWarning.IsEmpty() && !m_tParsed.m_sWarning.IsEmpty() )
  1003. tRes.m_sWarning = m_tParsed.m_sWarning;
  1004. CSphString sResult = EncodeResult ( tHandler.m_dAggrResults, bNeedProfile ? &tProfile : nullptr );
  1005. BuildReply ( sResult, EHTTP_STATUS::_200 );
  1006. return true;
  1007. }
  1008. explicit HttpSearchHandler_c ( const OptionsHash_t & tOptions )
  1009. : HttpOptionTrait_t ( tOptions )
  1010. {}
  1011. protected:
  1012. QueryType_e m_eQueryType {QUERY_SQL};
  1013. ParsedJsonQuery_t m_tParsed;
  1014. virtual std::unique_ptr<QueryParser_i> PreParseQuery() = 0;
  1015. CSphString EncodeResult ( const VecTraits_T<AggrResult_t> & dRes, QueryProfile_c * pProfile )
  1016. {
  1017. return sphEncodeResultJson ( dRes, m_tParsed.m_tQuery, pProfile, ResultSetFormat_e::MntSearch );
  1018. }
  1019. virtual void SetStmt ( SearchHandler_c & tHandler ) {};
  1020. };
  1021. static void AddAggs ( const VecTraits_T<SqlStmt_t> & dStmt, JsonQuery_c & tQuery )
  1022. {
  1023. assert ( dStmt.GetLength()>1 && dStmt[0].m_tQuery.m_bFacetHead );
  1024. tQuery.m_dAggs.Reserve ( dStmt.GetLength()-1 );
  1025. for ( int i=1; i<dStmt.GetLength(); i++ )
  1026. {
  1027. const CSphQuery & tRef = dStmt[i].m_tQuery;
  1028. assert ( tRef.m_dRefItems.GetLength() );
  1029. JsonAggr_t & tBucket = tQuery.m_dAggs.Add();
  1030. tBucket.m_sBucketName = tRef.m_dRefItems[0].m_sExpr;
  1031. tBucket.m_sCol = tRef.m_dRefItems[0].m_sAlias;
  1032. }
  1033. }
  1034. class HttpSearchHandler_SQL_c final: public HttpSearchHandler_c
  1035. {
  1036. public:
  1037. explicit HttpSearchHandler_SQL_c ( const OptionsHash_t & tOptions )
  1038. : HttpSearchHandler_c ( tOptions )
  1039. {}
  1040. protected:
  1041. CSphVector<SqlStmt_t> m_dStmt;
  1042. std::unique_ptr<QueryParser_i> PreParseQuery() final
  1043. {
  1044. const CSphString * pRawQl = m_tOptions ( "query" );
  1045. if ( !pRawQl || pRawQl->IsEmpty() )
  1046. {
  1047. ReportError ( "query missing", EHTTP_STATUS::_400 );
  1048. return nullptr;
  1049. }
  1050. if ( !sphParseSqlQuery ( FromStr ( *pRawQl ), m_dStmt, m_sError, SPH_COLLATION_DEFAULT ) )
  1051. {
  1052. ReportError ( EHTTP_STATUS::_400 );
  1053. return nullptr;
  1054. }
  1055. ( (CSphQuery &) m_tParsed.m_tQuery ) = m_dStmt[0].m_tQuery;
  1056. bool bFacet = ( m_dStmt.GetLength()>1 );
  1057. for ( const auto & tStmt : m_dStmt )
  1058. {
  1059. // should be all FACET in case of multiple queries
  1060. bFacet &= ( tStmt.m_tQuery.m_bFacet || tStmt.m_tQuery.m_bFacetHead );
  1061. if ( tStmt.m_eStmt!=STMT_SELECT )
  1062. {
  1063. ReportError ( "only SELECT queries are supported", EHTTP_STATUS::_501 );
  1064. return nullptr;
  1065. }
  1066. }
  1067. if ( m_dStmt.GetLength()>1 && !bFacet )
  1068. {
  1069. ReportError ( "only FACET multiple queries supported", EHTTP_STATUS::_501 );
  1070. return nullptr;
  1071. }
  1072. if ( bFacet )
  1073. AddAggs ( m_dStmt, m_tParsed.m_tQuery );
  1074. m_eQueryType = QUERY_SQL;
  1075. return sphCreatePlainQueryParser();
  1076. }
  1077. void SetStmt ( SearchHandler_c & tHandler )
  1078. {
  1079. tHandler.m_pStmt = &m_dStmt[0];
  1080. for ( int i=1; i<m_dStmt.GetLength(); ++i )
  1081. tHandler.SetQuery ( i, m_dStmt[i].m_tQuery, nullptr );
  1082. }
  1083. };
  1084. typedef std::pair<CSphString,MysqlColumnType_e> ColumnNameType_t;
  1085. static const char * GetMysqlTypeName ( MysqlColumnType_e eType )
  1086. {
  1087. switch ( eType )
  1088. {
  1089. case MYSQL_COL_DECIMAL: return "decimal";
  1090. case MYSQL_COL_LONG: return "long";
  1091. case MYSQL_COL_FLOAT: return "float";
  1092. case MYSQL_COL_DOUBLE: return "double";
  1093. case MYSQL_COL_LONGLONG: return "long long";
  1094. case MYSQL_COL_STRING: return "string";
  1095. default: return "unknown";
  1096. };
  1097. }
  1098. static MysqlColumnType_e GetMysqlTypeByName ( const CSphString& sType )
  1099. {
  1100. if ( sType=="decimal")
  1101. return MYSQL_COL_DECIMAL;
  1102. if ( sType == "long" )
  1103. return MYSQL_COL_LONG;
  1104. if ( sType == "float" )
  1105. return MYSQL_COL_FLOAT;
  1106. if ( sType == "double" )
  1107. return MYSQL_COL_DOUBLE;
  1108. if ( sType == "long long" )
  1109. return MYSQL_COL_LONGLONG;
  1110. if ( sType == "string" )
  1111. return MYSQL_COL_STRING;
  1112. assert (false && "Unknown column");
  1113. return MYSQL_COL_STRING;
  1114. }
  1115. JsonEscapedBuilder& operator<< ( JsonEscapedBuilder& tOut, MysqlColumnType_e eType )
  1116. {
  1117. tOut.FixupSpacedAndAppendEscaped ( GetMysqlTypeName ( eType ) );
  1118. return tOut;
  1119. }
  1120. const StrBlock_t dJsonObjCustom { { ",\n", 2 }, { "[", 1 }, { "]", 1 } }; // json object with custom formatting
  1121. class JsonRowBuffer_c : public RowBuffer_i
  1122. {
  1123. public:
  1124. JsonRowBuffer_c()
  1125. {
  1126. m_dBuf.StartBlock ( dJsonObjCustom );
  1127. }
  1128. void PutFloatAsString ( float fVal, const char * ) override
  1129. {
  1130. AddDataColumn();
  1131. m_dBuf << fVal;
  1132. }
  1133. void PutDoubleAsString ( double fVal, const char * ) override
  1134. {
  1135. AddDataColumn();
  1136. m_dBuf << fVal;
  1137. }
  1138. void PutNumAsString ( int64_t iVal ) override
  1139. {
  1140. AddDataColumn();
  1141. m_dBuf << iVal;
  1142. }
  1143. void PutNumAsString ( uint64_t uVal ) override
  1144. {
  1145. AddDataColumn();
  1146. m_dBuf << uVal;
  1147. }
  1148. void PutNumAsString ( int iVal ) override
  1149. {
  1150. AddDataColumn();
  1151. m_dBuf << iVal;
  1152. }
  1153. void PutNumAsString ( DWORD uVal ) override
  1154. {
  1155. AddDataColumn();
  1156. m_dBuf << uVal;
  1157. }
  1158. void PutArray ( const ByteBlob_t& dBlob, bool ) override
  1159. {
  1160. AddDataColumn();
  1161. m_dBuf.FixupSpacedAndAppendEscaped ( (const char*)dBlob.first, dBlob.second );
  1162. }
  1163. void PutString ( Str_t sMsg ) override
  1164. {
  1165. PutArray ( S2B ( sMsg ), false );
  1166. }
  1167. void PutMicrosec ( int64_t iUsec ) override
  1168. {
  1169. PutNumAsString ( iUsec );
  1170. }
  1171. void PutNULL() override
  1172. {
  1173. AddDataColumn();
  1174. m_dBuf << "null";
  1175. }
  1176. bool Commit() override
  1177. {
  1178. m_dBuf.FinishBlock ( false ); // finish previous item
  1179. m_dBuf.ObjectBlock(); // start new item
  1180. ++m_iTotalRows;
  1181. m_iCol = 0;
  1182. return true;
  1183. }
  1184. void Eof ( bool bMoreResults, int iWarns, const char* ) override
  1185. {
  1186. m_dBuf.FinishBlock ( true ); // last doc, allow empty
  1187. m_dBuf.FinishBlock ( false ); // docs section
  1188. DataFinish ( m_iTotalRows, nullptr, nullptr );
  1189. m_dBuf.FinishBlock ( false ); // root object
  1190. }
  1191. void Error ( const char * szError, EMYSQL_ERR ) override
  1192. {
  1193. auto _ = m_dBuf.Object ( false );
  1194. DataFinish ( 0, szError, nullptr );
  1195. m_bError = true;
  1196. m_sError = szError;
  1197. }
  1198. void Ok ( int iAffectedRows, int iWarns, const char * sMessage, bool bMoreResults, int64_t iLastInsertId ) override
  1199. {
  1200. auto _ = m_dBuf.Object ( false );
  1201. DataFinish ( iAffectedRows, nullptr, sMessage );
  1202. }
  1203. void HeadBegin () override
  1204. {
  1205. m_iTotalRows = 0;
  1206. m_dBuf.ObjectWBlock();
  1207. m_dBuf.Named ( "columns" );
  1208. m_dBuf.ArrayBlock();
  1209. }
  1210. bool HeadEnd ( bool , int ) override
  1211. {
  1212. m_dBuf.FinishBlock(false);
  1213. m_dBuf.Named ( "data" );
  1214. m_dBuf.ArrayWBlock();
  1215. m_dBuf.ObjectBlock();
  1216. return true;
  1217. }
  1218. void HeadColumn ( const char * szName, MysqlColumnType_e eType ) override
  1219. {
  1220. JsonEscapedBuilder sEscapedName;
  1221. sEscapedName.FixupSpacedAndAppendEscaped ( szName );
  1222. ColumnNameType_t tCol { (CSphString)sEscapedName, eType };
  1223. auto _ = m_dBuf.Object(false);
  1224. m_dBuf.AppendName ( tCol.first.cstr(), false );
  1225. auto tTypeBlock = m_dBuf.Object(false);
  1226. m_dBuf.NamedVal ( "type", eType );
  1227. m_dColumns.Add ( tCol );
  1228. }
  1229. void Add ( BYTE ) override {}
  1230. const JsonEscapedBuilder & Finish()
  1231. {
  1232. m_dBuf.FinishBlocks();
  1233. return m_dBuf;
  1234. }
  1235. private:
  1236. JsonEscapedBuilder m_dBuf;
  1237. CSphVector<ColumnNameType_t> m_dColumns;
  1238. int m_iTotalRows = 0;
  1239. int m_iCol = 0;
  1240. void AddDataColumn()
  1241. {
  1242. m_dBuf.AppendName ( m_dColumns[m_iCol].first.cstr(), false );
  1243. ++m_iCol;
  1244. }
  1245. void DataFinish ( int iTotal, const char* szError, const char* szWarning )
  1246. {
  1247. m_dBuf.NamedVal ( "total", iTotal );
  1248. m_dBuf.NamedString ( "error", szError );
  1249. m_dBuf.NamedString ( "warning", szWarning );
  1250. m_iCol = 0;
  1251. m_dColumns.Reset();
  1252. }
  1253. };
  1254. /* Below is typical answer sent back by sql endpoint query mode=raw
  1255. [{
  1256. "columns":[{"id":{"type":"long long"}},{"proto":{"type":"string"}},{"state":{"type":"string"}},{"host":{"type":"string"}},{"connid":{"type":"long long"}},{"killed":{"type":"string"}},{"last cmd":{"type":"string"}}],
  1257. "data":[
  1258. {"id":2,"proto":"http","state":"query","host":"127.0.0.1:50787","connid":9,"killed":"0","last cmd":"select"},
  1259. {"id":1,"proto":"mysql,ssl","state":"query","host":"127.0.0.1:50514","connid":1,"killed":"0","last cmd":"show queries"}
  1260. ],
  1261. "total":2,
  1262. "error":"",
  1263. "warning":""
  1264. }]
  1265. */
  1266. void ConvertJsonDataset ( const JsonObj_c & tRoot, const char * sStmt, RowBuffer_i & tOut )
  1267. {
  1268. assert ( tRoot.IsArray() );
  1269. int iItem = 0;
  1270. int iItemsCount = tRoot.Size();
  1271. CSphString sParseError;
  1272. for ( const auto & tItem : tRoot )
  1273. {
  1274. int iTotal = 0;
  1275. CSphString sError, sWarning;
  1276. if ( !tItem.FetchIntItem ( iTotal, "total", sParseError, true ) )
  1277. {
  1278. tOut.Error ( sParseError.cstr() );
  1279. break;
  1280. }
  1281. if ( !tItem.FetchStrItem ( sError, "error", sParseError, true ) )
  1282. {
  1283. tOut.Error ( sParseError.cstr() );
  1284. break;
  1285. }
  1286. if ( !tItem.FetchStrItem ( sWarning, "warning", sParseError, true ) )
  1287. {
  1288. tOut.Error ( sParseError.cstr() );
  1289. break;
  1290. }
  1291. if ( !sError.IsEmpty() )
  1292. {
  1293. LogSphinxqlError ( sStmt, FromStr ( sError ) );
  1294. session::GetClientSession()->m_sError = sError;
  1295. session::GetClientSession()->m_tLastMeta.m_sError = sError;
  1296. tOut.Error ( sError.cstr() );
  1297. break;
  1298. }
  1299. if ( !iItem ) // only zero result set sets meta
  1300. {
  1301. session::GetClientSession()->m_tLastMeta.m_iTotalMatches = iTotal;
  1302. session::GetClientSession()->m_tLastMeta.m_sWarning = sWarning;
  1303. }
  1304. using ColType_t = std::pair<CSphString, MysqlColumnType_e>;
  1305. CSphVector<ColType_t> dSqlColumns;
  1306. assert ( tItem.IsObj() );
  1307. JsonObj_c tColumnsNode = tItem.GetArrayItem ( "columns", sParseError, true );
  1308. for ( const auto & tColumnNode : tColumnsNode )
  1309. {
  1310. assert ( tColumnNode.IsObj() ); // like {"id":{"type":"long long"}}
  1311. for ( const auto & tColumn : tColumnNode )
  1312. {
  1313. CSphString sType;
  1314. if ( !tColumn.FetchStrItem ( sType, "type", sParseError, false ) )
  1315. return;
  1316. auto eType = GetMysqlTypeByName ( sType );
  1317. dSqlColumns.Add ( { tColumn.Name(), eType } );
  1318. }
  1319. }
  1320. // fill headers
  1321. if ( !dSqlColumns.IsEmpty() )
  1322. {
  1323. tOut.HeadBegin ();
  1324. dSqlColumns.for_each ( [&] ( const auto& tColumn ) { tOut.HeadColumn ( tColumn.first.cstr(), tColumn.second ); } );
  1325. tOut.HeadEnd();
  1326. } else
  1327. {
  1328. // just simple OK reply without table
  1329. tOut.Ok ( iTotal, ( sWarning.IsEmpty() ? 0 : 1 ) );
  1330. break;
  1331. }
  1332. JsonObj_c tDataNodes = tItem.GetItem ( "data" );
  1333. for ( const auto & tDataRow : tDataNodes )
  1334. {
  1335. assert ( tDataRow.IsObj() ); // like {"id":2,"proto":"http","state":"query","host":"127.0.0.1:50787","connid":9,"killed":"0","last cmd":"select"}
  1336. for ( const auto & tDataCol : tDataRow )
  1337. {
  1338. if ( tDataCol.IsInt () )
  1339. tOut.PutNumAsString ( tDataCol.IntVal() );
  1340. else if ( tDataCol.IsDbl () )
  1341. tOut.PutDoubleAsString ( tDataCol.DblVal() );
  1342. else
  1343. tOut.PutString ( tDataCol.StrVal() );
  1344. }
  1345. if ( !tOut.Commit() )
  1346. return;
  1347. }
  1348. tOut.Eof ( iItem+1!=iItemsCount, ( sWarning.IsEmpty() ? 0 : 1 ) );
  1349. iItem++;
  1350. }
  1351. }
  1352. class HttpRawSqlHandler_c final: public HttpHandler_c, public HttpOptionTrait_t
  1353. {
  1354. Str_t m_sQuery;
  1355. public:
  1356. explicit HttpRawSqlHandler_c ( Str_t sQuery, const OptionsHash_t & tOptions )
  1357. : HttpOptionTrait_t ( tOptions )
  1358. , m_sQuery ( sQuery )
  1359. {}
  1360. bool Process () final
  1361. {
  1362. TRACE_CONN ( "conn", "HttpRawSqlHandler_c::Process" );
  1363. if ( IsEmpty ( m_sQuery ) )
  1364. {
  1365. ReportError ( "query missing", EHTTP_STATUS::_400 );
  1366. return false;
  1367. }
  1368. if ( IsBuddyQuery ( m_tOptions ) )
  1369. session::SetQueryDisableLog();
  1370. JsonRowBuffer_c tOut;
  1371. session::Execute ( m_sQuery, tOut );
  1372. if ( tOut.IsError() )
  1373. {
  1374. ReportError ( tOut.GetError().scstr(), EHTTP_STATUS::_500 );
  1375. return false;
  1376. }
  1377. BuildReply ( tOut.Finish(), EHTTP_STATUS::_200 );
  1378. return true;
  1379. }
  1380. };
  1381. class HttpHandler_JsonSearch_c : public HttpSearchHandler_c
  1382. {
  1383. Str_t m_sQuery;
  1384. public:
  1385. explicit HttpHandler_JsonSearch_c ( Str_t sQuery, const OptionsHash_t & tOptions )
  1386. : HttpSearchHandler_c ( tOptions )
  1387. , m_sQuery ( sQuery )
  1388. {}
  1389. std::unique_ptr<QueryParser_i> PreParseQuery() override
  1390. {
  1391. // TODO!!! add parsing collation from the query
  1392. m_tParsed.m_tQuery.m_eCollation = session::GetCollation();
  1393. if ( !sphParseJsonQuery ( m_sQuery, m_tParsed ) )
  1394. {
  1395. ReportError ( TlsMsg::szError(), EHTTP_STATUS::_400 );
  1396. return nullptr;
  1397. }
  1398. m_eQueryType = QUERY_JSON;
  1399. return sphCreateJsonQueryParser();
  1400. }
  1401. };
  1402. class HttpJsonTxnTraits_c
  1403. {
  1404. protected:
  1405. HttpJsonTxnTraits_c() = default;
  1406. explicit HttpJsonTxnTraits_c ( ResultSetFormat_e eFormat )
  1407. : m_eFormat ( eFormat )
  1408. {}
  1409. void ProcessBegin ( const CSphString& sIndex )
  1410. {
  1411. // for now - only local mutable indexes are suitable
  1412. {
  1413. auto pIndex = GetServed ( sIndex );
  1414. if ( !ServedDesc_t::IsMutable ( pIndex ) )
  1415. return;
  1416. }
  1417. HttpErrorReporter_c tReporter;
  1418. sphHandleMysqlBegin ( tReporter, FromStr (sIndex) );
  1419. m_iInserts = 0;
  1420. m_iUpdates = 0;
  1421. }
  1422. bool ProcessCommitRollback ( Str_t sIndex, DocID_t & tDocId, JsonObj_c & tResult, CSphString & sError ) const
  1423. {
  1424. HttpErrorReporter_c tReporter;
  1425. sphHandleMysqlCommitRollback ( tReporter, sIndex, true );
  1426. if ( tReporter.IsError() )
  1427. {
  1428. sError = tReporter.GetError();
  1429. tResult = sphEncodeInsertErrorJson ( sIndex.first, sError.cstr(), m_eFormat );
  1430. } else
  1431. {
  1432. auto iDeletes = tReporter.GetAffectedRows();
  1433. auto dLastIds = session::LastIds();
  1434. if ( !dLastIds.IsEmpty() )
  1435. tDocId = dLastIds[0];
  1436. tResult = sphEncodeTxnResultJson ( sIndex.first, tDocId, m_iInserts, iDeletes, m_iUpdates, m_eFormat );
  1437. }
  1438. return !tReporter.IsError();
  1439. }
  1440. int m_iInserts = 0;
  1441. int m_iUpdates = 0;
  1442. const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
  1443. };
  1444. static bool ProcessInsert ( SqlStmt_t & tStmt, DocID_t & tDocId, JsonObj_c & tResult, CSphString & sError, ResultSetFormat_e eFormat )
  1445. {
  1446. HttpErrorReporter_c tReporter;
  1447. sphHandleMysqlInsert ( tReporter, tStmt );
  1448. if ( tReporter.IsError() )
  1449. {
  1450. sError = tReporter.GetError();
  1451. tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), eFormat );
  1452. } else
  1453. {
  1454. auto dLastIds = session::LastIds();
  1455. if ( !dLastIds.IsEmpty() )
  1456. tDocId = dLastIds[0];
  1457. tResult = sphEncodeInsertResultJson ( tStmt.m_sIndex.cstr(), tStmt.m_eStmt == STMT_REPLACE, tDocId, eFormat );
  1458. }
  1459. return !tReporter.IsError();
  1460. }
  1461. static bool ProcessDelete ( Str_t sRawRequest, const SqlStmt_t& tStmt, DocID_t tDocId, JsonObj_c & tResult, CSphString & sError, ResultSetFormat_e eFormat )
  1462. {
  1463. HttpErrorReporter_c tReporter;
  1464. sphHandleMysqlDelete ( tReporter, tStmt, std::move ( sRawRequest ) );
  1465. if ( tReporter.IsError() )
  1466. {
  1467. sError = tReporter.GetError();
  1468. tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), eFormat );
  1469. } else
  1470. {
  1471. tResult = sphEncodeDeleteResultJson ( tStmt.m_sIndex.cstr(), tDocId, tReporter.GetAffectedRows(), eFormat );
  1472. }
  1473. return !tReporter.IsError();
  1474. }
  1475. class HttpHandler_JsonInsert_c final : public HttpHandler_c
  1476. {
  1477. Str_t m_sQuery;
  1478. bool m_bReplace;
  1479. public:
  1480. HttpHandler_JsonInsert_c ( Str_t sQuery, bool bReplace )
  1481. : m_sQuery ( sQuery )
  1482. , m_bReplace ( bReplace )
  1483. {}
  1484. bool Process () final
  1485. {
  1486. TRACE_CONN ( "conn", "HttpHandler_JsonInsert_c::Process" );
  1487. SqlStmt_t tStmt;
  1488. DocID_t tDocId = 0;
  1489. if ( !sphParseJsonInsert ( m_sQuery, tStmt, tDocId, m_bReplace, m_sError ) )
  1490. {
  1491. ReportError ( nullptr, HttpErrorType_e::Parse, EHTTP_STATUS::_400, tStmt.m_sIndex.cstr() );
  1492. return false;
  1493. }
  1494. tStmt.m_sEndpoint = HttpEndpointToStr ( m_bReplace ? EHTTP_ENDPOINT::JSON_REPLACE : EHTTP_ENDPOINT::JSON_INSERT );
  1495. JsonObj_c tResult = JsonNull;
  1496. bool bResult = ProcessInsert ( tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
  1497. if ( bResult )
  1498. BuildReply ( tResult.AsString(), bResult ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 );
  1499. else
  1500. ReportError ( nullptr, HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_409, tStmt.m_sIndex.cstr() );
  1501. return bResult;
  1502. }
  1503. };
  1504. class HttpJsonUpdateTraits_c
  1505. {
  1506. int m_iLastUpdated = 0;
  1507. protected:
  1508. HttpJsonUpdateTraits_c() = default;
  1509. explicit HttpJsonUpdateTraits_c ( ResultSetFormat_e eFormat )
  1510. : m_eFormat ( eFormat )
  1511. {}
  1512. bool ProcessUpdate ( Str_t sRawRequest, const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult, CSphString & sError )
  1513. {
  1514. HttpErrorReporter_c tReporter;
  1515. sphHandleMysqlUpdate ( tReporter, tStmt, sRawRequest );
  1516. if ( tReporter.IsError() )
  1517. {
  1518. sError = tReporter.GetError();
  1519. tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), m_eFormat );
  1520. } else
  1521. {
  1522. tResult = sphEncodeUpdateResultJson ( tStmt.m_sIndex.cstr(), tDocId, tReporter.GetAffectedRows(), m_eFormat );
  1523. }
  1524. m_iLastUpdated = tReporter.GetAffectedRows();
  1525. return !tReporter.IsError();
  1526. }
  1527. int GetLastUpdated() const
  1528. {
  1529. return m_iLastUpdated;
  1530. }
  1531. const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
  1532. };
  1533. class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
  1534. {
  1535. protected:
  1536. Str_t m_sQuery;
  1537. public:
  1538. explicit HttpHandler_JsonUpdate_c ( Str_t sQuery )
  1539. : m_sQuery ( sQuery )
  1540. {}
  1541. bool Process () final
  1542. {
  1543. TRACE_CONN ( "conn", "HttpHandler_JsonUpdate_c::Process" );
  1544. SqlStmt_t tStmt;
  1545. tStmt.m_bJson = true;
  1546. tStmt.m_tQuery.m_eQueryType = QUERY_JSON;
  1547. tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
  1548. DocID_t tDocId = 0;
  1549. if ( !ParseQuery ( tStmt, tDocId ) )
  1550. {
  1551. ReportError ( nullptr, HttpErrorType_e::Parse, EHTTP_STATUS::_400, tStmt.m_sIndex.cstr() );
  1552. return false;
  1553. }
  1554. JsonObj_c tResult = JsonNull;
  1555. bool bResult = ProcessQuery ( tStmt, tDocId, tResult );
  1556. if ( bResult )
  1557. BuildReply ( tResult.AsString(), bResult ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 );
  1558. else
  1559. ReportError ( nullptr, HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_409, tStmt.m_sIndex.cstr() );
  1560. return bResult;
  1561. }
  1562. protected:
  1563. virtual bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId )
  1564. {
  1565. return sphParseJsonUpdate ( m_sQuery, tStmt, tDocId, m_sError );
  1566. }
  1567. virtual bool ProcessQuery ( const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult )
  1568. {
  1569. return ProcessUpdate ( m_sQuery, tStmt, tDocId, tResult, m_sError );
  1570. }
  1571. };
  1572. class HttpHandler_JsonDelete_c final : public HttpHandler_JsonUpdate_c
  1573. {
  1574. public:
  1575. explicit HttpHandler_JsonDelete_c ( Str_t sQuery )
  1576. : HttpHandler_JsonUpdate_c ( sQuery )
  1577. {}
  1578. protected:
  1579. bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId ) final
  1580. {
  1581. tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
  1582. return sphParseJsonDelete ( m_sQuery, tStmt, tDocId, m_sError );
  1583. }
  1584. bool ProcessQuery ( const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult ) final
  1585. {
  1586. return ProcessDelete ( m_sQuery, tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
  1587. }
  1588. };
  1589. // stream for lines - each 'Read()' returns single line (lines split by \r or \n)
  1590. class NDJsonStream_c
  1591. {
  1592. CharStream_c & m_tIn;
  1593. CSphVector<char> m_dLastChunk;
  1594. Str_t m_sCurChunk { dEmptyStr };
  1595. bool m_bDone;
  1596. int m_iJsons = 0;
  1597. public:
  1598. explicit NDJsonStream_c ( CharStream_c& tIn )
  1599. : m_tIn { tIn }
  1600. , m_bDone { m_tIn.Eof() }
  1601. {}
  1602. inline bool Eof() const { return m_bDone;}
  1603. Str_t ReadLine()
  1604. {
  1605. assert ( !m_bDone );
  1606. while (true)
  1607. {
  1608. if ( IsEmpty ( m_sCurChunk ) )
  1609. {
  1610. if ( m_tIn.Eof() || m_tIn.GetError() )
  1611. break;
  1612. m_sCurChunk = m_tIn.Read();
  1613. }
  1614. const char* szLine = m_sCurChunk.first;
  1615. const char* pEnd = szLine + m_sCurChunk.second;
  1616. const char* p = szLine;
  1617. while ( p<pEnd && *p!='\r' && *p!='\n' )
  1618. ++p;
  1619. if ( p==pEnd )
  1620. {
  1621. m_dLastChunk.Append ( szLine, p-szLine );
  1622. m_sCurChunk = dEmptyStr;
  1623. continue;
  1624. }
  1625. *( const_cast<char*> ( p ) ) = '\0';
  1626. ++p;
  1627. m_sCurChunk = { p, pEnd - p };
  1628. Str_t sResult;
  1629. if ( m_dLastChunk.IsEmpty () )
  1630. {
  1631. sResult = { szLine, p - szLine - 1 };
  1632. // that is commented out, as we better will deal with empty strings on parser level instead.
  1633. // if ( IsEmpty ( sResult ) )
  1634. // continue;
  1635. ++m_iJsons;
  1636. HTTPINFO << "chunk " << m_iJsons << " '" << Data2Log ( sResult ) << "'";;
  1637. } else
  1638. {
  1639. m_dLastChunk.Append ( szLine, p - szLine );
  1640. sResult = m_dLastChunk;
  1641. --sResult.second; // exclude terminating \0
  1642. m_dLastChunk.Resize ( 0 );
  1643. ++m_iJsons;
  1644. HTTPINFO << "chunk last " << m_iJsons << " '" << Data2Log ( sResult ) << "'";;
  1645. }
  1646. return sResult;
  1647. }
  1648. m_bDone = true;
  1649. m_dLastChunk.Add ( '\0' );
  1650. m_dLastChunk.Resize ( m_dLastChunk.GetLength() - 1 );
  1651. Str_t sResult = m_dLastChunk;
  1652. ++m_iJsons;
  1653. HTTPINFO << "chunk termination " << m_iJsons << " '" << Data2Log ( sResult ) << "'";
  1654. return sResult;
  1655. }
  1656. bool GetError() const { return m_tIn.GetError(); }
  1657. const CSphString & GetErrorMessage() const { return m_tIn.GetErrorMessage(); }
  1658. };
  1659. static Str_t TrimHeadSpace ( Str_t tLine )
  1660. {
  1661. if ( IsEmpty ( tLine ) )
  1662. return tLine;
  1663. const char * sCur = tLine.first;
  1664. const char * sEnd = sCur + tLine.second;
  1665. while ( sCur<sEnd && sphIsSpace ( *sCur ) )
  1666. sCur++;
  1667. return Str_t { sCur, sEnd-sCur };
  1668. }
  1669. class HttpHandler_JsonBulk_c : public HttpHandler_c, public HttpJsonUpdateTraits_c, public HttpJsonTxnTraits_c
  1670. {
  1671. protected:
  1672. NDJsonStream_c m_tSource;
  1673. const OptionsHash_t& m_tOptions;
  1674. public:
  1675. HttpHandler_JsonBulk_c ( CharStream_c& tSource, const OptionsHash_t & tOptions )
  1676. : m_tSource ( tSource )
  1677. , m_tOptions ( tOptions )
  1678. {}
  1679. bool Process ()
  1680. {
  1681. TRACE_CONN ( "conn", "HttpHandler_JsonBulk_c::Process" );
  1682. if ( !CheckNDJson() )
  1683. return false;
  1684. JsonObj_c tResults ( true );
  1685. bool bResult = false;
  1686. int iCurLine = 0;
  1687. int iLastTxStartLine = 0;
  1688. auto FinishBulk = [&, this] ( EHTTP_STATUS eStatus = EHTTP_STATUS::_200 ) {
  1689. JsonObj_c tRoot;
  1690. tRoot.AddItem ( "items", tResults );
  1691. tRoot.AddInt ( "current_line", iCurLine );
  1692. tRoot.AddInt ( "skipped_lines", iCurLine - iLastTxStartLine );
  1693. tRoot.AddBool ( "errors", !bResult );
  1694. tRoot.AddStr ( "error", m_sError.IsEmpty() ? "" : m_sError );
  1695. if ( eStatus == EHTTP_STATUS::_200 && !bResult )
  1696. eStatus = EHTTP_STATUS::_500;
  1697. BuildReply ( tRoot.AsString(), eStatus );
  1698. HTTPINFO << "inserted " << iCurLine;
  1699. return bResult;
  1700. };
  1701. auto AddResult = [&tResults] ( const char* szStmt, JsonObj_c& tResult ) {
  1702. JsonObj_c tItem;
  1703. tItem.AddItem ( szStmt, tResult );
  1704. tResults.AddItem ( tItem );
  1705. };
  1706. if ( m_tSource.Eof() )
  1707. return FinishBulk();
  1708. // originally we execute txn for single index
  1709. // if there is combo, we fall back to query-by-query commits
  1710. CSphString sTxnIdx;
  1711. CSphString sStmt;
  1712. while ( !m_tSource.Eof() )
  1713. {
  1714. auto tQuery = m_tSource.ReadLine();
  1715. tQuery = TrimHeadSpace ( tQuery ); // could be a line with only whitespace chars
  1716. ++iCurLine;
  1717. DocID_t tDocId = 0;
  1718. JsonObj_c tResult = JsonNull;
  1719. if ( IsEmpty ( tQuery ) )
  1720. {
  1721. if ( session::IsInTrans() )
  1722. {
  1723. assert ( !sTxnIdx.IsEmpty() );
  1724. // empty query finishes current txn
  1725. bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
  1726. AddResult ( "bulk", tResult );
  1727. if ( !bResult )
  1728. break;
  1729. sTxnIdx = "";
  1730. iLastTxStartLine = iCurLine;
  1731. }
  1732. continue;
  1733. }
  1734. bResult = false;
  1735. auto& tCrashQuery = GlobalCrashQueryGetRef();
  1736. tCrashQuery.m_dQuery = { (const BYTE*) tQuery.first, tQuery.second };
  1737. const char* szStmt = tQuery.first;
  1738. SqlStmt_t tStmt;
  1739. tStmt.m_bJson = true;
  1740. CSphString sQuery;
  1741. if ( !sphParseJsonStatement ( szStmt, tStmt, sStmt, sQuery, tDocId, m_sError ) )
  1742. {
  1743. HTTPINFO << "inserted " << iCurLine << ", error: " << m_sError;
  1744. return FinishBulk ( EHTTP_STATUS::_400 );
  1745. }
  1746. if ( sTxnIdx.IsEmpty() )
  1747. {
  1748. sTxnIdx = tStmt.m_sIndex;
  1749. ProcessBegin ( sTxnIdx );
  1750. }
  1751. else if ( session::IsInTrans() && sTxnIdx!=tStmt.m_sIndex )
  1752. {
  1753. assert ( !sTxnIdx.IsEmpty() );
  1754. // we should finish current txn, as we got another index
  1755. bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
  1756. AddResult ( "bulk", tResult );
  1757. if ( !bResult )
  1758. break;
  1759. sTxnIdx = tStmt.m_sIndex;
  1760. ProcessBegin ( sTxnIdx );
  1761. iLastTxStartLine = iCurLine;
  1762. }
  1763. switch ( tStmt.m_eStmt )
  1764. {
  1765. case STMT_INSERT:
  1766. case STMT_REPLACE:
  1767. bResult = ProcessInsert ( tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
  1768. if ( bResult )
  1769. ++m_iInserts;
  1770. break;
  1771. case STMT_UPDATE:
  1772. tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
  1773. bResult = ProcessUpdate ( FromStr ( sQuery ), tStmt, tDocId, tResult, m_sError );
  1774. if ( bResult )
  1775. m_iUpdates += GetLastUpdated();
  1776. break;
  1777. case STMT_DELETE:
  1778. tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
  1779. bResult = ProcessDelete ( FromStr ( sQuery ), tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
  1780. break;
  1781. default:
  1782. HTTPINFO << "inserted " << iCurLine << ", got unknown statement:" << (int)tStmt.m_eStmt;
  1783. return FinishBulk ( EHTTP_STATUS::_400 );
  1784. }
  1785. if ( !bResult || !session::IsInTrans() )
  1786. AddResult ( sStmt.cstr(), tResult );
  1787. // no further than the first error
  1788. if ( !bResult )
  1789. break;
  1790. if ( !session::IsInTrans() )
  1791. iLastTxStartLine = iCurLine;
  1792. }
  1793. if ( bResult && session::IsInTrans() )
  1794. {
  1795. assert ( !sTxnIdx.IsEmpty() );
  1796. // We're in txn - that is, nothing committed, and we should do it right now
  1797. JsonObj_c tResult;
  1798. DocID_t tDocId = 0;
  1799. bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
  1800. AddResult ( "bulk", tResult );
  1801. if ( bResult )
  1802. iLastTxStartLine = iCurLine;
  1803. }
  1804. session::SetInTrans ( false );
  1805. HTTPINFO << "inserted " << iCurLine << " result: " << (int)bResult << ", error:" << m_sError;
  1806. return FinishBulk();
  1807. }
  1808. private:
  1809. bool CheckNDJson()
  1810. {
  1811. if ( !m_tOptions.Exists ( "content-type" ) )
  1812. {
  1813. ReportError ( "Content-Type must be set", HttpErrorType_e::Parse, EHTTP_STATUS::_400 );
  1814. return false;
  1815. }
  1816. auto sContentType = m_tOptions["content-type"].ToLower();
  1817. auto dParts = sphSplit ( sContentType.cstr(), ";" );
  1818. if ( dParts.IsEmpty() || dParts[0] != "application/x-ndjson" )
  1819. {
  1820. ReportError ( "Content-Type must be application/x-ndjson", HttpErrorType_e::Parse, EHTTP_STATUS::_400 );
  1821. return false;
  1822. }
  1823. return true;
  1824. }
  1825. };
  1826. class HttpHandlerPQ_c final : public HttpHandler_c, public HttpOptionTrait_t
  1827. {
  1828. Str_t m_sQuery;
  1829. public:
  1830. HttpHandlerPQ_c ( Str_t sQuery, const OptionsHash_t & tOptions )
  1831. : HttpOptionTrait_t ( tOptions )
  1832. , m_sQuery ( sQuery )
  1833. {}
  1834. bool Process () final;
  1835. private:
  1836. // FIXME!!! handle replication for InsertOrReplaceQuery and Delete
  1837. bool DoCallPQ ( const CSphString & sIndex, const JsonObj_c & tPercolate, bool bVerbose );
  1838. bool InsertOrReplaceQuery ( const CSphString& sIndex, const JsonObj_c& tJsonQuery, const JsonObj_c& tRoot, CSphString* pUID, bool bReplace );
  1839. bool ListQueries ( const CSphString & sIndex );
  1840. bool Delete ( const CSphString & sIndex, const JsonObj_c & tRoot );
  1841. };
  1842. struct BulkDoc_t
  1843. {
  1844. CSphString m_sAction;
  1845. CSphString m_sIndex;
  1846. DocID_t m_tDocid { 0 };
  1847. Str_t m_tDocLine;
  1848. };
  1849. struct BulkTnx_t
  1850. {
  1851. int m_iFrom { -1 };
  1852. int m_iCount { 0 };
  1853. };
  1854. class HttpHandlerEsBulk_c : public HttpCompatBaseHandler_c, public HttpJsonUpdateTraits_c, public HttpJsonTxnTraits_c
  1855. {
  1856. public:
  1857. HttpHandlerEsBulk_c ( Str_t sBody, int iReqType, const SmallStringHash_T<CSphString> & hOpts )
  1858. : HttpCompatBaseHandler_c ( sBody, iReqType, hOpts )
  1859. , HttpJsonUpdateTraits_c ( ResultSetFormat_e::ES )
  1860. , HttpJsonTxnTraits_c ( ResultSetFormat_e::ES )
  1861. {}
  1862. bool Process () override;
  1863. private:
  1864. bool ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecTraits_T<BulkDoc_t> & dDocs, JsonObj_c & tItems );
  1865. bool Validate();
  1866. void ReportLogError ( const char * sError, HttpErrorType_e eType , EHTTP_STATUS eStatus, bool bLogOnly );
  1867. };
  1868. static std::unique_ptr<HttpHandler_c> CreateHttpHandler ( EHTTP_ENDPOINT eEndpoint, CharStream_c & tSource, Str_t & sQuery, OptionsHash_t & tOptions, http_method eRequestType )
  1869. {
  1870. const CSphString * pOption = nullptr;
  1871. sQuery = dEmptyStr;
  1872. auto SetQuery = [&sQuery] ( Str_t&& sData ) {
  1873. auto& tCrashQuery = GlobalCrashQueryGetRef();
  1874. tCrashQuery.m_dQuery = { (const BYTE*)sData.first, sData.second };
  1875. sQuery = sData;
  1876. };
  1877. // SPH_HTTP_ENDPOINT_SQL SPH_HTTP_ENDPOINT_CLI SPH_HTTP_ENDPOINT_CLI_JSON these endpoints url-encoded, all others are plain json, and we don't want to waste time pre-parsing them
  1878. if ( eEndpoint== EHTTP_ENDPOINT::SQL || eEndpoint== EHTTP_ENDPOINT::CLI || eEndpoint== EHTTP_ENDPOINT::CLI_JSON )
  1879. {
  1880. auto sWholeData = tSource.ReadAll();
  1881. if ( tSource.GetError() )
  1882. return nullptr;
  1883. if ( eEndpoint == EHTTP_ENDPOINT::SQL )
  1884. {
  1885. const std::array<Str_t, 3> sQueries { FROMS ( "query=" ), FROMS ( "mode=raw&query=" ), FROMS ( "raw_response=true&query=" ) };
  1886. if ( std::any_of ( sQueries.cbegin(), sQueries.cend(), [&sWholeData] ( const Str_t& S ) { return sWholeData.second >= S.second && 0 == memcmp ( sWholeData.first, S.first, S.second ); } ) )
  1887. {
  1888. DecodeAndStoreRawQuery ( tOptions, sWholeData );
  1889. HttpRequestParser_c::ParseList ( sWholeData, tOptions );
  1890. } else
  1891. {
  1892. StoreRawQuery ( tOptions, { sWholeData } );
  1893. tOptions.Add ( sWholeData, "query" );
  1894. }
  1895. } else
  1896. StoreRawQuery ( tOptions, { sWholeData } );
  1897. }
  1898. switch ( eEndpoint )
  1899. {
  1900. case EHTTP_ENDPOINT::SQL:
  1901. {
  1902. bool bRawMode = false;
  1903. pOption = tOptions ( "mode" );
  1904. if ( pOption )
  1905. bRawMode = *pOption == "raw";
  1906. else
  1907. {
  1908. pOption = tOptions ( "raw_response" );
  1909. if ( pOption )
  1910. bRawMode = *pOption == "true";
  1911. }
  1912. if ( bRawMode )
  1913. {
  1914. auto pQuery = tOptions ( "query" );
  1915. if ( pQuery )
  1916. SetQuery ( FromStr ( *pQuery ) );
  1917. return std::make_unique<HttpRawSqlHandler_c> ( sQuery, tOptions ); // non-json
  1918. }
  1919. else
  1920. {
  1921. pOption = tOptions ( "raw_query" );
  1922. if ( pOption )
  1923. SetQuery ( FromStr (*pOption) );
  1924. return std::make_unique<HttpSearchHandler_SQL_c> ( tOptions ); // non-json
  1925. }
  1926. }
  1927. case EHTTP_ENDPOINT::CLI:
  1928. case EHTTP_ENDPOINT::CLI_JSON:
  1929. {
  1930. pOption = tOptions ( "raw_query" );
  1931. auto tQuery = pOption ? FromStr ( *pOption ) : dEmptyStr;
  1932. SetQuery ( std::move ( tQuery ) );
  1933. return std::make_unique<HttpRawSqlHandler_c> ( sQuery, tOptions ); // non-json
  1934. }
  1935. case EHTTP_ENDPOINT::JSON_SEARCH:
  1936. SetQuery ( tSource.ReadAll() );
  1937. return std::make_unique<HttpHandler_JsonSearch_c> ( sQuery, tOptions ); // json
  1938. case EHTTP_ENDPOINT::JSON_INDEX:
  1939. case EHTTP_ENDPOINT::JSON_CREATE:
  1940. case EHTTP_ENDPOINT::JSON_INSERT:
  1941. case EHTTP_ENDPOINT::JSON_REPLACE:
  1942. SetQuery ( tSource.ReadAll() );
  1943. if ( tSource.GetError() )
  1944. return nullptr;
  1945. else
  1946. return std::make_unique<HttpHandler_JsonInsert_c> ( sQuery, eEndpoint==EHTTP_ENDPOINT::JSON_INDEX || eEndpoint==EHTTP_ENDPOINT::JSON_REPLACE ); // json
  1947. case EHTTP_ENDPOINT::JSON_UPDATE:
  1948. SetQuery ( tSource.ReadAll() );
  1949. if ( tSource.GetError() )
  1950. return nullptr;
  1951. else
  1952. return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery ); // json
  1953. case EHTTP_ENDPOINT::JSON_DELETE:
  1954. SetQuery ( tSource.ReadAll() );
  1955. if ( tSource.GetError() )
  1956. return nullptr;
  1957. else
  1958. return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery ); // json
  1959. case EHTTP_ENDPOINT::JSON_BULK:
  1960. return std::make_unique<HttpHandler_JsonBulk_c> ( tSource, tOptions ); // json
  1961. case EHTTP_ENDPOINT::PQ:
  1962. SetQuery ( tSource.ReadAll() );
  1963. if ( tSource.GetError() )
  1964. return nullptr;
  1965. else
  1966. return std::make_unique<HttpHandlerPQ_c> ( sQuery, tOptions ); // json
  1967. case EHTTP_ENDPOINT::ES_BULK:
  1968. SetQuery ( tSource.ReadAll() );
  1969. if ( tSource.GetError() )
  1970. return nullptr;
  1971. else
  1972. return std::make_unique<HttpHandlerEsBulk_c> ( sQuery, eRequestType, tOptions );
  1973. case EHTTP_ENDPOINT::TOTAL:
  1974. SetQuery ( tSource.ReadAll() );
  1975. if ( tSource.GetError() )
  1976. return nullptr;
  1977. else
  1978. return CreateCompatHandler ( sQuery, eRequestType, tOptions );
  1979. default:
  1980. break;
  1981. }
  1982. return nullptr;
  1983. }
  1984. HttpProcessResult_t ProcessHttpQuery ( CharStream_c & tSource, Str_t & sSrcQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult, bool bNeedHttpResponse, http_method eRequestType )
  1985. {
  1986. TRACE_CONN ( "conn", "ProcessHttpQuery" );
  1987. HttpProcessResult_t tRes;
  1988. const CSphString & sEndpoint = hOptions["endpoint"];
  1989. tRes.m_eEndpoint = StrToHttpEndpoint ( sEndpoint );
  1990. std::unique_ptr<HttpHandler_c> pHandler = CreateHttpHandler ( tRes.m_eEndpoint, tSource, sSrcQuery, hOptions, eRequestType );
  1991. if ( !pHandler )
  1992. {
  1993. if ( tRes.m_eEndpoint == EHTTP_ENDPOINT::INDEX )
  1994. {
  1995. HttpHandlerIndexPage ( dResult );
  1996. } else
  1997. {
  1998. DumpHttp ( eRequestType, sEndpoint, tSource.ReadAll() );
  1999. tRes.m_eReplyHttpCode = EHTTP_STATUS::_501;
  2000. if ( tSource.GetError() )
  2001. {
  2002. tRes.m_sError = tSource.GetErrorMessage();
  2003. if ( tRes.m_sError.Begins ( "length out of bounds" ) )
  2004. tRes.m_eReplyHttpCode = EHTTP_STATUS::_413;
  2005. } else
  2006. {
  2007. tRes.m_sError.SetSprintf ( "/%s - unsupported endpoint", sEndpoint.cstr() );
  2008. }
  2009. sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
  2010. }
  2011. return tRes;
  2012. }
  2013. // will be processed by buddy right after source data got parsed
  2014. if ( tRes.m_eEndpoint == EHTTP_ENDPOINT::CLI )
  2015. return tRes;
  2016. pHandler->SetErrorFormat ( bNeedHttpResponse );
  2017. tRes.m_bOk = pHandler->Process();
  2018. tRes.m_sError = pHandler->GetError();
  2019. tRes.m_eReplyHttpCode = pHandler->GetStatusCode();
  2020. dResult = std::move ( pHandler->GetResult() );
  2021. return tRes;
  2022. }
  2023. void sphProcessHttpQueryNoResponce ( const CSphString & sEndpoint, const CSphString & sQuery, CSphVector<BYTE> & dResult )
  2024. {
  2025. OptionsHash_t hOptions;
  2026. hOptions.Add ( sEndpoint, "endpoint" );
  2027. BlobStream_c tQuery ( sQuery );
  2028. Str_t sSrcQuery;
  2029. HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, HTTP_GET );
  2030. ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, HTTP_GET );
  2031. }
  2032. static bool IsCompressed ( const OptionsHash_t & hOptions )
  2033. {
  2034. const CSphString * pEncoding = hOptions ( "content-encoding" );
  2035. if ( !pEncoding )
  2036. return false;
  2037. return ( *pEncoding=="gzip" );
  2038. }
  2039. bool HttpRequestParser_c::ProcessClientHttp ( AsyncNetInputBuffer_c& tIn, CSphVector<BYTE>& dResult )
  2040. {
  2041. assert ( !m_szError );
  2042. std::unique_ptr<CharStream_c> pSource;
  2043. bool bCompressed = IsCompressed ( m_hOptions );
  2044. if ( m_tParser.flags & F_CHUNKED )
  2045. {
  2046. pSource = std::make_unique<ChunkedSocketStream_c> ( &tIn, &m_tParser, m_bBodyDone, std::move ( m_dParsedBodies ), m_iLastParsed );
  2047. } else
  2048. {
  2049. // for non-chunked - need to throw out beginning of the packet (with header). Only body rest in the buffer.
  2050. tIn.PopTail ( m_iLastParsed - ParsedBodyLength() );
  2051. int iFullLength = ParsedBodyLength() + ( (int)m_tParser.content_length > 0 ? (int)m_tParser.content_length : 0 );
  2052. pSource = std::make_unique<RawSocketStream_c> ( &tIn, iFullLength, bCompressed );
  2053. }
  2054. EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( m_sEndpoint );
  2055. if ( IsLogManagementEnabled() && eEndpoint==EHTTP_ENDPOINT::TOTAL && m_sEndpoint.Ends ( "_bulk" ) )
  2056. eEndpoint = EHTTP_ENDPOINT::ES_BULK;
  2057. HttpProcessResult_t tRes;
  2058. Str_t sSrcQuery;
  2059. if ( bCompressed && !HasGzip() )
  2060. {
  2061. // 14.11 Content-Encoding
  2062. // If the content-coding of an entity in a request message is not acceptable to the origin server, the server SHOULD respond with a status code of 415 (Unsupported Media Type)
  2063. tRes.m_eReplyHttpCode = EHTTP_STATUS::_415;
  2064. tRes.m_bOk = false;
  2065. tRes.m_sError = "gzip error: unpack is not supported, rebuild with zlib";
  2066. sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
  2067. } else if ( bCompressed && ( m_tParser.flags & F_CHUNKED ) )
  2068. {
  2069. tRes.m_eReplyHttpCode = EHTTP_STATUS::_415;
  2070. tRes.m_bOk = false;
  2071. tRes.m_sError = "can not process chunked transfer-coding along with gzip";
  2072. sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
  2073. } else
  2074. {
  2075. tRes = ProcessHttpQuery ( *pSource, sSrcQuery, m_hOptions, dResult, true, m_eType );
  2076. }
  2077. return ProcessHttpQueryBuddy ( tRes, sSrcQuery, m_hOptions, dResult, true, m_eType );
  2078. }
  2079. void sphHttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError )
  2080. {
  2081. JsonObj_c tErr;
  2082. tErr.AddStr ( "error", szError );
  2083. CSphString sJsonError = tErr.AsString();
  2084. HttpBuildReply ( dData, eCode, FromStr (sJsonError), false );
  2085. }
  2086. static void EncodePercolateMatchResult ( const PercolateMatchResult_t & tRes, const CSphFixedVector<int64_t> & dDocids, const CSphString & sIndex, JsonEscapedBuilder & tOut )
  2087. {
  2088. ScopedComma_c sRootBlock ( tOut, ",", "{", "}" );
  2089. // column names
  2090. tOut.Sprintf ( R"("took":%d,"timed_out":false)", ( int ) ( tRes.m_tmTotal / 1000 ));
  2091. // hits {
  2092. ScopedComma_c sHitsBlock ( tOut, ",", R"("hits":{)", "}");
  2093. tOut.Sprintf ( R"("total":%d,"max_score":1)", tRes.m_dQueryDesc.GetLength()); // FIXME!!! track and provide weight
  2094. if ( tRes.m_bVerbose )
  2095. tOut.Sprintf ( R"("early_out_queries":%d,"matched_queries":%d,"matched_docs":%d,"only_terms_queries":%d,"total_queries":%d)",
  2096. tRes.m_iEarlyOutQueries, tRes.m_iQueriesMatched, tRes.m_iDocsMatched, tRes.m_iOnlyTerms, tRes.m_iTotalQueries );
  2097. // documents
  2098. tOut.StartBlock ( ",", R"("hits":[)", "]");
  2099. int iDocOff = 0;
  2100. for ( const auto& tDesc : tRes.m_dQueryDesc )
  2101. {
  2102. ScopedComma_c sQueryComma ( tOut, ",","{"," }");
  2103. tOut.Sprintf ( R"("table":"%s","_type":"doc","_id":"%U","_score":"1")", sIndex.cstr(), tDesc.m_iQUID );
  2104. {
  2105. ScopedComma_c sBrackets ( tOut, ",", R"("_source":{)", "}");
  2106. if ( !tDesc.m_bQL )
  2107. {
  2108. tOut.Sprintf ( R"("query":%s)", tDesc.m_sQuery.cstr() );
  2109. } else
  2110. {
  2111. ScopedComma_c sBrackets ( tOut, nullptr, R"("query": {"ql":)", "}");
  2112. tOut.AppendEscapedWithComma ( tDesc.m_sQuery.cstr() );
  2113. }
  2114. if ( !tDesc.m_sTags.IsEmpty() )
  2115. tOut.Sprintf ( R"("tags":"%s")", tDesc.m_sTags.cstr() );
  2116. }
  2117. // document count + document id(s)
  2118. if ( tRes.m_bGetDocs )
  2119. {
  2120. ScopedComma_c sFields ( tOut, ",",R"("fields":{"_percolator_document_slot": [)", "] }");
  2121. int iDocs = tRes.m_dDocs[iDocOff];
  2122. for ( int iDoc = 1; iDoc<=iDocs; ++iDoc )
  2123. {
  2124. auto iRow = tRes.m_dDocs[iDocOff + iDoc];
  2125. tOut.Sprintf ("%l", DocID_t ( dDocids.IsEmpty () ? iRow : dDocids[iRow] ) );
  2126. }
  2127. iDocOff += iDocs + 1;
  2128. }
  2129. }
  2130. tOut.FinishBlock ( false ); // hits[]
  2131. // all the rest blocks (root, hits) will be auto-closed here.
  2132. }
  2133. bool HttpHandlerPQ_c::DoCallPQ ( const CSphString & sIndex, const JsonObj_c & tPercolate, bool bVerbose )
  2134. {
  2135. CSphString sWarning, sTmp;
  2136. BlobVec_t dDocs;
  2137. // single document
  2138. JsonObj_c tJsonDoc = tPercolate.GetObjItem ( "document", sTmp );
  2139. if ( tJsonDoc )
  2140. {
  2141. auto & tDoc = dDocs.Add();
  2142. if ( !bson::JsonObjToBson ( tJsonDoc, tDoc, g_bJsonAutoconvNumbers, g_bJsonKeynamesToLowercase ) )
  2143. {
  2144. ReportError ( "Bad cjson", EHTTP_STATUS::_400 );
  2145. return false;
  2146. }
  2147. }
  2148. // multiple documents
  2149. JsonObj_c tJsonDocs = tPercolate.GetArrayItem ( "documents", m_sError, true );
  2150. if ( !m_sError.IsEmpty() )
  2151. {
  2152. ReportError ( EHTTP_STATUS::_400 );
  2153. return false;
  2154. }
  2155. for ( auto i : tJsonDocs )
  2156. {
  2157. auto & tDoc = dDocs.Add();
  2158. if ( !bson::JsonObjToBson ( i, tDoc, g_bJsonAutoconvNumbers, g_bJsonKeynamesToLowercase ) )
  2159. {
  2160. ReportError ( "Bad cjson", EHTTP_STATUS::_400 );
  2161. return false;
  2162. }
  2163. }
  2164. if ( dDocs.IsEmpty() )
  2165. {
  2166. ReportError ( "no documents found", EHTTP_STATUS::_400 );
  2167. return false;
  2168. }
  2169. PercolateOptions_t tOpts;
  2170. tOpts.m_sIndex = sIndex;
  2171. tOpts.m_bGetDocs = true;
  2172. tOpts.m_bVerbose = bVerbose;
  2173. tOpts.m_bGetQuery = true;
  2174. // fixme! id alias here is 'id' or 'uid'. Process it!
  2175. CSphSessionAccum tAcc;
  2176. CPqResult tResult;
  2177. tResult.m_dResult.m_bGetFilters = false;
  2178. PercolateMatchDocuments ( dDocs, tOpts, tAcc, tResult );
  2179. JsonEscapedBuilder sRes;
  2180. EncodePercolateMatchResult ( tResult.m_dResult, tResult.m_dDocids, sIndex, sRes );
  2181. BuildReply ( sRes, EHTTP_STATUS::_200 );
  2182. return true;
  2183. }
  2184. static void EncodePercolateQueryResult ( bool bReplace, const CSphString & sIndex, int64_t iID, StringBuilder_c & tOut )
  2185. {
  2186. if ( bReplace )
  2187. tOut.Sprintf (R"({"table":"%s","type":"doc","_id":"%U","result":"updated","forced_refresh":true})", sIndex.cstr(), iID);
  2188. else
  2189. tOut.Sprintf ( R"({"table":"%s","type":"doc","_id":"%U","result":"created"})", sIndex.cstr (), iID );
  2190. }
  2191. bool HttpHandlerPQ_c::InsertOrReplaceQuery ( const CSphString & sIndex, const JsonObj_c & tJsonQuery, const JsonObj_c & tRoot, CSphString * pUID, bool bReplace )
  2192. {
  2193. CSphString sTmp, sWarning;
  2194. bool bQueryQL = true;
  2195. CSphQuery tQuery;
  2196. const char * sQuery = nullptr;
  2197. JsonObj_c tQueryQL = tJsonQuery.GetStrItem ( "ql", sTmp );
  2198. if ( tQueryQL )
  2199. sQuery = tQueryQL.SzVal();
  2200. else
  2201. {
  2202. bQueryQL = false;
  2203. if ( !ParseJsonQueryFilters ( tJsonQuery, tQuery, m_sError, sWarning ) )
  2204. {
  2205. ReportError ( EHTTP_STATUS::_400 );
  2206. return false;
  2207. }
  2208. if ( NonEmptyQuery ( tJsonQuery ) )
  2209. sQuery = tQuery.m_sQuery.cstr();
  2210. }
  2211. if ( !sQuery || *sQuery=='\0' )
  2212. {
  2213. ReportError ( "no query found", EHTTP_STATUS::_400 );
  2214. return false;
  2215. }
  2216. int64_t iID = 0;
  2217. if ( pUID && !pUID->IsEmpty() )
  2218. iID = strtoll ( pUID->cstr(), nullptr, 10 );
  2219. JsonObj_c tTagsArray = tRoot.GetArrayItem ( "tags", m_sError, true );
  2220. if ( !m_sError.IsEmpty() )
  2221. {
  2222. ReportError ( EHTTP_STATUS::_400 );
  2223. return false;
  2224. }
  2225. StringBuilder_c sTags (", ");
  2226. for ( const auto & i : tTagsArray )
  2227. sTags << i.SzVal();
  2228. JsonObj_c tFilters = tRoot.GetStrItem ( "filters", m_sError, true );
  2229. if ( !m_sError.IsEmpty() )
  2230. {
  2231. ReportError ( EHTTP_STATUS::_400 );
  2232. return false;
  2233. }
  2234. if ( tFilters && !bQueryQL && tQuery.m_dFilters.GetLength() )
  2235. {
  2236. ReportError ( "invalid combination of SphinxQL and query filter provided", EHTTP_STATUS::_501 );
  2237. return false;
  2238. }
  2239. CSphVector<CSphFilterSettings> dFilters;
  2240. CSphVector<FilterTreeItem_t> dFilterTree;
  2241. if ( tFilters )
  2242. {
  2243. auto pServed = GetServed ( sIndex );
  2244. if ( !CheckValid ( pServed, sIndex, IndexType_e::PERCOLATE ) )
  2245. return false;
  2246. RIdx_T<const PercolateIndex_i*> pIndex { pServed };
  2247. if ( !PercolateParseFilters ( tFilters.SzVal(), SPH_COLLATION_UTF8_GENERAL_CI, pIndex->GetInternalSchema (), dFilters, dFilterTree, m_sError ) )
  2248. {
  2249. ReportError ( EHTTP_STATUS::_400 );
  2250. return false;
  2251. }
  2252. } else
  2253. {
  2254. dFilters.SwapData ( tQuery.m_dFilters );
  2255. dFilterTree.SwapData ( tQuery.m_dFilterTree );
  2256. }
  2257. // scope for index lock
  2258. bool bOk = false;
  2259. {
  2260. auto pServed = GetServed ( sIndex );
  2261. if ( !CheckValid ( pServed, sIndex, IndexType_e::PERCOLATE ))
  2262. return false;
  2263. RIdx_T<PercolateIndex_i*> pIndex { pServed };
  2264. PercolateQueryArgs_t tArgs ( dFilters, dFilterTree );
  2265. tArgs.m_sQuery = sQuery;
  2266. tArgs.m_sTags = sTags.cstr();
  2267. tArgs.m_iQUID = iID;
  2268. tArgs.m_bReplace = bReplace;
  2269. tArgs.m_bQL = bQueryQL;
  2270. // add query
  2271. auto pStored = pIndex->CreateQuery ( tArgs, m_sError );
  2272. if ( pStored )
  2273. {
  2274. auto* pSession = session::GetClientSession();
  2275. auto& tAcc = pSession->m_tAcc;
  2276. auto* pAccum = tAcc.GetAcc( pIndex, m_sError );
  2277. ReplicationCommand_t * pCmd = pAccum->AddCommand ( ReplCmd_e::PQUERY_ADD, sIndex );
  2278. // refresh query's UID for reply as it might be auto-generated
  2279. iID = pStored->m_iQUID;
  2280. pCmd->m_pStored = std::move ( pStored );
  2281. bOk = HandleCmdReplicate ( *pAccum );
  2282. TlsMsg::MoveError ( m_sError );
  2283. }
  2284. }
  2285. if ( !bOk )
  2286. {
  2287. ReportError ( EHTTP_STATUS::_500 );
  2288. } else
  2289. {
  2290. StringBuilder_c sRes;
  2291. EncodePercolateQueryResult ( bReplace, sIndex, iID, sRes );
  2292. BuildReply ( sRes, EHTTP_STATUS::_200 );
  2293. }
  2294. return bOk;
  2295. }
  2296. // for now - forcibly route query as /json/search POST {"table":"<idx>"}. Later matter of deprecate/delete
  2297. bool HttpHandlerPQ_c::ListQueries ( const CSphString & sIndex )
  2298. {
  2299. StringBuilder_c sQuery;
  2300. sQuery.Sprintf(R"({"table":"%s"})", sIndex.scstr());
  2301. auto pHandler = std::make_unique<HttpHandler_JsonSearch_c> ( (Str_t)sQuery, m_tOptions ) ;
  2302. if ( !pHandler )
  2303. return false;
  2304. pHandler->SetErrorFormat (m_bNeedHttpResponse);
  2305. pHandler->Process ();
  2306. m_dData = std::move ( pHandler->GetResult ());
  2307. return true;
  2308. }
  2309. bool HttpHandlerPQ_c::Delete ( const CSphString & sIndex, const JsonObj_c & tRoot )
  2310. {
  2311. auto* pSession = session::GetClientSession();
  2312. auto& tAcc = pSession->m_tAcc;
  2313. auto* pAccum = tAcc.GetAcc ();
  2314. ReplicationCommand_t * pCmd = pAccum->AddCommand ( ReplCmd_e::PQUERY_DELETE, sIndex );
  2315. JsonObj_c tTagsArray = tRoot.GetArrayItem ( "tags", m_sError, true );
  2316. if ( !m_sError.IsEmpty() )
  2317. {
  2318. ReportError ( EHTTP_STATUS::_400 );
  2319. return false;
  2320. }
  2321. StringBuilder_c sTags ( ", " );
  2322. for ( const auto & i : tTagsArray )
  2323. sTags << i.SzVal();
  2324. JsonObj_c tUidsArray = tRoot.GetArrayItem ( "id", m_sError, true );
  2325. if ( !m_sError.IsEmpty() )
  2326. {
  2327. ReportError ( EHTTP_STATUS::_400 );
  2328. return false;
  2329. }
  2330. for ( const auto & i : tUidsArray )
  2331. pCmd->m_dDeleteQueries.Add ( i.IntVal() );
  2332. if ( !sTags.GetLength() && !pCmd->m_dDeleteQueries.GetLength() )
  2333. {
  2334. ReportError ( "no tags or id field arrays found", EHTTP_STATUS::_400 );
  2335. return false;
  2336. }
  2337. pCmd->m_sDeleteTags = sTags.cstr();
  2338. uint64_t tmStart = sphMicroTimer();
  2339. int iDeleted = 0;
  2340. bool bOk = HandleCmdReplicateDelete ( *pAccum, iDeleted );
  2341. TlsMsg::MoveError ( m_sError );
  2342. uint64_t tmTotal = sphMicroTimer() - tmStart;
  2343. if ( !bOk )
  2344. {
  2345. FormatError ( EHTTP_STATUS::_400, "%s", m_sError.cstr() );
  2346. return false;
  2347. }
  2348. StringBuilder_c tOut;
  2349. tOut.Sprintf (R"({"took":%d,"timed_out":false,"deleted":%d,"total":%d,"failures":[]})",
  2350. ( int ) ( tmTotal / 1000 ), iDeleted, iDeleted );
  2351. BuildReply ( tOut, EHTTP_STATUS::_200 );
  2352. return true;
  2353. }
  2354. bool HttpHandlerPQ_c::Process()
  2355. {
  2356. TRACE_CONN ( "conn", "HttpHandlerPQ_c::Process" );
  2357. CSphString * sEndpoint = m_tOptions ( "endpoint" );
  2358. if ( !sEndpoint || sEndpoint->IsEmpty() )
  2359. {
  2360. FormatError ( EHTTP_STATUS::_400, "invalid empty endpoint, should be pq/index_name/operation");
  2361. return false;
  2362. }
  2363. assert ( sEndpoint->Begins ( "json/pq/" ) || sEndpoint->Begins ( "pq/" ) );
  2364. const char * sEndpointMethod = sEndpoint->cstr() + sizeof("pq/") - 1;
  2365. if ( sEndpoint->Begins ( "json/pq/" ) )
  2366. sEndpointMethod = sEndpoint->cstr() + sizeof("json/pq/") - 1;
  2367. StrVec_t dPoints;
  2368. sphSplit ( dPoints, sEndpointMethod, "/" );
  2369. if ( dPoints.GetLength()<2 )
  2370. {
  2371. FormatError ( EHTTP_STATUS::_400, "invalid endpoint '%s', should be pq/index_name/operation", sEndpoint->scstr() );
  2372. return false;
  2373. }
  2374. const CSphString & sIndex = dPoints[0];
  2375. const CSphString & sOp = dPoints[1];
  2376. CSphString * pUID = nullptr;
  2377. if ( dPoints.GetLength()>2 )
  2378. pUID = dPoints.Begin() + 2;
  2379. enum class PercolateOp_e
  2380. {
  2381. UNKNOWN,
  2382. ADD,
  2383. DEL,
  2384. SEARCH
  2385. } eOp = PercolateOp_e::UNKNOWN;
  2386. if ( sOp=="_delete_by_query" )
  2387. eOp = PercolateOp_e::DEL;
  2388. else if ( sOp=="doc" )
  2389. eOp = PercolateOp_e::ADD;
  2390. else if ( sOp=="search" )
  2391. eOp = PercolateOp_e::SEARCH;
  2392. if ( IsEmpty ( m_sQuery ) )
  2393. return ListQueries ( sIndex );
  2394. const JsonObj_c tRoot ( m_sQuery );
  2395. if ( !tRoot )
  2396. {
  2397. ReportError ( "bad JSON object", EHTTP_STATUS::_400 );
  2398. return false;
  2399. }
  2400. if ( !tRoot.Size() )
  2401. return ListQueries ( sIndex );
  2402. if ( eOp==PercolateOp_e::UNKNOWN )
  2403. {
  2404. m_sError.SetSprintf ( "invalid percolate operation '%s', should be one of 'search' or 'doc' or '_delete_by_query'", sOp.cstr() );
  2405. ReportError ( EHTTP_STATUS::_400 );
  2406. return false;
  2407. }
  2408. JsonObj_c tQuery = tRoot.GetObjItem ( "query", m_sError, ( eOp==PercolateOp_e::DEL ) );
  2409. if ( !tQuery && ( eOp!=PercolateOp_e::DEL ) )
  2410. {
  2411. ReportError ( EHTTP_STATUS::_400 );
  2412. return false;
  2413. }
  2414. JsonObj_c tPerc = ( ( eOp==PercolateOp_e::SEARCH ) ? tQuery.GetObjItem ( "percolate", m_sError ) : JsonNull );
  2415. if ( ( eOp==PercolateOp_e::SEARCH ) && !tPerc )
  2416. {
  2417. ReportError ( EHTTP_STATUS::_400 );
  2418. return false;
  2419. }
  2420. bool bVerbose = false;
  2421. JsonObj_c tVerbose = tRoot.GetItem ( "verbose" );
  2422. if ( tVerbose )
  2423. {
  2424. if ( tVerbose.IsDbl() )
  2425. bVerbose = tVerbose.DblVal()!=0.0;
  2426. else if ( tVerbose.IsInt() )
  2427. bVerbose = tVerbose.IntVal()!=0;
  2428. else if ( tVerbose.IsBool() )
  2429. bVerbose = tVerbose.BoolVal();
  2430. }
  2431. if ( eOp==PercolateOp_e::SEARCH )
  2432. return DoCallPQ ( sIndex, tPerc, bVerbose );
  2433. else if ( eOp==PercolateOp_e::DEL )
  2434. return Delete ( sIndex, tRoot );
  2435. else
  2436. {
  2437. bool bRefresh = false;
  2438. CSphString * pRefresh = m_tOptions ( "refresh" );
  2439. if ( pRefresh && !pRefresh->IsEmpty() )
  2440. {
  2441. if ( *pRefresh=="0" )
  2442. bRefresh = false;
  2443. else if ( *pRefresh=="1" )
  2444. bRefresh = true;
  2445. }
  2446. return InsertOrReplaceQuery ( sIndex, tQuery, tRoot, pUID, bRefresh );
  2447. }
  2448. }
  2449. static bool ParseMetaLine ( const char * sLine, BulkDoc_t & tDoc, CSphString & sError )
  2450. {
  2451. JsonObj_c tLineMeta ( sLine );
  2452. JsonObj_c tAction = tLineMeta[0];
  2453. if ( !tAction )
  2454. {
  2455. sError = "no statement found";
  2456. return false;
  2457. }
  2458. tDoc.m_sAction = tAction.Name();
  2459. if ( !tAction.IsObj() )
  2460. {
  2461. sError.SetSprintf ( "statement %s should be an object", tDoc.m_sAction.cstr() );
  2462. return false;
  2463. }
  2464. JsonObj_c tIndex = tAction.GetStrItem ( "_index", sError );
  2465. if ( !tIndex )
  2466. return false;
  2467. tDoc.m_sIndex = tIndex.StrVal();
  2468. JsonObj_c tId = tAction.GetItem ( "_id" );
  2469. if ( tId )
  2470. {
  2471. if ( tId.IsNum() )
  2472. tDoc.m_tDocid = tId.IntVal();
  2473. else if ( tId.IsStr() )
  2474. tDoc.m_tDocid = GetDocID ( tId.SzVal() );
  2475. else if ( tId.IsNull() )
  2476. tDoc.m_tDocid = 0;
  2477. else
  2478. {
  2479. sError.SetSprintf ( "_id should be an int or string" );
  2480. return false;
  2481. }
  2482. }
  2483. return true;
  2484. }
  2485. static bool AddDocid ( SqlStmt_t & tStmt, DocID_t & tDocId, CSphString & sError )
  2486. {
  2487. int iDocidPos = tStmt.m_dInsertSchema.GetFirst ( [&] ( const CSphString & sName ) { return sName=="id"; } );
  2488. if ( iDocidPos!=-1 )
  2489. {
  2490. SqlInsert_t & tVal = tStmt.m_dInsertValues[iDocidPos];
  2491. // check and convert to int
  2492. if ( tVal.m_iType!=SqlInsert_t::CONST_INT )
  2493. {
  2494. tVal.SetValueInt ( GetDocID ( tVal.m_sVal.cstr() ), false );
  2495. tVal.m_iType = SqlInsert_t::CONST_INT;
  2496. }
  2497. DocID_t tSrcDocid = (int64_t)tVal.GetValueUint();
  2498. // can not set id at the same time via es meta and via document id property
  2499. if ( tDocId && tDocId!=tSrcDocid )
  2500. {
  2501. sError = "id has already been specified";
  2502. return false;
  2503. }
  2504. tDocId = tSrcDocid;
  2505. return true;
  2506. }
  2507. if ( !tDocId )
  2508. return true;
  2509. tStmt.m_dInsertSchema.Add ( sphGetDocidName() );
  2510. SqlInsert_t & tId = tStmt.m_dInsertValues.Add();
  2511. tId.m_iType = SqlInsert_t::CONST_INT;
  2512. tId.SetValueInt(tDocId);
  2513. tStmt.m_iSchemaSz = tStmt.m_dInsertSchema.GetLength();
  2514. return true;
  2515. }
  2516. static bool ParseSourceLine ( const char * sLine, const CSphString & sAction, SqlStmt_t & tStmt, DocID_t & tDocId, CSphString & sError )
  2517. {
  2518. // FIXME!!! update and delete ES compat endpoints
  2519. if ( sAction=="index" )
  2520. {
  2521. JsonObj_c tRoot ( sLine );
  2522. if ( !ParseJsonInsertSource ( tRoot, tStmt, true, sError ) )
  2523. return false;
  2524. if ( !AddDocid ( tStmt, tDocId, sError ) )
  2525. return false;
  2526. } else if ( sAction=="create" )
  2527. {
  2528. JsonObj_c tRoot ( sLine );
  2529. if ( !ParseJsonInsertSource ( tRoot, tStmt, false, sError ) )
  2530. return false;
  2531. if ( !AddDocid ( tStmt, tDocId, sError ) )
  2532. return false;
  2533. } else if ( sAction=="update" )
  2534. {
  2535. JsonObj_c tUpd ( FromSz ( sLine ) );
  2536. tUpd.AddStr ( "table", tStmt.m_sIndex );
  2537. tUpd.AddInt ( "id", tDocId );
  2538. if ( !ParseJsonUpdate ( tUpd, tStmt, tDocId, sError ) )
  2539. return false;
  2540. } else if ( sAction=="delete" )
  2541. {
  2542. tStmt.m_eStmt = STMT_DELETE;
  2543. tStmt.m_tQuery.m_sSelect = "id";
  2544. CSphFilterSettings & tFilter = tStmt.m_tQuery.m_dFilters.Add();
  2545. tFilter.m_eType = SPH_FILTER_VALUES;
  2546. tFilter.m_dValues.Add ( tDocId );
  2547. tFilter.m_sAttrName = "id";
  2548. } else
  2549. {
  2550. sError.SetSprintf ( "unknown action: %s", sAction.cstr() );
  2551. return false;
  2552. }
  2553. // _bulk could have cluster:index format
  2554. SqlParser_SplitClusterIndex ( tStmt.m_sIndex, &tStmt.m_sCluster );
  2555. return true;
  2556. }
  2557. char * SkipSpace ( char * p )
  2558. {
  2559. while ( sphIsSpace ( *p ) )
  2560. p++;
  2561. return p;
  2562. }
  2563. static CSphString sphHttpEndpointToStr ( EHTTP_ENDPOINT eEndpoint )
  2564. {
  2565. assert ( eEndpoint < EHTTP_ENDPOINT::TOTAL );
  2566. return g_dEndpoints[(int)eEndpoint].m_szName1;
  2567. }
  2568. bool Ends ( const Str_t tVal, const char * sSuffix )
  2569. {
  2570. if ( IsEmpty ( tVal ) || !sSuffix )
  2571. return false;
  2572. auto iSuffix = (int) strlen ( sSuffix );
  2573. if ( tVal.second<iSuffix )
  2574. return false;
  2575. return strncmp ( tVal.first + tVal.second - iSuffix, sSuffix, iSuffix )==0;
  2576. }
  2577. void HttpHandlerEsBulk_c::ReportLogError ( const char * sError, HttpErrorType_e eType, EHTTP_STATUS eStatus, bool bLogOnly )
  2578. {
  2579. if ( !bLogOnly )
  2580. ReportError ( sError, eType, eStatus );
  2581. for ( char * sCur = (char *)GetBody().first; sCur<GetBody().first+GetBody().second; sCur++ )
  2582. {
  2583. if ( *sCur=='\0' )
  2584. *sCur = '\n';
  2585. }
  2586. const CSphString * pUrl = GetOptions() ( "full_url" );
  2587. HTTPINFO << sError << "\n" << ( pUrl ? pUrl->scstr() : "" ) << "\n" << GetBody().first;
  2588. }
  2589. bool HttpHandlerEsBulk_c::Validate()
  2590. {
  2591. CSphString sError;
  2592. CSphString * pOptContentType = GetOptions() ( "content-type" );
  2593. if ( !pOptContentType )
  2594. {
  2595. ReportLogError ( "Content-Type must be set", HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
  2596. return false;
  2597. }
  2598. // HTTP field could have multiple values
  2599. StrVec_t dOptContentType = sphSplit ( pOptContentType->cstr(), ",; " );
  2600. if ( !dOptContentType.Contains ( "application/x-ndjson" ) && !dOptContentType.Contains ( "application/json" ) )
  2601. {
  2602. sError.SetSprintf ( "Content-Type header [%s] is not supported", pOptContentType->cstr() );
  2603. ReportLogError ( sError.cstr(), HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
  2604. return false;
  2605. }
  2606. if ( IsEmpty ( GetBody() ) )
  2607. {
  2608. ReportLogError ( "request body is required", HttpErrorType_e::Parse, EHTTP_STATUS::_400, false );
  2609. return false;
  2610. }
  2611. if ( !Ends ( GetBody(), "\n" ) )
  2612. {
  2613. ReportLogError ( "The bulk request must be terminated by a newline [\n]", HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
  2614. return false;
  2615. }
  2616. return true;
  2617. }
  2618. bool HttpHandlerEsBulk_c::Process()
  2619. {
  2620. if ( !Validate() )
  2621. return false;
  2622. auto & tCrashQuery = GlobalCrashQueryGetRef();
  2623. tCrashQuery.m_dQuery = S2B ( GetBody() );
  2624. CSphVector<Str_t> dLines;
  2625. SplitNdJson ( GetBody(), [&] ( const char * sLine, int iLen ) { dLines.Add ( Str_t ( sLine, iLen ) ); } );
  2626. CSphString sError;
  2627. CSphVector<BulkDoc_t> dDocs;
  2628. dDocs.Reserve ( dLines.GetLength() / 2 );
  2629. bool bNextLineMeta = true;
  2630. for ( const Str_t & tLine : dLines )
  2631. {
  2632. if ( !bNextLineMeta )
  2633. {
  2634. dDocs.Last().m_tDocLine = tLine;
  2635. bNextLineMeta = true;
  2636. } else
  2637. {
  2638. // skip empty lines if they are meta information
  2639. if ( IsEmpty ( tLine ) )
  2640. continue;
  2641. // any bad meta result in general error
  2642. BulkDoc_t & tDoc = dDocs.Add();
  2643. if ( !ParseMetaLine ( tLine.first, tDoc, sError ) )
  2644. {
  2645. ReportLogError ( sError.cstr(), HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_400, false );
  2646. return false;
  2647. }
  2648. if ( tDoc.m_sAction=="delete" )
  2649. {
  2650. tDoc.m_tDocLine = tLine;
  2651. bNextLineMeta = true;
  2652. } else
  2653. {
  2654. bNextLineMeta = false;
  2655. }
  2656. }
  2657. }
  2658. CSphVector<BulkTnx_t> dTnx;
  2659. const BulkDoc_t * pLastDoc = dDocs.Begin();
  2660. for ( const BulkDoc_t * pCurDoc = pLastDoc + 1; pCurDoc<dDocs.End(); pCurDoc++ )
  2661. {
  2662. // chain the same statements to the same index but not the updates
  2663. if ( pLastDoc->m_sIndex==pCurDoc->m_sIndex && pLastDoc->m_sAction==pCurDoc->m_sAction && pCurDoc->m_sAction!="update" )
  2664. continue;
  2665. BulkTnx_t & tTnx = dTnx.Add();
  2666. tTnx.m_iFrom = pLastDoc - dDocs.Begin();
  2667. tTnx.m_iCount = pCurDoc - pLastDoc;
  2668. pLastDoc = pCurDoc;
  2669. }
  2670. if ( pLastDoc )
  2671. {
  2672. BulkTnx_t & tTnx = dTnx.Add();
  2673. tTnx.m_iFrom = pLastDoc - dDocs.Begin();
  2674. tTnx.m_iCount = dDocs.GetLength() - tTnx.m_iFrom;
  2675. }
  2676. JsonObj_c tItems ( true );
  2677. bool bOk = ProcessTnx ( dTnx, dDocs, tItems );
  2678. JsonObj_c tRoot;
  2679. tRoot.AddItem ( "items", tItems );
  2680. tRoot.AddBool ( "errors", !bOk );
  2681. tRoot.AddInt ( "took", 1 ); // FIXME!!! add delta
  2682. BuildReply ( tRoot.AsString(), ( bOk ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 ) );
  2683. if ( !bOk )
  2684. ReportLogError ( "failed to commit", HttpErrorType_e::Unknown, EHTTP_STATUS::_400, true );
  2685. return bOk;
  2686. }
  2687. static void AddEsReply ( const BulkDoc_t & tDoc, JsonObj_c & tRoot )
  2688. {
  2689. const JsonObj_c tRefShards ( "{ \"total\": 1, \"successful\": 1, \"failed\": 0 }" );
  2690. char sBuf[70];
  2691. snprintf ( sBuf, sizeof(sBuf), UINT64_FMT, (uint64_t)tDoc.m_tDocid );
  2692. const char * sActionRes = "created";
  2693. if ( tDoc.m_sAction=="delete" )
  2694. sActionRes = "deleted";
  2695. else if ( tDoc.m_sAction=="update" )
  2696. sActionRes = "updated";
  2697. JsonObj_c tShard ( tRefShards.Clone() );
  2698. JsonObj_c tRes;
  2699. tRes.AddStr ( "_index", tDoc.m_sIndex.cstr() );
  2700. tRes.AddStr ( "_type", "doc" );
  2701. tRes.AddStr ( "_id", sBuf );
  2702. tRes.AddInt ( "_version", 1 );
  2703. tRes.AddStr ( "result", sActionRes );
  2704. tRes.AddItem ( "_shards", tShard );
  2705. tRes.AddInt ( "_seq_no", 0 );
  2706. tRes.AddInt ( "_primary_term", 1 );
  2707. tRes.AddInt ( "status", 201 );
  2708. JsonObj_c tAction;
  2709. tAction.AddItem ( tDoc.m_sAction.cstr(), tRes );
  2710. tRoot.AddItem ( tAction );
  2711. }
  2712. static void AddEsError ( int iReply, const CSphString & sError, const char * sErrorType, const BulkDoc_t & tDoc, JsonObj_c & tRoot )
  2713. {
  2714. char sBuf[70];
  2715. snprintf ( sBuf, sizeof(sBuf), UINT64_FMT, (uint64_t)tDoc.m_tDocid );
  2716. JsonObj_c tErrorObj;
  2717. tErrorObj.AddStr ( "type", sErrorType );
  2718. tErrorObj.AddStr ( "reason", sError.cstr() );
  2719. JsonObj_c tRes;
  2720. tRes.AddStr ( "_index", tDoc.m_sIndex.cstr() );
  2721. tRes.AddStr ( "_type", "doc" );
  2722. tRes.AddStr ( "_id", sBuf );
  2723. tRes.AddInt ( "status", 400 );
  2724. tRes.AddItem ( "error", tErrorObj );
  2725. JsonObj_c tAction;
  2726. tAction.AddItem ( tDoc.m_sAction.cstr(), tRes );
  2727. if ( iReply!=-1 )
  2728. tRoot.ReplaceItem ( iReply, tAction );
  2729. else
  2730. tRoot.AddItem ( tAction );
  2731. }
  2732. bool HttpHandlerEsBulk_c::ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecTraits_T<BulkDoc_t> & dDocs, JsonObj_c & tItems )
  2733. {
  2734. bool bOk = true;
  2735. CSphVector<std::pair<int, CSphString>> dErrors;
  2736. for ( const BulkTnx_t & tTnx : dTnx )
  2737. {
  2738. const CSphString & sIdx = dDocs[tTnx.m_iFrom].m_sIndex;
  2739. assert ( !sIdx.IsEmpty() );
  2740. ProcessBegin ( sIdx );
  2741. bool bUpdate = false;
  2742. bOk &= dErrors.IsEmpty();
  2743. dErrors.Resize ( 0 );
  2744. for ( int i = 0; i<tTnx.m_iCount; i++ )
  2745. {
  2746. int iDoc = tTnx.m_iFrom + i;
  2747. BulkDoc_t & tDoc = dDocs[iDoc];
  2748. if ( IsEmpty ( tDoc.m_tDocLine ) )
  2749. {
  2750. dErrors.Add ( { iDoc, "failed to parse, document is empty" } );
  2751. continue;
  2752. }
  2753. SqlStmt_t tStmt;
  2754. tStmt.m_tQuery.m_sIndexes = tDoc.m_sIndex;
  2755. tStmt.m_sIndex = tDoc.m_sIndex;
  2756. tStmt.m_sStmt = tDoc.m_tDocLine.first;
  2757. bool bParsed = ParseSourceLine ( tDoc.m_tDocLine.first, tDoc.m_sAction, tStmt, tDoc.m_tDocid, m_sError );
  2758. if ( !bParsed )
  2759. {
  2760. dErrors.Add ( { iDoc, m_sError } );
  2761. continue;
  2762. }
  2763. bool bAction = false;
  2764. JsonObj_c tResult = JsonNull;
  2765. switch ( tStmt.m_eStmt )
  2766. {
  2767. case STMT_INSERT:
  2768. case STMT_REPLACE:
  2769. bAction = ProcessInsert ( tStmt, tDoc.m_tDocid, tResult, m_sError, ResultSetFormat_e::ES );
  2770. break;
  2771. case STMT_UPDATE:
  2772. tStmt.m_sEndpoint = sphHttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
  2773. bAction = ProcessUpdate ( tDoc.m_tDocLine, tStmt, tDoc.m_tDocid, tResult, m_sError );
  2774. bUpdate = true;
  2775. break;
  2776. case STMT_DELETE:
  2777. tStmt.m_sEndpoint = sphHttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
  2778. bAction = ProcessDelete ( tDoc.m_tDocLine, tStmt, tDoc.m_tDocid, tResult, m_sError, ResultSetFormat_e::ES );
  2779. break;
  2780. default:
  2781. sphWarning ( "unknown statement \"%s\":%s", tStmt.m_sStmt, tDoc.m_tDocLine.first );
  2782. break; // ignore statement as ES does
  2783. }
  2784. if ( !bAction )
  2785. {
  2786. JsonObj_c tError = JsonNull;
  2787. JsonObj_c tErrorType = JsonNull;
  2788. if ( !tResult.Empty() && tResult.HasItem( "error" ) )
  2789. tError = tResult.GetItem ( "error" );
  2790. if ( !tError.Empty() && tError.HasItem ( "type" ))
  2791. tErrorType = tError.GetItem ( "type" );
  2792. if ( !tErrorType.Empty() )
  2793. dErrors.Add ( { iDoc, tErrorType.StrVal() } );
  2794. else
  2795. {
  2796. dErrors.Add ( { iDoc, CSphString() } );
  2797. dErrors.Last().second.SetSprintf ( "unknown statement \"%s\":%s", tStmt.m_sStmt, tDoc.m_tDocLine.first );
  2798. }
  2799. }
  2800. }
  2801. // FIXME!!! check commit of empty accum
  2802. JsonObj_c tResult;
  2803. DocID_t tDocId = 0;
  2804. bool bCommited = ProcessCommitRollback ( FromStr ( sIdx ), tDocId, tResult, m_sError );
  2805. if ( bCommited )
  2806. {
  2807. if ( bUpdate && !GetLastUpdated() )
  2808. {
  2809. assert ( tTnx.m_iCount==1 );
  2810. const BulkDoc_t & tUpdDoc = dDocs[tTnx.m_iFrom];
  2811. CSphString sUpdError;
  2812. sUpdError.SetSprintf ( "[_doc][" INT64_FMT "]: document missing", tUpdDoc.m_tDocid );
  2813. AddEsError ( -1, sUpdError, "document_missing_exception", tUpdDoc, tItems );
  2814. } else
  2815. {
  2816. for ( int i=0; i<tTnx.m_iCount; i++ )
  2817. AddEsReply ( dDocs[tTnx.m_iFrom+i], tItems );
  2818. }
  2819. } else
  2820. {
  2821. for ( int i=0; i<tTnx.m_iCount; i++ )
  2822. {
  2823. AddEsError ( -1, tResult.GetStrItem ( "error", m_sError, false ).StrVal(), "mapper_parsing_exception", dDocs[tTnx.m_iFrom+i], tItems );
  2824. }
  2825. }
  2826. for ( const auto & tErr : dErrors )
  2827. AddEsError ( tErr.first, tErr.second, "mapper_parsing_exception", dDocs[tErr.first], tItems );
  2828. }
  2829. bOk &= dErrors.IsEmpty();
  2830. session::SetInTrans ( false );
  2831. return bOk;
  2832. }
  2833. void SplitNdJson ( Str_t sBody, SplitAction_fn && fnAction )
  2834. {
  2835. const char * sBodyEnd = sBody.first + sBody.second;
  2836. while ( sBody.first<sBodyEnd )
  2837. {
  2838. const char * sNext = sBody.first;
  2839. // break on CR or LF
  2840. while ( sNext<sBodyEnd && *sNext != '\r' && *sNext != '\n' )
  2841. sNext++;
  2842. if ( sNext==sBodyEnd )
  2843. break;
  2844. *(const_cast<char*>(sNext)) = '\0';
  2845. fnAction ( sBody.first, sNext-sBody.first );
  2846. sBody.first = sNext + 1;
  2847. // skip new lines
  2848. while ( sBody.first<sBodyEnd && *sBody.first == '\n' )
  2849. sBody.first++;
  2850. }
  2851. }
  2852. bool HttpSetLogVerbosity ( const CSphString & sVal )
  2853. {
  2854. if ( !sVal.Begins( "http_" ) )
  2855. return false;
  2856. bool bOn = ( sVal.Ends ( "_1" ) || sVal.Ends ( "_on" ) );
  2857. if ( sVal.Begins ( "http_bad_req" ) )
  2858. g_bLogBadHttpReq = bOn;
  2859. else
  2860. LOG_LEVEL_HTTP = bOn;
  2861. return true;
  2862. }
  2863. void LogReplyStatus100()
  2864. {
  2865. HTTPINFO << "100 Continue sent";
  2866. }
  2867. const char * GetErrorTypeName ( HttpErrorType_e eType )
  2868. {
  2869. switch ( eType )
  2870. {
  2871. case HttpErrorType_e::Parse: return "parse_exception";
  2872. case HttpErrorType_e::IllegalArgument: return "illegal_argument_exception";
  2873. case HttpErrorType_e::ActionRequestValidation: return "action_request_validation_exception";
  2874. case HttpErrorType_e::IndexNotFound: return "index_not_found_exception";
  2875. case HttpErrorType_e::ContentParse: return "x_content_parse_exception";
  2876. case HttpErrorType_e::VersionConflictEngine: return "version_conflict_engine_exception";
  2877. case HttpErrorType_e::DocumentMissing: return "document_missing_exception";
  2878. case HttpErrorType_e::ResourceAlreadyExists: return "resource_already_exists_exception";
  2879. case HttpErrorType_e::AliasesNotFound: return "aliases_not_found_exception";
  2880. default:
  2881. return nullptr;;
  2882. }
  2883. }