| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421 |
- //
- // Copyright (c) 2017-2025, Manticore Software LTD (https://manticoresearch.com)
- // Copyright (c) 2001-2016, Andrew Aksyonoff
- // Copyright (c) 2008-2016, Sphinx Technologies Inc
- // All rights reserved
- //
- // This program is free software; you can redistribute it and/or modify
- // it under the terms of the GNU General Public License. You should have
- // received a copy of the GPL license along with this program; if you
- // did not, you can find it at http://www.gnu.org/
- //
- #include "searchdhttp.h"
- #include "jsonqueryfilter.h"
- #include "attribute.h"
- #include "sphinxpq.h"
- #include "http/http_parser.h"
- #include "searchdaemon.h"
- #include "searchdha.h"
- #include "searchdreplication.h"
- #include "accumulator.h"
- #include "networking_daemon.h"
- #include "client_session.h"
- #include "tracer.h"
- #include "searchdbuddy.h"
- #include "compressed_http.h"
- #include "daemon/logger.h"
- #include "daemon/search_handler.h"
- #include "sphinxquery/xqparser.h"
- static bool g_bLogBadHttpReq = val_from_env ( "MANTICORE_LOG_HTTP_BAD_REQ", false ); // log content of bad http requests, ruled by this env variable
- static int g_iLogHttpData = val_from_env ( "MANTICORE_LOG_HTTP_DATA", 0 ); // verbose logging of http data, ruled by this env variable
- static bool LOG_LEVEL_HTTP = val_from_env ( "MANTICORE_LOG_HTTP", false ); // verbose logging http processing events, ruled by this env variable
- #define LOG_COMPONENT_HTTP ""
- #define HTTPINFO LOGMSG ( VERBOSE_DEBUG, HTTP, HTTP )
- extern CSphString g_sStatusVersion;
- static const Str_t g_sDataDisabled = FROMS("-");
- Str_t Data2Log ( Str_t tMsg ) { return ( g_iLogHttpData ? Str_t ( tMsg.first, Min ( tMsg.second, g_iLogHttpData ) ) : g_sDataDisabled ); }
- Str_t Data2Log ( ByteBlob_t tMsg ) { return ( g_iLogHttpData ? Str_t ( (const char *)tMsg.first, Min ( tMsg.second, g_iLogHttpData ) ) : g_sDataDisabled ); }
- int HttpGetStatusCodes ( EHTTP_STATUS eStatus ) noexcept
- {
- switch ( eStatus )
- {
- case EHTTP_STATUS::_100: return 100;
- case EHTTP_STATUS::_200: return 200;
- case EHTTP_STATUS::_206: return 206;
- case EHTTP_STATUS::_400: return 400;
- case EHTTP_STATUS::_403: return 403;
- case EHTTP_STATUS::_404: return 404;
- case EHTTP_STATUS::_405: return 405;
- case EHTTP_STATUS::_409: return 409;
- case EHTTP_STATUS::_413: return 413;
- case EHTTP_STATUS::_415: return 415;
- case EHTTP_STATUS::_500: return 500;
- case EHTTP_STATUS::_501: return 501;
- case EHTTP_STATUS::_503: return 503;
- case EHTTP_STATUS::_526: return 526;
- default: return 503;
- };
- };
- EHTTP_STATUS HttpGetStatusCodes ( int iStatus ) noexcept
- {
- switch ( iStatus )
- {
- case 100: return EHTTP_STATUS::_100;
- case 200: return EHTTP_STATUS::_200;
- case 206: return EHTTP_STATUS::_206;
- case 400: return EHTTP_STATUS::_400;
- case 403: return EHTTP_STATUS::_403;
- case 404: return EHTTP_STATUS::_404;
- case 405: return EHTTP_STATUS::_405;
- case 409: return EHTTP_STATUS::_409;
- case 413: return EHTTP_STATUS::_413;
- case 415: return EHTTP_STATUS::_415;
- case 500: return EHTTP_STATUS::_500;
- case 501: return EHTTP_STATUS::_501;
- case 503: return EHTTP_STATUS::_503;
- case 526: return EHTTP_STATUS::_526;
- default: return EHTTP_STATUS::_503;
- };
- }
- inline constexpr const char* HttpGetStatusName ( EHTTP_STATUS eStatus ) noexcept
- {
- switch ( eStatus )
- {
- case EHTTP_STATUS::_100: return "100 Continue";
- case EHTTP_STATUS::_200: return "200 OK";
- case EHTTP_STATUS::_206: return "206 Partial Content";
- case EHTTP_STATUS::_400: return "400 Bad Request";
- case EHTTP_STATUS::_403: return "403 Forbidden";
- case EHTTP_STATUS::_404: return "404 Not Found";
- case EHTTP_STATUS::_405: return "405 Method Not Allowed";
- case EHTTP_STATUS::_409: return "409 Conflict";
- case EHTTP_STATUS::_413: return "413 Request Entity Too Large";
- case EHTTP_STATUS::_415: return "415 Unsupported Media Type";
- case EHTTP_STATUS::_500: return "500 Internal Server Error";
- case EHTTP_STATUS::_501: return "501 Not Implemented";
- case EHTTP_STATUS::_503: return "503 Service Unavailable";
- case EHTTP_STATUS::_526: return "526 Invalid SSL Certificate";
- default: return "503 Service Unavailable";
- };
- }
- extern CSphString g_sMySQLVersion;
- static void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHtml, bool bHeadReply )
- {
- assert ( sBody && iBodyLen );
- const char * sContent = ( bHtml ? "text/html" : "application/json" );
- CSphString sHttp;
- sHttp.SetSprintf ( "HTTP/1.1 %s\r\nServer: %s\r\nContent-Type: %s; charset=UTF-8\r\nContent-Length:%d\r\n\r\n", HttpGetStatusName(eCode), g_sMySQLVersion.cstr(), sContent, iBodyLen );
- int iHeaderLen = sHttp.Length();
- int iBufLen = iHeaderLen;
- if ( !bHeadReply )
- iBufLen += iBodyLen;
- dData.Resize ( iBufLen );
- memcpy ( dData.Begin(), sHttp.cstr(), iHeaderLen );
- if ( !bHeadReply )
- memcpy ( dData.Begin() + iHeaderLen, sBody, iBodyLen );
- }
- void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHtml )
- {
- HttpBuildReply ( dData, eCode, sBody, iBodyLen, bHtml, false );
- }
- void HttpBuildReplyHead ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * sBody, int iBodyLen, bool bHeadReply )
- {
- HttpBuildReply ( dData, eCode, sBody, iBodyLen, false, bHeadReply );
- }
- void HttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError )
- {
- JsonObj_c tErr;
- tErr.AddStr ( "error", szError );
- CSphString sJsonError = tErr.AsString();
- HttpBuildReply ( dData, eCode, sJsonError.cstr(), sJsonError.Length(), false );
- }
- struct Endpoint_t
- {
- const char* m_szName1;
- const char* m_szName2;
- };
- static Endpoint_t g_dEndpoints[(size_t)EHTTP_ENDPOINT::TOTAL] =
- {
- { "index.html", nullptr },
- { "sql", nullptr },
- { "search", "json/search" },
- { "index", "json/index" },
- { "create", "json/create" },
- { "insert", "json/insert" },
- { "replace", "json/replace" },
- { "update", "json/update" },
- { "delete", "json/delete" },
- { "bulk", "json/bulk" },
- { "pq", "json/pq" },
- { "cli", nullptr },
- { "cli_json", nullptr },
- { "_bulk", nullptr }
- };
- EHTTP_ENDPOINT StrToHttpEndpoint ( const CSphString& sEndpoint ) noexcept
- {
- if ( sEndpoint.Begins ( g_dEndpoints[(int)EHTTP_ENDPOINT::PQ].m_szName1 ) || sEndpoint.Begins ( g_dEndpoints[(int)EHTTP_ENDPOINT::PQ].m_szName2 ) )
- return EHTTP_ENDPOINT::PQ;
- for ( int i = 0; i < (int)EHTTP_ENDPOINT::TOTAL; ++i )
- if ( sEndpoint == g_dEndpoints[i].m_szName1 || ( g_dEndpoints[i].m_szName2 && sEndpoint == g_dEndpoints[i].m_szName2 ) )
- return EHTTP_ENDPOINT ( i );
- return EHTTP_ENDPOINT::TOTAL;
- }
- ///////////////////////////////////////////////////////////////////////
- /// Stream reader
- bool CharStream_c::GetError() const
- {
- return ( ( m_pIn && m_pIn->GetError() ) || !m_sError.IsEmpty() );
- }
- const CSphString & CharStream_c::GetErrorMessage() const
- {
- if ( m_pIn && m_pIn->GetError() )
- return m_pIn->GetErrorMessage();
- else
- return m_sError;
- }
- /// stub - returns feed string
- class BlobStream_c final: public CharStream_c
- {
- Str_t m_sData;
- public:
- BlobStream_c ( const CSphString & sData )
- : CharStream_c ( nullptr )
- , m_sData { FromStr ( sData ) }
- {}
- Str_t Read() final
- {
- if ( m_bDone )
- return dEmptyStr;
- m_bDone = true;
- return m_sData;
- }
- Str_t ReadAll() final
- {
- auto sData = Read();
- Str_t sDescr = { sData.first, Min ( sData.second, 100 ) };
- myinfo::SetDescription ( sDescr, sDescr.second );
- return sData;
- }
- };
- /// stream with known content length and no special massage over socket
- class RawSocketStream_c final : public CharStream_c
- {
- int m_iContentLength;
- bool m_bTerminated = false;
- BYTE m_uOldTerminator = 0;
- CSphVector<BYTE> m_dUnpacked;
- bool m_bCompressed = false;
- public:
- RawSocketStream_c ( AsyncNetInputBuffer_c * pIn, int iContentLength, bool bCompressed )
- : CharStream_c ( pIn )
- , m_iContentLength ( iContentLength )
- , m_bCompressed ( bCompressed )
- {
- assert ( pIn );
- m_bDone = !m_iContentLength;
- }
- ~RawSocketStream_c() final
- {
- if ( m_bTerminated )
- m_pIn->Terminate ( 0, m_uOldTerminator );
- m_pIn->DiscardProcessed ( 0 );
- }
- Str_t Read() final
- {
- if ( m_bDone )
- return dEmptyStr;
- m_pIn->DiscardProcessed ( 0 );
- if ( !m_pIn->HasBytes() && m_pIn->ReadAny()<0 )
- {
- if ( !m_pIn->GetError() )
- m_sError.SetSprintf ( "failed to receive HTTP request (error='%s')", sphSockError() );
- m_bDone = true;
- return dEmptyStr;
- }
- auto iChunk = Min ( m_iContentLength, m_pIn->HasBytes() );
- m_iContentLength -= iChunk;
- m_bDone = !m_iContentLength;
- // Temporary write \0 at the end, since parser wants z-terminated buf
- if ( m_bDone )
- {
- m_uOldTerminator = m_pIn->Terminate ( iChunk, '\0' );
- m_bTerminated = true;
- }
- return Decompress ( m_pIn->PopTail ( iChunk ) );
- }
- Str_t ReadAll() final
- {
- if ( m_bDone )
- return dEmptyStr;
- // that is oneshot read - we sure, we're done
- m_bDone = true;
- if ( m_iContentLength && !m_pIn->ReadFrom ( m_iContentLength ) )
- {
- if ( !m_pIn->GetError() )
- m_sError.SetSprintf ( "failed to receive HTTP request (error='%s')", sphSockError() );
- return dEmptyStr;
- }
- m_uOldTerminator = m_pIn->Terminate ( m_iContentLength, '\0' );
- return Decompress ( m_pIn->PopTail ( m_iContentLength ) );
- }
- Str_t Decompress ( const ByteBlob_t & tIn )
- {
- if ( !m_bCompressed )
- return B2S ( tIn );
- m_dUnpacked.Resize ( 0 );
- if ( !GzipDecompress ( tIn, m_dUnpacked, m_sError ) )
- {
- m_bDone = true;
- return dEmptyStr;
- }
- return Str_t ( m_dUnpacked );
- }
- };
- /// chunked stream - i.e. total content length is unknown
- class ChunkedSocketStream_c final: public CharStream_c
- {
- CSphVector<BYTE> m_dData; // used only in ReadAll() call
- int m_iLastParsed;
- bool m_bBodyDone;
- CSphVector<Str_t> m_dBodies;
- http_parser_settings m_tParserSettings;
- http_parser* m_pParser;
- private:
- // callbacks
- static int cbParserBody ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- assert ( pParser->data );
- auto pThis = static_cast<ChunkedSocketStream_c*> ( pParser->data );
- return pThis->ParserBody ( { sAt, (int)iLen } );
- }
- static int cbMessageComplete ( http_parser* pParser )
- {
- assert ( pParser->data );
- auto pThis = static_cast<ChunkedSocketStream_c*> ( pParser->data );
- return pThis->MessageComplete();
- }
- inline int MessageComplete()
- {
- HTTPINFO << "ChunkedSocketStream_c::MessageComplete";
- m_bBodyDone = true;
- return 0;
- }
- inline int ParserBody ( Str_t sData )
- {
- HTTPINFO << "ParserBody chunked str with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";;
- if ( !IsEmpty ( sData ) )
- m_dBodies.Add ( sData );
- return 0;
- }
- void ParseBody ( ByteBlob_t sData )
- {
- HTTPINFO << "ParseBody chunked blob with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";;
- m_iLastParsed = (int)http_parser_execute ( m_pParser, &m_tParserSettings, (const char*)sData.first, sData.second );
- if ( m_iLastParsed != sData.second )
- {
- HTTPINFO << "ParseBody error: parsed " << m_iLastParsed << ", chunk " << sData.second;
- if ( !m_pIn->GetError() )
- m_sError = http_errno_description ( (http_errno)m_pParser->http_errno );
- }
- }
- public:
- ChunkedSocketStream_c ( AsyncNetInputBuffer_c * pIn, http_parser * pParser, bool bBodyDone, CSphVector<Str_t> dBodies, int iLastParsed )
- : CharStream_c ( pIn )
- , m_iLastParsed ( iLastParsed )
- , m_bBodyDone ( bBodyDone )
- , m_pParser ( pParser )
- {
- assert ( pIn );
- m_dBodies = std::move ( dBodies );
- http_parser_settings_init ( &m_tParserSettings );
- m_tParserSettings.on_body = cbParserBody;
- m_tParserSettings.on_message_complete = cbMessageComplete;
- m_pParser->data = this;
- }
- void DiscardLast()
- {
- m_pIn->PopTail ( std::exchange ( m_iLastParsed, 0 ) );
- m_pIn->DiscardProcessed ( 0 );
- }
- ~ChunkedSocketStream_c() final
- {
- DiscardLast();
- }
- Str_t Read() final
- {
- if ( m_bDone )
- return dEmptyStr;
- while ( m_dBodies.IsEmpty() )
- {
- if ( m_bBodyDone )
- {
- m_bDone = true;
- return dEmptyStr;
- }
- DiscardLast();
- if ( !m_pIn->HasBytes() )
- {
- switch ( m_pIn->ReadAny() )
- {
- case -1:
- case 0:
- m_bDone = true;
- return dEmptyStr;
- default:
- break;
- }
- }
- ParseBody ( m_pIn->Tail() );
- }
- auto sResult = m_dBodies.First();
- m_dBodies.Remove ( 0 );
- if ( m_bBodyDone && m_dBodies.IsEmpty() )
- {
- m_bDone = true;
- const_cast<char&> ( sResult.first[sResult.second] ) = '\0';
- }
- return sResult;
- }
- Str_t ReadAll() final
- {
- auto sFirst = Read();
- if ( m_bDone )
- return sFirst;
- m_dData.Append ( sFirst );
- do
- m_dData.Append ( Read() );
- while ( !m_bDone );
- m_dData.Add ( '\0' );
- m_dData.Resize ( m_dData.GetLength() - 1 );
- return m_dData;
- }
- };
- ///////////////////////////////////////////////////////////////////////
- CSphString HttpEndpointToStr ( EHTTP_ENDPOINT eEndpoint )
- {
- assert ( eEndpoint < EHTTP_ENDPOINT::TOTAL );
- return g_dEndpoints[(int)eEndpoint].m_szName1;
- }
- void HttpBuildReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, Str_t sReply, bool bHtml )
- {
- const char * sContent = ( bHtml ? "text/html" : "application/json" );
- StringBuilder_c sHttp;
- sHttp.Sprintf ( "HTTP/1.1 %s\r\nServer: %s\r\nContent-Type: %s; charset=UTF-8\r\nContent-Length: %d\r\n\r\n", HttpGetStatusName ( eCode ), g_sStatusVersion.cstr(), sContent, sReply.second );
- dData.Reserve ( sHttp.GetLength() + sReply.second );
- dData.Append ( (Str_t)sHttp );
- dData.Append ( sReply );
- }
- HttpRequestParser_c::HttpRequestParser_c()
- {
- http_parser_settings_init ( &m_tParserSettings );
- m_tParserSettings.on_url = cbParserUrl;
- m_tParserSettings.on_header_field = cbParserHeaderField;
- m_tParserSettings.on_header_value = cbParserHeaderValue;
- m_tParserSettings.on_headers_complete = cbParseHeaderCompleted;
- m_tParserSettings.on_body = cbParserBody;
- m_tParserSettings.on_message_begin = cbMessageBegin;
- m_tParserSettings.on_message_complete = cbMessageComplete;
- m_tParserSettings.on_status = cbMessageStatus;
- Reinit();
- }
- void HttpRequestParser_c::Reinit()
- {
- HTTPINFO << "HttpRequestParser_c::Reinit()";
- http_parser_init ( &m_tParser, HTTP_REQUEST );
- m_sEndpoint = "";
- m_sCurField.Clear();
- m_sCurValue.Clear();
- m_hOptions.Reset();
- m_eType = HTTP_GET;
- m_sUrl.Clear();
- m_bHeaderDone = false;
- m_bBodyDone = false;
- m_dParsedBodies.Reset();
- m_iParsedBodyLength = 0;
- m_iLastParsed = 0;
- m_szError = nullptr;
- m_tParser.data = this;
- }
- bool HttpRequestParser_c::ParseHeader ( ByteBlob_t sData )
- {
- HTTPINFO << "ParseChunk with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
- m_iLastParsed = (int) http_parser_execute ( &m_tParser, &m_tParserSettings, (const char *)sData.first, sData.second );
- if ( m_iLastParsed != sData.second )
- {
- if ( g_bLogBadHttpReq )
- {
- sphWarning ( "ParseChunk error: parsed %d, chunk %d, conn %d, %.*s", m_iLastParsed, sData.second, session::GetConnID(), sData.second, sData.first );
- } else
- {
- HTTPINFO << "ParseChunk error: parsed " << m_iLastParsed << ", chunk " << sData.second;
- }
- m_szError = http_errno_description ( (http_errno)m_tParser.http_errno );
- return true;
- }
- return m_bHeaderDone;
- }
- int HttpRequestParser_c::ParsedBodyLength() const
- {
- return m_iParsedBodyLength;
- }
- bool HttpRequestParser_c::Expect100() const
- {
- return m_hOptions.Exists ( "expect" ) && m_hOptions["expect"] == "100-continue";
- }
- bool HttpRequestParser_c::KeepAlive() const
- {
- return m_bKeepAlive;
- }
- const char* HttpRequestParser_c::Error() const
- {
- return m_szError;
- }
- bool HttpRequestParser_c::IsBuddyQuery () const
- {
- return ::IsBuddyQuery ( m_hOptions );
- }
- inline int Char2Hex ( BYTE uChar )
- {
- switch (uChar)
- {
- case '0': return 0;
- case '1': return 1;
- case '2': return 2;
- case '3': return 3;
- case '4': return 4;
- case '5': return 5;
- case '6': return 6;
- case '7': return 7;
- case '8': return 8;
- case '9': return 9;
- case 'a': case 'A': return 10;
- case 'b': case 'B': return 11;
- case 'c': case 'C': return 12;
- case 'd': case 'D': return 13;
- case 'e': case 'E': return 14;
- case 'f': case 'F': return 15;
- default: break;
- }
- return -1;
- }
- inline int Chars2Hex ( const char* pSrc )
- {
- int iRes = Char2Hex ( *( pSrc + 1 ) );
- return iRes < 0 ? iRes : iRes + Char2Hex ( *pSrc ) * 16;
- }
- void UriPercentReplace ( Str_t & sEntity, Replace_e ePlus )
- {
- if ( IsEmpty ( sEntity ) )
- return;
- const char* pSrc = sEntity.first;
- auto* pDst = const_cast<char*> ( pSrc );
- char cPlus = ((bool)ePlus) ? ' ' : '+';
- auto* pEnd = pSrc + sEntity.second;
- while ( pSrc < pEnd )
- {
- if ( *pSrc=='%' && *(pSrc+1) && *(pSrc+2) )
- {
- auto iCode = Chars2Hex ( pSrc + 1 );
- if ( iCode<0 )
- {
- *pDst++ = *pSrc++;
- continue;
- }
- pSrc += 3;
- *pDst++ = (char) iCode;
- } else
- {
- *pDst++ = ( *pSrc=='+' ? cPlus : *pSrc );
- pSrc++;
- }
- }
- sEntity.second = int ( pDst - sEntity.first );
- }
- void StoreRawQuery ( OptionsHash_t& hOptions, CSphString sRawBody )
- {
- if ( sRawBody.IsEmpty() )
- return;
- hOptions.Add ( std::move ( sRawBody ), "raw_query" );
- }
- void DecodeAndStoreRawQuery ( OptionsHash_t& hOptions, const Str_t& sWholeData )
- {
- if ( IsEmpty ( sWholeData ) )
- return;
- // store raw query
- CSphString sRawBody ( sWholeData ); // copy raw data, important!
- Str_t sRaw { sRawBody.cstr(), sWholeData.second }; // FromStr implies strlen(), but we don't need it
- UriPercentReplace ( sRaw, Replace_e::NoPlus ); // avoid +-decoding
- *const_cast<char*> ( sRaw.first + sRaw.second ) = '\0';
- StoreRawQuery ( hOptions, std::move ( sRawBody ));
- }
- void HttpRequestParser_c::ParseList ( Str_t sData, OptionsHash_t & hOptions )
- {
- HTTPINFO << "ParseList with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
- CSphString sBuf ( sData );
- const char * sCur = sBuf.cstr();
- const char * sLast = sCur;
- const char * sEnd = sCur + sData.second;
- Str_t sName = dEmptyStr;
- for ( ; sCur<sEnd; ++sCur )
- {
- switch (*sCur)
- {
- case '=':
- {
- sName = { sLast, int ( sCur - sLast ) };
- UriPercentReplace ( sName );
- sLast = sCur + 1;
- break;
- }
- case '&':
- {
- Str_t sVal { sLast, int ( sCur - sLast ) };
- UriPercentReplace ( sVal );
- ToLower ( sName );
- hOptions.Add ( sVal, sName );
- sLast = sCur + 1;
- sName = dEmptyStr;
- break;
- }
- default:
- break;
- }
- }
- if ( IsEmpty ( sName ) )
- return;
- Str_t sVal { sLast, int ( sCur - sLast ) };
- UriPercentReplace ( sVal );
- ToLower ( sName );
- hOptions.Add ( sVal, sName );
- }
- inline int HttpRequestParser_c::ParserUrl ( Str_t sData )
- {
- HTTPINFO << "ParseUrl with " << sData.second << " bytes '" << sData << "'";
- m_sUrl << sData;
- return 0;
- }
- inline void HttpRequestParser_c::FinishParserUrl ()
- {
- if ( m_sUrl.IsEmpty() )
- return;
- auto _ = AtScopeExit ( [this] { m_sUrl.Clear(); } );
- auto sData = (Str_t) m_sUrl;
- http_parser_url tUri;
- if ( http_parser_parse_url ( sData.first, sData.second, 0, &tUri ) )
- return;
- DWORD uPath = ( 1UL<<UF_PATH );
- DWORD uQuery = ( 1UL<<UF_QUERY );
- if ( ( tUri.field_set & uPath )!=0 )
- {
- const char * sPath = sData.first + tUri.field_data[UF_PATH].off;
- int iPathLen = tUri.field_data[UF_PATH].len;
- if ( *sPath=='/' )
- {
- ++sPath;
- --iPathLen;
- }
- // URL should be split fully to point to proper endpoint
- m_sEndpoint.SetBinary ( sPath, iPathLen );
- // transfer endpoint for further parse
- m_hOptions.Add ( m_sEndpoint, "endpoint" );
- }
- if ( ( tUri.field_set & uQuery )!=0 )
- {
- Str_t sRawGetQuery { sData.first + tUri.field_data[UF_QUERY].off, tUri.field_data[UF_QUERY].len };
- if ( m_eType==HTTP_GET )
- DecodeAndStoreRawQuery ( m_hOptions, sRawGetQuery );
- ParseList ( sRawGetQuery, m_hOptions );
- }
- CSphString sFullURL;
- if ( ( tUri.field_set & uPath )!=0 && ( tUri.field_set & uQuery )!=0 )
- {
- const char * sStart = sData.first + tUri.field_data[UF_PATH].off;
- const char * sEnd = sData.first + tUri.field_data[UF_QUERY].off + tUri.field_data[UF_QUERY].len;
- sFullURL.SetBinary ( sStart, sEnd-sStart );
- m_hOptions.Add ( sFullURL, "full_url" );
- } else if ( ( tUri.field_set & uPath )!=0 )
- {
- const char * sPath = sData.first + tUri.field_data[UF_PATH].off;
- int iPathLen = tUri.field_data[UF_PATH].len;
- // URL should be split fully to point to proper endpoint
- sFullURL.SetBinary ( sPath, iPathLen );
- m_hOptions.Add ( sFullURL, "full_url" );
- }
- }
- inline int HttpRequestParser_c::ParserHeaderField ( Str_t sData )
- {
- FinishParserKeyVal();
- m_sCurField << sData;
- return 0;
- }
- inline int HttpRequestParser_c::ParserHeaderValue ( Str_t sData )
- {
- m_sCurValue << sData;
- return 0;
- }
- inline void HttpRequestParser_c::FinishParserKeyVal()
- {
- if ( m_sCurValue.IsEmpty() )
- return;
- CSphString sField = (CSphString)m_sCurField;
- sField.ToLower();
- m_hOptions.Add ( (CSphString)m_sCurValue, sField );
- m_sCurField.Clear();
- m_sCurValue.Clear();
- }
- inline int HttpRequestParser_c::ParserBody ( Str_t sData )
- {
- HTTPINFO << "ParserBody parser with " << sData.second << " bytes '" << Data2Log ( sData ) << "'";
- if ( !m_dParsedBodies.IsEmpty() )
- {
- auto& sLast = m_dParsedBodies.Last();
- if ( sLast.first + sLast.second == sData.first )
- sLast.second += sData.second;
- else
- m_dParsedBodies.Add ( sData );
- } else
- m_dParsedBodies.Add ( sData );
- m_iParsedBodyLength += sData.second;
- return 0;
- }
- inline int HttpRequestParser_c::MessageComplete ()
- {
- HTTPINFO << "MessageComplete";
- m_bBodyDone = true;
- return 0;
- }
- inline int HttpRequestParser_c::ParseHeaderCompleted ()
- {
- HTTPINFO << "ParseHeaderCompleted. Upgrade=" << (unsigned int)m_tParser.upgrade << ", length=" << (int64_t) m_tParser.content_length;
- // we're not support connection upgrade - so just reset upgrade flag, if detected.
- // rfc7540 section-3.2 (for http/2) says, we just should continue as if no 'upgrade' header was found
- m_tParser.upgrade = 0;
- // connection wide http options
- m_bKeepAlive = ( ( m_tParser.flags & F_CONNECTION_KEEP_ALIVE ) != 0 );
- m_eType = (http_method)m_tParser.method;
- FinishParserKeyVal();
- FinishParserUrl();
- m_bHeaderDone = true;
- return 0;
- }
- int HttpRequestParser_c::cbParserUrl ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->ParserUrl ( { sAt, (int)iLen } );
- }
- int HttpRequestParser_c::cbParserHeaderField ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->ParserHeaderField ( { sAt, (int)iLen } );
- }
- int HttpRequestParser_c::cbParserHeaderValue ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->ParserHeaderValue ( { sAt, (int)iLen } );
- }
- int HttpRequestParser_c::cbParseHeaderCompleted ( http_parser* pParser )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->ParseHeaderCompleted ();
- }
- int HttpRequestParser_c::cbMessageBegin ( http_parser* pParser )
- {
- HTTPINFO << "cbMessageBegin";
- return 0;
- }
- int HttpRequestParser_c::cbMessageComplete ( http_parser* pParser )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->MessageComplete();
- }
- int HttpRequestParser_c::cbMessageStatus ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- HTTPINFO << "cbMessageStatus with '" << Str_t { sAt, (int)iLen } << "'";
- return 0;
- }
- int HttpRequestParser_c::cbParserBody ( http_parser* pParser, const char* sAt, size_t iLen )
- {
- assert ( pParser->data );
- auto pThis = static_cast<HttpRequestParser_c*> ( pParser->data );
- return pThis->ParserBody ( { sAt, (int)iLen } );
- }
- static const char * g_sIndexPage =
- R"index(<!DOCTYPE html>
- <html>
- <head>
- <title>Manticore</title>
- </head>
- <body>
- <h1>Manticore daemon</h1>
- <p>%s</p>
- </body>
- </html>)index";
- static void HttpHandlerIndexPage ( CSphVector<BYTE> & dData )
- {
- StringBuilder_c sIndexPage;
- sIndexPage.Appendf ( g_sIndexPage, g_sStatusVersion.cstr() );
- HttpBuildReply ( dData, EHTTP_STATUS::_200, (Str_t)sIndexPage, true );
- }
- //////////////////////////////////////////////////////////////////////////
- class JsonRequestBuilder_c : public RequestBuilder_i
- {
- public:
- JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint )
- : m_sEndpoint ( std::move ( sEndpoint ) )
- , m_tQuery ( szQuery )
- {
- // fixme: we can implement replacing indexes in a string (without parsing) if it becomes a performance issue
- }
- void BuildRequest ( const AgentConn_t & tAgent, ISphOutputBuffer & tOut ) const final
- {
- // replace "table" value in the json query
- m_tQuery.DelItem ( "table" );
- m_tQuery.AddStr ( "table", tAgent.m_tDesc.m_sIndexes.cstr() );
- CSphString sRequest = m_tQuery.AsString();
- auto tWr = APIHeader ( tOut, SEARCHD_COMMAND_JSON, VER_COMMAND_JSON ); // API header
- tOut.SendString ( m_sEndpoint.cstr() );
- tOut.SendString ( sRequest.cstr() );
- }
- private:
- CSphString m_sEndpoint;
- mutable JsonObj_c m_tQuery;
- };
- class JsonReplyParser_c : public ReplyParser_i
- {
- public:
- JsonReplyParser_c ( int & iAffected, int & iWarnings )
- : m_iAffected ( iAffected )
- , m_iWarnings ( iWarnings )
- {}
- bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & ) const final
- {
- CSphString sEndpoint = tReq.GetString();
- EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( sEndpoint );
- if ( eEndpoint!=EHTTP_ENDPOINT::JSON_UPDATE && eEndpoint!=EHTTP_ENDPOINT::JSON_DELETE )
- return false;
- DWORD uLength = tReq.GetDword();
- CSphFixedVector<BYTE> dResult ( uLength+1 );
- tReq.GetBytes ( dResult.Begin(), (int)uLength );
- dResult[uLength] = '\0';
- return sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE );
- }
- protected:
- int & m_iAffected;
- int & m_iWarnings;
- };
- std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept
- {
- return bJson ? sphCreateJsonQueryParser() : sphCreatePlainQueryParser();
- }
- std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const SqlStmt_t & tStmt )
- {
- if ( tStmt.m_bJson )
- {
- assert ( !tStmt.m_sEndpoint.IsEmpty() );
- return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint );
- } else
- {
- return std::make_unique<SphinxqlRequestBuilder_c> ( sQuery, tStmt );
- }
- }
- std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings )
- {
- if ( bJson )
- return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings );
- else
- return std::make_unique<SphinxqlReplyParser_c> ( &iUpdated, &iWarnings );
- }
- //////////////////////////////////////////////////////////////////////////
- class HttpErrorReporter_c final : public StmtErrorReporter_i
- {
- public:
- void Ok ( int iAffectedRows, const CSphString & /*sWarning*/, int64_t /*iLastInsertId*/ ) final { m_iAffected = iAffectedRows; }
- void Ok ( int iAffectedRows, int /*nWarnings*/ ) final { m_iAffected = iAffectedRows; }
- void ErrorEx ( EMYSQL_ERR eErr, const char * sError ) final;
- RowBuffer_i * GetBuffer() final { return nullptr; }
- bool IsError() const { return m_bError; }
- const char * GetError() const { return m_sError.cstr(); }
- int GetAffectedRows() const { return m_iAffected; }
- private:
- bool m_bError {false};
- CSphString m_sError;
- int m_iAffected {0};
- };
- void HttpErrorReporter_c::ErrorEx ( EMYSQL_ERR /*iErr*/, const char * sError )
- {
- m_bError = true;
- m_sError = sError;
- }
- StmtErrorReporter_i * CreateHttpErrorReporter()
- {
- return new HttpErrorReporter_c();
- }
- //////////////////////////////////////////////////////////////////////////
- // all the handlers for http queries
- void ReplyBuf ( Str_t sResult, EHTTP_STATUS eStatus, bool bNeedHttpResponse, CSphVector<BYTE> & dData )
- {
- if ( bNeedHttpResponse )
- HttpBuildReply ( dData, eStatus, sResult, false );
- else
- {
- dData.Resize ( 0 );
- dData.Append ( sResult );
- }
- }
- void HttpHandler_c::SetErrorFormat ( bool bNeedHttpResponse )
- {
- m_bNeedHttpResponse = bNeedHttpResponse;
- }
-
- CSphVector<BYTE> & HttpHandler_c::GetResult()
- {
- return m_dData;
- }
- const CSphString & HttpHandler_c::GetError () const
- {
- return m_sError;
- }
- EHTTP_STATUS HttpHandler_c::GetStatusCode() const
- {
- return m_eHttpCode;
- }
- void HttpHandler_c::ReportError ( const char * szError, EHTTP_STATUS eStatus )
- {
- m_sError = szError;
- ReportError ( eStatus );
- }
- void HttpHandler_c::ReportError ( EHTTP_STATUS eStatus )
- {
- m_eHttpCode = eStatus;
- if ( m_bNeedHttpResponse )
- sphHttpErrorReply ( m_dData, eStatus, m_sError.cstr() );
- else
- {
- m_dData.Resize ( m_sError.Length() );
- memcpy ( m_dData.Begin(), m_sError.cstr(), m_dData.GetLength() );
- }
- }
- void HttpHandler_c::FormatError ( EHTTP_STATUS eStatus, const char * sError, ... )
- {
- va_list ap;
- va_start ( ap, sError );
- m_sError.SetSprintfVa ( sError, ap );
- va_end ( ap );
- m_eHttpCode = eStatus;
- if ( m_bNeedHttpResponse )
- sphHttpErrorReply ( m_dData, eStatus, m_sError.cstr() );
- else
- {
- int iLen = m_sError.Length();
- m_dData.Resize ( iLen+1 );
- memcpy ( m_dData.Begin(), m_sError.cstr(), iLen );
- m_dData[iLen] = '\0';
- }
- }
- void HttpHandler_c::ReportError ( const char * sError, HttpErrorType_e eType, EHTTP_STATUS eStatus, const char * sIndex )
- {
- if ( sError )
- m_sError = sError;
- m_eHttpCode = eStatus;
- const char * sErrorType = GetErrorTypeName ( eType );
- int iStatus = HttpGetStatusCodes ( eStatus );
- CSphString sReply = ( sErrorType ? JsonEncodeResultError ( m_sError, sErrorType, iStatus, sIndex ) : JsonEncodeResultError ( m_sError, iStatus ) );
- HttpBuildReplyHead ( GetResult(), eStatus, sReply.cstr(), sReply.Length(), false );
- }
- void HttpHandler_c::BuildReply ( const CSphString & sResult, EHTTP_STATUS eStatus )
- {
- m_eHttpCode = eStatus;
- ReplyBuf ( FromStr ( sResult ), eStatus, m_bNeedHttpResponse, m_dData );
- }
- void HttpHandler_c::BuildReply ( const char* szResult, EHTTP_STATUS eStatus )
- {
- m_eHttpCode = eStatus;
- ReplyBuf ( FromSz( szResult ), eStatus, m_bNeedHttpResponse, m_dData );
- }
- void HttpHandler_c::BuildReply ( const StringBuilder_c & sResult, EHTTP_STATUS eStatus )
- {
- m_eHttpCode = eStatus;
- ReplyBuf ( (Str_t)sResult, eStatus, m_bNeedHttpResponse, m_dData );
- }
- void HttpHandler_c::BuildReply ( Str_t sResult, EHTTP_STATUS eStatus )
- {
- m_eHttpCode = eStatus;
- ReplyBuf ( sResult, eStatus, m_bNeedHttpResponse, m_dData );
- }
- // check whether given served index is exist and has requested type
- bool HttpHandler_c::CheckValid ( const ServedIndex_c* pServed, const CSphString& sIndex, IndexType_e eType )
- {
- if ( !pServed )
- {
- FormatError ( EHTTP_STATUS::_500, "no such table '%s'", sIndex.cstr () );
- return false;
- }
- if ( pServed->m_eType!=eType )
- {
- FormatError ( EHTTP_STATUS::_500, "table '%s' is not %s", sIndex.cstr(), GetIndexTypeName ( eType ) );
- return false;
- }
- return true;
- }
- struct HttpOptionTrait_t
- {
- const OptionsHash_t & m_tOptions;
- explicit HttpOptionTrait_t ( const OptionsHash_t & tOptions )
- : m_tOptions ( tOptions )
- {}
- };
- class HttpSearchHandler_c : public HttpHandler_c, public HttpOptionTrait_t
- {
- public:
- bool Process () final
- {
- TRACE_CONN ( "conn", "HttpSearchHandler_c::Process" );
- CSphString sWarning;
- std::unique_ptr<QueryParser_i> pQueryParser = PreParseQuery();
- if ( !pQueryParser )
- return false;
- if ( IsBuddyQuery ( m_tOptions ) )
- m_tParsed.m_tQuery.m_uDebugFlags |= QUERY_DEBUG_NO_LOG;
- auto tHandler = CreateMsearchHandler ( std::move ( pQueryParser ), m_eQueryType, m_tParsed );
- SetStmt ( tHandler );
- QueryProfile_c tProfile;
- tProfile.m_eNeedPlan = (PLAN_FLAVOUR)m_tParsed.m_iPlan;
- tProfile.m_bNeedProfile = m_tParsed.m_bProfile;
- bool bNeedProfile = m_tParsed.m_bProfile || ( m_tParsed.m_iPlan != 0 );
- if ( bNeedProfile )
- tHandler.SetProfile ( &tProfile );
- // search
- tHandler.RunQueries();
- if ( bNeedProfile )
- tProfile.Stop();
- AggrResult_t & tRes = tHandler.m_dAggrResults.First();
- if ( !tRes.m_sError.IsEmpty() )
- {
- ReportError ( tRes.m_sError.cstr(), EHTTP_STATUS::_500 );
- return false;
- }
- // fixme: handle more than one warning at once?
- if ( tRes.m_sWarning.IsEmpty() && !m_tParsed.m_sWarning.IsEmpty() )
- tRes.m_sWarning = m_tParsed.m_sWarning;
- CSphString sResult = EncodeResult ( tHandler.m_dAggrResults, bNeedProfile ? &tProfile : nullptr );
- BuildReply ( sResult, EHTTP_STATUS::_200 );
- return true;
- }
- explicit HttpSearchHandler_c ( const OptionsHash_t & tOptions )
- : HttpOptionTrait_t ( tOptions )
- {}
- protected:
- QueryType_e m_eQueryType {QUERY_SQL};
- ParsedJsonQuery_t m_tParsed;
- virtual std::unique_ptr<QueryParser_i> PreParseQuery() = 0;
- CSphString EncodeResult ( const VecTraits_T<AggrResult_t> & dRes, QueryProfile_c * pProfile )
- {
- return sphEncodeResultJson ( dRes, m_tParsed.m_tQuery, pProfile, ResultSetFormat_e::MntSearch );
- }
- virtual void SetStmt ( SearchHandler_c & tHandler ) {};
- };
- static void AddAggs ( const VecTraits_T<SqlStmt_t> & dStmt, JsonQuery_c & tQuery )
- {
- assert ( dStmt.GetLength()>1 && dStmt[0].m_tQuery.m_bFacetHead );
- tQuery.m_dAggs.Reserve ( dStmt.GetLength()-1 );
- for ( int i=1; i<dStmt.GetLength(); i++ )
- {
- const CSphQuery & tRef = dStmt[i].m_tQuery;
- assert ( tRef.m_dRefItems.GetLength() );
- JsonAggr_t & tBucket = tQuery.m_dAggs.Add();
- tBucket.m_sBucketName = tRef.m_dRefItems[0].m_sExpr;
- tBucket.m_sCol = tRef.m_dRefItems[0].m_sAlias;
- }
- }
- class HttpSearchHandler_SQL_c final: public HttpSearchHandler_c
- {
- public:
- explicit HttpSearchHandler_SQL_c ( const OptionsHash_t & tOptions )
- : HttpSearchHandler_c ( tOptions )
- {}
- protected:
- CSphVector<SqlStmt_t> m_dStmt;
- std::unique_ptr<QueryParser_i> PreParseQuery() final
- {
- const CSphString * pRawQl = m_tOptions ( "query" );
- if ( !pRawQl || pRawQl->IsEmpty() )
- {
- ReportError ( "query missing", EHTTP_STATUS::_400 );
- return nullptr;
- }
- if ( !sphParseSqlQuery ( FromStr ( *pRawQl ), m_dStmt, m_sError, SPH_COLLATION_DEFAULT ) )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return nullptr;
- }
- ( (CSphQuery &) m_tParsed.m_tQuery ) = m_dStmt[0].m_tQuery;
- bool bFacet = ( m_dStmt.GetLength()>1 );
- for ( const auto & tStmt : m_dStmt )
- {
- // should be all FACET in case of multiple queries
- bFacet &= ( tStmt.m_tQuery.m_bFacet || tStmt.m_tQuery.m_bFacetHead );
- if ( tStmt.m_eStmt!=STMT_SELECT )
- {
- ReportError ( "only SELECT queries are supported", EHTTP_STATUS::_501 );
- return nullptr;
- }
- }
- if ( m_dStmt.GetLength()>1 && !bFacet )
- {
- ReportError ( "only FACET multiple queries supported", EHTTP_STATUS::_501 );
- return nullptr;
- }
- if ( bFacet )
- AddAggs ( m_dStmt, m_tParsed.m_tQuery );
- m_eQueryType = QUERY_SQL;
- return sphCreatePlainQueryParser();
- }
- void SetStmt ( SearchHandler_c & tHandler )
- {
- tHandler.m_pStmt = &m_dStmt[0];
- for ( int i=1; i<m_dStmt.GetLength(); ++i )
- tHandler.SetQuery ( i, m_dStmt[i].m_tQuery, nullptr );
- }
- };
- typedef std::pair<CSphString,MysqlColumnType_e> ColumnNameType_t;
- static const char * GetMysqlTypeName ( MysqlColumnType_e eType )
- {
- switch ( eType )
- {
- case MYSQL_COL_DECIMAL: return "decimal";
- case MYSQL_COL_LONG: return "long";
- case MYSQL_COL_FLOAT: return "float";
- case MYSQL_COL_DOUBLE: return "double";
- case MYSQL_COL_LONGLONG: return "long long";
- case MYSQL_COL_STRING: return "string";
- default: return "unknown";
- };
- }
- static MysqlColumnType_e GetMysqlTypeByName ( const CSphString& sType )
- {
- if ( sType=="decimal")
- return MYSQL_COL_DECIMAL;
- if ( sType == "long" )
- return MYSQL_COL_LONG;
- if ( sType == "float" )
- return MYSQL_COL_FLOAT;
- if ( sType == "double" )
- return MYSQL_COL_DOUBLE;
- if ( sType == "long long" )
- return MYSQL_COL_LONGLONG;
- if ( sType == "string" )
- return MYSQL_COL_STRING;
- assert (false && "Unknown column");
- return MYSQL_COL_STRING;
- }
- JsonEscapedBuilder& operator<< ( JsonEscapedBuilder& tOut, MysqlColumnType_e eType )
- {
- tOut.FixupSpacedAndAppendEscaped ( GetMysqlTypeName ( eType ) );
- return tOut;
- }
- const StrBlock_t dJsonObjCustom { { ",\n", 2 }, { "[", 1 }, { "]", 1 } }; // json object with custom formatting
- class JsonRowBuffer_c : public RowBuffer_i
- {
- public:
- JsonRowBuffer_c()
- {
- m_dBuf.StartBlock ( dJsonObjCustom );
- }
- void PutFloatAsString ( float fVal, const char * ) override
- {
- AddDataColumn();
- m_dBuf << fVal;
- }
- void PutDoubleAsString ( double fVal, const char * ) override
- {
- AddDataColumn();
- m_dBuf << fVal;
- }
- void PutNumAsString ( int64_t iVal ) override
- {
- AddDataColumn();
- m_dBuf << iVal;
- }
- void PutNumAsString ( uint64_t uVal ) override
- {
- AddDataColumn();
- m_dBuf << uVal;
- }
- void PutNumAsString ( int iVal ) override
- {
- AddDataColumn();
- m_dBuf << iVal;
- }
-
- void PutNumAsString ( DWORD uVal ) override
- {
- AddDataColumn();
- m_dBuf << uVal;
- }
- void PutArray ( const ByteBlob_t& dBlob, bool ) override
- {
- AddDataColumn();
- m_dBuf.FixupSpacedAndAppendEscaped ( (const char*)dBlob.first, dBlob.second );
- }
- void PutString ( Str_t sMsg ) override
- {
- PutArray ( S2B ( sMsg ), false );
- }
- void PutMicrosec ( int64_t iUsec ) override
- {
- PutNumAsString ( iUsec );
- }
- void PutNULL() override
- {
- AddDataColumn();
- m_dBuf << "null";
- }
- bool Commit() override
- {
- m_dBuf.FinishBlock ( false ); // finish previous item
- m_dBuf.ObjectBlock(); // start new item
- ++m_iTotalRows;
- m_iCol = 0;
- return true;
- }
- void Eof ( bool bMoreResults, int iWarns, const char* ) override
- {
- m_dBuf.FinishBlock ( true ); // last doc, allow empty
- m_dBuf.FinishBlock ( false ); // docs section
- DataFinish ( m_iTotalRows, nullptr, nullptr );
- m_dBuf.FinishBlock ( false ); // root object
- }
- void Error ( const char * szError, EMYSQL_ERR ) override
- {
- auto _ = m_dBuf.Object ( false );
- DataFinish ( 0, szError, nullptr );
- m_bError = true;
- m_sError = szError;
- }
- void Ok ( int iAffectedRows, int iWarns, const char * sMessage, bool bMoreResults, int64_t iLastInsertId ) override
- {
- auto _ = m_dBuf.Object ( false );
- DataFinish ( iAffectedRows, nullptr, sMessage );
- }
- void HeadBegin () override
- {
- m_iTotalRows = 0;
- m_dBuf.ObjectWBlock();
- m_dBuf.Named ( "columns" );
- m_dBuf.ArrayBlock();
- }
- bool HeadEnd ( bool , int ) override
- {
- m_dBuf.FinishBlock(false);
- m_dBuf.Named ( "data" );
- m_dBuf.ArrayWBlock();
- m_dBuf.ObjectBlock();
- return true;
- }
- void HeadColumn ( const char * szName, MysqlColumnType_e eType ) override
- {
- JsonEscapedBuilder sEscapedName;
- sEscapedName.FixupSpacedAndAppendEscaped ( szName );
- ColumnNameType_t tCol { (CSphString)sEscapedName, eType };
- auto _ = m_dBuf.Object(false);
- m_dBuf.AppendName ( tCol.first.cstr(), false );
- auto tTypeBlock = m_dBuf.Object(false);
- m_dBuf.NamedVal ( "type", eType );
- m_dColumns.Add ( tCol );
- }
- void Add ( BYTE ) override {}
- const JsonEscapedBuilder & Finish()
- {
- m_dBuf.FinishBlocks();
- return m_dBuf;
- }
- private:
- JsonEscapedBuilder m_dBuf;
- CSphVector<ColumnNameType_t> m_dColumns;
- int m_iTotalRows = 0;
- int m_iCol = 0;
- void AddDataColumn()
- {
- m_dBuf.AppendName ( m_dColumns[m_iCol].first.cstr(), false );
- ++m_iCol;
- }
- void DataFinish ( int iTotal, const char* szError, const char* szWarning )
- {
- m_dBuf.NamedVal ( "total", iTotal );
- m_dBuf.NamedString ( "error", szError );
- m_dBuf.NamedString ( "warning", szWarning );
- m_iCol = 0;
- m_dColumns.Reset();
- }
- };
- /* Below is typical answer sent back by sql endpoint query mode=raw
- [{
- "columns":[{"id":{"type":"long long"}},{"proto":{"type":"string"}},{"state":{"type":"string"}},{"host":{"type":"string"}},{"connid":{"type":"long long"}},{"killed":{"type":"string"}},{"last cmd":{"type":"string"}}],
- "data":[
- {"id":2,"proto":"http","state":"query","host":"127.0.0.1:50787","connid":9,"killed":"0","last cmd":"select"},
- {"id":1,"proto":"mysql,ssl","state":"query","host":"127.0.0.1:50514","connid":1,"killed":"0","last cmd":"show queries"}
- ],
- "total":2,
- "error":"",
- "warning":""
- }]
- */
- void ConvertJsonDataset ( const JsonObj_c & tRoot, const char * sStmt, RowBuffer_i & tOut )
- {
- assert ( tRoot.IsArray() );
- int iItem = 0;
- int iItemsCount = tRoot.Size();
- CSphString sParseError;
- for ( const auto & tItem : tRoot )
- {
- int iTotal = 0;
- CSphString sError, sWarning;
- if ( !tItem.FetchIntItem ( iTotal, "total", sParseError, true ) )
- {
- tOut.Error ( sParseError.cstr() );
- break;
- }
- if ( !tItem.FetchStrItem ( sError, "error", sParseError, true ) )
- {
- tOut.Error ( sParseError.cstr() );
- break;
- }
- if ( !tItem.FetchStrItem ( sWarning, "warning", sParseError, true ) )
- {
- tOut.Error ( sParseError.cstr() );
- break;
- }
- if ( !sError.IsEmpty() )
- {
- LogSphinxqlError ( sStmt, FromStr ( sError ) );
- session::GetClientSession()->m_sError = sError;
- session::GetClientSession()->m_tLastMeta.m_sError = sError;
- tOut.Error ( sError.cstr() );
- break;
- }
- if ( !iItem ) // only zero result set sets meta
- {
- session::GetClientSession()->m_tLastMeta.m_iTotalMatches = iTotal;
- session::GetClientSession()->m_tLastMeta.m_sWarning = sWarning;
- }
- using ColType_t = std::pair<CSphString, MysqlColumnType_e>;
- CSphVector<ColType_t> dSqlColumns;
- assert ( tItem.IsObj() );
- JsonObj_c tColumnsNode = tItem.GetArrayItem ( "columns", sParseError, true );
- for ( const auto & tColumnNode : tColumnsNode )
- {
- assert ( tColumnNode.IsObj() ); // like {"id":{"type":"long long"}}
- for ( const auto & tColumn : tColumnNode )
- {
- CSphString sType;
- if ( !tColumn.FetchStrItem ( sType, "type", sParseError, false ) )
- return;
- auto eType = GetMysqlTypeByName ( sType );
- dSqlColumns.Add ( { tColumn.Name(), eType } );
- }
- }
- // fill headers
- if ( !dSqlColumns.IsEmpty() )
- {
- tOut.HeadBegin ();
- dSqlColumns.for_each ( [&] ( const auto& tColumn ) { tOut.HeadColumn ( tColumn.first.cstr(), tColumn.second ); } );
- tOut.HeadEnd();
- } else
- {
- // just simple OK reply without table
- tOut.Ok ( iTotal, ( sWarning.IsEmpty() ? 0 : 1 ) );
- break;
- }
- JsonObj_c tDataNodes = tItem.GetItem ( "data" );
- for ( const auto & tDataRow : tDataNodes )
- {
- assert ( tDataRow.IsObj() ); // like {"id":2,"proto":"http","state":"query","host":"127.0.0.1:50787","connid":9,"killed":"0","last cmd":"select"}
- for ( const auto & tDataCol : tDataRow )
- {
- if ( tDataCol.IsInt () )
- tOut.PutNumAsString ( tDataCol.IntVal() );
- else if ( tDataCol.IsDbl () )
- tOut.PutDoubleAsString ( tDataCol.DblVal() );
- else
- tOut.PutString ( tDataCol.StrVal() );
- }
- if ( !tOut.Commit() )
- return;
- }
- tOut.Eof ( iItem+1!=iItemsCount, ( sWarning.IsEmpty() ? 0 : 1 ) );
- iItem++;
- }
- }
- class HttpRawSqlHandler_c final: public HttpHandler_c, public HttpOptionTrait_t
- {
- Str_t m_sQuery;
- public:
- explicit HttpRawSqlHandler_c ( Str_t sQuery, const OptionsHash_t & tOptions )
- : HttpOptionTrait_t ( tOptions )
- , m_sQuery ( sQuery )
- {}
- bool Process () final
- {
- TRACE_CONN ( "conn", "HttpRawSqlHandler_c::Process" );
- if ( IsEmpty ( m_sQuery ) )
- {
- ReportError ( "query missing", EHTTP_STATUS::_400 );
- return false;
- }
- if ( IsBuddyQuery ( m_tOptions ) )
- session::SetQueryDisableLog();
- JsonRowBuffer_c tOut;
- session::Execute ( m_sQuery, tOut );
- if ( tOut.IsError() )
- {
- ReportError ( tOut.GetError().scstr(), EHTTP_STATUS::_500 );
- return false;
- }
- BuildReply ( tOut.Finish(), EHTTP_STATUS::_200 );
- return true;
- }
- };
- class HttpHandler_JsonSearch_c : public HttpSearchHandler_c
- {
- Str_t m_sQuery;
- public:
- explicit HttpHandler_JsonSearch_c ( Str_t sQuery, const OptionsHash_t & tOptions )
- : HttpSearchHandler_c ( tOptions )
- , m_sQuery ( sQuery )
- {}
- std::unique_ptr<QueryParser_i> PreParseQuery() override
- {
- // TODO!!! add parsing collation from the query
- m_tParsed.m_tQuery.m_eCollation = session::GetCollation();
- if ( !sphParseJsonQuery ( m_sQuery, m_tParsed ) )
- {
- ReportError ( TlsMsg::szError(), EHTTP_STATUS::_400 );
- return nullptr;
- }
- m_eQueryType = QUERY_JSON;
- return sphCreateJsonQueryParser();
- }
- };
- class HttpJsonTxnTraits_c
- {
- protected:
- HttpJsonTxnTraits_c() = default;
- explicit HttpJsonTxnTraits_c ( ResultSetFormat_e eFormat )
- : m_eFormat ( eFormat )
- {}
- void ProcessBegin ( const CSphString& sIndex )
- {
- // for now - only local mutable indexes are suitable
- {
- auto pIndex = GetServed ( sIndex );
- if ( !ServedDesc_t::IsMutable ( pIndex ) )
- return;
- }
- HttpErrorReporter_c tReporter;
- sphHandleMysqlBegin ( tReporter, FromStr (sIndex) );
- m_iInserts = 0;
- m_iUpdates = 0;
- }
- bool ProcessCommitRollback ( Str_t sIndex, DocID_t & tDocId, JsonObj_c & tResult, CSphString & sError ) const
- {
- HttpErrorReporter_c tReporter;
- sphHandleMysqlCommitRollback ( tReporter, sIndex, true );
- if ( tReporter.IsError() )
- {
- sError = tReporter.GetError();
- tResult = sphEncodeInsertErrorJson ( sIndex.first, sError.cstr(), m_eFormat );
- } else
- {
- auto iDeletes = tReporter.GetAffectedRows();
- auto dLastIds = session::LastIds();
- if ( !dLastIds.IsEmpty() )
- tDocId = dLastIds[0];
- tResult = sphEncodeTxnResultJson ( sIndex.first, tDocId, m_iInserts, iDeletes, m_iUpdates, m_eFormat );
- }
- return !tReporter.IsError();
- }
- int m_iInserts = 0;
- int m_iUpdates = 0;
- const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
- };
- static bool ProcessInsert ( SqlStmt_t & tStmt, DocID_t & tDocId, JsonObj_c & tResult, CSphString & sError, ResultSetFormat_e eFormat )
- {
- HttpErrorReporter_c tReporter;
- sphHandleMysqlInsert ( tReporter, tStmt );
- if ( tReporter.IsError() )
- {
- sError = tReporter.GetError();
- tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), eFormat );
- } else
- {
- auto dLastIds = session::LastIds();
- if ( !dLastIds.IsEmpty() )
- tDocId = dLastIds[0];
- tResult = sphEncodeInsertResultJson ( tStmt.m_sIndex.cstr(), tStmt.m_eStmt == STMT_REPLACE, tDocId, eFormat );
- }
- return !tReporter.IsError();
- }
- static bool ProcessDelete ( Str_t sRawRequest, const SqlStmt_t& tStmt, DocID_t tDocId, JsonObj_c & tResult, CSphString & sError, ResultSetFormat_e eFormat )
- {
- HttpErrorReporter_c tReporter;
- sphHandleMysqlDelete ( tReporter, tStmt, std::move ( sRawRequest ) );
- if ( tReporter.IsError() )
- {
- sError = tReporter.GetError();
- tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), eFormat );
- } else
- {
- tResult = sphEncodeDeleteResultJson ( tStmt.m_sIndex.cstr(), tDocId, tReporter.GetAffectedRows(), eFormat );
- }
- return !tReporter.IsError();
- }
- class HttpHandler_JsonInsert_c final : public HttpHandler_c
- {
- Str_t m_sQuery;
- bool m_bReplace;
- public:
- HttpHandler_JsonInsert_c ( Str_t sQuery, bool bReplace )
- : m_sQuery ( sQuery )
- , m_bReplace ( bReplace )
- {}
- bool Process () final
- {
- TRACE_CONN ( "conn", "HttpHandler_JsonInsert_c::Process" );
- SqlStmt_t tStmt;
- DocID_t tDocId = 0;
- if ( !sphParseJsonInsert ( m_sQuery, tStmt, tDocId, m_bReplace, m_sError ) )
- {
- ReportError ( nullptr, HttpErrorType_e::Parse, EHTTP_STATUS::_400, tStmt.m_sIndex.cstr() );
- return false;
- }
- tStmt.m_sEndpoint = HttpEndpointToStr ( m_bReplace ? EHTTP_ENDPOINT::JSON_REPLACE : EHTTP_ENDPOINT::JSON_INSERT );
- JsonObj_c tResult = JsonNull;
- bool bResult = ProcessInsert ( tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
- if ( bResult )
- BuildReply ( tResult.AsString(), bResult ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 );
- else
- ReportError ( nullptr, HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_409, tStmt.m_sIndex.cstr() );
- return bResult;
- }
- };
- class HttpJsonUpdateTraits_c
- {
- int m_iLastUpdated = 0;
- protected:
- HttpJsonUpdateTraits_c() = default;
- explicit HttpJsonUpdateTraits_c ( ResultSetFormat_e eFormat )
- : m_eFormat ( eFormat )
- {}
- bool ProcessUpdate ( Str_t sRawRequest, const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult, CSphString & sError )
- {
- HttpErrorReporter_c tReporter;
- sphHandleMysqlUpdate ( tReporter, tStmt, sRawRequest );
- if ( tReporter.IsError() )
- {
- sError = tReporter.GetError();
- tResult = sphEncodeInsertErrorJson ( tStmt.m_sIndex.cstr(), sError.cstr(), m_eFormat );
- } else
- {
- tResult = sphEncodeUpdateResultJson ( tStmt.m_sIndex.cstr(), tDocId, tReporter.GetAffectedRows(), m_eFormat );
- }
- m_iLastUpdated = tReporter.GetAffectedRows();
- return !tReporter.IsError();
- }
- int GetLastUpdated() const
- {
- return m_iLastUpdated;
- }
- const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
- };
- class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
- {
- protected:
- Str_t m_sQuery;
- public:
- explicit HttpHandler_JsonUpdate_c ( Str_t sQuery )
- : m_sQuery ( sQuery )
- {}
- bool Process () final
- {
- TRACE_CONN ( "conn", "HttpHandler_JsonUpdate_c::Process" );
- SqlStmt_t tStmt;
- tStmt.m_bJson = true;
- tStmt.m_tQuery.m_eQueryType = QUERY_JSON;
- tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
- DocID_t tDocId = 0;
- if ( !ParseQuery ( tStmt, tDocId ) )
- {
- ReportError ( nullptr, HttpErrorType_e::Parse, EHTTP_STATUS::_400, tStmt.m_sIndex.cstr() );
- return false;
- }
- JsonObj_c tResult = JsonNull;
- bool bResult = ProcessQuery ( tStmt, tDocId, tResult );
- if ( bResult )
- BuildReply ( tResult.AsString(), bResult ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 );
- else
- ReportError ( nullptr, HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_409, tStmt.m_sIndex.cstr() );
- return bResult;
- }
- protected:
- virtual bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId )
- {
- return sphParseJsonUpdate ( m_sQuery, tStmt, tDocId, m_sError );
- }
- virtual bool ProcessQuery ( const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult )
- {
- return ProcessUpdate ( m_sQuery, tStmt, tDocId, tResult, m_sError );
- }
- };
- class HttpHandler_JsonDelete_c final : public HttpHandler_JsonUpdate_c
- {
- public:
- explicit HttpHandler_JsonDelete_c ( Str_t sQuery )
- : HttpHandler_JsonUpdate_c ( sQuery )
- {}
- protected:
- bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId ) final
- {
- tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
- return sphParseJsonDelete ( m_sQuery, tStmt, tDocId, m_sError );
- }
- bool ProcessQuery ( const SqlStmt_t & tStmt, DocID_t tDocId, JsonObj_c & tResult ) final
- {
- return ProcessDelete ( m_sQuery, tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
- }
- };
- // stream for lines - each 'Read()' returns single line (lines split by \r or \n)
- class NDJsonStream_c
- {
- CharStream_c & m_tIn;
- CSphVector<char> m_dLastChunk;
- Str_t m_sCurChunk { dEmptyStr };
- bool m_bDone;
- int m_iJsons = 0;
- public:
- explicit NDJsonStream_c ( CharStream_c& tIn )
- : m_tIn { tIn }
- , m_bDone { m_tIn.Eof() }
- {}
- inline bool Eof() const { return m_bDone;}
- Str_t ReadLine()
- {
- assert ( !m_bDone );
- while (true)
- {
- if ( IsEmpty ( m_sCurChunk ) )
- {
- if ( m_tIn.Eof() || m_tIn.GetError() )
- break;
- m_sCurChunk = m_tIn.Read();
- }
- const char* szLine = m_sCurChunk.first;
- const char* pEnd = szLine + m_sCurChunk.second;
- const char* p = szLine;
- while ( p<pEnd && *p!='\r' && *p!='\n' )
- ++p;
- if ( p==pEnd )
- {
- m_dLastChunk.Append ( szLine, p-szLine );
- m_sCurChunk = dEmptyStr;
- continue;
- }
- *( const_cast<char*> ( p ) ) = '\0';
- ++p;
- m_sCurChunk = { p, pEnd - p };
- Str_t sResult;
- if ( m_dLastChunk.IsEmpty () )
- {
- sResult = { szLine, p - szLine - 1 };
- // that is commented out, as we better will deal with empty strings on parser level instead.
- // if ( IsEmpty ( sResult ) )
- // continue;
- ++m_iJsons;
- HTTPINFO << "chunk " << m_iJsons << " '" << Data2Log ( sResult ) << "'";;
- } else
- {
- m_dLastChunk.Append ( szLine, p - szLine );
- sResult = m_dLastChunk;
- --sResult.second; // exclude terminating \0
- m_dLastChunk.Resize ( 0 );
- ++m_iJsons;
- HTTPINFO << "chunk last " << m_iJsons << " '" << Data2Log ( sResult ) << "'";;
- }
- return sResult;
- }
- m_bDone = true;
- m_dLastChunk.Add ( '\0' );
- m_dLastChunk.Resize ( m_dLastChunk.GetLength() - 1 );
- Str_t sResult = m_dLastChunk;
- ++m_iJsons;
- HTTPINFO << "chunk termination " << m_iJsons << " '" << Data2Log ( sResult ) << "'";
- return sResult;
- }
- bool GetError() const { return m_tIn.GetError(); }
- const CSphString & GetErrorMessage() const { return m_tIn.GetErrorMessage(); }
- };
- static Str_t TrimHeadSpace ( Str_t tLine )
- {
- if ( IsEmpty ( tLine ) )
- return tLine;
- const char * sCur = tLine.first;
- const char * sEnd = sCur + tLine.second;
- while ( sCur<sEnd && sphIsSpace ( *sCur ) )
- sCur++;
- return Str_t { sCur, sEnd-sCur };
- }
- class HttpHandler_JsonBulk_c : public HttpHandler_c, public HttpJsonUpdateTraits_c, public HttpJsonTxnTraits_c
- {
- protected:
- NDJsonStream_c m_tSource;
- const OptionsHash_t& m_tOptions;
- public:
- HttpHandler_JsonBulk_c ( CharStream_c& tSource, const OptionsHash_t & tOptions )
- : m_tSource ( tSource )
- , m_tOptions ( tOptions )
- {}
- bool Process ()
- {
- TRACE_CONN ( "conn", "HttpHandler_JsonBulk_c::Process" );
- if ( !CheckNDJson() )
- return false;
- JsonObj_c tResults ( true );
- bool bResult = false;
- int iCurLine = 0;
- int iLastTxStartLine = 0;
- auto FinishBulk = [&, this] ( EHTTP_STATUS eStatus = EHTTP_STATUS::_200 ) {
- JsonObj_c tRoot;
- tRoot.AddItem ( "items", tResults );
- tRoot.AddInt ( "current_line", iCurLine );
- tRoot.AddInt ( "skipped_lines", iCurLine - iLastTxStartLine );
- tRoot.AddBool ( "errors", !bResult );
- tRoot.AddStr ( "error", m_sError.IsEmpty() ? "" : m_sError );
- if ( eStatus == EHTTP_STATUS::_200 && !bResult )
- eStatus = EHTTP_STATUS::_500;
- BuildReply ( tRoot.AsString(), eStatus );
- HTTPINFO << "inserted " << iCurLine;
- return bResult;
- };
- auto AddResult = [&tResults] ( const char* szStmt, JsonObj_c& tResult ) {
- JsonObj_c tItem;
- tItem.AddItem ( szStmt, tResult );
- tResults.AddItem ( tItem );
- };
- if ( m_tSource.Eof() )
- return FinishBulk();
- // originally we execute txn for single index
- // if there is combo, we fall back to query-by-query commits
- CSphString sTxnIdx;
- CSphString sStmt;
- while ( !m_tSource.Eof() )
- {
- auto tQuery = m_tSource.ReadLine();
- tQuery = TrimHeadSpace ( tQuery ); // could be a line with only whitespace chars
- ++iCurLine;
- DocID_t tDocId = 0;
- JsonObj_c tResult = JsonNull;
- if ( IsEmpty ( tQuery ) )
- {
- if ( session::IsInTrans() )
- {
- assert ( !sTxnIdx.IsEmpty() );
- // empty query finishes current txn
- bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
- AddResult ( "bulk", tResult );
- if ( !bResult )
- break;
- sTxnIdx = "";
- iLastTxStartLine = iCurLine;
- }
- continue;
- }
- bResult = false;
- auto& tCrashQuery = GlobalCrashQueryGetRef();
- tCrashQuery.m_dQuery = { (const BYTE*) tQuery.first, tQuery.second };
- const char* szStmt = tQuery.first;
- SqlStmt_t tStmt;
- tStmt.m_bJson = true;
- CSphString sQuery;
- if ( !sphParseJsonStatement ( szStmt, tStmt, sStmt, sQuery, tDocId, m_sError ) )
- {
- HTTPINFO << "inserted " << iCurLine << ", error: " << m_sError;
- return FinishBulk ( EHTTP_STATUS::_400 );
- }
- if ( sTxnIdx.IsEmpty() )
- {
- sTxnIdx = tStmt.m_sIndex;
- ProcessBegin ( sTxnIdx );
- }
- else if ( session::IsInTrans() && sTxnIdx!=tStmt.m_sIndex )
- {
- assert ( !sTxnIdx.IsEmpty() );
- // we should finish current txn, as we got another index
- bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
- AddResult ( "bulk", tResult );
- if ( !bResult )
- break;
- sTxnIdx = tStmt.m_sIndex;
- ProcessBegin ( sTxnIdx );
- iLastTxStartLine = iCurLine;
- }
- switch ( tStmt.m_eStmt )
- {
- case STMT_INSERT:
- case STMT_REPLACE:
- bResult = ProcessInsert ( tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
- if ( bResult )
- ++m_iInserts;
- break;
- case STMT_UPDATE:
- tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
- bResult = ProcessUpdate ( FromStr ( sQuery ), tStmt, tDocId, tResult, m_sError );
- if ( bResult )
- m_iUpdates += GetLastUpdated();
- break;
- case STMT_DELETE:
- tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
- bResult = ProcessDelete ( FromStr ( sQuery ), tStmt, tDocId, tResult, m_sError, ResultSetFormat_e::MntSearch );
- break;
- default:
- HTTPINFO << "inserted " << iCurLine << ", got unknown statement:" << (int)tStmt.m_eStmt;
- return FinishBulk ( EHTTP_STATUS::_400 );
- }
- if ( !bResult || !session::IsInTrans() )
- AddResult ( sStmt.cstr(), tResult );
- // no further than the first error
- if ( !bResult )
- break;
- if ( !session::IsInTrans() )
- iLastTxStartLine = iCurLine;
- }
- if ( bResult && session::IsInTrans() )
- {
- assert ( !sTxnIdx.IsEmpty() );
- // We're in txn - that is, nothing committed, and we should do it right now
- JsonObj_c tResult;
- DocID_t tDocId = 0;
- bResult = ProcessCommitRollback ( FromStr ( sTxnIdx ), tDocId, tResult, m_sError );
- AddResult ( "bulk", tResult );
- if ( bResult )
- iLastTxStartLine = iCurLine;
- }
- session::SetInTrans ( false );
- HTTPINFO << "inserted " << iCurLine << " result: " << (int)bResult << ", error:" << m_sError;
- return FinishBulk();
- }
- private:
- bool CheckNDJson()
- {
- if ( !m_tOptions.Exists ( "content-type" ) )
- {
- ReportError ( "Content-Type must be set", HttpErrorType_e::Parse, EHTTP_STATUS::_400 );
- return false;
- }
- auto sContentType = m_tOptions["content-type"].ToLower();
- auto dParts = sphSplit ( sContentType.cstr(), ";" );
- if ( dParts.IsEmpty() || dParts[0] != "application/x-ndjson" )
- {
- ReportError ( "Content-Type must be application/x-ndjson", HttpErrorType_e::Parse, EHTTP_STATUS::_400 );
- return false;
- }
- return true;
- }
- };
- class HttpHandlerPQ_c final : public HttpHandler_c, public HttpOptionTrait_t
- {
- Str_t m_sQuery;
- public:
- HttpHandlerPQ_c ( Str_t sQuery, const OptionsHash_t & tOptions )
- : HttpOptionTrait_t ( tOptions )
- , m_sQuery ( sQuery )
- {}
- bool Process () final;
- private:
- // FIXME!!! handle replication for InsertOrReplaceQuery and Delete
- bool DoCallPQ ( const CSphString & sIndex, const JsonObj_c & tPercolate, bool bVerbose );
- bool InsertOrReplaceQuery ( const CSphString& sIndex, const JsonObj_c& tJsonQuery, const JsonObj_c& tRoot, CSphString* pUID, bool bReplace );
- bool ListQueries ( const CSphString & sIndex );
- bool Delete ( const CSphString & sIndex, const JsonObj_c & tRoot );
- };
- struct BulkDoc_t
- {
- CSphString m_sAction;
- CSphString m_sIndex;
- DocID_t m_tDocid { 0 };
- Str_t m_tDocLine;
- };
- struct BulkTnx_t
- {
- int m_iFrom { -1 };
- int m_iCount { 0 };
- };
- class HttpHandlerEsBulk_c : public HttpCompatBaseHandler_c, public HttpJsonUpdateTraits_c, public HttpJsonTxnTraits_c
- {
- public:
- HttpHandlerEsBulk_c ( Str_t sBody, int iReqType, const SmallStringHash_T<CSphString> & hOpts )
- : HttpCompatBaseHandler_c ( sBody, iReqType, hOpts )
- , HttpJsonUpdateTraits_c ( ResultSetFormat_e::ES )
- , HttpJsonTxnTraits_c ( ResultSetFormat_e::ES )
- {}
- bool Process () override;
- private:
- bool ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecTraits_T<BulkDoc_t> & dDocs, JsonObj_c & tItems );
- bool Validate();
- void ReportLogError ( const char * sError, HttpErrorType_e eType , EHTTP_STATUS eStatus, bool bLogOnly );
- };
- static std::unique_ptr<HttpHandler_c> CreateHttpHandler ( EHTTP_ENDPOINT eEndpoint, CharStream_c & tSource, Str_t & sQuery, OptionsHash_t & tOptions, http_method eRequestType )
- {
- const CSphString * pOption = nullptr;
- sQuery = dEmptyStr;
- auto SetQuery = [&sQuery] ( Str_t&& sData ) {
- auto& tCrashQuery = GlobalCrashQueryGetRef();
- tCrashQuery.m_dQuery = { (const BYTE*)sData.first, sData.second };
- sQuery = sData;
- };
- // SPH_HTTP_ENDPOINT_SQL SPH_HTTP_ENDPOINT_CLI SPH_HTTP_ENDPOINT_CLI_JSON these endpoints url-encoded, all others are plain json, and we don't want to waste time pre-parsing them
- if ( eEndpoint== EHTTP_ENDPOINT::SQL || eEndpoint== EHTTP_ENDPOINT::CLI || eEndpoint== EHTTP_ENDPOINT::CLI_JSON )
- {
- auto sWholeData = tSource.ReadAll();
- if ( tSource.GetError() )
- return nullptr;
- if ( eEndpoint == EHTTP_ENDPOINT::SQL )
- {
- const std::array<Str_t, 3> sQueries { FROMS ( "query=" ), FROMS ( "mode=raw&query=" ), FROMS ( "raw_response=true&query=" ) };
- if ( std::any_of ( sQueries.cbegin(), sQueries.cend(), [&sWholeData] ( const Str_t& S ) { return sWholeData.second >= S.second && 0 == memcmp ( sWholeData.first, S.first, S.second ); } ) )
- {
- DecodeAndStoreRawQuery ( tOptions, sWholeData );
- HttpRequestParser_c::ParseList ( sWholeData, tOptions );
- } else
- {
- StoreRawQuery ( tOptions, { sWholeData } );
- tOptions.Add ( sWholeData, "query" );
- }
- } else
- StoreRawQuery ( tOptions, { sWholeData } );
- }
- switch ( eEndpoint )
- {
- case EHTTP_ENDPOINT::SQL:
- {
- bool bRawMode = false;
- pOption = tOptions ( "mode" );
- if ( pOption )
- bRawMode = *pOption == "raw";
- else
- {
- pOption = tOptions ( "raw_response" );
- if ( pOption )
- bRawMode = *pOption == "true";
- }
- if ( bRawMode )
- {
- auto pQuery = tOptions ( "query" );
- if ( pQuery )
- SetQuery ( FromStr ( *pQuery ) );
- return std::make_unique<HttpRawSqlHandler_c> ( sQuery, tOptions ); // non-json
- }
- else
- {
- pOption = tOptions ( "raw_query" );
- if ( pOption )
- SetQuery ( FromStr (*pOption) );
- return std::make_unique<HttpSearchHandler_SQL_c> ( tOptions ); // non-json
- }
- }
- case EHTTP_ENDPOINT::CLI:
- case EHTTP_ENDPOINT::CLI_JSON:
- {
- pOption = tOptions ( "raw_query" );
- auto tQuery = pOption ? FromStr ( *pOption ) : dEmptyStr;
- SetQuery ( std::move ( tQuery ) );
- return std::make_unique<HttpRawSqlHandler_c> ( sQuery, tOptions ); // non-json
- }
- case EHTTP_ENDPOINT::JSON_SEARCH:
- SetQuery ( tSource.ReadAll() );
- return std::make_unique<HttpHandler_JsonSearch_c> ( sQuery, tOptions ); // json
- case EHTTP_ENDPOINT::JSON_INDEX:
- case EHTTP_ENDPOINT::JSON_CREATE:
- case EHTTP_ENDPOINT::JSON_INSERT:
- case EHTTP_ENDPOINT::JSON_REPLACE:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return std::make_unique<HttpHandler_JsonInsert_c> ( sQuery, eEndpoint==EHTTP_ENDPOINT::JSON_INDEX || eEndpoint==EHTTP_ENDPOINT::JSON_REPLACE ); // json
- case EHTTP_ENDPOINT::JSON_UPDATE:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery ); // json
- case EHTTP_ENDPOINT::JSON_DELETE:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery ); // json
- case EHTTP_ENDPOINT::JSON_BULK:
- return std::make_unique<HttpHandler_JsonBulk_c> ( tSource, tOptions ); // json
- case EHTTP_ENDPOINT::PQ:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return std::make_unique<HttpHandlerPQ_c> ( sQuery, tOptions ); // json
- case EHTTP_ENDPOINT::ES_BULK:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return std::make_unique<HttpHandlerEsBulk_c> ( sQuery, eRequestType, tOptions );
- case EHTTP_ENDPOINT::TOTAL:
- SetQuery ( tSource.ReadAll() );
- if ( tSource.GetError() )
- return nullptr;
- else
- return CreateCompatHandler ( sQuery, eRequestType, tOptions );
- default:
- break;
- }
- return nullptr;
- }
- HttpProcessResult_t ProcessHttpQuery ( CharStream_c & tSource, Str_t & sSrcQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult, bool bNeedHttpResponse, http_method eRequestType )
- {
- TRACE_CONN ( "conn", "ProcessHttpQuery" );
- HttpProcessResult_t tRes;
- const CSphString & sEndpoint = hOptions["endpoint"];
- tRes.m_eEndpoint = StrToHttpEndpoint ( sEndpoint );
- std::unique_ptr<HttpHandler_c> pHandler = CreateHttpHandler ( tRes.m_eEndpoint, tSource, sSrcQuery, hOptions, eRequestType );
- if ( !pHandler )
- {
- if ( tRes.m_eEndpoint == EHTTP_ENDPOINT::INDEX )
- {
- HttpHandlerIndexPage ( dResult );
- } else
- {
- DumpHttp ( eRequestType, sEndpoint, tSource.ReadAll() );
- tRes.m_eReplyHttpCode = EHTTP_STATUS::_501;
- if ( tSource.GetError() )
- {
- tRes.m_sError = tSource.GetErrorMessage();
- if ( tRes.m_sError.Begins ( "length out of bounds" ) )
- tRes.m_eReplyHttpCode = EHTTP_STATUS::_413;
- } else
- {
- tRes.m_sError.SetSprintf ( "/%s - unsupported endpoint", sEndpoint.cstr() );
- }
- sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
- }
- return tRes;
- }
- // will be processed by buddy right after source data got parsed
- if ( tRes.m_eEndpoint == EHTTP_ENDPOINT::CLI )
- return tRes;
- pHandler->SetErrorFormat ( bNeedHttpResponse );
- tRes.m_bOk = pHandler->Process();
- tRes.m_sError = pHandler->GetError();
- tRes.m_eReplyHttpCode = pHandler->GetStatusCode();
- dResult = std::move ( pHandler->GetResult() );
- return tRes;
- }
- void sphProcessHttpQueryNoResponce ( const CSphString & sEndpoint, const CSphString & sQuery, CSphVector<BYTE> & dResult )
- {
- OptionsHash_t hOptions;
- hOptions.Add ( sEndpoint, "endpoint" );
- BlobStream_c tQuery ( sQuery );
- Str_t sSrcQuery;
- HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, HTTP_GET );
- ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, HTTP_GET );
- }
- static bool IsCompressed ( const OptionsHash_t & hOptions )
- {
- const CSphString * pEncoding = hOptions ( "content-encoding" );
- if ( !pEncoding )
- return false;
- return ( *pEncoding=="gzip" );
- }
- bool HttpRequestParser_c::ProcessClientHttp ( AsyncNetInputBuffer_c& tIn, CSphVector<BYTE>& dResult )
- {
- assert ( !m_szError );
- std::unique_ptr<CharStream_c> pSource;
- bool bCompressed = IsCompressed ( m_hOptions );
- if ( m_tParser.flags & F_CHUNKED )
- {
- pSource = std::make_unique<ChunkedSocketStream_c> ( &tIn, &m_tParser, m_bBodyDone, std::move ( m_dParsedBodies ), m_iLastParsed );
- } else
- {
- // for non-chunked - need to throw out beginning of the packet (with header). Only body rest in the buffer.
- tIn.PopTail ( m_iLastParsed - ParsedBodyLength() );
- int iFullLength = ParsedBodyLength() + ( (int)m_tParser.content_length > 0 ? (int)m_tParser.content_length : 0 );
- pSource = std::make_unique<RawSocketStream_c> ( &tIn, iFullLength, bCompressed );
- }
- EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( m_sEndpoint );
- if ( IsLogManagementEnabled() && eEndpoint==EHTTP_ENDPOINT::TOTAL && m_sEndpoint.Ends ( "_bulk" ) )
- eEndpoint = EHTTP_ENDPOINT::ES_BULK;
- HttpProcessResult_t tRes;
- Str_t sSrcQuery;
- if ( bCompressed && !HasGzip() )
- {
- // 14.11 Content-Encoding
- // If the content-coding of an entity in a request message is not acceptable to the origin server, the server SHOULD respond with a status code of 415 (Unsupported Media Type)
- tRes.m_eReplyHttpCode = EHTTP_STATUS::_415;
- tRes.m_bOk = false;
- tRes.m_sError = "gzip error: unpack is not supported, rebuild with zlib";
- sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
- } else if ( bCompressed && ( m_tParser.flags & F_CHUNKED ) )
- {
- tRes.m_eReplyHttpCode = EHTTP_STATUS::_415;
- tRes.m_bOk = false;
- tRes.m_sError = "can not process chunked transfer-coding along with gzip";
- sphHttpErrorReply ( dResult, tRes.m_eReplyHttpCode, tRes.m_sError.cstr() );
- } else
- {
- tRes = ProcessHttpQuery ( *pSource, sSrcQuery, m_hOptions, dResult, true, m_eType );
- }
- return ProcessHttpQueryBuddy ( tRes, sSrcQuery, m_hOptions, dResult, true, m_eType );
- }
- void sphHttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError )
- {
- JsonObj_c tErr;
- tErr.AddStr ( "error", szError );
- CSphString sJsonError = tErr.AsString();
- HttpBuildReply ( dData, eCode, FromStr (sJsonError), false );
- }
- static void EncodePercolateMatchResult ( const PercolateMatchResult_t & tRes, const CSphFixedVector<int64_t> & dDocids, const CSphString & sIndex, JsonEscapedBuilder & tOut )
- {
- ScopedComma_c sRootBlock ( tOut, ",", "{", "}" );
- // column names
- tOut.Sprintf ( R"("took":%d,"timed_out":false)", ( int ) ( tRes.m_tmTotal / 1000 ));
- // hits {
- ScopedComma_c sHitsBlock ( tOut, ",", R"("hits":{)", "}");
- tOut.Sprintf ( R"("total":%d,"max_score":1)", tRes.m_dQueryDesc.GetLength()); // FIXME!!! track and provide weight
- if ( tRes.m_bVerbose )
- tOut.Sprintf ( R"("early_out_queries":%d,"matched_queries":%d,"matched_docs":%d,"only_terms_queries":%d,"total_queries":%d)",
- tRes.m_iEarlyOutQueries, tRes.m_iQueriesMatched, tRes.m_iDocsMatched, tRes.m_iOnlyTerms, tRes.m_iTotalQueries );
- // documents
- tOut.StartBlock ( ",", R"("hits":[)", "]");
- int iDocOff = 0;
- for ( const auto& tDesc : tRes.m_dQueryDesc )
- {
- ScopedComma_c sQueryComma ( tOut, ",","{"," }");
- tOut.Sprintf ( R"("table":"%s","_type":"doc","_id":"%U","_score":"1")", sIndex.cstr(), tDesc.m_iQUID );
- {
- ScopedComma_c sBrackets ( tOut, ",", R"("_source":{)", "}");
- if ( !tDesc.m_bQL )
- {
- tOut.Sprintf ( R"("query":%s)", tDesc.m_sQuery.cstr() );
- } else
- {
- ScopedComma_c sBrackets ( tOut, nullptr, R"("query": {"ql":)", "}");
- tOut.AppendEscapedWithComma ( tDesc.m_sQuery.cstr() );
- }
- if ( !tDesc.m_sTags.IsEmpty() )
- tOut.Sprintf ( R"("tags":"%s")", tDesc.m_sTags.cstr() );
- }
- // document count + document id(s)
- if ( tRes.m_bGetDocs )
- {
- ScopedComma_c sFields ( tOut, ",",R"("fields":{"_percolator_document_slot": [)", "] }");
- int iDocs = tRes.m_dDocs[iDocOff];
- for ( int iDoc = 1; iDoc<=iDocs; ++iDoc )
- {
- auto iRow = tRes.m_dDocs[iDocOff + iDoc];
- tOut.Sprintf ("%l", DocID_t ( dDocids.IsEmpty () ? iRow : dDocids[iRow] ) );
- }
- iDocOff += iDocs + 1;
- }
- }
- tOut.FinishBlock ( false ); // hits[]
- // all the rest blocks (root, hits) will be auto-closed here.
- }
- bool HttpHandlerPQ_c::DoCallPQ ( const CSphString & sIndex, const JsonObj_c & tPercolate, bool bVerbose )
- {
- CSphString sWarning, sTmp;
- BlobVec_t dDocs;
- // single document
- JsonObj_c tJsonDoc = tPercolate.GetObjItem ( "document", sTmp );
- if ( tJsonDoc )
- {
- auto & tDoc = dDocs.Add();
- if ( !bson::JsonObjToBson ( tJsonDoc, tDoc, g_bJsonAutoconvNumbers, g_bJsonKeynamesToLowercase ) )
- {
- ReportError ( "Bad cjson", EHTTP_STATUS::_400 );
- return false;
- }
- }
- // multiple documents
- JsonObj_c tJsonDocs = tPercolate.GetArrayItem ( "documents", m_sError, true );
- if ( !m_sError.IsEmpty() )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- for ( auto i : tJsonDocs )
- {
- auto & tDoc = dDocs.Add();
- if ( !bson::JsonObjToBson ( i, tDoc, g_bJsonAutoconvNumbers, g_bJsonKeynamesToLowercase ) )
- {
- ReportError ( "Bad cjson", EHTTP_STATUS::_400 );
- return false;
- }
- }
- if ( dDocs.IsEmpty() )
- {
- ReportError ( "no documents found", EHTTP_STATUS::_400 );
- return false;
- }
- PercolateOptions_t tOpts;
- tOpts.m_sIndex = sIndex;
- tOpts.m_bGetDocs = true;
- tOpts.m_bVerbose = bVerbose;
- tOpts.m_bGetQuery = true;
- // fixme! id alias here is 'id' or 'uid'. Process it!
- CSphSessionAccum tAcc;
- CPqResult tResult;
- tResult.m_dResult.m_bGetFilters = false;
- PercolateMatchDocuments ( dDocs, tOpts, tAcc, tResult );
- JsonEscapedBuilder sRes;
- EncodePercolateMatchResult ( tResult.m_dResult, tResult.m_dDocids, sIndex, sRes );
- BuildReply ( sRes, EHTTP_STATUS::_200 );
- return true;
- }
- static void EncodePercolateQueryResult ( bool bReplace, const CSphString & sIndex, int64_t iID, StringBuilder_c & tOut )
- {
- if ( bReplace )
- tOut.Sprintf (R"({"table":"%s","type":"doc","_id":"%U","result":"updated","forced_refresh":true})", sIndex.cstr(), iID);
- else
- tOut.Sprintf ( R"({"table":"%s","type":"doc","_id":"%U","result":"created"})", sIndex.cstr (), iID );
- }
- bool HttpHandlerPQ_c::InsertOrReplaceQuery ( const CSphString & sIndex, const JsonObj_c & tJsonQuery, const JsonObj_c & tRoot, CSphString * pUID, bool bReplace )
- {
- CSphString sTmp, sWarning;
- bool bQueryQL = true;
- CSphQuery tQuery;
- const char * sQuery = nullptr;
- JsonObj_c tQueryQL = tJsonQuery.GetStrItem ( "ql", sTmp );
- if ( tQueryQL )
- sQuery = tQueryQL.SzVal();
- else
- {
- bQueryQL = false;
- if ( !ParseJsonQueryFilters ( tJsonQuery, tQuery, m_sError, sWarning ) )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- if ( NonEmptyQuery ( tJsonQuery ) )
- sQuery = tQuery.m_sQuery.cstr();
- }
- if ( !sQuery || *sQuery=='\0' )
- {
- ReportError ( "no query found", EHTTP_STATUS::_400 );
- return false;
- }
- int64_t iID = 0;
- if ( pUID && !pUID->IsEmpty() )
- iID = strtoll ( pUID->cstr(), nullptr, 10 );
- JsonObj_c tTagsArray = tRoot.GetArrayItem ( "tags", m_sError, true );
- if ( !m_sError.IsEmpty() )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- StringBuilder_c sTags (", ");
- for ( const auto & i : tTagsArray )
- sTags << i.SzVal();
- JsonObj_c tFilters = tRoot.GetStrItem ( "filters", m_sError, true );
- if ( !m_sError.IsEmpty() )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- if ( tFilters && !bQueryQL && tQuery.m_dFilters.GetLength() )
- {
- ReportError ( "invalid combination of SphinxQL and query filter provided", EHTTP_STATUS::_501 );
- return false;
- }
- CSphVector<CSphFilterSettings> dFilters;
- CSphVector<FilterTreeItem_t> dFilterTree;
- if ( tFilters )
- {
- auto pServed = GetServed ( sIndex );
- if ( !CheckValid ( pServed, sIndex, IndexType_e::PERCOLATE ) )
- return false;
- RIdx_T<const PercolateIndex_i*> pIndex { pServed };
- if ( !PercolateParseFilters ( tFilters.SzVal(), SPH_COLLATION_UTF8_GENERAL_CI, pIndex->GetInternalSchema (), dFilters, dFilterTree, m_sError ) )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- } else
- {
- dFilters.SwapData ( tQuery.m_dFilters );
- dFilterTree.SwapData ( tQuery.m_dFilterTree );
- }
- // scope for index lock
- bool bOk = false;
- {
- auto pServed = GetServed ( sIndex );
- if ( !CheckValid ( pServed, sIndex, IndexType_e::PERCOLATE ))
- return false;
- RIdx_T<PercolateIndex_i*> pIndex { pServed };
- PercolateQueryArgs_t tArgs ( dFilters, dFilterTree );
- tArgs.m_sQuery = sQuery;
- tArgs.m_sTags = sTags.cstr();
- tArgs.m_iQUID = iID;
- tArgs.m_bReplace = bReplace;
- tArgs.m_bQL = bQueryQL;
- // add query
- auto pStored = pIndex->CreateQuery ( tArgs, m_sError );
- if ( pStored )
- {
- auto* pSession = session::GetClientSession();
- auto& tAcc = pSession->m_tAcc;
- auto* pAccum = tAcc.GetAcc( pIndex, m_sError );
- ReplicationCommand_t * pCmd = pAccum->AddCommand ( ReplCmd_e::PQUERY_ADD, sIndex );
- // refresh query's UID for reply as it might be auto-generated
- iID = pStored->m_iQUID;
- pCmd->m_pStored = std::move ( pStored );
- bOk = HandleCmdReplicate ( *pAccum );
- TlsMsg::MoveError ( m_sError );
- }
- }
- if ( !bOk )
- {
- ReportError ( EHTTP_STATUS::_500 );
- } else
- {
- StringBuilder_c sRes;
- EncodePercolateQueryResult ( bReplace, sIndex, iID, sRes );
- BuildReply ( sRes, EHTTP_STATUS::_200 );
- }
- return bOk;
- }
- // for now - forcibly route query as /json/search POST {"table":"<idx>"}. Later matter of deprecate/delete
- bool HttpHandlerPQ_c::ListQueries ( const CSphString & sIndex )
- {
- StringBuilder_c sQuery;
- sQuery.Sprintf(R"({"table":"%s"})", sIndex.scstr());
- auto pHandler = std::make_unique<HttpHandler_JsonSearch_c> ( (Str_t)sQuery, m_tOptions ) ;
- if ( !pHandler )
- return false;
- pHandler->SetErrorFormat (m_bNeedHttpResponse);
- pHandler->Process ();
- m_dData = std::move ( pHandler->GetResult ());
- return true;
- }
- bool HttpHandlerPQ_c::Delete ( const CSphString & sIndex, const JsonObj_c & tRoot )
- {
- auto* pSession = session::GetClientSession();
- auto& tAcc = pSession->m_tAcc;
- auto* pAccum = tAcc.GetAcc ();
- ReplicationCommand_t * pCmd = pAccum->AddCommand ( ReplCmd_e::PQUERY_DELETE, sIndex );
- JsonObj_c tTagsArray = tRoot.GetArrayItem ( "tags", m_sError, true );
- if ( !m_sError.IsEmpty() )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- StringBuilder_c sTags ( ", " );
- for ( const auto & i : tTagsArray )
- sTags << i.SzVal();
- JsonObj_c tUidsArray = tRoot.GetArrayItem ( "id", m_sError, true );
- if ( !m_sError.IsEmpty() )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- for ( const auto & i : tUidsArray )
- pCmd->m_dDeleteQueries.Add ( i.IntVal() );
- if ( !sTags.GetLength() && !pCmd->m_dDeleteQueries.GetLength() )
- {
- ReportError ( "no tags or id field arrays found", EHTTP_STATUS::_400 );
- return false;
- }
- pCmd->m_sDeleteTags = sTags.cstr();
- uint64_t tmStart = sphMicroTimer();
- int iDeleted = 0;
- bool bOk = HandleCmdReplicateDelete ( *pAccum, iDeleted );
- TlsMsg::MoveError ( m_sError );
- uint64_t tmTotal = sphMicroTimer() - tmStart;
- if ( !bOk )
- {
- FormatError ( EHTTP_STATUS::_400, "%s", m_sError.cstr() );
- return false;
- }
- StringBuilder_c tOut;
- tOut.Sprintf (R"({"took":%d,"timed_out":false,"deleted":%d,"total":%d,"failures":[]})",
- ( int ) ( tmTotal / 1000 ), iDeleted, iDeleted );
- BuildReply ( tOut, EHTTP_STATUS::_200 );
- return true;
- }
- bool HttpHandlerPQ_c::Process()
- {
- TRACE_CONN ( "conn", "HttpHandlerPQ_c::Process" );
- CSphString * sEndpoint = m_tOptions ( "endpoint" );
- if ( !sEndpoint || sEndpoint->IsEmpty() )
- {
- FormatError ( EHTTP_STATUS::_400, "invalid empty endpoint, should be pq/index_name/operation");
- return false;
- }
- assert ( sEndpoint->Begins ( "json/pq/" ) || sEndpoint->Begins ( "pq/" ) );
- const char * sEndpointMethod = sEndpoint->cstr() + sizeof("pq/") - 1;
- if ( sEndpoint->Begins ( "json/pq/" ) )
- sEndpointMethod = sEndpoint->cstr() + sizeof("json/pq/") - 1;
- StrVec_t dPoints;
- sphSplit ( dPoints, sEndpointMethod, "/" );
- if ( dPoints.GetLength()<2 )
- {
- FormatError ( EHTTP_STATUS::_400, "invalid endpoint '%s', should be pq/index_name/operation", sEndpoint->scstr() );
- return false;
- }
- const CSphString & sIndex = dPoints[0];
- const CSphString & sOp = dPoints[1];
- CSphString * pUID = nullptr;
- if ( dPoints.GetLength()>2 )
- pUID = dPoints.Begin() + 2;
- enum class PercolateOp_e
- {
- UNKNOWN,
- ADD,
- DEL,
- SEARCH
- } eOp = PercolateOp_e::UNKNOWN;
- if ( sOp=="_delete_by_query" )
- eOp = PercolateOp_e::DEL;
- else if ( sOp=="doc" )
- eOp = PercolateOp_e::ADD;
- else if ( sOp=="search" )
- eOp = PercolateOp_e::SEARCH;
- if ( IsEmpty ( m_sQuery ) )
- return ListQueries ( sIndex );
- const JsonObj_c tRoot ( m_sQuery );
- if ( !tRoot )
- {
- ReportError ( "bad JSON object", EHTTP_STATUS::_400 );
- return false;
- }
- if ( !tRoot.Size() )
- return ListQueries ( sIndex );
- if ( eOp==PercolateOp_e::UNKNOWN )
- {
- m_sError.SetSprintf ( "invalid percolate operation '%s', should be one of 'search' or 'doc' or '_delete_by_query'", sOp.cstr() );
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- JsonObj_c tQuery = tRoot.GetObjItem ( "query", m_sError, ( eOp==PercolateOp_e::DEL ) );
- if ( !tQuery && ( eOp!=PercolateOp_e::DEL ) )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- JsonObj_c tPerc = ( ( eOp==PercolateOp_e::SEARCH ) ? tQuery.GetObjItem ( "percolate", m_sError ) : JsonNull );
- if ( ( eOp==PercolateOp_e::SEARCH ) && !tPerc )
- {
- ReportError ( EHTTP_STATUS::_400 );
- return false;
- }
- bool bVerbose = false;
- JsonObj_c tVerbose = tRoot.GetItem ( "verbose" );
- if ( tVerbose )
- {
- if ( tVerbose.IsDbl() )
- bVerbose = tVerbose.DblVal()!=0.0;
- else if ( tVerbose.IsInt() )
- bVerbose = tVerbose.IntVal()!=0;
- else if ( tVerbose.IsBool() )
- bVerbose = tVerbose.BoolVal();
- }
- if ( eOp==PercolateOp_e::SEARCH )
- return DoCallPQ ( sIndex, tPerc, bVerbose );
- else if ( eOp==PercolateOp_e::DEL )
- return Delete ( sIndex, tRoot );
- else
- {
- bool bRefresh = false;
- CSphString * pRefresh = m_tOptions ( "refresh" );
- if ( pRefresh && !pRefresh->IsEmpty() )
- {
- if ( *pRefresh=="0" )
- bRefresh = false;
- else if ( *pRefresh=="1" )
- bRefresh = true;
- }
- return InsertOrReplaceQuery ( sIndex, tQuery, tRoot, pUID, bRefresh );
- }
- }
- static bool ParseMetaLine ( const char * sLine, BulkDoc_t & tDoc, CSphString & sError )
- {
- JsonObj_c tLineMeta ( sLine );
- JsonObj_c tAction = tLineMeta[0];
- if ( !tAction )
- {
- sError = "no statement found";
- return false;
- }
- tDoc.m_sAction = tAction.Name();
- if ( !tAction.IsObj() )
- {
- sError.SetSprintf ( "statement %s should be an object", tDoc.m_sAction.cstr() );
- return false;
- }
- JsonObj_c tIndex = tAction.GetStrItem ( "_index", sError );
- if ( !tIndex )
- return false;
- tDoc.m_sIndex = tIndex.StrVal();
-
- JsonObj_c tId = tAction.GetItem ( "_id" );
- if ( tId )
- {
- if ( tId.IsNum() )
- tDoc.m_tDocid = tId.IntVal();
- else if ( tId.IsStr() )
- tDoc.m_tDocid = GetDocID ( tId.SzVal() );
- else if ( tId.IsNull() )
- tDoc.m_tDocid = 0;
- else
- {
- sError.SetSprintf ( "_id should be an int or string" );
- return false;
- }
- }
- return true;
- }
- static bool AddDocid ( SqlStmt_t & tStmt, DocID_t & tDocId, CSphString & sError )
- {
- int iDocidPos = tStmt.m_dInsertSchema.GetFirst ( [&] ( const CSphString & sName ) { return sName=="id"; } );
- if ( iDocidPos!=-1 )
- {
- SqlInsert_t & tVal = tStmt.m_dInsertValues[iDocidPos];
- // check and convert to int
- if ( tVal.m_iType!=SqlInsert_t::CONST_INT )
- {
- tVal.SetValueInt ( GetDocID ( tVal.m_sVal.cstr() ), false );
- tVal.m_iType = SqlInsert_t::CONST_INT;
- }
- DocID_t tSrcDocid = (int64_t)tVal.GetValueUint();
- // can not set id at the same time via es meta and via document id property
- if ( tDocId && tDocId!=tSrcDocid )
- {
- sError = "id has already been specified";
- return false;
- }
-
- tDocId = tSrcDocid;
- return true;
- }
- if ( !tDocId )
- return true;
- tStmt.m_dInsertSchema.Add ( sphGetDocidName() );
- SqlInsert_t & tId = tStmt.m_dInsertValues.Add();
- tId.m_iType = SqlInsert_t::CONST_INT;
- tId.SetValueInt(tDocId);
- tStmt.m_iSchemaSz = tStmt.m_dInsertSchema.GetLength();
- return true;
- }
- static bool ParseSourceLine ( const char * sLine, const CSphString & sAction, SqlStmt_t & tStmt, DocID_t & tDocId, CSphString & sError )
- {
- // FIXME!!! update and delete ES compat endpoints
- if ( sAction=="index" )
- {
- JsonObj_c tRoot ( sLine );
- if ( !ParseJsonInsertSource ( tRoot, tStmt, true, sError ) )
- return false;
- if ( !AddDocid ( tStmt, tDocId, sError ) )
- return false;
- } else if ( sAction=="create" )
- {
- JsonObj_c tRoot ( sLine );
- if ( !ParseJsonInsertSource ( tRoot, tStmt, false, sError ) )
- return false;
- if ( !AddDocid ( tStmt, tDocId, sError ) )
- return false;
- } else if ( sAction=="update" )
- {
- JsonObj_c tUpd ( FromSz ( sLine ) );
- tUpd.AddStr ( "table", tStmt.m_sIndex );
- tUpd.AddInt ( "id", tDocId );
- if ( !ParseJsonUpdate ( tUpd, tStmt, tDocId, sError ) )
- return false;
- } else if ( sAction=="delete" )
- {
- tStmt.m_eStmt = STMT_DELETE;
- tStmt.m_tQuery.m_sSelect = "id";
- CSphFilterSettings & tFilter = tStmt.m_tQuery.m_dFilters.Add();
- tFilter.m_eType = SPH_FILTER_VALUES;
- tFilter.m_dValues.Add ( tDocId );
- tFilter.m_sAttrName = "id";
- } else
- {
- sError.SetSprintf ( "unknown action: %s", sAction.cstr() );
- return false;
- }
- // _bulk could have cluster:index format
- SqlParser_SplitClusterIndex ( tStmt.m_sIndex, &tStmt.m_sCluster );
- return true;
- }
- char * SkipSpace ( char * p )
- {
- while ( sphIsSpace ( *p ) )
- p++;
- return p;
- }
- static CSphString sphHttpEndpointToStr ( EHTTP_ENDPOINT eEndpoint )
- {
- assert ( eEndpoint < EHTTP_ENDPOINT::TOTAL );
- return g_dEndpoints[(int)eEndpoint].m_szName1;
- }
- bool Ends ( const Str_t tVal, const char * sSuffix )
- {
- if ( IsEmpty ( tVal ) || !sSuffix )
- return false;
- auto iSuffix = (int) strlen ( sSuffix );
- if ( tVal.second<iSuffix )
- return false;
- return strncmp ( tVal.first + tVal.second - iSuffix, sSuffix, iSuffix )==0;
- }
- void HttpHandlerEsBulk_c::ReportLogError ( const char * sError, HttpErrorType_e eType, EHTTP_STATUS eStatus, bool bLogOnly )
- {
- if ( !bLogOnly )
- ReportError ( sError, eType, eStatus );
- for ( char * sCur = (char *)GetBody().first; sCur<GetBody().first+GetBody().second; sCur++ )
- {
- if ( *sCur=='\0' )
- *sCur = '\n';
- }
- const CSphString * pUrl = GetOptions() ( "full_url" );
- HTTPINFO << sError << "\n" << ( pUrl ? pUrl->scstr() : "" ) << "\n" << GetBody().first;
- }
- bool HttpHandlerEsBulk_c::Validate()
- {
- CSphString sError;
- CSphString * pOptContentType = GetOptions() ( "content-type" );
- if ( !pOptContentType )
- {
- ReportLogError ( "Content-Type must be set", HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
- return false;
- }
- // HTTP field could have multiple values
- StrVec_t dOptContentType = sphSplit ( pOptContentType->cstr(), ",; " );
- if ( !dOptContentType.Contains ( "application/x-ndjson" ) && !dOptContentType.Contains ( "application/json" ) )
- {
- sError.SetSprintf ( "Content-Type header [%s] is not supported", pOptContentType->cstr() );
- ReportLogError ( sError.cstr(), HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
- return false;
- }
- if ( IsEmpty ( GetBody() ) )
- {
- ReportLogError ( "request body is required", HttpErrorType_e::Parse, EHTTP_STATUS::_400, false );
- return false;
- }
- if ( !Ends ( GetBody(), "\n" ) )
- {
- ReportLogError ( "The bulk request must be terminated by a newline [\n]", HttpErrorType_e::IllegalArgument, EHTTP_STATUS::_400, false );
- return false;
- }
- return true;
- }
- bool HttpHandlerEsBulk_c::Process()
- {
- if ( !Validate() )
- return false;
- auto & tCrashQuery = GlobalCrashQueryGetRef();
- tCrashQuery.m_dQuery = S2B ( GetBody() );
- CSphVector<Str_t> dLines;
- SplitNdJson ( GetBody(), [&] ( const char * sLine, int iLen ) { dLines.Add ( Str_t ( sLine, iLen ) ); } );
-
- CSphString sError;
- CSphVector<BulkDoc_t> dDocs;
- dDocs.Reserve ( dLines.GetLength() / 2 );
- bool bNextLineMeta = true;
- for ( const Str_t & tLine : dLines )
- {
- if ( !bNextLineMeta )
- {
- dDocs.Last().m_tDocLine = tLine;
- bNextLineMeta = true;
- } else
- {
- // skip empty lines if they are meta information
- if ( IsEmpty ( tLine ) )
- continue;
- // any bad meta result in general error
- BulkDoc_t & tDoc = dDocs.Add();
- if ( !ParseMetaLine ( tLine.first, tDoc, sError ) )
- {
- ReportLogError ( sError.cstr(), HttpErrorType_e::ActionRequestValidation, EHTTP_STATUS::_400, false );
- return false;
- }
- if ( tDoc.m_sAction=="delete" )
- {
- tDoc.m_tDocLine = tLine;
- bNextLineMeta = true;
- } else
- {
- bNextLineMeta = false;
- }
- }
- }
- CSphVector<BulkTnx_t> dTnx;
- const BulkDoc_t * pLastDoc = dDocs.Begin();
- for ( const BulkDoc_t * pCurDoc = pLastDoc + 1; pCurDoc<dDocs.End(); pCurDoc++ )
- {
- // chain the same statements to the same index but not the updates
- if ( pLastDoc->m_sIndex==pCurDoc->m_sIndex && pLastDoc->m_sAction==pCurDoc->m_sAction && pCurDoc->m_sAction!="update" )
- continue;
- BulkTnx_t & tTnx = dTnx.Add();
- tTnx.m_iFrom = pLastDoc - dDocs.Begin();
- tTnx.m_iCount = pCurDoc - pLastDoc;
- pLastDoc = pCurDoc;
- }
- if ( pLastDoc )
- {
- BulkTnx_t & tTnx = dTnx.Add();
- tTnx.m_iFrom = pLastDoc - dDocs.Begin();
- tTnx.m_iCount = dDocs.GetLength() - tTnx.m_iFrom;
- }
- JsonObj_c tItems ( true );
- bool bOk = ProcessTnx ( dTnx, dDocs, tItems );
- JsonObj_c tRoot;
- tRoot.AddItem ( "items", tItems );
- tRoot.AddBool ( "errors", !bOk );
- tRoot.AddInt ( "took", 1 ); // FIXME!!! add delta
- BuildReply ( tRoot.AsString(), ( bOk ? EHTTP_STATUS::_200 : EHTTP_STATUS::_409 ) );
- if ( !bOk )
- ReportLogError ( "failed to commit", HttpErrorType_e::Unknown, EHTTP_STATUS::_400, true );
- return bOk;
- }
- static void AddEsReply ( const BulkDoc_t & tDoc, JsonObj_c & tRoot )
- {
- const JsonObj_c tRefShards ( "{ \"total\": 1, \"successful\": 1, \"failed\": 0 }" );
- char sBuf[70];
- snprintf ( sBuf, sizeof(sBuf), UINT64_FMT, (uint64_t)tDoc.m_tDocid );
- const char * sActionRes = "created";
- if ( tDoc.m_sAction=="delete" )
- sActionRes = "deleted";
- else if ( tDoc.m_sAction=="update" )
- sActionRes = "updated";
- JsonObj_c tShard ( tRefShards.Clone() );
- JsonObj_c tRes;
- tRes.AddStr ( "_index", tDoc.m_sIndex.cstr() );
- tRes.AddStr ( "_type", "doc" );
- tRes.AddStr ( "_id", sBuf );
- tRes.AddInt ( "_version", 1 );
- tRes.AddStr ( "result", sActionRes );
- tRes.AddItem ( "_shards", tShard );
- tRes.AddInt ( "_seq_no", 0 );
- tRes.AddInt ( "_primary_term", 1 );
- tRes.AddInt ( "status", 201 );
- JsonObj_c tAction;
- tAction.AddItem ( tDoc.m_sAction.cstr(), tRes );
- tRoot.AddItem ( tAction );
- }
- static void AddEsError ( int iReply, const CSphString & sError, const char * sErrorType, const BulkDoc_t & tDoc, JsonObj_c & tRoot )
- {
- char sBuf[70];
- snprintf ( sBuf, sizeof(sBuf), UINT64_FMT, (uint64_t)tDoc.m_tDocid );
- JsonObj_c tErrorObj;
- tErrorObj.AddStr ( "type", sErrorType );
- tErrorObj.AddStr ( "reason", sError.cstr() );
- JsonObj_c tRes;
- tRes.AddStr ( "_index", tDoc.m_sIndex.cstr() );
- tRes.AddStr ( "_type", "doc" );
- tRes.AddStr ( "_id", sBuf );
- tRes.AddInt ( "status", 400 );
- tRes.AddItem ( "error", tErrorObj );
- JsonObj_c tAction;
- tAction.AddItem ( tDoc.m_sAction.cstr(), tRes );
- if ( iReply!=-1 )
- tRoot.ReplaceItem ( iReply, tAction );
- else
- tRoot.AddItem ( tAction );
- }
- bool HttpHandlerEsBulk_c::ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecTraits_T<BulkDoc_t> & dDocs, JsonObj_c & tItems )
- {
- bool bOk = true;
- CSphVector<std::pair<int, CSphString>> dErrors;
- for ( const BulkTnx_t & tTnx : dTnx )
- {
- const CSphString & sIdx = dDocs[tTnx.m_iFrom].m_sIndex;
- assert ( !sIdx.IsEmpty() );
- ProcessBegin ( sIdx );
- bool bUpdate = false;
- bOk &= dErrors.IsEmpty();
- dErrors.Resize ( 0 );
- for ( int i = 0; i<tTnx.m_iCount; i++ )
- {
- int iDoc = tTnx.m_iFrom + i;
- BulkDoc_t & tDoc = dDocs[iDoc];
- if ( IsEmpty ( tDoc.m_tDocLine ) )
- {
- dErrors.Add ( { iDoc, "failed to parse, document is empty" } );
- continue;
- }
- SqlStmt_t tStmt;
- tStmt.m_tQuery.m_sIndexes = tDoc.m_sIndex;
- tStmt.m_sIndex = tDoc.m_sIndex;
- tStmt.m_sStmt = tDoc.m_tDocLine.first;
- bool bParsed = ParseSourceLine ( tDoc.m_tDocLine.first, tDoc.m_sAction, tStmt, tDoc.m_tDocid, m_sError );
- if ( !bParsed )
- {
- dErrors.Add ( { iDoc, m_sError } );
- continue;
- }
- bool bAction = false;
- JsonObj_c tResult = JsonNull;
- switch ( tStmt.m_eStmt )
- {
- case STMT_INSERT:
- case STMT_REPLACE:
- bAction = ProcessInsert ( tStmt, tDoc.m_tDocid, tResult, m_sError, ResultSetFormat_e::ES );
- break;
- case STMT_UPDATE:
- tStmt.m_sEndpoint = sphHttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
- bAction = ProcessUpdate ( tDoc.m_tDocLine, tStmt, tDoc.m_tDocid, tResult, m_sError );
- bUpdate = true;
- break;
- case STMT_DELETE:
- tStmt.m_sEndpoint = sphHttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
- bAction = ProcessDelete ( tDoc.m_tDocLine, tStmt, tDoc.m_tDocid, tResult, m_sError, ResultSetFormat_e::ES );
- break;
- default:
- sphWarning ( "unknown statement \"%s\":%s", tStmt.m_sStmt, tDoc.m_tDocLine.first );
- break; // ignore statement as ES does
- }
- if ( !bAction )
- {
- JsonObj_c tError = JsonNull;
- JsonObj_c tErrorType = JsonNull;
- if ( !tResult.Empty() && tResult.HasItem( "error" ) )
- tError = tResult.GetItem ( "error" );
- if ( !tError.Empty() && tError.HasItem ( "type" ))
- tErrorType = tError.GetItem ( "type" );
- if ( !tErrorType.Empty() )
- dErrors.Add ( { iDoc, tErrorType.StrVal() } );
- else
- {
- dErrors.Add ( { iDoc, CSphString() } );
- dErrors.Last().second.SetSprintf ( "unknown statement \"%s\":%s", tStmt.m_sStmt, tDoc.m_tDocLine.first );
- }
- }
- }
- // FIXME!!! check commit of empty accum
- JsonObj_c tResult;
- DocID_t tDocId = 0;
- bool bCommited = ProcessCommitRollback ( FromStr ( sIdx ), tDocId, tResult, m_sError );
- if ( bCommited )
- {
- if ( bUpdate && !GetLastUpdated() )
- {
- assert ( tTnx.m_iCount==1 );
- const BulkDoc_t & tUpdDoc = dDocs[tTnx.m_iFrom];
- CSphString sUpdError;
- sUpdError.SetSprintf ( "[_doc][" INT64_FMT "]: document missing", tUpdDoc.m_tDocid );
- AddEsError ( -1, sUpdError, "document_missing_exception", tUpdDoc, tItems );
- } else
- {
- for ( int i=0; i<tTnx.m_iCount; i++ )
- AddEsReply ( dDocs[tTnx.m_iFrom+i], tItems );
- }
- } else
- {
- for ( int i=0; i<tTnx.m_iCount; i++ )
- {
- AddEsError ( -1, tResult.GetStrItem ( "error", m_sError, false ).StrVal(), "mapper_parsing_exception", dDocs[tTnx.m_iFrom+i], tItems );
- }
- }
- for ( const auto & tErr : dErrors )
- AddEsError ( tErr.first, tErr.second, "mapper_parsing_exception", dDocs[tErr.first], tItems );
- }
- bOk &= dErrors.IsEmpty();
- session::SetInTrans ( false );
- return bOk;
- }
- void SplitNdJson ( Str_t sBody, SplitAction_fn && fnAction )
- {
- const char * sBodyEnd = sBody.first + sBody.second;
- while ( sBody.first<sBodyEnd )
- {
- const char * sNext = sBody.first;
- // break on CR or LF
- while ( sNext<sBodyEnd && *sNext != '\r' && *sNext != '\n' )
- sNext++;
- if ( sNext==sBodyEnd )
- break;
- *(const_cast<char*>(sNext)) = '\0';
- fnAction ( sBody.first, sNext-sBody.first );
- sBody.first = sNext + 1;
- // skip new lines
- while ( sBody.first<sBodyEnd && *sBody.first == '\n' )
- sBody.first++;
- }
- }
- bool HttpSetLogVerbosity ( const CSphString & sVal )
- {
- if ( !sVal.Begins( "http_" ) )
- return false;
- bool bOn = ( sVal.Ends ( "_1" ) || sVal.Ends ( "_on" ) );
-
- if ( sVal.Begins ( "http_bad_req" ) )
- g_bLogBadHttpReq = bOn;
- else
- LOG_LEVEL_HTTP = bOn;
- return true;
- }
- void LogReplyStatus100()
- {
- HTTPINFO << "100 Continue sent";
- }
- const char * GetErrorTypeName ( HttpErrorType_e eType )
- {
- switch ( eType )
- {
- case HttpErrorType_e::Parse: return "parse_exception";
- case HttpErrorType_e::IllegalArgument: return "illegal_argument_exception";
- case HttpErrorType_e::ActionRequestValidation: return "action_request_validation_exception";
- case HttpErrorType_e::IndexNotFound: return "index_not_found_exception";
- case HttpErrorType_e::ContentParse: return "x_content_parse_exception";
- case HttpErrorType_e::VersionConflictEngine: return "version_conflict_engine_exception";
- case HttpErrorType_e::DocumentMissing: return "document_missing_exception";
- case HttpErrorType_e::ResourceAlreadyExists: return "resource_already_exists_exception";
- case HttpErrorType_e::AliasesNotFound: return "aliases_not_found_exception";
- default:
- return nullptr;;
- }
- }
|