tcp_main.c 133 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496
  1. /*
  2. * $Id$
  3. *
  4. * Copyright (C) 2001-2003 FhG Fokus
  5. *
  6. * This file is part of ser, a free SIP server.
  7. *
  8. * ser is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. * For a license to use the ser software under conditions
  14. * other than those described here, or to purchase support for this
  15. * software, please contact iptel.org by e-mail at the following addresses:
  16. * [email protected]
  17. *
  18. * ser is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program; if not, write to the Free Software
  25. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  26. */
  27. /*
  28. * History:
  29. * --------
  30. * 2002-11-29 created by andrei
  31. * 2002-12-11 added tcp_send (andrei)
  32. * 2003-01-20 locking fixes, hashtables (andrei)
  33. * 2003-02-20 s/lock_t/gen_lock_t/ to avoid a conflict on solaris (andrei)
  34. * 2003-02-25 Nagle is disabled if -DDISABLE_NAGLE (andrei)
  35. * 2003-03-29 SO_REUSEADDR before calling bind to allow
  36. * server restart, Nagle set on the (hopefuly)
  37. * correct socket (jiri)
  38. * 2003-03-31 always try to find the corresponding tcp listen socket for
  39. * a temp. socket and store in in *->bind_address: added
  40. * find_tcp_si, modified tcpconn_connect (andrei)
  41. * 2003-04-14 set sockopts to TOS low delay (andrei)
  42. * 2003-06-30 moved tcp new connect checking & handling to
  43. * handle_new_connect (andrei)
  44. * 2003-07-09 tls_close called before closing the tcp connection (andrei)
  45. * 2003-10-24 converted to the new socket_info lists (andrei)
  46. * 2003-10-27 tcp port aliases support added (andrei)
  47. * 2003-11-04 always lock before manipulating refcnt; sendchild
  48. * does not inc refcnt by itself anymore (andrei)
  49. * 2003-11-07 different unix sockets are used for fd passing
  50. * to/from readers/writers (andrei)
  51. * 2003-11-17 handle_new_connect & tcp_connect will close the
  52. * new socket if tcpconn_new return 0 (e.g. out of mem) (andrei)
  53. * 2003-11-28 tcp_blocking_write & tcp_blocking_connect added (andrei)
  54. * 2004-11-08 dropped find_tcp_si and replaced with find_si (andrei)
  55. * 2005-06-07 new tcp optimized code, supports epoll (LT), sigio + real time
  56. * signals, poll & select (andrei)
  57. * 2005-06-26 *bsd kqueue support (andrei)
  58. * 2005-07-04 solaris /dev/poll support (andrei)
  59. * 2005-07-08 tcp_max_connections, tcp_connection_lifetime, don't accept
  60. * more connections if tcp_max_connections is exceeded (andrei)
  61. * 2005-10-21 cleanup all the open connections on exit
  62. * decrement the no. of open connections on timeout too (andrei) * 2006-01-30 queue send_fd request and execute them at the end of the
  63. * poll loop (#ifdef) (andrei)
  64. * process all children requests, before attempting to send
  65. * them new stuff (fixes some deadlocks) (andrei)
  66. * 2006-02-03 timers are run only once per s (andrei)
  67. * tcp children fds can be non-blocking; send fds are queued on
  68. * EAGAIN; lots of bug fixes (andrei)
  69. * 2006-02-06 better tcp_max_connections checks, tcp_connections_no moved to
  70. * shm (andrei)
  71. * 2006-04-12 tcp_send() changed to use struct dest_info (andrei)
  72. * 2006-11-02 switched to atomic ops for refcnt, locking improvements
  73. * (andrei)
  74. * 2006-11-04 switched to raw ticks (to fix conversion errors which could
  75. * result in inf. lifetime) (andrei)
  76. * 2007-07-25 tcpconn_connect can now bind the socket on a specified
  77. * source addr/port (andrei)
  78. * 2007-07-26 tcp_send() and tcpconn_get() can now use a specified source
  79. * addr./port (andrei)
  80. * 2007-08-23 getsockname() for INADDR_ANY(SI_IS_ANY) sockets (andrei)
  81. * 2007-08-27 split init_sock_opt into a lightweight init_sock_opt_accept()
  82. * used when accepting connections and init_sock_opt used for
  83. * connect/ new sockets (andrei)
  84. * 2007-11-22 always add the connection & clear the coresponding flags before
  85. * io_watch_add-ing its fd - it's safer this way (andrei)
  86. * 2007-11-26 improved tcp timers: switched to local_timer (andrei)
  87. * 2007-11-27 added send fd cache and reader fd reuse (andrei)
  88. * 2007-11-28 added support for TCP_DEFER_ACCEPT, KEEPALIVE, KEEPINTVL,
  89. * KEEPCNT, QUICKACK, SYNCNT, LINGER2 (andrei)
  90. * 2007-12-04 support for queueing write requests (andrei)
  91. * 2007-12-12 destroy connection asap on wbuf. timeout (andrei)
  92. * 2007-12-13 changed the refcnt and destroy scheme, now refcnt is 1 if
  93. * linked into the hash tables (was 0) (andrei)
  94. * 2007-12-21 support for pending connects (connections are added to the
  95. * hash immediately and writes on them are buffered) (andrei)
  96. * 2008-02-05 handle POLLRDHUP (if supported), POLLERR and
  97. * POLLHUP (andrei)
  98. * on write error check if there's still data in the socket
  99. * read buffer and process it first (andrei)
  100. * 2009-02-26 direct blacklist support (andrei)
  101. * 2009-03-20 s/wq_timeout/send_timeout ; send_timeout is now in ticks
  102. * (andrei)
  103. * 2009-04-09 tcp ev and tcp stats macros added (andrei)
  104. * 2009-09-15 support for force connection reuse and close after send
  105. * send flags (andrei)
  106. * 2010-03-23 tcp_send() split in 3 smaller functions (andrei)
  107. */
  108. /** tcp main/dispatcher and tcp send functions.
  109. * @file tcp_main.c
  110. * @ingroup core
  111. * Module: @ref core
  112. */
  113. #ifdef USE_TCP
  114. #ifndef SHM_MEM
  115. #error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
  116. #endif
  117. #define HANDLE_IO_INLINE
  118. #include "io_wait.h" /* include first to make sure the needed features are
  119. turned on (e.g. _GNU_SOURCE for POLLRDHUP) */
  120. #include <sys/time.h>
  121. #include <sys/types.h>
  122. #include <sys/select.h>
  123. #include <sys/socket.h>
  124. #ifdef HAVE_FILIO_H
  125. #include <sys/filio.h> /* needed on solaris 2.x for FIONREAD */
  126. #elif defined __OS_solaris
  127. #define BSD_COMP /* needed on older solaris for FIONREAD */
  128. #endif /* HAVE_FILIO_H / __OS_solaris */
  129. #include <sys/ioctl.h> /* ioctl() used on write error */
  130. #include <netinet/in.h>
  131. #include <netinet/in_systm.h>
  132. #include <netinet/ip.h>
  133. #include <netinet/tcp.h>
  134. #include <sys/uio.h> /* writev*/
  135. #include <netdb.h>
  136. #include <stdlib.h> /*exit() */
  137. #include <unistd.h>
  138. #include <errno.h>
  139. #include <string.h>
  140. #ifdef HAVE_SELECT
  141. #include <sys/select.h>
  142. #endif
  143. #include <sys/poll.h>
  144. #include "ip_addr.h"
  145. #include "pass_fd.h"
  146. #include "tcp_conn.h"
  147. #include "globals.h"
  148. #include "pt.h"
  149. #include "locking.h"
  150. #include "mem/mem.h"
  151. #include "mem/shm_mem.h"
  152. #include "timer.h"
  153. #include "sr_module.h"
  154. #include "tcp_server.h"
  155. #include "tcp_init.h"
  156. #include "tcp_int_send.h"
  157. #include "tcp_stats.h"
  158. #include "tcp_ev.h"
  159. #include "tsend.h"
  160. #include "timer_ticks.h"
  161. #include "local_timer.h"
  162. #ifdef CORE_TLS
  163. #include "tls/tls_server.h"
  164. #define tls_loaded() 1
  165. #else
  166. #include "tls_hooks_init.h"
  167. #include "tls_hooks.h"
  168. #endif /* CORE_TLS*/
  169. #ifdef USE_DST_BLACKLIST
  170. #include "dst_blacklist.h"
  171. #endif /* USE_DST_BLACKLIST */
  172. #include "tcp_info.h"
  173. #include "tcp_options.h"
  174. #include "ut.h"
  175. #include "cfg/cfg_struct.h"
  176. #define local_malloc pkg_malloc
  177. #define local_free pkg_free
  178. #include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
  179. #ifdef NO_MSG_DONTWAIT
  180. #ifndef MSG_DONTWAIT
  181. /* should work inside tcp_main */
  182. #define MSG_DONTWAIT 0
  183. #endif
  184. #endif /*NO_MSG_DONTWAIT */
  185. #define TCP_PASS_NEW_CONNECTION_ON_DATA /* don't pass a new connection
  186. immediately to a child, wait for
  187. some data on it first */
  188. #define TCP_LISTEN_BACKLOG 1024
  189. #define SEND_FD_QUEUE /* queue send fd requests on EAGAIN, instead of sending
  190. them immediately */
  191. #define TCP_CHILD_NON_BLOCKING
  192. #ifdef SEND_FD_QUEUE
  193. #ifndef TCP_CHILD_NON_BLOCKING
  194. #define TCP_CHILD_NON_BLOCKING
  195. #endif
  196. #define MAX_SEND_FD_QUEUE_SIZE tcp_main_max_fd_no
  197. #define SEND_FD_QUEUE_SIZE 128 /* initial size */
  198. #define MAX_SEND_FD_RETRIES 96 /* FIXME: not used for now */
  199. #define SEND_FD_QUEUE_TIMEOUT MS_TO_TICKS(2000) /* 2 s */
  200. #endif
  201. /* minimum interval local_timer_run() is allowed to run, in ticks */
  202. #define TCPCONN_TIMEOUT_MIN_RUN 1 /* once per tick */
  203. #define TCPCONN_WAIT_TIMEOUT 1 /* 1 tick */
  204. #ifdef TCP_ASYNC
  205. static unsigned int* tcp_total_wq=0;
  206. #endif
  207. enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
  208. F_TCPCONN, F_TCPCHILD, F_PROC };
  209. #ifdef TCP_FD_CACHE
  210. #define TCP_FD_CACHE_SIZE 8
  211. struct fd_cache_entry{
  212. struct tcp_connection* con;
  213. int id;
  214. int fd;
  215. };
  216. static struct fd_cache_entry fd_cache[TCP_FD_CACHE_SIZE];
  217. #endif /* TCP_FD_CACHE */
  218. static int is_tcp_main=0;
  219. enum poll_types tcp_poll_method=0; /* by default choose the best method */
  220. int tcp_main_max_fd_no=0;
  221. int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
  222. static union sockaddr_union tcp_source_ipv4_addr; /* saved bind/srv v4 addr. */
  223. static union sockaddr_union* tcp_source_ipv4=0;
  224. #ifdef USE_IPV6
  225. static union sockaddr_union tcp_source_ipv6_addr; /* saved bind/src v6 addr. */
  226. static union sockaddr_union* tcp_source_ipv6=0;
  227. #endif
  228. static int* tcp_connections_no=0; /* current open connections */
  229. /* connection hash table (after ip&port) , includes also aliases */
  230. struct tcp_conn_alias** tcpconn_aliases_hash=0;
  231. /* connection hash table (after connection id) */
  232. struct tcp_connection** tcpconn_id_hash=0;
  233. gen_lock_t* tcpconn_lock=0;
  234. struct tcp_child* tcp_children;
  235. static int* connection_id=0; /* unique for each connection, used for
  236. quickly finding the corresponding connection
  237. for a reply */
  238. int unix_tcp_sock;
  239. static int tcp_proto_no=-1; /* tcp protocol number as returned by
  240. getprotobyname */
  241. static io_wait_h io_h;
  242. static struct local_timer tcp_main_ltimer;
  243. static ticks_t tcp_main_prev_ticks;
  244. static ticks_t tcpconn_main_timeout(ticks_t , struct timer_ln* , void* );
  245. inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
  246. struct ip_addr* l_ip, int l_port,
  247. int flags);
  248. /* sets source address used when opening new sockets and no source is specified
  249. * (by default the address is choosen by the kernel)
  250. * Should be used only on init.
  251. * returns -1 on error */
  252. int tcp_set_src_addr(struct ip_addr* ip)
  253. {
  254. switch (ip->af){
  255. case AF_INET:
  256. ip_addr2su(&tcp_source_ipv4_addr, ip, 0);
  257. tcp_source_ipv4=&tcp_source_ipv4_addr;
  258. break;
  259. #ifdef USE_IPV6
  260. case AF_INET6:
  261. ip_addr2su(&tcp_source_ipv6_addr, ip, 0);
  262. tcp_source_ipv6=&tcp_source_ipv6_addr;
  263. break;
  264. #endif
  265. default:
  266. return -1;
  267. }
  268. return 0;
  269. }
  270. static inline int init_sock_keepalive(int s)
  271. {
  272. int optval;
  273. #ifdef HAVE_SO_KEEPALIVE
  274. if (cfg_get(tcp, tcp_cfg, keepalive)){
  275. optval=1;
  276. if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval,
  277. sizeof(optval))<0){
  278. LOG(L_WARN, "WARNING: init_sock_keepalive: failed to enable"
  279. " SO_KEEPALIVE: %s\n", strerror(errno));
  280. return -1;
  281. }
  282. }
  283. #endif
  284. #ifdef HAVE_TCP_KEEPINTVL
  285. if ((optval=cfg_get(tcp, tcp_cfg, keepintvl))){
  286. if (setsockopt(s, IPPROTO_TCP, TCP_KEEPINTVL, &optval,
  287. sizeof(optval))<0){
  288. LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
  289. " keepalive probes interval: %s\n", strerror(errno));
  290. }
  291. }
  292. #endif
  293. #ifdef HAVE_TCP_KEEPIDLE
  294. if ((optval=cfg_get(tcp, tcp_cfg, keepidle))){
  295. if (setsockopt(s, IPPROTO_TCP, TCP_KEEPIDLE, &optval,
  296. sizeof(optval))<0){
  297. LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
  298. " keepalive idle interval: %s\n", strerror(errno));
  299. }
  300. }
  301. #endif
  302. #ifdef HAVE_TCP_KEEPCNT
  303. if ((optval=cfg_get(tcp, tcp_cfg, keepcnt))){
  304. if (setsockopt(s, IPPROTO_TCP, TCP_KEEPCNT, &optval,
  305. sizeof(optval))<0){
  306. LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
  307. " maximum keepalive count: %s\n", strerror(errno));
  308. }
  309. }
  310. #endif
  311. return 0;
  312. }
  313. /* set all socket/fd options for new sockets (e.g. before connect):
  314. * disable nagle, tos lowdelay, reuseaddr, non-blocking
  315. *
  316. * return -1 on error */
  317. static int init_sock_opt(int s)
  318. {
  319. int flags;
  320. int optval;
  321. #ifdef DISABLE_NAGLE
  322. flags=1;
  323. if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY,
  324. &flags, sizeof(flags))<0) ){
  325. LOG(L_WARN, "WARNING: init_sock_opt: could not disable Nagle: %s\n",
  326. strerror(errno));
  327. }
  328. #endif
  329. /* tos*/
  330. optval = tos;
  331. if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
  332. LOG(L_WARN, "WARNING: init_sock_opt: setsockopt tos: %s\n",
  333. strerror(errno));
  334. /* continue since this is not critical */
  335. }
  336. #if !defined(TCP_DONT_REUSEADDR)
  337. optval=1;
  338. if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
  339. (void*)&optval, sizeof(optval))==-1){
  340. LOG(L_ERR, "ERROR: setsockopt SO_REUSEADDR %s\n",
  341. strerror(errno));
  342. /* continue, not critical */
  343. }
  344. #endif /* !TCP_DONT_REUSEADDR */
  345. #ifdef HAVE_TCP_SYNCNT
  346. if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
  347. if (setsockopt(s, IPPROTO_TCP, TCP_SYNCNT, &optval,
  348. sizeof(optval))<0){
  349. LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
  350. " maximum SYN retr. count: %s\n", strerror(errno));
  351. }
  352. }
  353. #endif
  354. #ifdef HAVE_TCP_LINGER2
  355. if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
  356. if (setsockopt(s, IPPROTO_TCP, TCP_LINGER2, &optval,
  357. sizeof(optval))<0){
  358. LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
  359. " maximum LINGER2 timeout: %s\n", strerror(errno));
  360. }
  361. }
  362. #endif
  363. #ifdef HAVE_TCP_QUICKACK
  364. if (cfg_get(tcp, tcp_cfg, delayed_ack)){
  365. optval=0; /* reset quick ack => delayed ack */
  366. if (setsockopt(s, IPPROTO_TCP, TCP_QUICKACK, &optval,
  367. sizeof(optval))<0){
  368. LOG(L_WARN, "WARNING: init_sock_opt: failed to reset"
  369. " TCP_QUICKACK: %s\n", strerror(errno));
  370. }
  371. }
  372. #endif /* HAVE_TCP_QUICKACK */
  373. init_sock_keepalive(s);
  374. /* non-blocking */
  375. flags=fcntl(s, F_GETFL);
  376. if (flags==-1){
  377. LOG(L_ERR, "ERROR: init_sock_opt: fnctl failed: (%d) %s\n",
  378. errno, strerror(errno));
  379. goto error;
  380. }
  381. if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
  382. LOG(L_ERR, "ERROR: init_sock_opt: fcntl: set non-blocking failed:"
  383. " (%d) %s\n", errno, strerror(errno));
  384. goto error;
  385. }
  386. return 0;
  387. error:
  388. return -1;
  389. }
  390. /* set all socket/fd options for "accepted" sockets
  391. * only nonblocking is set since the rest is inherited from the
  392. * "parent" (listening) socket
  393. * Note: setting O_NONBLOCK is required on linux but it's not needed on
  394. * BSD and possibly solaris (where the flag is inherited from the
  395. * parent socket). However since there is no standard document
  396. * requiring a specific behaviour in this case it's safer to always set
  397. * it (at least for now) --andrei
  398. * TODO: check on which OSes O_NONBLOCK is inherited and make this
  399. * function a nop.
  400. *
  401. * return -1 on error */
  402. static int init_sock_opt_accept(int s)
  403. {
  404. int flags;
  405. /* non-blocking */
  406. flags=fcntl(s, F_GETFL);
  407. if (flags==-1){
  408. LOG(L_ERR, "ERROR: init_sock_opt_accept: fnctl failed: (%d) %s\n",
  409. errno, strerror(errno));
  410. goto error;
  411. }
  412. if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
  413. LOG(L_ERR, "ERROR: init_sock_opt_accept: "
  414. "fcntl: set non-blocking failed: (%d) %s\n",
  415. errno, strerror(errno));
  416. goto error;
  417. }
  418. return 0;
  419. error:
  420. return -1;
  421. }
  422. /* blocking connect on a non-blocking fd; it will timeout after
  423. * tcp_connect_timeout
  424. * if BLOCKING_USE_SELECT and HAVE_SELECT are defined it will internally
  425. * use select() instead of poll (bad if fd > FD_SET_SIZE, poll is preferred)
  426. */
  427. static int tcp_blocking_connect(int fd, int type, snd_flags_t* send_flags,
  428. const struct sockaddr *servaddr,
  429. socklen_t addrlen)
  430. {
  431. int n;
  432. #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
  433. fd_set sel_set;
  434. fd_set orig_set;
  435. struct timeval timeout;
  436. #else
  437. struct pollfd pf;
  438. #endif
  439. int elapsed;
  440. int to;
  441. int ticks;
  442. int err;
  443. unsigned int err_len;
  444. int poll_err;
  445. poll_err=0;
  446. to=cfg_get(tcp, tcp_cfg, connect_timeout_s);
  447. ticks=get_ticks();
  448. again:
  449. n=connect(fd, servaddr, addrlen);
  450. if (n==-1){
  451. if (errno==EINTR){
  452. elapsed=(get_ticks()-ticks)*TIMER_TICK;
  453. if (elapsed<to) goto again;
  454. else goto error_timeout;
  455. }
  456. if (errno!=EINPROGRESS && errno!=EALREADY){
  457. goto error_errno;
  458. }
  459. }else goto end;
  460. /* poll/select loop */
  461. #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
  462. FD_ZERO(&orig_set);
  463. FD_SET(fd, &orig_set);
  464. #else
  465. pf.fd=fd;
  466. pf.events=POLLOUT;
  467. #endif
  468. while(1){
  469. elapsed=(get_ticks()-ticks)*TIMER_TICK;
  470. if (elapsed>=to)
  471. goto error_timeout;
  472. #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
  473. sel_set=orig_set;
  474. timeout.tv_sec=to-elapsed;
  475. timeout.tv_usec=0;
  476. n=select(fd+1, 0, &sel_set, 0, &timeout);
  477. #else
  478. n=poll(&pf, 1, (to-elapsed)*1000);
  479. #endif
  480. if (n<0){
  481. if (errno==EINTR) continue;
  482. LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll/select failed:"
  483. " (%d) %s\n",
  484. su2a((union sockaddr_union*)servaddr, addrlen),
  485. errno, strerror(errno));
  486. goto error;
  487. }else if (n==0) /* timeout */ continue;
  488. #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
  489. if (FD_ISSET(fd, &sel_set))
  490. #else
  491. if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){
  492. LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll error: "
  493. "flags %x\n",
  494. su2a((union sockaddr_union*)servaddr, addrlen),
  495. pf.revents);
  496. poll_err=1;
  497. }
  498. #endif
  499. {
  500. err_len=sizeof(err);
  501. getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len);
  502. if ((err==0) && (poll_err==0)) goto end;
  503. if (err!=EINPROGRESS && err!=EALREADY){
  504. LOG(L_ERR, "ERROR: tcp_blocking_connect %s: SO_ERROR (%d) "
  505. "%s\n",
  506. su2a((union sockaddr_union*)servaddr, addrlen),
  507. err, strerror(err));
  508. errno=err;
  509. goto error_errno;
  510. }
  511. }
  512. }
  513. error_errno:
  514. switch(errno){
  515. case ENETUNREACH:
  516. case EHOSTUNREACH:
  517. #ifdef USE_DST_BLACKLIST
  518. dst_blacklist_su(BLST_ERR_CONNECT, type,
  519. (union sockaddr_union*)servaddr, send_flags, 0);
  520. #endif /* USE_DST_BLACKLIST */
  521. TCP_EV_CONNECT_UNREACHABLE(errno, 0, 0,
  522. (union sockaddr_union*)servaddr, type);
  523. break;
  524. case ETIMEDOUT:
  525. #ifdef USE_DST_BLACKLIST
  526. dst_blacklist_su(BLST_ERR_CONNECT, type,
  527. (union sockaddr_union*)servaddr, send_flags, 0);
  528. #endif /* USE_DST_BLACKLIST */
  529. TCP_EV_CONNECT_TIMEOUT(errno, 0, 0,
  530. (union sockaddr_union*)servaddr, type);
  531. break;
  532. case ECONNREFUSED:
  533. case ECONNRESET:
  534. #ifdef USE_DST_BLACKLIST
  535. dst_blacklist_su(BLST_ERR_CONNECT, type,
  536. (union sockaddr_union*)servaddr, send_flags, 0);
  537. #endif /* USE_DST_BLACKLIST */
  538. TCP_EV_CONNECT_RST(errno, 0, 0,
  539. (union sockaddr_union*)servaddr, type);
  540. break;
  541. case EAGAIN: /* not posix, but supported on linux and bsd */
  542. TCP_EV_CONNECT_NO_MORE_PORTS(errno, 0, 0,
  543. (union sockaddr_union*)servaddr, type);
  544. break;
  545. default:
  546. TCP_EV_CONNECT_ERR(errno, 0, 0,
  547. (union sockaddr_union*)servaddr, type);
  548. }
  549. LOG(L_ERR, "ERROR: tcp_blocking_connect %s: (%d) %s\n",
  550. su2a((union sockaddr_union*)servaddr, addrlen),
  551. errno, strerror(errno));
  552. goto error;
  553. error_timeout:
  554. /* timeout */
  555. #ifdef USE_DST_BLACKLIST
  556. dst_blacklist_su(BLST_ERR_CONNECT, type,
  557. (union sockaddr_union*)servaddr, send_flags, 0);
  558. #endif /* USE_DST_BLACKLIST */
  559. TCP_EV_CONNECT_TIMEOUT(0, 0, 0, (union sockaddr_union*)servaddr, type);
  560. LOG(L_ERR, "ERROR: tcp_blocking_connect %s: timeout %d s elapsed "
  561. "from %d s\n", su2a((union sockaddr_union*)servaddr, addrlen),
  562. elapsed, cfg_get(tcp, tcp_cfg, connect_timeout_s));
  563. error:
  564. TCP_STATS_CONNECT_FAILED();
  565. return -1;
  566. end:
  567. return 0;
  568. }
  569. #ifdef TCP_ASYNC
  570. /* unsafe version */
  571. #define _wbufq_empty(con) ((con)->wbuf_q.first==0)
  572. /* unsafe version */
  573. #define _wbufq_non_empty(con) ((con)->wbuf_q.first!=0)
  574. /* unsafe version, call while holding the connection write lock */
  575. inline static int _wbufq_add(struct tcp_connection* c, char* data,
  576. unsigned int size)
  577. {
  578. struct tcp_wbuffer_queue* q;
  579. struct tcp_wbuffer* wb;
  580. unsigned int last_free;
  581. unsigned int wb_size;
  582. unsigned int crt_size;
  583. ticks_t t;
  584. q=&c->wbuf_q;
  585. t=get_ticks_raw();
  586. if (unlikely( ((q->queued+size)>cfg_get(tcp, tcp_cfg, tcpconn_wq_max)) ||
  587. ((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max)) ||
  588. (q->first &&
  589. TICKS_LT(q->wr_timeout, t)) )){
  590. LOG(L_ERR, "ERROR: wbufq_add(%d bytes): write queue full or timeout "
  591. " (%d, total %d, last write %d s ago)\n",
  592. size, q->queued, *tcp_total_wq,
  593. TICKS_TO_S(t-q->wr_timeout-
  594. cfg_get(tcp, tcp_cfg, send_timeout)));
  595. if (q->first && TICKS_LT(q->wr_timeout, t)){
  596. if (unlikely(c->state==S_CONN_CONNECT)){
  597. #ifdef USE_DST_BLACKLIST
  598. dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
  599. &c->rcv.src_su, &c->send_flags, 0);
  600. #endif /* USE_DST_BLACKLIST */
  601. TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(c), TCP_LPORT(c),
  602. TCP_PSU(c), TCP_PROTO(c));
  603. TCP_STATS_CONNECT_FAILED();
  604. }else{
  605. #ifdef USE_DST_BLACKLIST
  606. dst_blacklist_su( BLST_ERR_SEND, c->rcv.proto,
  607. &c->rcv.src_su, &c->send_flags, 0);
  608. #endif /* USE_DST_BLACKLIST */
  609. TCP_EV_SEND_TIMEOUT(0, &c->rcv);
  610. TCP_STATS_SEND_TIMEOUT();
  611. }
  612. }else{
  613. /* if it's not a timeout => queue full */
  614. TCP_EV_SENDQ_FULL(0, &c->rcv);
  615. TCP_STATS_SENDQ_FULL();
  616. }
  617. goto error;
  618. }
  619. if (unlikely(q->last==0)){
  620. wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
  621. wb=shm_malloc(sizeof(*wb)+wb_size-1);
  622. if (unlikely(wb==0))
  623. goto error;
  624. wb->b_size=wb_size;
  625. wb->next=0;
  626. q->last=wb;
  627. q->first=wb;
  628. q->last_used=0;
  629. q->offset=0;
  630. q->wr_timeout=get_ticks_raw()+
  631. ((c->state==S_CONN_CONNECT)?
  632. S_TO_TICKS(cfg_get(tcp, tcp_cfg, connect_timeout_s)):
  633. cfg_get(tcp, tcp_cfg, send_timeout));
  634. }else{
  635. wb=q->last;
  636. }
  637. while(size){
  638. last_free=wb->b_size-q->last_used;
  639. if (last_free==0){
  640. wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
  641. wb=shm_malloc(sizeof(*wb)+wb_size-1);
  642. if (unlikely(wb==0))
  643. goto error;
  644. wb->b_size=wb_size;
  645. wb->next=0;
  646. q->last->next=wb;
  647. q->last=wb;
  648. q->last_used=0;
  649. last_free=wb->b_size;
  650. }
  651. crt_size=MIN_unsigned(last_free, size);
  652. memcpy(wb->buf+q->last_used, data, crt_size);
  653. q->last_used+=crt_size;
  654. size-=crt_size;
  655. data+=crt_size;
  656. q->queued+=crt_size;
  657. atomic_add_int((int*)tcp_total_wq, crt_size);
  658. }
  659. return 0;
  660. error:
  661. return -1;
  662. }
  663. /* unsafe version, call while holding the connection write lock
  664. * inserts data at the beginning, it ignores the max queue size checks and
  665. * the timeout (use sparingly)
  666. * Note: it should never be called on a write buffer after wbufq_run() */
  667. inline static int _wbufq_insert(struct tcp_connection* c, char* data,
  668. unsigned int size)
  669. {
  670. struct tcp_wbuffer_queue* q;
  671. struct tcp_wbuffer* wb;
  672. q=&c->wbuf_q;
  673. if (likely(q->first==0)) /* if empty, use wbufq_add */
  674. return _wbufq_add(c, data, size);
  675. if (unlikely((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max))){
  676. LOG(L_ERR, "ERROR: wbufq_insert(%d bytes): write queue full"
  677. " (%d, total %d, last write %d s ago)\n",
  678. size, q->queued, *tcp_total_wq,
  679. TICKS_TO_S(get_ticks_raw()-q->wr_timeout-
  680. cfg_get(tcp, tcp_cfg, send_timeout)));
  681. goto error;
  682. }
  683. if (unlikely(q->offset)){
  684. LOG(L_CRIT, "BUG: wbufq_insert: non-null offset %d (bad call, should"
  685. "never be called after the wbufq_run())\n", q->offset);
  686. goto error;
  687. }
  688. if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
  689. /* one block with enough space in it for size bytes */
  690. memmove(q->first->buf+size, q->first->buf, size);
  691. memcpy(q->first->buf, data, size);
  692. q->last_used+=size;
  693. }else{
  694. /* create a size bytes block directly */
  695. wb=shm_malloc(sizeof(*wb)+size-1);
  696. if (unlikely(wb==0))
  697. goto error;
  698. wb->b_size=size;
  699. /* insert it */
  700. wb->next=q->first;
  701. q->first=wb;
  702. memcpy(wb->buf, data, size);
  703. }
  704. q->queued+=size;
  705. atomic_add_int((int*)tcp_total_wq, size);
  706. return 0;
  707. error:
  708. return -1;
  709. }
  710. /* unsafe version, call while holding the connection write lock */
  711. inline static void _wbufq_destroy( struct tcp_wbuffer_queue* q)
  712. {
  713. struct tcp_wbuffer* wb;
  714. struct tcp_wbuffer* next_wb;
  715. int unqueued;
  716. unqueued=0;
  717. if (likely(q->first)){
  718. wb=q->first;
  719. do{
  720. next_wb=wb->next;
  721. unqueued+=(wb==q->last)?q->last_used:wb->b_size;
  722. if (wb==q->first)
  723. unqueued-=q->offset;
  724. shm_free(wb);
  725. wb=next_wb;
  726. }while(wb);
  727. }
  728. memset(q, 0, sizeof(*q));
  729. atomic_add_int((int*)tcp_total_wq, -unqueued);
  730. }
  731. /* tries to empty the queue (safe version, c->write_lock must not be hold)
  732. * returns -1 on error, bytes written on success (>=0)
  733. * if the whole queue is emptied => sets *empty*/
  734. inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
  735. {
  736. struct tcp_wbuffer_queue* q;
  737. struct tcp_wbuffer* wb;
  738. int n;
  739. int ret;
  740. int block_size;
  741. char* buf;
  742. *empty=0;
  743. ret=0;
  744. lock_get(&c->write_lock);
  745. q=&c->wbuf_q;
  746. while(q->first){
  747. block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
  748. q->offset;
  749. buf=q->first->buf+q->offset;
  750. n=_tcpconn_write_nb(fd, c, buf, block_size);
  751. if (likely(n>0)){
  752. ret+=n;
  753. if (likely(n==block_size)){
  754. wb=q->first;
  755. q->first=q->first->next;
  756. shm_free(wb);
  757. q->offset=0;
  758. q->queued-=block_size;
  759. atomic_add_int((int*)tcp_total_wq, -block_size);
  760. }else{
  761. q->offset+=n;
  762. q->queued-=n;
  763. atomic_add_int((int*)tcp_total_wq, -n);
  764. break;
  765. }
  766. }else{
  767. if (n<0){
  768. /* EINTR is handled inside _tcpconn_write_nb */
  769. if (!(errno==EAGAIN || errno==EWOULDBLOCK)){
  770. if (unlikely(c->state==S_CONN_CONNECT)){
  771. switch(errno){
  772. case ENETUNREACH:
  773. case EHOSTUNREACH: /* not posix for send() */
  774. #ifdef USE_DST_BLACKLIST
  775. dst_blacklist_su(BLST_ERR_CONNECT,
  776. c->rcv.proto,
  777. &c->rcv.src_su,
  778. &c->send_flags, 0);
  779. #endif /* USE_DST_BLACKLIST */
  780. TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
  781. TCP_LPORT(c), TCP_PSU(c),
  782. TCP_PROTO(c));
  783. break;
  784. case ECONNREFUSED:
  785. case ECONNRESET:
  786. #ifdef USE_DST_BLACKLIST
  787. dst_blacklist_su(BLST_ERR_CONNECT,
  788. c->rcv.proto,
  789. &c->rcv.src_su,
  790. &c->send_flags, 0);
  791. #endif /* USE_DST_BLACKLIST */
  792. TCP_EV_CONNECT_RST(0, TCP_LADDR(c),
  793. TCP_LPORT(c), TCP_PSU(c),
  794. TCP_PROTO(c));
  795. break;
  796. default:
  797. TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
  798. TCP_LPORT(c), TCP_PSU(c),
  799. TCP_PROTO(c));
  800. }
  801. TCP_STATS_CONNECT_FAILED();
  802. }else{
  803. switch(errno){
  804. case ECONNREFUSED:
  805. case ECONNRESET:
  806. TCP_STATS_CON_RESET();
  807. /* no break */
  808. case ENETUNREACH:
  809. case EHOSTUNREACH: /* not posix for send() */
  810. #ifdef USE_DST_BLACKLIST
  811. dst_blacklist_su(BLST_ERR_SEND,
  812. c->rcv.proto,
  813. &c->rcv.src_su,
  814. &c->send_flags, 0);
  815. #endif /* USE_DST_BLACKLIST */
  816. break;
  817. }
  818. }
  819. ret=-1;
  820. LOG(L_ERR, "ERROR: wbuf_runq: %s [%d]\n",
  821. strerror(errno), errno);
  822. }
  823. }
  824. break;
  825. }
  826. }
  827. if (likely(q->first==0)){
  828. q->last=0;
  829. q->last_used=0;
  830. q->offset=0;
  831. *empty=1;
  832. }
  833. lock_release(&c->write_lock);
  834. if (likely(ret>0)){
  835. q->wr_timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, send_timeout);
  836. if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
  837. TCP_STATS_ESTABLISHED(c->state);
  838. c->state=S_CONN_OK;
  839. }
  840. }
  841. return ret;
  842. }
  843. #endif /* TCP_ASYNC */
  844. #if 0
  845. /* blocking write even on non-blocking sockets
  846. * if TCP_TIMEOUT will return with error */
  847. static int tcp_blocking_write(struct tcp_connection* c, int fd, char* buf,
  848. unsigned int len)
  849. {
  850. int n;
  851. fd_set sel_set;
  852. struct timeval timeout;
  853. int ticks;
  854. int initial_len;
  855. initial_len=len;
  856. again:
  857. n=send(fd, buf, len,
  858. #ifdef HAVE_MSG_NOSIGNAL
  859. MSG_NOSIGNAL
  860. #else
  861. 0
  862. #endif
  863. );
  864. if (n<0){
  865. if (errno==EINTR) goto again;
  866. else if (errno!=EAGAIN && errno!=EWOULDBLOCK){
  867. LOG(L_ERR, "tcp_blocking_write: failed to send: (%d) %s\n",
  868. errno, strerror(errno));
  869. TCP_EV_SEND_TIMEOUT(errno, &c->rcv);
  870. TCP_STATS_SEND_TIMEOUT();
  871. goto error;
  872. }
  873. }else if (n<len){
  874. /* partial write */
  875. buf+=n;
  876. len-=n;
  877. }else{
  878. /* success: full write */
  879. goto end;
  880. }
  881. while(1){
  882. FD_ZERO(&sel_set);
  883. FD_SET(fd, &sel_set);
  884. timeout.tv_sec=tcp_send_timeout;
  885. timeout.tv_usec=0;
  886. ticks=get_ticks();
  887. n=select(fd+1, 0, &sel_set, 0, &timeout);
  888. if (n<0){
  889. if (errno==EINTR) continue; /* signal, ignore */
  890. LOG(L_ERR, "ERROR: tcp_blocking_write: select failed: "
  891. " (%d) %s\n", errno, strerror(errno));
  892. goto error;
  893. }else if (n==0){
  894. /* timeout */
  895. if (get_ticks()-ticks>=tcp_send_timeout){
  896. LOG(L_ERR, "ERROR: tcp_blocking_write: send timeout (%d)\n",
  897. tcp_send_timeout);
  898. goto error;
  899. }
  900. continue;
  901. }
  902. if (FD_ISSET(fd, &sel_set)){
  903. /* we can write again */
  904. goto again;
  905. }
  906. }
  907. error:
  908. return -1;
  909. end:
  910. return initial_len;
  911. }
  912. #endif
  913. struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
  914. union sockaddr_union* local_addr,
  915. struct socket_info* ba, int type,
  916. int state)
  917. {
  918. struct tcp_connection *c;
  919. int rd_b_size;
  920. rd_b_size=cfg_get(tcp, tcp_cfg, rd_buf_size);
  921. c=shm_malloc(sizeof(struct tcp_connection) + rd_b_size);
  922. if (c==0){
  923. LOG(L_ERR, "ERROR: tcpconn_new: mem. allocation failure\n");
  924. goto error;
  925. }
  926. memset(c, 0, sizeof(struct tcp_connection)); /* zero init (skip rd buf)*/
  927. c->s=sock;
  928. c->fd=-1; /* not initialized */
  929. if (lock_init(&c->write_lock)==0){
  930. LOG(L_ERR, "ERROR: tcpconn_new: init lock failed\n");
  931. goto error;
  932. }
  933. c->rcv.src_su=*su;
  934. atomic_set(&c->refcnt, 0);
  935. local_timer_init(&c->timer, tcpconn_main_timeout, c, 0);
  936. su2ip_addr(&c->rcv.src_ip, su);
  937. c->rcv.src_port=su_getport(su);
  938. c->rcv.bind_address=ba;
  939. if (likely(local_addr)){
  940. su2ip_addr(&c->rcv.dst_ip, local_addr);
  941. c->rcv.dst_port=su_getport(local_addr);
  942. }else if (ba){
  943. c->rcv.dst_ip=ba->address;
  944. c->rcv.dst_port=ba->port_no;
  945. }
  946. print_ip("tcpconn_new: new tcp connection: ", &c->rcv.src_ip, "\n");
  947. DBG( "tcpconn_new: on port %d, type %d\n", c->rcv.src_port, type);
  948. init_tcp_req(&c->req, (char*)c+sizeof(struct tcp_connection), rd_b_size);
  949. c->id=(*connection_id)++;
  950. c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
  951. c->rcv.proto_reserved2=0;
  952. c->state=state;
  953. c->extra_data=0;
  954. #ifdef USE_TLS
  955. if (type==PROTO_TLS){
  956. if (tls_tcpconn_init(c, sock)==-1) goto error;
  957. }else
  958. #endif /* USE_TLS*/
  959. {
  960. c->type=PROTO_TCP;
  961. c->rcv.proto=PROTO_TCP;
  962. c->timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, con_lifetime);
  963. }
  964. return c;
  965. error:
  966. if (c) shm_free(c);
  967. return 0;
  968. }
  969. /* do the actual connect, set sock. options a.s.o
  970. * returns socket on success, -1 on error
  971. * sets also *res_local_addr, res_si and state (S_CONN_CONNECT for an
  972. * unfinished connect and S_CONN_OK for a finished one)*/
  973. inline static int tcp_do_connect( union sockaddr_union* server,
  974. union sockaddr_union* from,
  975. int type,
  976. snd_flags_t* send_flags,
  977. union sockaddr_union* res_local_addr,
  978. struct socket_info** res_si,
  979. enum tcp_conn_states *state
  980. )
  981. {
  982. int s;
  983. union sockaddr_union my_name;
  984. socklen_t my_name_len;
  985. struct ip_addr ip;
  986. #ifdef TCP_ASYNC
  987. int n;
  988. #endif /* TCP_ASYNC */
  989. s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
  990. if (unlikely(s==-1)){
  991. LOG(L_ERR, "ERROR: tcp_do_connect %s: socket: (%d) %s\n",
  992. su2a(server, sizeof(*server)), errno, strerror(errno));
  993. goto error;
  994. }
  995. if (init_sock_opt(s)<0){
  996. LOG(L_ERR, "ERROR: tcp_do_connect %s: init_sock_opt failed\n",
  997. su2a(server, sizeof(*server)));
  998. goto error;
  999. }
  1000. if (unlikely(from && bind(s, &from->s, sockaddru_len(*from)) != 0)){
  1001. LOG(L_WARN, "WARNING: tcp_do_connect: binding to source address"
  1002. " %s failed: %s [%d]\n", su2a(from, sizeof(*from)),
  1003. strerror(errno), errno);
  1004. }
  1005. *state=S_CONN_OK;
  1006. #ifdef TCP_ASYNC
  1007. if (likely(cfg_get(tcp, tcp_cfg, async))){
  1008. again:
  1009. n=connect(s, &server->s, sockaddru_len(*server));
  1010. if (likely(n==-1)){ /*non-blocking => most probable EINPROGRESS*/
  1011. if (likely(errno==EINPROGRESS))
  1012. *state=S_CONN_CONNECT;
  1013. else if (errno==EINTR) goto again;
  1014. else if (errno!=EALREADY){
  1015. switch(errno){
  1016. case ENETUNREACH:
  1017. case EHOSTUNREACH:
  1018. #ifdef USE_DST_BLACKLIST
  1019. dst_blacklist_su(BLST_ERR_CONNECT, type, server,
  1020. send_flags, 0);
  1021. #endif /* USE_DST_BLACKLIST */
  1022. TCP_EV_CONNECT_UNREACHABLE(errno, 0, 0, server, type);
  1023. break;
  1024. case ETIMEDOUT:
  1025. #ifdef USE_DST_BLACKLIST
  1026. dst_blacklist_su(BLST_ERR_CONNECT, type, server,
  1027. send_flags, 0);
  1028. #endif /* USE_DST_BLACKLIST */
  1029. TCP_EV_CONNECT_TIMEOUT(errno, 0, 0, server, type);
  1030. break;
  1031. case ECONNREFUSED:
  1032. case ECONNRESET:
  1033. #ifdef USE_DST_BLACKLIST
  1034. dst_blacklist_su(BLST_ERR_CONNECT, type, server,
  1035. send_flags, 0);
  1036. #endif /* USE_DST_BLACKLIST */
  1037. TCP_EV_CONNECT_RST(errno, 0, 0, server, type);
  1038. break;
  1039. case EAGAIN:/* not posix, but supported on linux and bsd */
  1040. TCP_EV_CONNECT_NO_MORE_PORTS(errno, 0, 0, server,type);
  1041. break;
  1042. default:
  1043. TCP_EV_CONNECT_ERR(errno, 0, 0, server, type);
  1044. }
  1045. TCP_STATS_CONNECT_FAILED();
  1046. LOG(L_ERR, "ERROR: tcp_do_connect: connect %s: (%d) %s\n",
  1047. su2a(server, sizeof(*server)),
  1048. errno, strerror(errno));
  1049. goto error;
  1050. }
  1051. }
  1052. }else{
  1053. #endif /* TCP_ASYNC */
  1054. if (tcp_blocking_connect(s, type, send_flags, &server->s,
  1055. sockaddru_len(*server))<0){
  1056. LOG(L_ERR, "ERROR: tcp_do_connect: tcp_blocking_connect %s"
  1057. " failed\n", su2a(server, sizeof(*server)));
  1058. goto error;
  1059. }
  1060. #ifdef TCP_ASYNC
  1061. }
  1062. #endif /* TCP_ASYNC */
  1063. if (from){
  1064. su2ip_addr(&ip, from);
  1065. if (!ip_addr_any(&ip))
  1066. /* we already know the source ip, skip the sys. call */
  1067. goto find_socket;
  1068. }
  1069. my_name_len=sizeof(my_name);
  1070. if (unlikely(getsockname(s, &my_name.s, &my_name_len)!=0)){
  1071. LOG(L_ERR, "ERROR: tcp_do_connect: getsockname failed: %s(%d)\n",
  1072. strerror(errno), errno);
  1073. *res_si=0;
  1074. goto error;
  1075. }
  1076. from=&my_name; /* update from with the real "from" address */
  1077. su2ip_addr(&ip, &my_name);
  1078. find_socket:
  1079. #ifdef USE_TLS
  1080. if (unlikely(type==PROTO_TLS))
  1081. *res_si=find_si(&ip, 0, PROTO_TLS);
  1082. else
  1083. #endif
  1084. *res_si=find_si(&ip, 0, PROTO_TCP);
  1085. if (unlikely(*res_si==0)){
  1086. LOG(L_WARN, "WARNING: tcp_do_connect %s: could not find corresponding"
  1087. " listening socket for %s, using default...\n",
  1088. su2a(server, sizeof(*server)), ip_addr2a(&ip));
  1089. if (server->s.sa_family==AF_INET) *res_si=sendipv4_tcp;
  1090. #ifdef USE_IPV6
  1091. else *res_si=sendipv6_tcp;
  1092. #endif
  1093. }
  1094. *res_local_addr=*from;
  1095. return s;
  1096. error:
  1097. if (s!=-1) close(s);
  1098. return -1;
  1099. }
  1100. struct tcp_connection* tcpconn_connect( union sockaddr_union* server,
  1101. union sockaddr_union* from,
  1102. int type, snd_flags_t* send_flags)
  1103. {
  1104. int s;
  1105. struct socket_info* si;
  1106. union sockaddr_union my_name;
  1107. struct tcp_connection* con;
  1108. enum tcp_conn_states state;
  1109. s=-1;
  1110. if (*tcp_connections_no >= cfg_get(tcp, tcp_cfg, max_connections)){
  1111. LOG(L_ERR, "ERROR: tcpconn_connect: maximum number of connections"
  1112. " exceeded (%d/%d)\n",
  1113. *tcp_connections_no,
  1114. cfg_get(tcp, tcp_cfg, max_connections));
  1115. goto error;
  1116. }
  1117. s=tcp_do_connect(server, from, type, send_flags, &my_name, &si, &state);
  1118. if (s==-1){
  1119. LOG(L_ERR, "ERROR: tcp_do_connect %s: failed (%d) %s\n",
  1120. su2a(server, sizeof(*server)), errno, strerror(errno));
  1121. goto error;
  1122. }
  1123. con=tcpconn_new(s, server, &my_name, si, type, state);
  1124. if (con==0){
  1125. LOG(L_ERR, "ERROR: tcp_connect %s: tcpconn_new failed, closing the "
  1126. " socket\n", su2a(server, sizeof(*server)));
  1127. goto error;
  1128. }
  1129. tcpconn_set_send_flags(con, *send_flags);
  1130. return con;
  1131. /*FIXME: set sock idx! */
  1132. error:
  1133. if (s!=-1) close(s); /* close the opened socket */
  1134. return 0;
  1135. }
  1136. #ifdef TCP_CONNECT_WAIT
  1137. int tcpconn_finish_connect( struct tcp_connection* c,
  1138. union sockaddr_union* from)
  1139. {
  1140. int s;
  1141. int r;
  1142. union sockaddr_union local_addr;
  1143. struct socket_info* si;
  1144. enum tcp_conn_states state;
  1145. struct tcp_conn_alias* a;
  1146. int new_conn_alias_flags;
  1147. s=tcp_do_connect(&c->rcv.src_su, from, c->type, &c->send_flags,
  1148. &local_addr, &si, &state);
  1149. if (unlikely(s==-1)){
  1150. LOG(L_ERR, "ERROR: tcpconn_finish_connect %s: tcp_do_connect for %p"
  1151. " failed\n", su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
  1152. c);
  1153. return -1;
  1154. }
  1155. c->rcv.bind_address=si;
  1156. su2ip_addr(&c->rcv.dst_ip, &local_addr);
  1157. c->rcv.dst_port=su_getport(&local_addr);
  1158. /* update aliases if needed */
  1159. if (likely(from==0)){
  1160. new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
  1161. /* add aliases */
  1162. TCPCONN_LOCK;
  1163. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
  1164. new_conn_alias_flags);
  1165. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
  1166. c->rcv.dst_port, new_conn_alias_flags);
  1167. TCPCONN_UNLOCK;
  1168. }else if (su_cmp(from, &local_addr)!=1){
  1169. new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
  1170. TCPCONN_LOCK;
  1171. /* remove all the aliases except the first one and re-add them
  1172. * (there shouldn't be more then the 3 default aliases at this
  1173. * stage) */
  1174. for (r=1; r<c->aliases; r++){
  1175. a=&c->con_aliases[r];
  1176. tcpconn_listrm(tcpconn_aliases_hash[a->hash], a, next, prev);
  1177. }
  1178. c->aliases=1;
  1179. /* add the local_ip:0 and local_ip:local_port aliases */
  1180. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
  1181. 0, new_conn_alias_flags);
  1182. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
  1183. c->rcv.dst_port, new_conn_alias_flags);
  1184. TCPCONN_UNLOCK;
  1185. }
  1186. return s;
  1187. }
  1188. #endif /* TCP_CONNECT_WAIT */
  1189. /* adds a tcp connection to the tcpconn hashes
  1190. * Note: it's called _only_ from the tcp_main process */
  1191. inline static struct tcp_connection* tcpconn_add(struct tcp_connection *c)
  1192. {
  1193. struct ip_addr zero_ip;
  1194. int new_conn_alias_flags;
  1195. if (likely(c)){
  1196. ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
  1197. c->id_hash=tcp_id_hash(c->id);
  1198. c->aliases=0;
  1199. new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
  1200. TCPCONN_LOCK;
  1201. c->flags|=F_CONN_HASHED;
  1202. /* add it at the begining of the list*/
  1203. tcpconn_listadd(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
  1204. /* set the aliases */
  1205. /* first alias is for (peer_ip, peer_port, 0 ,0) -- for finding
  1206. * any connection to peer_ip, peer_port
  1207. * the second alias is for (peer_ip, peer_port, local_addr, 0) -- for
  1208. * finding any conenction to peer_ip, peer_port from local_addr
  1209. * the third alias is for (peer_ip, peer_port, local_addr, local_port)
  1210. * -- for finding if a fully specified connection exists */
  1211. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &zero_ip, 0,
  1212. new_conn_alias_flags);
  1213. if (likely(c->rcv.dst_ip.af && ! ip_addr_any(&c->rcv.dst_ip))){
  1214. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
  1215. new_conn_alias_flags);
  1216. _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
  1217. c->rcv.dst_port, new_conn_alias_flags);
  1218. }
  1219. /* ignore add_alias errors, there are some valid cases when one
  1220. * of the add_alias would fail (e.g. first add_alias for 2 connections
  1221. * with the same destination but different src. ip*/
  1222. TCPCONN_UNLOCK;
  1223. DBG("tcpconn_add: hashes: %d:%d:%d, %d\n",
  1224. c->con_aliases[0].hash,
  1225. c->con_aliases[1].hash,
  1226. c->con_aliases[2].hash,
  1227. c->id_hash);
  1228. return c;
  1229. }else{
  1230. LOG(L_CRIT, "tcpconn_add: BUG: null connection pointer\n");
  1231. return 0;
  1232. }
  1233. }
  1234. static inline void _tcpconn_detach(struct tcp_connection *c)
  1235. {
  1236. int r;
  1237. tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
  1238. /* remove all the aliases */
  1239. for (r=0; r<c->aliases; r++)
  1240. tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash],
  1241. &c->con_aliases[r], next, prev);
  1242. }
  1243. static inline void _tcpconn_free(struct tcp_connection* c)
  1244. {
  1245. #ifdef TCP_ASYNC
  1246. if (unlikely(_wbufq_non_empty(c)))
  1247. _wbufq_destroy(&c->wbuf_q);
  1248. #endif
  1249. lock_destroy(&c->write_lock);
  1250. #ifdef USE_TLS
  1251. if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
  1252. #endif
  1253. shm_free(c);
  1254. }
  1255. /* unsafe tcpconn_rm version (nolocks) */
  1256. void _tcpconn_rm(struct tcp_connection* c)
  1257. {
  1258. _tcpconn_detach(c);
  1259. _tcpconn_free(c);
  1260. }
  1261. void tcpconn_rm(struct tcp_connection* c)
  1262. {
  1263. int r;
  1264. TCPCONN_LOCK;
  1265. tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
  1266. /* remove all the aliases */
  1267. for (r=0; r<c->aliases; r++)
  1268. tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash],
  1269. &c->con_aliases[r], next, prev);
  1270. TCPCONN_UNLOCK;
  1271. lock_destroy(&c->write_lock);
  1272. #ifdef USE_TLS
  1273. if ((c->type==PROTO_TLS)&&(c->extra_data)) tls_tcpconn_clean(c);
  1274. #endif
  1275. shm_free(c);
  1276. }
  1277. /* finds a connection, if id=0 uses the ip addr, port, local_ip and local port
  1278. * (host byte order) and tries to find the connection that matches all of
  1279. * them. Wild cards can be used for local_ip and local_port (a 0 filled
  1280. * ip address and/or a 0 local port).
  1281. * WARNING: unprotected (locks) use tcpconn_get unless you really
  1282. * know what you are doing */
  1283. struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port,
  1284. struct ip_addr* l_ip, int l_port)
  1285. {
  1286. struct tcp_connection *c;
  1287. struct tcp_conn_alias* a;
  1288. unsigned hash;
  1289. int is_local_ip_any;
  1290. #ifdef EXTRA_DEBUG
  1291. DBG("tcpconn_find: %d port %d\n",id, port);
  1292. if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
  1293. #endif
  1294. if (likely(id)){
  1295. hash=tcp_id_hash(id);
  1296. for (c=tcpconn_id_hash[hash]; c; c=c->id_next){
  1297. #ifdef EXTRA_DEBUG
  1298. DBG("c=%p, c->id=%d, port=%d\n",c, c->id, c->rcv.src_port);
  1299. print_ip("ip=", &c->rcv.src_ip, "\n");
  1300. #endif
  1301. if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c;
  1302. }
  1303. }else if (likely(ip)){
  1304. hash=tcp_addr_hash(ip, port, l_ip, l_port);
  1305. is_local_ip_any=ip_addr_any(l_ip);
  1306. for (a=tcpconn_aliases_hash[hash]; a; a=a->next){
  1307. #ifdef EXTRA_DEBUG
  1308. DBG("a=%p, c=%p, c->id=%d, alias port= %d port=%d\n", a, a->parent,
  1309. a->parent->id, a->port, a->parent->rcv.src_port);
  1310. print_ip("ip=",&a->parent->rcv.src_ip,"\n");
  1311. #endif
  1312. if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
  1313. ((l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
  1314. (ip_addr_cmp(ip, &a->parent->rcv.src_ip)) &&
  1315. (is_local_ip_any ||
  1316. ip_addr_cmp(l_ip, &a->parent->rcv.dst_ip))
  1317. )
  1318. return a->parent;
  1319. }
  1320. }
  1321. return 0;
  1322. }
  1323. /* _tcpconn_find with locks and timeout
  1324. * local_addr contains the desired local ip:port. If null any local address
  1325. * will be used. IN*ADDR_ANY or 0 port are wild cards.
  1326. */
  1327. struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port,
  1328. union sockaddr_union* local_addr,
  1329. ticks_t timeout)
  1330. {
  1331. struct tcp_connection* c;
  1332. struct ip_addr local_ip;
  1333. int local_port;
  1334. local_port=0;
  1335. if (likely(ip)){
  1336. if (unlikely(local_addr)){
  1337. su2ip_addr(&local_ip, local_addr);
  1338. local_port=su_getport(local_addr);
  1339. }else{
  1340. ip_addr_mk_any(ip->af, &local_ip);
  1341. local_port=0;
  1342. }
  1343. }
  1344. TCPCONN_LOCK;
  1345. c=_tcpconn_find(id, ip, port, &local_ip, local_port);
  1346. if (likely(c)){
  1347. atomic_inc(&c->refcnt);
  1348. /* update the timeout only if the connection is not handled
  1349. * by a tcp reader (the tcp reader process uses c->timeout for
  1350. * its own internal timeout and c->timeout will be overwritten
  1351. * anyway on return to tcp_main) */
  1352. if (likely(c->reader_pid==0))
  1353. c->timeout=get_ticks_raw()+timeout;
  1354. }
  1355. TCPCONN_UNLOCK;
  1356. return c;
  1357. }
  1358. /* add c->dst:port, local_addr as an alias for the "id" connection,
  1359. * flags: TCP_ALIAS_FORCE_ADD - add an alias even if a previous one exists
  1360. * TCP_ALIAS_REPLACE - if a prev. alias exists, replace it with the
  1361. * new one
  1362. * returns 0 on success, <0 on failure ( -1 - null c, -2 too many aliases,
  1363. * -3 alias already present and pointing to another connection)
  1364. * WARNING: must be called with TCPCONN_LOCK held */
  1365. inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
  1366. struct ip_addr* l_ip, int l_port,
  1367. int flags)
  1368. {
  1369. unsigned hash;
  1370. struct tcp_conn_alias* a;
  1371. struct tcp_conn_alias* nxt;
  1372. struct tcp_connection* p;
  1373. int is_local_ip_any;
  1374. int i;
  1375. int r;
  1376. a=0;
  1377. is_local_ip_any=ip_addr_any(l_ip);
  1378. if (likely(c)){
  1379. hash=tcp_addr_hash(&c->rcv.src_ip, port, l_ip, l_port);
  1380. /* search the aliases for an already existing one */
  1381. for (a=tcpconn_aliases_hash[hash], nxt=0; a; a=nxt){
  1382. nxt=a->next;
  1383. if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
  1384. ( (l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
  1385. (ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) &&
  1386. ( is_local_ip_any ||
  1387. ip_addr_cmp(&a->parent->rcv.dst_ip, l_ip))
  1388. ){
  1389. /* found */
  1390. if (unlikely(a->parent!=c)){
  1391. if (flags & TCP_ALIAS_FORCE_ADD)
  1392. /* still have to walk the whole list to check if
  1393. * the alias was not already added */
  1394. continue;
  1395. else if (flags & TCP_ALIAS_REPLACE){
  1396. /* remove the alias =>
  1397. * remove the current alias and all the following
  1398. * ones from the corresponding connection, shift the
  1399. * connection aliases array and re-add the other
  1400. * aliases (!= current one) */
  1401. p=a->parent;
  1402. for (i=0; (i<p->aliases) && (&(p->con_aliases[i])!=a);
  1403. i++);
  1404. if (unlikely(i==p->aliases)){
  1405. LOG(L_CRIT, "BUG: _tcpconn_add_alias_unsafe: "
  1406. " alias %p not found in con %p (id %d)\n",
  1407. a, p, p->id);
  1408. goto error_not_found;
  1409. }
  1410. for (r=i; r<p->aliases; r++){
  1411. tcpconn_listrm(
  1412. tcpconn_aliases_hash[p->con_aliases[r].hash],
  1413. &p->con_aliases[r], next, prev);
  1414. }
  1415. if (likely((i+1)<p->aliases)){
  1416. memmove(&p->con_aliases[i], &p->con_aliases[i+1],
  1417. (p->aliases-i-1)*
  1418. sizeof(p->con_aliases[0]));
  1419. }
  1420. p->aliases--;
  1421. /* re-add the remaining aliases */
  1422. for (r=i; r<p->aliases; r++){
  1423. tcpconn_listadd(
  1424. tcpconn_aliases_hash[p->con_aliases[r].hash],
  1425. &p->con_aliases[r], next, prev);
  1426. }
  1427. }else
  1428. goto error_sec;
  1429. }else goto ok;
  1430. }
  1431. }
  1432. if (unlikely(c->aliases>=TCP_CON_MAX_ALIASES)) goto error_aliases;
  1433. c->con_aliases[c->aliases].parent=c;
  1434. c->con_aliases[c->aliases].port=port;
  1435. c->con_aliases[c->aliases].hash=hash;
  1436. tcpconn_listadd(tcpconn_aliases_hash[hash],
  1437. &c->con_aliases[c->aliases], next, prev);
  1438. c->aliases++;
  1439. }else goto error_not_found;
  1440. ok:
  1441. #ifdef EXTRA_DEBUG
  1442. if (a) DBG("_tcpconn_add_alias_unsafe: alias already present\n");
  1443. else DBG("_tcpconn_add_alias_unsafe: alias port %d for hash %d, id %d\n",
  1444. port, hash, c->id);
  1445. #endif
  1446. return 0;
  1447. error_aliases:
  1448. /* too many aliases */
  1449. return -2;
  1450. error_not_found:
  1451. /* null connection */
  1452. return -1;
  1453. error_sec:
  1454. /* alias already present and pointing to a different connection
  1455. * (hijack attempt?) */
  1456. return -3;
  1457. }
  1458. /* add port as an alias for the "id" connection,
  1459. * returns 0 on success,-1 on failure */
  1460. int tcpconn_add_alias(int id, int port, int proto)
  1461. {
  1462. struct tcp_connection* c;
  1463. int ret;
  1464. struct ip_addr zero_ip;
  1465. int r;
  1466. int alias_flags;
  1467. /* fix the port */
  1468. port=port?port:((proto==PROTO_TLS)?SIPS_PORT:SIP_PORT);
  1469. TCPCONN_LOCK;
  1470. /* check if alias already exists */
  1471. c=_tcpconn_find(id, 0, 0, 0, 0);
  1472. if (likely(c)){
  1473. ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
  1474. alias_flags=cfg_get(tcp, tcp_cfg, alias_flags);
  1475. /* alias src_ip:port, 0, 0 */
  1476. ret=_tcpconn_add_alias_unsafe(c, port, &zero_ip, 0,
  1477. alias_flags);
  1478. if (ret<0 && ret!=-3) goto error;
  1479. /* alias src_ip:port, local_ip, 0 */
  1480. ret=_tcpconn_add_alias_unsafe(c, port, &c->rcv.dst_ip, 0,
  1481. alias_flags);
  1482. if (ret<0 && ret!=-3) goto error;
  1483. /* alias src_ip:port, local_ip, local_port */
  1484. ret=_tcpconn_add_alias_unsafe(c, port, &c->rcv.dst_ip, c->rcv.dst_port,
  1485. alias_flags);
  1486. if (unlikely(ret<0)) goto error;
  1487. }else goto error_not_found;
  1488. TCPCONN_UNLOCK;
  1489. return 0;
  1490. error_not_found:
  1491. TCPCONN_UNLOCK;
  1492. LOG(L_ERR, "ERROR: tcpconn_add_alias: no connection found for id %d\n",id);
  1493. return -1;
  1494. error:
  1495. TCPCONN_UNLOCK;
  1496. switch(ret){
  1497. case -2:
  1498. LOG(L_ERR, "ERROR: tcpconn_add_alias: too many aliases (%d)"
  1499. " for connection %p (id %d) %s:%d <- %d\n",
  1500. c->aliases, c, c->id, ip_addr2a(&c->rcv.src_ip),
  1501. c->rcv.src_port, port);
  1502. for (r=0; r<c->aliases; r++){
  1503. LOG(L_ERR, "ERROR: tcpconn_add_alias: alias %d: for %p (%d)"
  1504. " %s:%d <-%d hash %x\n", r, c, c->id,
  1505. ip_addr2a(&c->rcv.src_ip), c->rcv.src_port,
  1506. c->con_aliases[r].port, c->con_aliases[r].hash);
  1507. }
  1508. break;
  1509. case -3:
  1510. LOG(L_ERR, "ERROR: tcpconn_add_alias: possible port"
  1511. " hijack attempt\n");
  1512. LOG(L_ERR, "ERROR: tcpconn_add_alias: alias for %d port %d already"
  1513. " present and points to another connection \n",
  1514. c->id, port);
  1515. break;
  1516. default:
  1517. LOG(L_ERR, "ERROR: tcpconn_add_alias: unkown error %d\n", ret);
  1518. }
  1519. return -1;
  1520. }
  1521. #ifdef TCP_FD_CACHE
  1522. static void tcp_fd_cache_init()
  1523. {
  1524. int r;
  1525. for (r=0; r<TCP_FD_CACHE_SIZE; r++)
  1526. fd_cache[r].fd=-1;
  1527. }
  1528. inline static struct fd_cache_entry* tcp_fd_cache_get(struct tcp_connection *c)
  1529. {
  1530. int h;
  1531. h=c->id%TCP_FD_CACHE_SIZE;
  1532. if ((fd_cache[h].fd>0) && (fd_cache[h].id==c->id) && (fd_cache[h].con==c))
  1533. return &fd_cache[h];
  1534. return 0;
  1535. }
  1536. inline static void tcp_fd_cache_rm(struct fd_cache_entry* e)
  1537. {
  1538. e->fd=-1;
  1539. }
  1540. inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
  1541. {
  1542. int h;
  1543. h=c->id%TCP_FD_CACHE_SIZE;
  1544. if (likely(fd_cache[h].fd>0))
  1545. close(fd_cache[h].fd);
  1546. fd_cache[h].fd=fd;
  1547. fd_cache[h].id=c->id;
  1548. fd_cache[h].con=c;
  1549. }
  1550. #endif /* TCP_FD_CACHE */
  1551. inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
  1552. static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
  1553. snd_flags_t send_flags);
  1554. /* finds a tcpconn & sends on it
  1555. * uses the dst members to, proto (TCP|TLS) and id and tries to send
  1556. * from the "from" address (if non null and id==0)
  1557. * returns: number of bytes written (>=0) on success
  1558. * <0 on error */
  1559. int tcp_send(struct dest_info* dst, union sockaddr_union* from,
  1560. char* buf, unsigned len)
  1561. {
  1562. struct tcp_connection *c;
  1563. struct ip_addr ip;
  1564. int port;
  1565. int fd;
  1566. long response[2];
  1567. int n;
  1568. int do_close_fd;
  1569. ticks_t con_lifetime;
  1570. do_close_fd=1; /* close the fd on exit */
  1571. port=su_getport(&dst->to);
  1572. con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
  1573. if (likely(port)){
  1574. su2ip_addr(&ip, &dst->to);
  1575. c=tcpconn_get(dst->id, &ip, port, from, con_lifetime);
  1576. }else if (likely(dst->id)){
  1577. c=tcpconn_get(dst->id, 0, 0, 0, con_lifetime);
  1578. }else{
  1579. LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
  1580. return -1;
  1581. }
  1582. if (likely(dst->id)){
  1583. if (unlikely(c==0)) {
  1584. if (likely(port)){
  1585. /* try again w/o id */
  1586. c=tcpconn_get(0, &ip, port, from, con_lifetime);
  1587. }else{
  1588. LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
  1589. dst->id);
  1590. return -1;
  1591. }
  1592. }
  1593. }
  1594. /* connection not found or unusable => open a new one and send on it */
  1595. if (unlikely((c==0) || tcpconn_close_after_send(c))){
  1596. if (unlikely(c)){
  1597. /* can't use c if it's marked as close-after-send =>
  1598. release it and try opening new one */
  1599. tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
  1600. c=0;
  1601. }
  1602. /* check if connect() is disabled */
  1603. if (unlikely((dst->send_flags.f & SND_F_FORCE_CON_REUSE) ||
  1604. cfg_get(tcp, tcp_cfg, no_connect)))
  1605. return -1;
  1606. DBG("tcp_send: no open tcp connection found, opening new one\n");
  1607. /* create tcp connection */
  1608. if (likely(from==0)){
  1609. /* check to see if we have to use a specific source addr. */
  1610. switch (dst->to.s.sa_family) {
  1611. case AF_INET:
  1612. from = tcp_source_ipv4;
  1613. break;
  1614. #ifdef USE_IPV6
  1615. case AF_INET6:
  1616. from = tcp_source_ipv6;
  1617. break;
  1618. #endif
  1619. default:
  1620. /* error, bad af, ignore ... */
  1621. break;
  1622. }
  1623. }
  1624. #if defined(TCP_CONNECT_WAIT) && defined(TCP_ASYNC)
  1625. if (likely(cfg_get(tcp, tcp_cfg, tcp_connect_wait) &&
  1626. cfg_get(tcp, tcp_cfg, async) )){
  1627. if (unlikely(*tcp_connections_no >=
  1628. cfg_get(tcp, tcp_cfg, max_connections))){
  1629. LOG(L_ERR, "ERROR: tcp_send %s: maximum number of"
  1630. " connections exceeded (%d/%d)\n",
  1631. su2a(&dst->to, sizeof(dst->to)),
  1632. *tcp_connections_no,
  1633. cfg_get(tcp, tcp_cfg, max_connections));
  1634. return -1;
  1635. }
  1636. c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
  1637. S_CONN_CONNECT);
  1638. if (unlikely(c==0)){
  1639. LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
  1640. " connection\n",
  1641. su2a(&dst->to, sizeof(dst->to)));
  1642. return -1;
  1643. }
  1644. c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
  1645. tcpconn_set_send_flags(c, dst->send_flags);
  1646. atomic_set(&c->refcnt, 2); /* ref from here and from main hash
  1647. table */
  1648. /* add it to id hash and aliases */
  1649. if (unlikely(tcpconn_add(c)==0)){
  1650. LOG(L_ERR, "ERROR: tcp_send %s: could not add "
  1651. "connection %p\n",
  1652. su2a(&dst->to, sizeof(dst->to)),
  1653. c);
  1654. _tcpconn_free(c);
  1655. n=-1;
  1656. goto end_no_conn;
  1657. }
  1658. /* do connect and if src ip or port changed, update the
  1659. * aliases */
  1660. if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
  1661. /* tcpconn_finish_connect will automatically blacklist
  1662. on error => no need to do it here */
  1663. LOG(L_ERR, "ERROR: tcp_send %s: tcpconn_finish_connect(%p)"
  1664. " failed\n", su2a(&dst->to, sizeof(dst->to)),
  1665. c);
  1666. goto conn_wait_error;
  1667. }
  1668. /* ? TODO: it might be faster just to queue the write directly
  1669. * and send to main CONN_NEW_PENDING_WRITE */
  1670. /* delay sending the fd to main after the send */
  1671. /* NOTE: no lock here, because the connection is marked as
  1672. * pending and nobody else will try to write on it. However
  1673. * this might produce out-of-order writes. If this is not
  1674. * desired either lock before the write or use
  1675. * _wbufq_insert(...) */
  1676. #ifdef USE_TLS
  1677. if (unlikely(c->type==PROTO_TLS))
  1678. n=tls_1st_send(fd, c, buf, len, dst->send_flags,
  1679. &response[1]);
  1680. else
  1681. #endif /* USE_TLS */
  1682. n=tcpconn_1st_send(fd, c, buf, len, dst->send_flags,
  1683. &response[1], 0);
  1684. if (unlikely(n<0))
  1685. goto conn_wait_error;
  1686. if (unlikely(response[1]==CONN_EOF)){
  1687. /* if close-after-send requested, don't bother
  1688. sending the fd back to tcp_main, try closing it
  1689. immediately (no other tcp_send should use it,
  1690. because it is marked as close-after-send before
  1691. being added to the hash */
  1692. goto conn_wait_close;
  1693. }
  1694. /* send to tcp_main */
  1695. response[0]=(long)c;
  1696. if (unlikely(response[1]!=CONN_NOP &&
  1697. (send_fd(unix_tcp_sock, response,
  1698. sizeof(response), fd) <= 0))){
  1699. LOG(L_ERR, "BUG: tcp_send %s: %ld for %p"
  1700. " failed:" " %s (%d)\n",
  1701. su2a(&dst->to, sizeof(dst->to)),
  1702. response[1], c, strerror(errno), errno);
  1703. goto conn_wait_error;
  1704. }
  1705. goto conn_wait_success;
  1706. }
  1707. #endif /* TCP_CONNECT_WAIT && TCP_ASYNC */
  1708. if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto,
  1709. &dst->send_flags))==0)){
  1710. LOG(L_ERR, "ERROR: tcp_send %s: connect failed\n",
  1711. su2a(&dst->to, sizeof(dst->to)));
  1712. return -1;
  1713. }
  1714. tcpconn_set_send_flags(c, dst->send_flags);
  1715. if (likely(c->state==S_CONN_OK))
  1716. TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
  1717. atomic_set(&c->refcnt, 2); /* ref. from here and it will also
  1718. be added in the tcp_main hash */
  1719. fd=c->s;
  1720. c->flags|=F_CONN_FD_CLOSED; /* not yet opened in main */
  1721. /* ? TODO: it might be faster just to queue the write and
  1722. * send to main a CONN_NEW_PENDING_WRITE */
  1723. /* send the new tcpconn to "tcp main" */
  1724. response[0]=(long)c;
  1725. response[1]=CONN_NEW;
  1726. n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
  1727. if (unlikely(n<=0)){
  1728. LOG(L_ERR, "BUG: tcp_send %s: failed send_fd: %s (%d)\n",
  1729. su2a(&dst->to, sizeof(dst->to)),
  1730. strerror(errno), errno);
  1731. /* we can safely delete it, it's not referenced by anybody */
  1732. _tcpconn_free(c);
  1733. n=-1;
  1734. goto end_no_conn;
  1735. }
  1736. /* new connection => send on it directly */
  1737. #ifdef USE_TLS
  1738. if (unlikely(c->type==PROTO_TLS)) {
  1739. response[1] = CONN_ERROR; /* in case tls is not loaded */
  1740. n = tls_do_send(fd, c, buf, len, dst->send_flags,
  1741. &response[1]);
  1742. } else
  1743. #endif /* USE_TLS */
  1744. n = tcpconn_do_send(fd, c, buf, len, dst->send_flags,
  1745. &response[1], 0);
  1746. if (unlikely(response[1] != CONN_NOP)) {
  1747. response[0]=(long)c;
  1748. if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
  1749. BUG("tcp_main command %ld sending failed (write):"
  1750. "%s (%d)\n", response[1], strerror(errno), errno);
  1751. /* all commands != CONN_NOP returned by tcpconn_do_send()
  1752. (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec
  1753. refcnt => if sending the command fails we have to
  1754. dec. refcnt by hand */
  1755. tcpconn_chld_put(c); /* deref. it manually */
  1756. n=-1;
  1757. }
  1758. /* here refcnt for c is already decremented => c contents can
  1759. no longer be used and refcnt _must_ _not_ be decremented
  1760. again on exit */
  1761. if (unlikely(n < 0 || response[1] == CONN_EOF)) {
  1762. /* on error or eof, close fd */
  1763. close(fd);
  1764. } else if (response[1] == CONN_QUEUED_WRITE) {
  1765. #ifdef TCP_FD_CACHE
  1766. if (cfg_get(tcp, tcp_cfg, fd_cache)) {
  1767. tcp_fd_cache_add(c, fd);
  1768. } else
  1769. #endif /* TCP_FD_CACHE */
  1770. close(fd);
  1771. } else {
  1772. BUG("unexpected tcpconn_do_send() return & response:"
  1773. " %d, %ld\n", n, response[1]);
  1774. }
  1775. goto end_no_deref;
  1776. }
  1777. #ifdef TCP_FD_CACHE
  1778. if (cfg_get(tcp, tcp_cfg, fd_cache)) {
  1779. tcp_fd_cache_add(c, fd);
  1780. }else
  1781. #endif /* TCP_FD_CACHE */
  1782. close(fd);
  1783. /* here we can have only commands that _do_ _not_ dec refcnt.
  1784. (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
  1785. goto release_c;
  1786. } /* if (c==0 or unusable) new connection */
  1787. /* existing connection, send on it */
  1788. n = tcpconn_send_put(c, buf, len, dst->send_flags);
  1789. /* no deref needed (automatically done inside tcpconn_send_put() */
  1790. return n;
  1791. #ifdef TCP_CONNECT_WAIT
  1792. conn_wait_success:
  1793. #ifdef TCP_FD_CACHE
  1794. if (cfg_get(tcp, tcp_cfg, fd_cache)) {
  1795. tcp_fd_cache_add(c, fd);
  1796. } else
  1797. #endif /* TCP_FD_CACHE */
  1798. close(fd);
  1799. tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
  1800. return n;
  1801. conn_wait_error:
  1802. n=-1;
  1803. conn_wait_close:
  1804. /* connect or send failed or immediate close-after-send was requested on
  1805. * newly created connection which was not yet sent to tcp_main (but was
  1806. * already hashed) => don't send to main, unhash and destroy directly
  1807. * (if refcnt>2 it will be destroyed when the last sender releases the
  1808. * connection (tcpconn_chld_put(c))) or when tcp_main receives a
  1809. * CONN_ERROR it*/
  1810. c->state=S_CONN_BAD;
  1811. /* we are here only if we opened a new fd (and not reused a cached or
  1812. a reader one) => if the connect was successful close the fd */
  1813. if (fd>=0) close(fd);
  1814. TCPCONN_LOCK;
  1815. if (c->flags & F_CONN_HASHED){
  1816. /* if some other parallel tcp_send did send CONN_ERROR to
  1817. * tcp_main, the connection might be already detached */
  1818. _tcpconn_detach(c);
  1819. c->flags&=~F_CONN_HASHED;
  1820. TCPCONN_UNLOCK;
  1821. tcpconn_put(c);
  1822. }else
  1823. TCPCONN_UNLOCK;
  1824. /* dec refcnt -> mark it for destruction */
  1825. tcpconn_chld_put(c);
  1826. return n;
  1827. #endif /* TCP_CONNET_WAIT */
  1828. release_c:
  1829. tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
  1830. end_no_deref:
  1831. end_no_conn:
  1832. return n;
  1833. }
  1834. /** sends on an existing tcpconn and auto-dec. con. ref counter.
  1835. * As opposed to tcp_send(), this function requires an existing
  1836. * tcp connection.
  1837. * WARNING: the tcp_connection will be de-referenced.
  1838. * @param c - existing tcp connection pointer.
  1839. * @param buf - data to be sent.
  1840. * @param len - data length,
  1841. * @return >=0 on success, -1 on error.
  1842. */
  1843. static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
  1844. snd_flags_t send_flags)
  1845. {
  1846. struct tcp_connection *tmp;
  1847. int fd;
  1848. long response[2];
  1849. int n;
  1850. int do_close_fd;
  1851. #ifdef TCP_FD_CACHE
  1852. struct fd_cache_entry* fd_cache_e;
  1853. int use_fd_cache;
  1854. use_fd_cache=cfg_get(tcp, tcp_cfg, fd_cache);
  1855. fd_cache_e=0;
  1856. #endif /* TCP_FD_CACHE */
  1857. do_close_fd=1; /* close the fd on exit */
  1858. response[1] = CONN_NOP;
  1859. #ifdef TCP_ASYNC
  1860. /* if data is already queued, we don't need the fd */
  1861. #ifdef TCP_CONNECT_WAIT
  1862. if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
  1863. (_wbufq_non_empty(c) || (c->flags&F_CONN_PENDING)) ))
  1864. #else /* ! TCP_CONNECT_WAIT */
  1865. if (unlikely(cfg_get(tcp, tcp_cfg, async) && (_wbufq_non_empty(c)) ))
  1866. #endif /* TCP_CONNECT_WAIT */
  1867. {
  1868. lock_get(&c->write_lock);
  1869. #ifdef TCP_CONNECT_WAIT
  1870. if (likely(_wbufq_non_empty(c) || (c->flags&F_CONN_PENDING)))
  1871. #else /* ! TCP_CONNECT_WAIT */
  1872. if (likely(_wbufq_non_empty(c)))
  1873. #endif /* TCP_CONNECT_WAIT */
  1874. {
  1875. do_close_fd=0;
  1876. if (unlikely(_wbufq_add(c, buf, len)<0)){
  1877. lock_release(&c->write_lock);
  1878. n=-1;
  1879. response[1] = CONN_ERROR;
  1880. c->state=S_CONN_BAD;
  1881. c->timeout=get_ticks_raw(); /* force timeout */
  1882. goto error;
  1883. }
  1884. n=len;
  1885. lock_release(&c->write_lock);
  1886. goto release_c;
  1887. }
  1888. lock_release(&c->write_lock);
  1889. }
  1890. #endif /* TCP_ASYNC */
  1891. /* check if this is not the same reader process holding
  1892. * c and if so send directly on c->fd */
  1893. if (c->reader_pid==my_pid()){
  1894. DBG("tcp_send: send from reader (%d (%d)), reusing fd\n",
  1895. my_pid(), process_no);
  1896. fd=c->fd;
  1897. do_close_fd=0; /* don't close the fd on exit, it's in use */
  1898. #ifdef TCP_FD_CACHE
  1899. use_fd_cache=0; /* don't cache: problems would arise due to the
  1900. close() on cache eviction (if the fd is still
  1901. used). If it has to be cached then dup() _must_
  1902. be used */
  1903. }else if (likely(use_fd_cache &&
  1904. ((fd_cache_e=tcp_fd_cache_get(c))!=0))){
  1905. fd=fd_cache_e->fd;
  1906. do_close_fd=0;
  1907. DBG("tcp_send: found fd in cache ( %d, %p, %d)\n",
  1908. fd, c, fd_cache_e->id);
  1909. #endif /* TCP_FD_CACHE */
  1910. }else{
  1911. DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
  1912. /* get the fd */
  1913. response[0]=(long)c;
  1914. response[1]=CONN_GET_FD;
  1915. n=send_all(unix_tcp_sock, response, sizeof(response));
  1916. if (unlikely(n<=0)){
  1917. LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
  1918. strerror(errno), errno);
  1919. n=-1;
  1920. goto release_c;
  1921. }
  1922. DBG("tcp_send, c= %p, n=%d\n", c, n);
  1923. n=receive_fd(unix_tcp_sock, &tmp, sizeof(tmp), &fd, MSG_WAITALL);
  1924. if (unlikely(n<=0)){
  1925. LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
  1926. " %s (%d)\n", strerror(errno), errno);
  1927. n=-1;
  1928. do_close_fd=0;
  1929. goto release_c;
  1930. }
  1931. if (unlikely(c!=tmp)){
  1932. LOG(L_CRIT, "BUG: tcp_send: get_fd: got different connection:"
  1933. " %p (id= %d, refcnt=%d state=%d) != "
  1934. " %p (n=%d)\n",
  1935. c, c->id, atomic_get(&c->refcnt), c->state,
  1936. tmp, n
  1937. );
  1938. n=-1; /* fail */
  1939. /* don't cache fd & close it */
  1940. do_close_fd = 1;
  1941. #ifdef TCP_FD_CACHE
  1942. use_fd_cache = 0;
  1943. #endif /* TCP_FD_CACHE */
  1944. goto end;
  1945. }
  1946. DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
  1947. }
  1948. #ifdef USE_TLS
  1949. if (unlikely(c->type==PROTO_TLS)) {
  1950. response[1] = CONN_ERROR; /* in case tls is not loaded */
  1951. n = tls_do_send(fd, c, buf, len, send_flags, &response[1]);
  1952. } else
  1953. #endif
  1954. n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 0);
  1955. if (unlikely(response[1] != CONN_NOP)) {
  1956. error:
  1957. response[0]=(long)c;
  1958. if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
  1959. BUG("tcp_main command %ld sending failed (write):%s (%d)\n",
  1960. response[1], strerror(errno), errno);
  1961. /* all commands != CONN_NOP returned by tcpconn_do_send()
  1962. (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec refcnt
  1963. => if sending the command fails we have to dec. refcnt by hand
  1964. */
  1965. tcpconn_chld_put(c); /* deref. it manually */
  1966. n=-1;
  1967. }
  1968. /* here refcnt for c is already decremented => c contents can no
  1969. longer be used and refcnt _must_ _not_ be decremented again
  1970. on exit */
  1971. if (unlikely(n < 0 || response[1] == CONN_EOF)) {
  1972. /* on error or eof, remove from cache or close fd */
  1973. #ifdef TCP_FD_CACHE
  1974. if (unlikely(fd_cache_e)){
  1975. tcp_fd_cache_rm(fd_cache_e);
  1976. fd_cache_e = 0;
  1977. close(fd);
  1978. }else
  1979. #endif /* TCP_FD_CACHE */
  1980. if (do_close_fd) close(fd);
  1981. } else if (response[1] == CONN_QUEUED_WRITE) {
  1982. #ifdef TCP_FD_CACHE
  1983. if (unlikely((fd_cache_e==0) && use_fd_cache)){
  1984. tcp_fd_cache_add(c, fd);
  1985. }else
  1986. #endif /* TCP_FD_CACHE */
  1987. if (do_close_fd) close(fd);
  1988. } else {
  1989. BUG("unexpected tcpconn_do_send() return & response: %d, %ld\n",
  1990. n, response[1]);
  1991. }
  1992. return n; /* no tcpconn_put */
  1993. }
  1994. end:
  1995. #ifdef TCP_FD_CACHE
  1996. if (unlikely((fd_cache_e==0) && use_fd_cache)){
  1997. tcp_fd_cache_add(c, fd);
  1998. }else
  1999. #endif /* TCP_FD_CACHE */
  2000. if (do_close_fd) close(fd);
  2001. /* here we can have only commands that _do_ _not_ dec refcnt.
  2002. (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
  2003. release_c:
  2004. tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
  2005. return n;
  2006. }
  2007. /* unsafe send on a known tcp connection.
  2008. * Directly send on a known tcp connection with a given fd.
  2009. * It is assumed that the connection locks are already held.
  2010. * Side effects: if needed it will send state update commands to
  2011. * tcp_main (e.g. CON_EOF, CON_ERROR, CON_QUEUED_WRITE).
  2012. * @param fd - fd used for sending.
  2013. * @param c - existing tcp connection pointer (state and flags might be
  2014. * changed).
  2015. * @param buf - data to be sent.
  2016. * @param len - data length.
  2017. * @param send_flags
  2018. * @return <0 on error, number of bytes sent on success.
  2019. */
  2020. int tcpconn_send_unsafe(int fd, struct tcp_connection *c,
  2021. char* buf, unsigned len, snd_flags_t send_flags)
  2022. {
  2023. int n;
  2024. long response[2];
  2025. n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 1);
  2026. if (unlikely(response[1] != CONN_NOP)) {
  2027. /* all commands != CONN_NOP returned by tcpconn_do_send()
  2028. (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec refcnt
  2029. => increment it (we don't want the connection to be destroyed
  2030. from under us)
  2031. */
  2032. atomic_inc(&c->refcnt);
  2033. response[0]=(long)c;
  2034. if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
  2035. BUG("connection %p command %ld sending failed (write):%s (%d)\n",
  2036. c, response[1], strerror(errno), errno);
  2037. /* send failed => deref. it back by hand */
  2038. tcpconn_chld_put(c);
  2039. n=-1;
  2040. }
  2041. /* here refcnt for c is already decremented => c contents can no
  2042. longer be used and refcnt _must_ _not_ be decremented again
  2043. on exit */
  2044. return n;
  2045. }
  2046. return n;
  2047. }
  2048. /** lower level send (connection and fd should be known).
  2049. * It takes care of possible write-queueing, blacklisting a.s.o.
  2050. * It expects a valid tcp connection. It doesn't touch the ref. cnts.
  2051. * It will also set the connection flags from send_flags (it's better
  2052. * to do it here, because it's guaranteed to be under lock).
  2053. * @param fd - fd used for sending.
  2054. * @param c - existing tcp connection pointer (state and flags might be
  2055. * changed).
  2056. * @param buf - data to be sent.
  2057. * @param len - data length.
  2058. * @param send_flags
  2059. * @param resp - filled with a cmd. for tcp_main:
  2060. * CONN_NOP - nothing needs to be done (do not send
  2061. * anything to tcp_main).
  2062. * CONN_ERROR - error, connection should be closed.
  2063. * CONN_EOF - no error, but connection should be closed.
  2064. * CONN_QUEUED_WRITE - new write queue (connection
  2065. * should be watched for write and the wr.
  2066. * queue flushed).
  2067. * @param locked - if set assume the connection is already locked (call from
  2068. * tls) and do not lock/unlock the connection.
  2069. * @return >=0 on success, < 0 on error && *resp == CON_ERROR.
  2070. *
  2071. */
  2072. int tcpconn_do_send(int fd, struct tcp_connection* c,
  2073. char* buf, unsigned len,
  2074. snd_flags_t send_flags, long* resp,
  2075. int locked)
  2076. {
  2077. int n;
  2078. #ifdef TCP_ASYNC
  2079. int enable_write_watch;
  2080. #endif /* TCP_ASYNC */
  2081. DBG("tcp_send: sending...\n");
  2082. *resp = CONN_NOP;
  2083. if (likely(!locked)) lock_get(&c->write_lock);
  2084. /* update connection send flags with the current ones */
  2085. tcpconn_set_send_flags(c, send_flags);
  2086. #ifdef TCP_ASYNC
  2087. if (likely(cfg_get(tcp, tcp_cfg, async))){
  2088. if (_wbufq_non_empty(c)
  2089. #ifdef TCP_CONNECT_WAIT
  2090. || (c->flags&F_CONN_PENDING)
  2091. #endif /* TCP_CONNECT_WAIT */
  2092. ){
  2093. if (unlikely(_wbufq_add(c, buf, len)<0)){
  2094. if (likely(!locked)) lock_release(&c->write_lock);
  2095. n=-1;
  2096. goto error;
  2097. }
  2098. if (likely(!locked)) lock_release(&c->write_lock);
  2099. n=len;
  2100. goto end;
  2101. }
  2102. n=_tcpconn_write_nb(fd, c, buf, len);
  2103. }else{
  2104. #endif /* TCP_ASYNC */
  2105. /* n=tcp_blocking_write(c, fd, buf, len); */
  2106. n=tsend_stream(fd, buf, len,
  2107. TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
  2108. 1000);
  2109. #ifdef TCP_ASYNC
  2110. }
  2111. #else /* ! TCP_ASYNC */
  2112. if (likely(!locked)) lock_release(&c->write_lock);
  2113. #endif /* TCP_ASYNC */
  2114. DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
  2115. DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
  2116. if (unlikely(n<(int)len)){
  2117. #ifdef TCP_ASYNC
  2118. if (cfg_get(tcp, tcp_cfg, async) &&
  2119. ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
  2120. enable_write_watch=_wbufq_empty(c);
  2121. if (n<0) n=0;
  2122. else if (unlikely(c->state==S_CONN_CONNECT ||
  2123. c->state==S_CONN_ACCEPT)){
  2124. TCP_STATS_ESTABLISHED(c->state);
  2125. c->state=S_CONN_OK; /* something was written */
  2126. }
  2127. if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
  2128. if (likely(!locked)) lock_release(&c->write_lock);
  2129. n=-1;
  2130. goto error;
  2131. }
  2132. if (likely(!locked)) lock_release(&c->write_lock);
  2133. n=len;
  2134. if (likely(enable_write_watch))
  2135. *resp=CONN_QUEUED_WRITE;
  2136. goto end;
  2137. }else{
  2138. if (likely(!locked)) lock_release(&c->write_lock);
  2139. }
  2140. #endif /* TCP_ASYNC */
  2141. if (unlikely(c->state==S_CONN_CONNECT)){
  2142. switch(errno){
  2143. case ENETUNREACH:
  2144. case EHOSTUNREACH: /* not posix for send() */
  2145. #ifdef USE_DST_BLACKLIST
  2146. dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto,
  2147. &c->rcv.src_su, &c->send_flags, 0);
  2148. #endif /* USE_DST_BLACKLIST */
  2149. TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
  2150. TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
  2151. break;
  2152. case ECONNREFUSED:
  2153. case ECONNRESET:
  2154. #ifdef USE_DST_BLACKLIST
  2155. dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto,
  2156. &c->rcv.src_su, &c->send_flags, 0);
  2157. #endif /* USE_DST_BLACKLIST */
  2158. TCP_EV_CONNECT_RST(errno, TCP_LADDR(c), TCP_LPORT(c),
  2159. TCP_PSU(c), TCP_PROTO(c));
  2160. break;
  2161. default:
  2162. TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c), TCP_LPORT(c),
  2163. TCP_PSU(c), TCP_PROTO(c));
  2164. }
  2165. TCP_STATS_CONNECT_FAILED();
  2166. }else{
  2167. switch(errno){
  2168. case ECONNREFUSED:
  2169. case ECONNRESET:
  2170. TCP_STATS_CON_RESET();
  2171. /* no break */
  2172. case ENETUNREACH:
  2173. /*case EHOSTUNREACH: -- not posix */
  2174. #ifdef USE_DST_BLACKLIST
  2175. dst_blacklist_su(BLST_ERR_SEND, c->rcv.proto,
  2176. &c->rcv.src_su, &c->send_flags, 0);
  2177. #endif /* USE_DST_BLACKLIST */
  2178. break;
  2179. }
  2180. }
  2181. LOG(L_ERR, "ERROR: tcp_send: failed to send on %p (%s:%d->%s): %s (%d)"
  2182. "\n", c, ip_addr2a(&c->rcv.dst_ip), c->rcv.dst_port,
  2183. su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
  2184. strerror(errno), errno);
  2185. n = -1;
  2186. #ifdef TCP_ASYNC
  2187. error:
  2188. #endif /* TCP_ASYNC */
  2189. /* error on the connection , mark it as bad and set 0 timeout */
  2190. c->state=S_CONN_BAD;
  2191. c->timeout=get_ticks_raw();
  2192. /* tell "main" it should drop this (optional it will t/o anyway?)*/
  2193. *resp=CONN_ERROR;
  2194. return n; /* error return, no tcpconn_put */
  2195. }
  2196. #ifdef TCP_ASYNC
  2197. if (likely(!locked)) lock_release(&c->write_lock);
  2198. #endif /* TCP_ASYNC */
  2199. /* in non-async mode here we're either in S_CONN_OK or S_CONN_ACCEPT*/
  2200. if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
  2201. TCP_STATS_ESTABLISHED(c->state);
  2202. c->state=S_CONN_OK;
  2203. }
  2204. if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
  2205. /* close after write => send EOF request to tcp_main */
  2206. c->state=S_CONN_BAD;
  2207. c->timeout=get_ticks_raw();
  2208. /* tell "main" it should drop this*/
  2209. *resp=CONN_EOF;
  2210. return n;
  2211. }
  2212. end:
  2213. return n;
  2214. }
  2215. /** low level 1st send on a new connection.
  2216. * It takes care of possible write-queueing, blacklisting a.s.o.
  2217. * It expects a valid just-opened tcp connection. It doesn't touch the
  2218. * ref. counters. It's used only in the async first send case.
  2219. * @param fd - fd used for sending.
  2220. * @param c - existing tcp connection pointer (state and flags might be
  2221. * changed). The connection must be new (no previous send on it).
  2222. * @param buf - data to be sent.
  2223. * @param len - data length.
  2224. * @param send_flags
  2225. * @param resp - filled with a fd sending cmd. for tcp_main on success:
  2226. * CONN_NOP - nothing needs to be done (unused right now).
  2227. * CONN_NEW_PENDING_WRITE - new connection, first write
  2228. * was partially successful (or EAGAIN) and
  2229. * was queued (connection should be watched
  2230. * for write and the write queue flushed).
  2231. * The fd should be sent to tcp_main.
  2232. * CONN_NEW_COMPLETE - new connection, first write
  2233. * completed successfuly and no data is queued.
  2234. * The fd should be sent to tcp_main.
  2235. * CONN_EOF - no error, but the connection should be
  2236. * closed (e.g. SND_F_CON_CLOSE send flag).
  2237. * @param locked - if set assume the connection is already locked (call from
  2238. * tls) and do not lock/unlock the connection.
  2239. * @return >=0 on success, < 0 on error (on error *resp is undefined).
  2240. *
  2241. */
  2242. int tcpconn_1st_send(int fd, struct tcp_connection* c,
  2243. char* buf, unsigned len,
  2244. snd_flags_t send_flags, long* resp,
  2245. int locked)
  2246. {
  2247. int n;
  2248. n=_tcpconn_write_nb(fd, c, buf, len);
  2249. if (unlikely(n<(int)len)){
  2250. if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK){
  2251. DBG("pending write on new connection %p "
  2252. " (%d/%d bytes written)\n", c, n, len);
  2253. if (unlikely(n<0)) n=0;
  2254. else{
  2255. TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
  2256. c->state=S_CONN_OK; /* partial write => connect()
  2257. ended */
  2258. }
  2259. /* add to the write queue */
  2260. if (likely(!locked)) lock_get(&c->write_lock);
  2261. if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
  2262. if (likely(!locked)) lock_release(&c->write_lock);
  2263. n=-1;
  2264. LOG(L_ERR, "%s: EAGAIN and"
  2265. " write queue full or failed for %p\n",
  2266. su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)), c);
  2267. goto error;
  2268. }
  2269. if (likely(!locked)) lock_release(&c->write_lock);
  2270. /* send to tcp_main */
  2271. *resp=CONN_NEW_PENDING_WRITE;
  2272. n=len;
  2273. goto end;
  2274. }
  2275. /* n < 0 and not EAGAIN => write error */
  2276. /* if first write failed it's most likely a
  2277. connect error */
  2278. switch(errno){
  2279. case ENETUNREACH:
  2280. case EHOSTUNREACH: /* not posix for send() */
  2281. #ifdef USE_DST_BLACKLIST
  2282. dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
  2283. &c->rcv.src_su, &c->send_flags, 0);
  2284. #endif /* USE_DST_BLACKLIST */
  2285. TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
  2286. TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
  2287. break;
  2288. case ECONNREFUSED:
  2289. case ECONNRESET:
  2290. #ifdef USE_DST_BLACKLIST
  2291. dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
  2292. &c->rcv.src_su, &c->send_flags, 0);
  2293. #endif /* USE_DST_BLACKLIST */
  2294. TCP_EV_CONNECT_RST(errno, TCP_LADDR(c),
  2295. TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
  2296. break;
  2297. default:
  2298. TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
  2299. TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
  2300. }
  2301. /* error: destroy it directly */
  2302. TCP_STATS_CONNECT_FAILED();
  2303. LOG(L_ERR, "%s: connect & send for %p failed:" " %s (%d)\n",
  2304. su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
  2305. c, strerror(errno), errno);
  2306. goto error;
  2307. }
  2308. LOG(L_INFO, "quick connect for %p\n", c);
  2309. TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
  2310. if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
  2311. /* close after write => EOF => close immediately */
  2312. c->state=S_CONN_BAD;
  2313. /* tell our caller that it should drop this*/
  2314. *resp=CONN_EOF;
  2315. }else{
  2316. c->state=S_CONN_OK;
  2317. /* send to tcp_main */
  2318. *resp=CONN_NEW_COMPLETE;
  2319. }
  2320. end:
  2321. return n; /* >= 0 */
  2322. error:
  2323. *resp=CONN_ERROR;
  2324. return -1;
  2325. }
  2326. int tcp_init(struct socket_info* sock_info)
  2327. {
  2328. union sockaddr_union* addr;
  2329. int optval;
  2330. #ifdef HAVE_TCP_ACCEPT_FILTER
  2331. struct accept_filter_arg afa;
  2332. #endif /* HAVE_TCP_ACCEPT_FILTER */
  2333. #ifdef DISABLE_NAGLE
  2334. int flag;
  2335. struct protoent* pe;
  2336. if (tcp_proto_no==-1){ /* if not already set */
  2337. pe=getprotobyname("tcp");
  2338. if (pe==0){
  2339. LOG(L_ERR, "ERROR: tcp_init: could not get TCP protocol number\n");
  2340. tcp_proto_no=-1;
  2341. }else{
  2342. tcp_proto_no=pe->p_proto;
  2343. }
  2344. }
  2345. #endif
  2346. addr=&sock_info->su;
  2347. /* sock_info->proto=PROTO_TCP; */
  2348. if (init_su(addr, &sock_info->address, sock_info->port_no)<0){
  2349. LOG(L_ERR, "ERROR: tcp_init: could no init sockaddr_union\n");
  2350. goto error;
  2351. }
  2352. DBG("tcp_init: added %s\n", su2a(addr, sizeof(*addr)));
  2353. sock_info->socket=socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
  2354. if (sock_info->socket==-1){
  2355. LOG(L_ERR, "ERROR: tcp_init: socket: %s\n", strerror(errno));
  2356. goto error;
  2357. }
  2358. #ifdef DISABLE_NAGLE
  2359. flag=1;
  2360. if ( (tcp_proto_no!=-1) &&
  2361. (setsockopt(sock_info->socket, tcp_proto_no , TCP_NODELAY,
  2362. &flag, sizeof(flag))<0) ){
  2363. LOG(L_ERR, "ERROR: tcp_init: could not disable Nagle: %s\n",
  2364. strerror(errno));
  2365. }
  2366. #endif
  2367. #if !defined(TCP_DONT_REUSEADDR)
  2368. /* Stevens, "Network Programming", Section 7.5, "Generic Socket
  2369. * Options": "...server started,..a child continues..on existing
  2370. * connection..listening server is restarted...call to bind fails
  2371. * ... ALL TCP servers should specify the SO_REUSEADDRE option
  2372. * to allow the server to be restarted in this situation
  2373. *
  2374. * Indeed, without this option, the server can't restart.
  2375. * -jiri
  2376. */
  2377. optval=1;
  2378. if (setsockopt(sock_info->socket, SOL_SOCKET, SO_REUSEADDR,
  2379. (void*)&optval, sizeof(optval))==-1) {
  2380. LOG(L_ERR, "ERROR: tcp_init: setsockopt %s\n",
  2381. strerror(errno));
  2382. goto error;
  2383. }
  2384. #endif
  2385. /* tos */
  2386. optval = tos;
  2387. if (setsockopt(sock_info->socket, IPPROTO_IP, IP_TOS, (void*)&optval,
  2388. sizeof(optval)) ==-1){
  2389. LOG(L_WARN, "WARNING: tcp_init: setsockopt tos: %s\n", strerror(errno));
  2390. /* continue since this is not critical */
  2391. }
  2392. #ifdef HAVE_TCP_DEFER_ACCEPT
  2393. /* linux only */
  2394. if ((optval=cfg_get(tcp, tcp_cfg, defer_accept))){
  2395. if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_DEFER_ACCEPT,
  2396. (void*)&optval, sizeof(optval)) ==-1){
  2397. LOG(L_WARN, "WARNING: tcp_init: setsockopt TCP_DEFER_ACCEPT %s\n",
  2398. strerror(errno));
  2399. /* continue since this is not critical */
  2400. }
  2401. }
  2402. #endif /* HAVE_TCP_DEFFER_ACCEPT */
  2403. #ifdef HAVE_TCP_SYNCNT
  2404. if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
  2405. if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_SYNCNT, &optval,
  2406. sizeof(optval))<0){
  2407. LOG(L_WARN, "WARNING: tcp_init: failed to set"
  2408. " maximum SYN retr. count: %s\n", strerror(errno));
  2409. }
  2410. }
  2411. #endif
  2412. #ifdef HAVE_TCP_LINGER2
  2413. if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
  2414. if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_LINGER2, &optval,
  2415. sizeof(optval))<0){
  2416. LOG(L_WARN, "WARNING: tcp_init: failed to set"
  2417. " maximum LINGER2 timeout: %s\n", strerror(errno));
  2418. }
  2419. }
  2420. #endif
  2421. init_sock_keepalive(sock_info->socket);
  2422. if (bind(sock_info->socket, &addr->s, sockaddru_len(*addr))==-1){
  2423. LOG(L_ERR, "ERROR: tcp_init: bind(%x, %p, %d) on %s:%d : %s\n",
  2424. sock_info->socket, &addr->s,
  2425. (unsigned)sockaddru_len(*addr),
  2426. sock_info->address_str.s,
  2427. sock_info->port_no,
  2428. strerror(errno));
  2429. goto error;
  2430. }
  2431. if (listen(sock_info->socket, TCP_LISTEN_BACKLOG)==-1){
  2432. LOG(L_ERR, "ERROR: tcp_init: listen(%x, %p, %d) on %s: %s\n",
  2433. sock_info->socket, &addr->s,
  2434. (unsigned)sockaddru_len(*addr),
  2435. sock_info->address_str.s,
  2436. strerror(errno));
  2437. goto error;
  2438. }
  2439. #ifdef HAVE_TCP_ACCEPT_FILTER
  2440. /* freebsd */
  2441. if (cfg_get(tcp, tcp_cfg, defer_accept)){
  2442. memset(&afa, 0, sizeof(afa));
  2443. strcpy(afa.af_name, "dataready");
  2444. if (setsockopt(sock_info->socket, SOL_SOCKET, SO_ACCEPTFILTER,
  2445. (void*)&afa, sizeof(afa)) ==-1){
  2446. LOG(L_WARN, "WARNING: tcp_init: setsockopt SO_ACCEPTFILTER %s\n",
  2447. strerror(errno));
  2448. /* continue since this is not critical */
  2449. }
  2450. }
  2451. #endif /* HAVE_TCP_ACCEPT_FILTER */
  2452. return 0;
  2453. error:
  2454. if (sock_info->socket!=-1){
  2455. close(sock_info->socket);
  2456. sock_info->socket=-1;
  2457. }
  2458. return -1;
  2459. }
  2460. /* close tcp_main's fd from a tcpconn
  2461. * WARNING: call only in tcp_main context */
  2462. inline static void tcpconn_close_main_fd(struct tcp_connection* tcpconn)
  2463. {
  2464. int fd;
  2465. fd=tcpconn->s;
  2466. #ifdef USE_TLS
  2467. if (tcpconn->type==PROTO_TLS)
  2468. tls_close(tcpconn, fd);
  2469. #endif
  2470. #ifdef TCP_FD_CACHE
  2471. if (likely(cfg_get(tcp, tcp_cfg, fd_cache))) shutdown(fd, SHUT_RDWR);
  2472. #endif /* TCP_FD_CACHE */
  2473. close_again:
  2474. if (unlikely(close(fd)<0)){
  2475. if (errno==EINTR)
  2476. goto close_again;
  2477. LOG(L_ERR, "ERROR: tcpconn_put_destroy; close() failed: %s (%d)\n",
  2478. strerror(errno), errno);
  2479. }
  2480. }
  2481. /* dec refcnt & frees the connection if refcnt==0
  2482. * returns 1 if the connection is freed, 0 otherwise
  2483. *
  2484. * WARNING: use only from child processes */
  2485. inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
  2486. {
  2487. if (unlikely(atomic_dec_and_test(&tcpconn->refcnt))){
  2488. DBG("tcpconn_chld_put: destroying connection %p (%d, %d) "
  2489. "flags %04x\n", tcpconn, tcpconn->id,
  2490. tcpconn->s, tcpconn->flags);
  2491. /* sanity checks */
  2492. membar_read_atomic_op(); /* make sure we see the current flags */
  2493. if (unlikely(!(tcpconn->flags & F_CONN_FD_CLOSED) ||
  2494. (tcpconn->flags &
  2495. (F_CONN_HASHED|F_CONN_MAIN_TIMER|
  2496. F_CONN_READ_W|F_CONN_WRITE_W)) )){
  2497. LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
  2498. tcpconn, tcpconn->flags);
  2499. abort();
  2500. }
  2501. _tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
  2502. return 1;
  2503. }
  2504. return 0;
  2505. }
  2506. /* simple destroy function (the connection should be already removed
  2507. * from the hashes. refcnt 0 and the fds should not be watched anymore for IO)
  2508. */
  2509. inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
  2510. {
  2511. DBG("tcpconn_destroy: destroying connection %p (%d, %d) "
  2512. "flags %04x\n", tcpconn, tcpconn->id,
  2513. tcpconn->s, tcpconn->flags);
  2514. if (unlikely(tcpconn->flags & F_CONN_HASHED)){
  2515. LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed"
  2516. " connection (%p)\n", tcpconn);
  2517. /* try to continue */
  2518. if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
  2519. local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
  2520. TCPCONN_LOCK;
  2521. _tcpconn_detach(tcpconn);
  2522. TCPCONN_UNLOCK;
  2523. }
  2524. if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
  2525. tcpconn_close_main_fd(tcpconn);
  2526. (*tcp_connections_no)--;
  2527. }
  2528. _tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
  2529. }
  2530. /* tries to destroy the connection: dec. refcnt and if 0 destroys the
  2531. * connection, else it will mark it as BAD and close the main fds
  2532. *
  2533. * returns 1 if the connection was destroyed, 0 otherwise
  2534. *
  2535. * WARNING: - the connection _has_ to be removed from the hash and timer
  2536. * first (use tcpconn_try_unhash() for this )
  2537. * - the fd should not be watched anymore (io_watch_del()...)
  2538. * - must be called _only_ from the tcp_main process context
  2539. * (or else the fd will remain open)
  2540. */
  2541. inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
  2542. {
  2543. if (unlikely((tcpconn->flags &
  2544. (F_CONN_WRITE_W|F_CONN_HASHED|F_CONN_MAIN_TIMER|F_CONN_READ_W)) )){
  2545. /* sanity check */
  2546. if (unlikely(tcpconn->flags & F_CONN_HASHED)){
  2547. LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed and/or"
  2548. "on timer connection (%p), flags = %0x\n",
  2549. tcpconn, tcpconn->flags);
  2550. /* try to continue */
  2551. if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
  2552. local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
  2553. TCPCONN_LOCK;
  2554. _tcpconn_detach(tcpconn);
  2555. TCPCONN_UNLOCK;
  2556. }else{
  2557. LOG(L_CRIT, "BUG: tcpconn_put_destroy: %p flags = %0x\n",
  2558. tcpconn, tcpconn->flags);
  2559. }
  2560. }
  2561. tcpconn->state=S_CONN_BAD;
  2562. /* in case it's still in a reader timer */
  2563. tcpconn->timeout=get_ticks_raw();
  2564. /* fast close: close fds now */
  2565. if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
  2566. tcpconn_close_main_fd(tcpconn);
  2567. tcpconn->flags|=F_CONN_FD_CLOSED;
  2568. (*tcp_connections_no)--;
  2569. }
  2570. /* all the flags / ops on the tcpconn must be done prior to decrementing
  2571. * the refcnt. and at least a membar_write_atomic_op() mem. barrier or
  2572. * a mb_atomic_* op must * be used to make sure all the changed flags are
  2573. * written into memory prior to the new refcnt value */
  2574. if (unlikely(mb_atomic_dec_and_test(&tcpconn->refcnt))){
  2575. _tcpconn_free(tcpconn);
  2576. return 1;
  2577. }
  2578. return 0;
  2579. }
  2580. /* try to remove a connection from the hashes and timer.
  2581. * returns 1 if the connection was removed, 0 if not (connection not in
  2582. * hash)
  2583. *
  2584. * WARNING: call it only in the tcp_main process context or else the
  2585. * timer removal won't work.
  2586. */
  2587. inline static int tcpconn_try_unhash(struct tcp_connection* tcpconn)
  2588. {
  2589. if (likely(tcpconn->flags & F_CONN_HASHED)){
  2590. tcpconn->state=S_CONN_BAD;
  2591. if (likely(tcpconn->flags & F_CONN_MAIN_TIMER)){
  2592. local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
  2593. tcpconn->flags&=~F_CONN_MAIN_TIMER;
  2594. }else
  2595. /* in case it's still in a reader timer */
  2596. tcpconn->timeout=get_ticks_raw();
  2597. TCPCONN_LOCK;
  2598. if (tcpconn->flags & F_CONN_HASHED){
  2599. tcpconn->flags&=~F_CONN_HASHED;
  2600. _tcpconn_detach(tcpconn);
  2601. TCPCONN_UNLOCK;
  2602. }else{
  2603. /* tcp_send was faster and did unhash it itself */
  2604. TCPCONN_UNLOCK;
  2605. return 0;
  2606. }
  2607. #ifdef TCP_ASYNC
  2608. /* empty possible write buffers (optional) */
  2609. if (unlikely(_wbufq_non_empty(tcpconn))){
  2610. lock_get(&tcpconn->write_lock);
  2611. /* check again, while holding the lock */
  2612. if (likely(_wbufq_non_empty(tcpconn)))
  2613. _wbufq_destroy(&tcpconn->wbuf_q);
  2614. lock_release(&tcpconn->write_lock);
  2615. }
  2616. #endif /* TCP_ASYNC */
  2617. return 1;
  2618. }
  2619. return 0;
  2620. }
  2621. #ifdef SEND_FD_QUEUE
  2622. struct send_fd_info{
  2623. struct tcp_connection* tcp_conn;
  2624. ticks_t expire;
  2625. int unix_sock;
  2626. unsigned int retries; /* debugging */
  2627. };
  2628. struct tcp_send_fd_q{
  2629. struct send_fd_info* data; /* buffer */
  2630. struct send_fd_info* crt; /* pointer inside the buffer */
  2631. struct send_fd_info* end; /* points after the last valid position */
  2632. };
  2633. static struct tcp_send_fd_q send2child_q;
  2634. static int send_fd_queue_init(struct tcp_send_fd_q *q, unsigned int size)
  2635. {
  2636. q->data=pkg_malloc(size*sizeof(struct send_fd_info));
  2637. if (q->data==0){
  2638. LOG(L_ERR, "ERROR: send_fd_queue_init: out of memory\n");
  2639. return -1;
  2640. }
  2641. q->crt=&q->data[0];
  2642. q->end=&q->data[size];
  2643. return 0;
  2644. }
  2645. static void send_fd_queue_destroy(struct tcp_send_fd_q *q)
  2646. {
  2647. if (q->data){
  2648. pkg_free(q->data);
  2649. q->data=0;
  2650. q->crt=q->end=0;
  2651. }
  2652. }
  2653. static int init_send_fd_queues()
  2654. {
  2655. if (send_fd_queue_init(&send2child_q, SEND_FD_QUEUE_SIZE)!=0)
  2656. goto error;
  2657. return 0;
  2658. error:
  2659. LOG(L_ERR, "ERROR: init_send_fd_queues: init failed\n");
  2660. return -1;
  2661. }
  2662. static void destroy_send_fd_queues()
  2663. {
  2664. send_fd_queue_destroy(&send2child_q);
  2665. }
  2666. inline static int send_fd_queue_add( struct tcp_send_fd_q* q,
  2667. int unix_sock,
  2668. struct tcp_connection *t)
  2669. {
  2670. struct send_fd_info* tmp;
  2671. unsigned long new_size;
  2672. if (q->crt>=q->end){
  2673. new_size=q->end-&q->data[0];
  2674. if (new_size< MAX_SEND_FD_QUEUE_SIZE/2){
  2675. new_size*=2;
  2676. }else new_size=MAX_SEND_FD_QUEUE_SIZE;
  2677. if (unlikely(q->crt>=&q->data[new_size])){
  2678. LOG(L_ERR, "ERROR: send_fd_queue_add: queue full: %ld/%ld\n",
  2679. (long)(q->crt-&q->data[0]-1), new_size);
  2680. goto error;
  2681. }
  2682. LOG(L_CRIT, "INFO: send_fd_queue: queue full: %ld, extending to %ld\n",
  2683. (long)(q->end-&q->data[0]), new_size);
  2684. tmp=pkg_realloc(q->data, new_size*sizeof(struct send_fd_info));
  2685. if (unlikely(tmp==0)){
  2686. LOG(L_ERR, "ERROR: send_fd_queue_add: out of memory\n");
  2687. goto error;
  2688. }
  2689. q->crt=(q->crt-&q->data[0])+tmp;
  2690. q->data=tmp;
  2691. q->end=&q->data[new_size];
  2692. }
  2693. q->crt->tcp_conn=t;
  2694. q->crt->unix_sock=unix_sock;
  2695. q->crt->expire=get_ticks_raw()+SEND_FD_QUEUE_TIMEOUT;
  2696. q->crt->retries=0;
  2697. q->crt++;
  2698. return 0;
  2699. error:
  2700. return -1;
  2701. }
  2702. inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
  2703. {
  2704. struct send_fd_info* p;
  2705. struct send_fd_info* t;
  2706. for (p=t=&q->data[0]; p<q->crt; p++){
  2707. if (unlikely(send_fd(p->unix_sock, &(p->tcp_conn),
  2708. sizeof(struct tcp_connection*), p->tcp_conn->s)<=0)){
  2709. if ( ((errno==EAGAIN)||(errno==EWOULDBLOCK)) &&
  2710. ((s_ticks_t)(p->expire-get_ticks_raw())>0)){
  2711. /* leave in queue for a future try */
  2712. *t=*p;
  2713. t->retries++;
  2714. t++;
  2715. }else{
  2716. LOG(L_ERR, "ERROR: run_send_fd_queue: send_fd failed"
  2717. " on socket %d , queue entry %ld, retries %d,"
  2718. " connection %p, tcp socket %d, errno=%d (%s) \n",
  2719. p->unix_sock, (long)(p-&q->data[0]), p->retries,
  2720. p->tcp_conn, p->tcp_conn->s, errno,
  2721. strerror(errno));
  2722. #ifdef TCP_ASYNC
  2723. if (p->tcp_conn->flags & F_CONN_WRITE_W){
  2724. io_watch_del(&io_h, p->tcp_conn->s, -1, IO_FD_CLOSING);
  2725. p->tcp_conn->flags &=~F_CONN_WRITE_W;
  2726. }
  2727. #endif
  2728. p->tcp_conn->flags &= ~F_CONN_READER;
  2729. if (likely(tcpconn_try_unhash(p->tcp_conn)))
  2730. tcpconn_put(p->tcp_conn);
  2731. tcpconn_put_destroy(p->tcp_conn); /* dec refcnt & destroy */
  2732. }
  2733. }
  2734. }
  2735. q->crt=t;
  2736. }
  2737. #else
  2738. #define send_fd_queue_run(q)
  2739. #endif
  2740. /* non blocking write() on a tcpconnection, unsafe version (should be called
  2741. * while holding c->write_lock). The fd should be non-blocking.
  2742. * returns number of bytes written on success, -1 on error (and sets errno)
  2743. */
  2744. int _tcpconn_write_nb(int fd, struct tcp_connection* c,
  2745. char* buf, int len)
  2746. {
  2747. int n;
  2748. again:
  2749. n=send(fd, buf, len,
  2750. #ifdef HAVE_MSG_NOSIGNAL
  2751. MSG_NOSIGNAL
  2752. #else
  2753. 0
  2754. #endif /* HAVE_MSG_NOSIGNAL */
  2755. );
  2756. if (unlikely(n<0)){
  2757. if (errno==EINTR) goto again;
  2758. }
  2759. return n;
  2760. }
  2761. /* handles io from a tcp child process
  2762. * params: tcp_c - pointer in the tcp_children array, to the entry for
  2763. * which an io event was detected
  2764. * fd_i - fd index in the fd_array (usefull for optimizing
  2765. * io_watch_deletes)
  2766. * returns: handle_* return convention: -1 on error, 0 on EAGAIN (no more
  2767. * io events queued), >0 on success. success/error refer only to
  2768. * the reads from the fd.
  2769. */
  2770. inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
  2771. {
  2772. struct tcp_connection* tcpconn;
  2773. long response[2];
  2774. int cmd;
  2775. int bytes;
  2776. int n;
  2777. ticks_t t;
  2778. ticks_t crt_timeout;
  2779. ticks_t con_lifetime;
  2780. if (unlikely(tcp_c->unix_sock<=0)){
  2781. /* (we can't have a fd==0, 0 is never closed )*/
  2782. LOG(L_CRIT, "BUG: handle_tcp_child: fd %d for %d "
  2783. "(pid %ld, ser no %d)\n", tcp_c->unix_sock,
  2784. (int)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
  2785. tcp_c->proc_no);
  2786. goto error;
  2787. }
  2788. /* read until sizeof(response)
  2789. * (this is a SOCK_STREAM so read is not atomic) */
  2790. bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
  2791. if (unlikely(bytes<(int)sizeof(response))){
  2792. if (bytes==0){
  2793. /* EOF -> bad, child has died */
  2794. DBG("DBG: handle_tcp_child: dead tcp child %d (pid %ld, no %d)"
  2795. " (shutting down?)\n", (int)(tcp_c-&tcp_children[0]),
  2796. (long)tcp_c->pid, tcp_c->proc_no );
  2797. /* don't listen on it any more */
  2798. io_watch_del(&io_h, tcp_c->unix_sock, fd_i, 0);
  2799. goto error; /* eof. so no more io here, it's ok to return error */
  2800. }else if (bytes<0){
  2801. /* EAGAIN is ok if we try to empty the buffer
  2802. * e.g.: SIGIO_RT overflow mode or EPOLL ET */
  2803. if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
  2804. LOG(L_CRIT, "ERROR: handle_tcp_child: read from tcp child %ld "
  2805. " (pid %ld, no %d) %s [%d]\n",
  2806. (long)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
  2807. tcp_c->proc_no, strerror(errno), errno );
  2808. }else{
  2809. bytes=0;
  2810. }
  2811. /* try to ignore ? */
  2812. goto end;
  2813. }else{
  2814. /* should never happen */
  2815. LOG(L_CRIT, "BUG: handle_tcp_child: too few bytes received (%d)\n",
  2816. bytes );
  2817. bytes=0; /* something was read so there is no error; otoh if
  2818. receive_fd returned less then requested => the receive
  2819. buffer is empty => no more io queued on this fd */
  2820. goto end;
  2821. }
  2822. }
  2823. DBG("handle_tcp_child: reader response= %lx, %ld from %d \n",
  2824. response[0], response[1], (int)(tcp_c-&tcp_children[0]));
  2825. cmd=response[1];
  2826. tcpconn=(struct tcp_connection*)response[0];
  2827. if (unlikely(tcpconn==0)){
  2828. /* should never happen */
  2829. LOG(L_CRIT, "BUG: handle_tcp_child: null tcpconn pointer received"
  2830. " from tcp child %d (pid %ld): %lx, %lx\n",
  2831. (int)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
  2832. response[0], response[1]) ;
  2833. goto end;
  2834. }
  2835. switch(cmd){
  2836. case CONN_RELEASE:
  2837. tcp_c->busy--;
  2838. if (unlikely(tcpconn_put(tcpconn))){
  2839. tcpconn_destroy(tcpconn);
  2840. break;
  2841. }
  2842. if (unlikely(tcpconn->state==S_CONN_BAD)){
  2843. #ifdef TCP_ASYNC
  2844. if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
  2845. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  2846. tcpconn->flags &= ~F_CONN_WRITE_W;
  2847. }
  2848. #endif /* TCP_ASYNC */
  2849. if (tcpconn_try_unhash(tcpconn))
  2850. tcpconn_put_destroy(tcpconn);
  2851. break;
  2852. }
  2853. /* update the timeout*/
  2854. t=get_ticks_raw();
  2855. con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
  2856. tcpconn->timeout=t+con_lifetime;
  2857. crt_timeout=con_lifetime;
  2858. #ifdef TCP_ASYNC
  2859. if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
  2860. _wbufq_non_empty(tcpconn) )){
  2861. if (unlikely(TICKS_GE(t, tcpconn->wbuf_q.wr_timeout))){
  2862. DBG("handle_tcp_child: wr. timeout on CONN_RELEASE for %p "
  2863. "refcnt= %d\n", tcpconn,
  2864. atomic_get(&tcpconn->refcnt));
  2865. /* timeout */
  2866. if (unlikely(tcpconn->state==S_CONN_CONNECT)){
  2867. #ifdef USE_DST_BLACKLIST
  2868. dst_blacklist_su( BLST_ERR_CONNECT,
  2869. tcpconn->rcv.proto,
  2870. &tcpconn->rcv.src_su,
  2871. &tcpconn->send_flags, 0);
  2872. #endif /* USE_DST_BLACKLIST */
  2873. TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(tcpconn),
  2874. TCP_LPORT(tcpconn), TCP_PSU(tcpconn),
  2875. TCP_PROTO(tcpconn));
  2876. TCP_STATS_CONNECT_FAILED();
  2877. }else{
  2878. #ifdef USE_DST_BLACKLIST
  2879. dst_blacklist_su( BLST_ERR_SEND,
  2880. tcpconn->rcv.proto,
  2881. &tcpconn->rcv.src_su,
  2882. &tcpconn->send_flags, 0);
  2883. #endif /* USE_DST_BLACKLIST */
  2884. TCP_EV_SEND_TIMEOUT(0, &tcpconn->rcv);
  2885. TCP_STATS_SEND_TIMEOUT();
  2886. }
  2887. if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
  2888. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  2889. tcpconn->flags&=~F_CONN_WRITE_W;
  2890. }
  2891. if (tcpconn_try_unhash(tcpconn))
  2892. tcpconn_put_destroy(tcpconn);
  2893. break;
  2894. }else{
  2895. crt_timeout=MIN_unsigned(con_lifetime,
  2896. tcpconn->wbuf_q.wr_timeout-t);
  2897. }
  2898. }
  2899. #endif /* TCP_ASYNC */
  2900. /* re-activate the timer */
  2901. tcpconn->timer.f=tcpconn_main_timeout;
  2902. local_timer_reinit(&tcpconn->timer);
  2903. local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
  2904. /* must be after the de-ref*/
  2905. tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
  2906. tcpconn->flags&=~(F_CONN_READER|F_CONN_OOB_DATA);
  2907. #ifdef TCP_ASYNC
  2908. if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
  2909. n=io_watch_chg(&io_h, tcpconn->s, POLLIN| POLLOUT, -1);
  2910. else
  2911. #endif /* TCP_ASYNC */
  2912. n=io_watch_add(&io_h, tcpconn->s, POLLIN, F_TCPCONN, tcpconn);
  2913. if (unlikely(n<0)){
  2914. LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
  2915. " new socket to the fd list\n");
  2916. tcpconn->flags&=~F_CONN_READ_W;
  2917. #ifdef TCP_ASYNC
  2918. if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
  2919. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  2920. tcpconn->flags&=~F_CONN_WRITE_W;
  2921. }
  2922. #endif /* TCP_ASYNC */
  2923. if (tcpconn_try_unhash(tcpconn))
  2924. tcpconn_put_destroy(tcpconn);
  2925. break;
  2926. }
  2927. DBG("handle_tcp_child: CONN_RELEASE %p refcnt= %d\n",
  2928. tcpconn, atomic_get(&tcpconn->refcnt));
  2929. break;
  2930. case CONN_ERROR:
  2931. case CONN_DESTROY:
  2932. case CONN_EOF:
  2933. /* WARNING: this will auto-dec. refcnt! */
  2934. tcp_c->busy--;
  2935. /* main doesn't listen on it => we don't have to delete it
  2936. if (tcpconn->s!=-1)
  2937. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  2938. */
  2939. #ifdef TCP_ASYNC
  2940. if ((tcpconn->flags & F_CONN_WRITE_W) && (tcpconn->s!=-1)){
  2941. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  2942. tcpconn->flags&=~F_CONN_WRITE_W;
  2943. }
  2944. #endif /* TCP_ASYNC */
  2945. if (tcpconn_try_unhash(tcpconn))
  2946. tcpconn_put(tcpconn);
  2947. tcpconn_put_destroy(tcpconn); /* deref & delete if refcnt==0 */
  2948. break;
  2949. default:
  2950. LOG(L_CRIT, "BUG: handle_tcp_child: unknown cmd %d"
  2951. " from tcp reader %d\n",
  2952. cmd, (int)(tcp_c-&tcp_children[0]));
  2953. }
  2954. end:
  2955. return bytes;
  2956. error:
  2957. return -1;
  2958. }
  2959. /* handles io from a "generic" ser process (get fd or new_fd from a tcp_send)
  2960. *
  2961. * params: p - pointer in the ser processes array (pt[]), to the entry for
  2962. * which an io event was detected
  2963. * fd_i - fd index in the fd_array (usefull for optimizing
  2964. * io_watch_deletes)
  2965. * returns: handle_* return convention:
  2966. * -1 on error reading from the fd,
  2967. * 0 on EAGAIN or when no more io events are queued
  2968. * (receive buffer empty),
  2969. * >0 on successfull reads from the fd (the receive buffer might
  2970. * be non-empty).
  2971. */
  2972. inline static int handle_ser_child(struct process_table* p, int fd_i)
  2973. {
  2974. struct tcp_connection* tcpconn;
  2975. long response[2];
  2976. int cmd;
  2977. int bytes;
  2978. int ret;
  2979. int fd;
  2980. int flags;
  2981. ticks_t t;
  2982. ticks_t con_lifetime;
  2983. #ifdef TCP_ASYNC
  2984. ticks_t nxt_timeout;
  2985. #endif /* TCP_ASYNC */
  2986. ret=-1;
  2987. if (unlikely(p->unix_sock<=0)){
  2988. /* (we can't have a fd==0, 0 is never closed )*/
  2989. LOG(L_CRIT, "BUG: handle_ser_child: fd %d for %d "
  2990. "(pid %d)\n", p->unix_sock, (int)(p-&pt[0]), p->pid);
  2991. goto error;
  2992. }
  2993. /* get all bytes and the fd (if transmitted)
  2994. * (this is a SOCK_STREAM so read is not atomic) */
  2995. bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
  2996. MSG_DONTWAIT);
  2997. if (unlikely(bytes<(int)sizeof(response))){
  2998. /* too few bytes read */
  2999. if (bytes==0){
  3000. /* EOF -> bad, child has died */
  3001. DBG("DBG: handle_ser_child: dead child %d, pid %d"
  3002. " (shutting down?)\n", (int)(p-&pt[0]), p->pid);
  3003. /* don't listen on it any more */
  3004. io_watch_del(&io_h, p->unix_sock, fd_i, 0);
  3005. goto error; /* child dead => no further io events from it */
  3006. }else if (bytes<0){
  3007. /* EAGAIN is ok if we try to empty the buffer
  3008. * e.g: SIGIO_RT overflow mode or EPOLL ET */
  3009. if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
  3010. LOG(L_CRIT, "ERROR: handle_ser_child: read from child %d "
  3011. "(pid %d): %s [%d]\n", (int)(p-&pt[0]), p->pid,
  3012. strerror(errno), errno);
  3013. ret=-1;
  3014. }else{
  3015. ret=0;
  3016. }
  3017. /* try to ignore ? */
  3018. goto end;
  3019. }else{
  3020. /* should never happen */
  3021. LOG(L_CRIT, "BUG: handle_ser_child: too few bytes received (%d)\n",
  3022. bytes );
  3023. ret=0; /* something was read so there is no error; otoh if
  3024. receive_fd returned less then requested => the receive
  3025. buffer is empty => no more io queued on this fd */
  3026. goto end;
  3027. }
  3028. }
  3029. ret=1; /* something was received, there might be more queued */
  3030. DBG("handle_ser_child: read response= %lx, %ld, fd %d from %d (%d)\n",
  3031. response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
  3032. cmd=response[1];
  3033. tcpconn=(struct tcp_connection*)response[0];
  3034. if (unlikely(tcpconn==0)){
  3035. LOG(L_CRIT, "BUG: handle_ser_child: null tcpconn pointer received"
  3036. " from child %d (pid %d): %lx, %lx\n",
  3037. (int)(p-&pt[0]), p->pid, response[0], response[1]) ;
  3038. goto end;
  3039. }
  3040. switch(cmd){
  3041. case CONN_ERROR:
  3042. LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
  3043. " (id %d), refcnt %d\n",
  3044. tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
  3045. case CONN_EOF: /* forced EOF after full send, due to send flags */
  3046. #ifdef TCP_CONNECT_WAIT
  3047. /* if the connection is pending => it might be on the way of
  3048. * reaching tcp_main (e.g. CONN_NEW_COMPLETE or
  3049. * CONN_NEW_PENDING_WRITE) => it cannot be destroyed here */
  3050. if ( !(tcpconn->flags & F_CONN_PENDING) &&
  3051. tcpconn_try_unhash(tcpconn) )
  3052. tcpconn_put(tcpconn);
  3053. #else /* ! TCP_CONNECT_WAIT */
  3054. if ( tcpconn_try_unhash(tcpconn) )
  3055. tcpconn_put(tcpconn);
  3056. #endif /* TCP_CONNECT_WAIT */
  3057. if ( ((tcpconn->flags & (F_CONN_WRITE_W|F_CONN_READ_W)) ) &&
  3058. (tcpconn->s!=-1)){
  3059. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  3060. tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
  3061. }
  3062. tcpconn_put_destroy(tcpconn); /* dec refcnt & destroy on 0 */
  3063. break;
  3064. case CONN_GET_FD:
  3065. /* send the requested FD */
  3066. /* WARNING: take care of setting refcnt properly to
  3067. * avoid race conditions */
  3068. if (unlikely(send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
  3069. tcpconn->s)<=0)){
  3070. LOG(L_ERR, "ERROR: handle_ser_child: send_fd failed\n");
  3071. }
  3072. break;
  3073. case CONN_NEW:
  3074. /* update the fd in the requested tcpconn*/
  3075. /* WARNING: take care of setting refcnt properly to
  3076. * avoid race conditions */
  3077. if (unlikely(fd==-1)){
  3078. LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW:"
  3079. " no fd received\n");
  3080. tcpconn->flags|=F_CONN_FD_CLOSED;
  3081. tcpconn_put_destroy(tcpconn);
  3082. break;
  3083. }
  3084. (*tcp_connections_no)++;
  3085. tcpconn->s=fd;
  3086. /* add tcpconn to the list*/
  3087. tcpconn_add(tcpconn);
  3088. /* update the timeout*/
  3089. t=get_ticks_raw();
  3090. con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
  3091. tcpconn->timeout=t+con_lifetime;
  3092. /* activate the timer (already properly init. in tcpconn_new())
  3093. * no need for reinit */
  3094. local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
  3095. con_lifetime, t);
  3096. tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD)
  3097. #ifdef TCP_ASYNC
  3098. /* not used for now, the connection is sent to tcp_main
  3099. * before knowing whether we can write on it or we should
  3100. * wait */
  3101. | (((int)!(tcpconn->flags & F_CONN_WANTS_WR)-1)&
  3102. F_CONN_WRITE_W)
  3103. #endif /* TCP_ASYNC */
  3104. ;
  3105. tcpconn->flags&=~F_CONN_FD_CLOSED;
  3106. flags=POLLIN
  3107. #ifdef TCP_ASYNC
  3108. /* not used for now, the connection is sent to tcp_main
  3109. * before knowing if we can write on it or we should
  3110. * wait */
  3111. | (((int)!(tcpconn->flags & F_CONN_WANTS_WR)-1) & POLLOUT)
  3112. #endif /* TCP_ASYNC */
  3113. ;
  3114. if (unlikely(
  3115. io_watch_add(&io_h, tcpconn->s, flags,
  3116. F_TCPCONN, tcpconn)<0)){
  3117. LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
  3118. " new socket to the fd list\n");
  3119. tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
  3120. tcpconn_try_unhash(tcpconn); /* unhash & dec refcnt */
  3121. tcpconn_put_destroy(tcpconn);
  3122. }
  3123. break;
  3124. #ifdef TCP_ASYNC
  3125. case CONN_QUEUED_WRITE:
  3126. /* received only if the wr. queue is empty and a write finishes
  3127. * with EAGAIN (common after connect())
  3128. * it should only enable write watching on the fd. The connection
  3129. * should be already in the hash. The refcnt is automatically
  3130. * decremented.
  3131. */
  3132. /* auto-dec refcnt */
  3133. if (unlikely(tcpconn_put(tcpconn))){
  3134. tcpconn_destroy(tcpconn);
  3135. break;
  3136. }
  3137. if (unlikely((tcpconn->state==S_CONN_BAD) ||
  3138. !(tcpconn->flags & F_CONN_HASHED) ))
  3139. /* in the process of being destroyed => do nothing */
  3140. break;
  3141. if (!(tcpconn->flags & F_CONN_WANTS_WR)){
  3142. tcpconn->flags|=F_CONN_WANTS_WR;
  3143. t=get_ticks_raw();
  3144. if (likely((tcpconn->flags & F_CONN_MAIN_TIMER) &&
  3145. (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)) &&
  3146. TICKS_LT(t, tcpconn->wbuf_q.wr_timeout) )){
  3147. /* _wbufq_nonempty() is guaranteed here */
  3148. /* update the timer */
  3149. local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
  3150. local_timer_reinit(&tcpconn->timer);
  3151. local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
  3152. tcpconn->wbuf_q.wr_timeout-t, t);
  3153. DBG("tcp_main: handle_ser_child: CONN_QUEUED_WRITE; %p "
  3154. "timeout adjusted to %d s\n", tcpconn,
  3155. TICKS_TO_S(tcpconn->wbuf_q.wr_timeout-t));
  3156. }
  3157. if (!(tcpconn->flags & F_CONN_WRITE_W)){
  3158. tcpconn->flags|=F_CONN_WRITE_W;
  3159. if (!(tcpconn->flags & F_CONN_READ_W)){
  3160. if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLOUT,
  3161. F_TCPCONN, tcpconn)<0)){
  3162. LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child:"
  3163. " failed to enable write watch on"
  3164. " socket\n");
  3165. if (tcpconn_try_unhash(tcpconn))
  3166. tcpconn_put_destroy(tcpconn);
  3167. break;
  3168. }
  3169. }else{
  3170. if (unlikely(io_watch_chg(&io_h, tcpconn->s,
  3171. POLLIN|POLLOUT, -1)<0)){
  3172. LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child:"
  3173. " failed to change socket watch events\n");
  3174. io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
  3175. tcpconn->flags&=~F_CONN_READ_W;
  3176. if (tcpconn_try_unhash(tcpconn))
  3177. tcpconn_put_destroy(tcpconn);
  3178. break;
  3179. }
  3180. }
  3181. }
  3182. }else{
  3183. LOG(L_WARN, "tcp_main: handler_ser_child: connection %p"
  3184. " already watched for write\n", tcpconn);
  3185. }
  3186. break;
  3187. #ifdef TCP_CONNECT_WAIT
  3188. case CONN_NEW_COMPLETE:
  3189. case CONN_NEW_PENDING_WRITE:
  3190. /* received when a pending connect completes in the same
  3191. * tcp_send() that initiated it
  3192. * the connection is already in the hash with F_CONN_PENDING
  3193. * flag (added by tcp_send()) and refcnt at least 1 (for the
  3194. * hash)*/
  3195. tcpconn->flags&=~(F_CONN_PENDING|F_CONN_FD_CLOSED);
  3196. if (unlikely((tcpconn->state==S_CONN_BAD) || (fd==-1))){
  3197. if (unlikely(fd==-1))
  3198. LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW_COMPLETE:"
  3199. " no fd received\n");
  3200. else
  3201. LOG(L_WARN, "WARNING: handle_ser_child: CONN_NEW_COMPLETE:"
  3202. " received connection with error\n");
  3203. tcpconn->flags|=F_CONN_FD_CLOSED;
  3204. tcpconn->state=S_CONN_BAD;
  3205. tcpconn_try_unhash(tcpconn);
  3206. tcpconn_put_destroy(tcpconn);
  3207. break;
  3208. }
  3209. (*tcp_connections_no)++;
  3210. tcpconn->s=fd;
  3211. /* update the timeout*/
  3212. t=get_ticks_raw();
  3213. con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
  3214. tcpconn->timeout=t+con_lifetime;
  3215. nxt_timeout=con_lifetime;
  3216. if (unlikely(cmd==CONN_NEW_COMPLETE)){
  3217. /* check if needs to be watched for write */
  3218. lock_get(&tcpconn->write_lock);
  3219. /* if queue non empty watch it for write */
  3220. flags=(_wbufq_empty(tcpconn)-1)&POLLOUT;
  3221. lock_release(&tcpconn->write_lock);
  3222. if (flags){
  3223. if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
  3224. && TICKS_LT(t, tcpconn->wbuf_q.wr_timeout))
  3225. nxt_timeout=tcpconn->wbuf_q.wr_timeout-t;
  3226. tcpconn->flags|=F_CONN_WRITE_W|F_CONN_WANTS_WR;
  3227. }
  3228. /* activate the timer (already properly init. in
  3229. tcpconn_new()) no need for reinit */
  3230. local_timer_add(&tcp_main_ltimer, &tcpconn->timer, nxt_timeout,
  3231. t);
  3232. tcpconn->flags|=F_CONN_MAIN_TIMER|F_CONN_READ_W|
  3233. F_CONN_WANTS_RD;
  3234. }else{
  3235. /* CONN_NEW_PENDING_WRITE */
  3236. /* no need to check, we have something queued for write */
  3237. flags=POLLOUT;
  3238. if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
  3239. && TICKS_LT(t, tcpconn->wbuf_q.wr_timeout))
  3240. nxt_timeout=tcpconn->wbuf_q.wr_timeout-t;
  3241. /* activate the timer (already properly init. in
  3242. tcpconn_new()) no need for reinit */
  3243. local_timer_add(&tcp_main_ltimer, &tcpconn->timer, nxt_timeout,
  3244. t);
  3245. tcpconn->flags|=F_CONN_MAIN_TIMER|F_CONN_READ_W|
  3246. F_CONN_WANTS_RD |
  3247. F_CONN_WRITE_W|F_CONN_WANTS_WR;
  3248. }
  3249. flags|=POLLIN;
  3250. if (unlikely(
  3251. io_watch_add(&io_h, tcpconn->s, flags,
  3252. F_TCPCONN, tcpconn)<0)){
  3253. LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
  3254. " new socket to the fd list\n");
  3255. tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
  3256. tcpconn_try_unhash(tcpconn); /* unhash & dec refcnt */
  3257. tcpconn_put_destroy(tcpconn);
  3258. }
  3259. break;
  3260. #endif /* TCP_CONNECT_WAIT */
  3261. #endif /* TCP_ASYNC */
  3262. default:
  3263. LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
  3264. }
  3265. end:
  3266. return ret;
  3267. error:
  3268. return -1;
  3269. }
  3270. /* sends a tcpconn + fd to a choosen child */
  3271. inline static int send2child(struct tcp_connection* tcpconn)
  3272. {
  3273. int i;
  3274. int min_busy;
  3275. int idx;
  3276. static int crt=0; /* current child */
  3277. int last;
  3278. min_busy=tcp_children[0].busy;
  3279. idx=0;
  3280. last=crt+tcp_children_no;
  3281. for (; crt<last; crt++){
  3282. i=crt%tcp_children_no;
  3283. if (!tcp_children[i].busy){
  3284. idx=i;
  3285. min_busy=0;
  3286. break;
  3287. }else if (min_busy>tcp_children[i].busy){
  3288. min_busy=tcp_children[i].busy;
  3289. idx=i;
  3290. }
  3291. }
  3292. crt=idx+1; /* next time we start with crt%tcp_children_no */
  3293. tcp_children[idx].busy++;
  3294. tcp_children[idx].n_reqs++;
  3295. if (unlikely(min_busy)){
  3296. DBG("WARNING: send2child: no free tcp receiver, "
  3297. " connection passed to the least busy one (%d)\n",
  3298. min_busy);
  3299. }
  3300. DBG("send2child: to tcp child %d %d(%ld), %p\n", idx,
  3301. tcp_children[idx].proc_no,
  3302. (long)tcp_children[idx].pid, tcpconn);
  3303. /* first make sure this child doesn't have pending request for
  3304. * tcp_main (to avoid a possible deadlock: e.g. child wants to
  3305. * send a release command, but the master fills its socket buffer
  3306. * with new connection commands => deadlock) */
  3307. /* answer tcp_send requests first */
  3308. while(handle_ser_child(&pt[tcp_children[idx].proc_no], -1)>0);
  3309. /* process tcp readers requests */
  3310. while(handle_tcp_child(&tcp_children[idx], -1)>0);
  3311. #ifdef SEND_FD_QUEUE
  3312. /* if queue full, try to queue the io */
  3313. if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
  3314. sizeof(tcpconn), tcpconn->s)<=0)){
  3315. if ((errno==EAGAIN)||(errno==EWOULDBLOCK)){
  3316. /* FIXME: remove after debugging */
  3317. LOG(L_CRIT, "INFO: tcp child %d, socket %d: queue full,"
  3318. " %d requests queued (total handled %d)\n",
  3319. idx, tcp_children[idx].unix_sock, min_busy,
  3320. tcp_children[idx].n_reqs-1);
  3321. if (send_fd_queue_add(&send2child_q, tcp_children[idx].unix_sock,
  3322. tcpconn)!=0){
  3323. LOG(L_ERR, "ERROR: send2child: queue send op. failed\n");
  3324. return -1;
  3325. }
  3326. }else{
  3327. LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
  3328. return -1;
  3329. }
  3330. }
  3331. #else
  3332. if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
  3333. sizeof(tcpconn), tcpconn->s)<=0)){
  3334. LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
  3335. return -1;
  3336. }
  3337. #endif
  3338. return 0;
  3339. }
  3340. /* handles a new connection, called internally by tcp_main_loop/handle_io.
  3341. * params: si - pointer to one of the tcp socket_info structures on which
  3342. * an io event was detected (connection attempt)
  3343. * returns: handle_* return convention: -1 on error, 0 on EAGAIN (no more
  3344. * io events queued), >0 on success. success/error refer only to
  3345. * the accept.
  3346. */
  3347. static inline int handle_new_connect(struct socket_info* si)
  3348. {
  3349. union sockaddr_union su;
  3350. union sockaddr_union sock_name;
  3351. unsigned sock_name_len;
  3352. union sockaddr_union* dst_su;
  3353. struct tcp_connection* tcpconn;
  3354. socklen_t su_len;
  3355. int new_sock;
  3356. /* got a connection on r */
  3357. su_len=sizeof(su);
  3358. new_sock=accept(si->socket, &(su.s), &su_len);
  3359. if (unlikely(new_sock==-1)){
  3360. if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
  3361. return 0;
  3362. LOG(L_ERR, "WARNING: handle_new_connect: error while accepting"
  3363. " connection(%d): %s\n", errno, strerror(errno));
  3364. return -1;
  3365. }
  3366. if (unlikely(*tcp_connections_no>=cfg_get(tcp, tcp_cfg, max_connections))){
  3367. LOG(L_ERR, "ERROR: maximum number of connections exceeded: %d/%d\n",
  3368. *tcp_connections_no,
  3369. cfg_get(tcp, tcp_cfg, max_connections));
  3370. close(new_sock);
  3371. TCP_STATS_LOCAL_REJECT();
  3372. return 1; /* success, because the accept was succesfull */
  3373. }
  3374. if (unlikely(init_sock_opt_accept(new_sock)<0)){
  3375. LOG(L_ERR, "ERROR: handle_new_connect: init_sock_opt failed\n");
  3376. close(new_sock);
  3377. return 1; /* success, because the accept was succesfull */
  3378. }
  3379. (*tcp_connections_no)++;
  3380. TCP_STATS_ESTABLISHED(S_CONN_ACCEPT);
  3381. dst_su=&si->su;
  3382. if (unlikely(si->flags & SI_IS_ANY)){
  3383. /* INADDR_ANY => get local dst */
  3384. sock_name_len=sizeof(sock_name);
  3385. if (getsockname(new_sock, &sock_name.s, &sock_name_len)!=0){
  3386. LOG(L_ERR, "ERROR: handle_new_connect:"
  3387. " getsockname failed: %s(%d)\n",
  3388. strerror(errno), errno);
  3389. /* go on with the 0.0.0.0 dst from the sock_info */
  3390. }else{
  3391. dst_su=&sock_name;
  3392. }
  3393. }
  3394. /* add socket to list */
  3395. tcpconn=tcpconn_new(new_sock, &su, dst_su, si, si->proto, S_CONN_ACCEPT);
  3396. if (likely(tcpconn)){
  3397. tcpconn->flags|=F_CONN_PASSIVE;
  3398. #ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
  3399. atomic_set(&tcpconn->refcnt, 1); /* safe, not yet available to the
  3400. outside world */
  3401. tcpconn_add(tcpconn);
  3402. /* activate the timer */
  3403. local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
  3404. cfg_get(tcp, tcp_cfg, con_lifetime),
  3405. get_ticks_raw());
  3406. tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
  3407. if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLIN,
  3408. F_TCPCONN, tcpconn)<0)){
  3409. LOG(L_CRIT, "ERROR: tcp_main: handle_new_connect: failed to add"
  3410. " new socket to the fd list\n");
  3411. tcpconn->flags&=~F_CONN_READ_W;
  3412. if (tcpconn_try_unhash(tcpconn))
  3413. tcpconn_put_destroy(tcpconn);
  3414. }
  3415. #else
  3416. atomic_set(&tcpconn->refcnt, 2); /* safe, not yet available to the
  3417. outside world */
  3418. /* prepare it for passing to a child */
  3419. tcpconn->flags|=F_CONN_READER;
  3420. tcpconn_add(tcpconn);
  3421. DBG("handle_new_connect: new connection from %s: %p %d flags: %04x\n",
  3422. su2a(&su, sizeof(su)), tcpconn, tcpconn->s, tcpconn->flags);
  3423. if(unlikely(send2child(tcpconn)<0)){
  3424. LOG(L_ERR,"ERROR: handle_new_connect: no children "
  3425. "available\n");
  3426. tcpconn->flags&=~F_CONN_READER;
  3427. tcpconn_put(tcpconn);
  3428. tcpconn_try_unhash(tcpconn);
  3429. tcpconn_put_destroy(tcpconn);
  3430. }
  3431. #endif
  3432. }else{ /*tcpconn==0 */
  3433. LOG(L_ERR, "ERROR: handle_new_connect: tcpconn_new failed, "
  3434. "closing socket\n");
  3435. close(new_sock);
  3436. (*tcp_connections_no)--;
  3437. }
  3438. return 1; /* accept() was succesfull */
  3439. }
  3440. /* handles an io event on one of the watched tcp connections
  3441. *
  3442. * params: tcpconn - pointer to the tcp_connection for which we have an io ev.
  3443. * fd_i - index in the fd_array table (needed for delete)
  3444. * returns: handle_* return convention, but on success it always returns 0
  3445. * (because it's one-shot, after a succesful execution the fd is
  3446. * removed from tcp_main's watch fd list and passed to a child =>
  3447. * tcp_main is not interested in further io events that might be
  3448. * queued for this fd)
  3449. */
  3450. inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
  3451. int fd_i)
  3452. {
  3453. #ifdef TCP_ASYNC
  3454. int empty_q;
  3455. int bytes;
  3456. #endif /* TCP_ASYNC */
  3457. /* is refcnt!=0 really necessary?
  3458. * No, in fact it's a bug: I can have the following situation: a send only
  3459. * tcp connection used by n processes simultaneously => refcnt = n. In
  3460. * the same time I can have a read event and this situation is perfectly
  3461. * valid. -- andrei
  3462. */
  3463. #if 0
  3464. if ((tcpconn->refcnt!=0)){
  3465. /* FIXME: might be valid for sigio_rt iff fd flags are not cleared
  3466. * (there is a short window in which it could generate a sig
  3467. * that would be catched by tcp_main) */
  3468. LOG(L_CRIT, "BUG: handle_tcpconn_ev: io event on referenced"
  3469. " tcpconn (%p), refcnt=%d, fd=%d\n",
  3470. tcpconn, tcpconn->refcnt, tcpconn->s);
  3471. return -1;
  3472. }
  3473. #endif
  3474. /* pass it to child, so remove it from the io watch list and the local
  3475. * timer */
  3476. #ifdef TCP_ASYNC
  3477. empty_q=0; /* warning fix */
  3478. if (unlikely((ev & (POLLOUT|POLLERR|POLLHUP)) &&
  3479. (tcpconn->flags & F_CONN_WRITE_W))){
  3480. if (unlikely((ev & (POLLERR|POLLHUP)) ||
  3481. (wbufq_run(tcpconn->s, tcpconn, &empty_q)<0) ||
  3482. (empty_q && tcpconn_close_after_send(tcpconn))
  3483. )){
  3484. if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
  3485. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(1) failed:"
  3486. " for %p, fd %d\n", tcpconn, tcpconn->s);
  3487. }
  3488. if ((tcpconn->flags & F_CONN_READ_W) && (ev & POLLIN)){
  3489. /* connection is watched for read and there is a read event
  3490. * (unfortunately if we have POLLIN here we don't know if
  3491. * there's really any data in the read buffer or the POLLIN
  3492. * was generated by the error or EOF => to avoid loosing
  3493. * data it's safer to either directly check the read buffer
  3494. * or try a read)*/
  3495. /* in most cases the read buffer will be empty, so in general
  3496. * is cheaper to check it here and then send the
  3497. * conn. to a a child only if needed (another syscall + at
  3498. * least 2 * syscalls in the reader + ...) */
  3499. if ((ioctl(tcpconn->s, FIONREAD, &bytes)>=0) && (bytes>0)){
  3500. tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W|
  3501. F_CONN_WANTS_RD|F_CONN_WANTS_WR);
  3502. tcpconn->flags|=F_CONN_FORCE_EOF|F_CONN_WR_ERROR;
  3503. goto send_to_child;
  3504. }
  3505. /* if bytes==0 or ioctl failed, destroy the connection now */
  3506. }
  3507. tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W|
  3508. F_CONN_WANTS_RD|F_CONN_WANTS_WR);
  3509. if (unlikely(ev & POLLERR)){
  3510. if (unlikely(tcpconn->state==S_CONN_CONNECT)){
  3511. #ifdef USE_DST_BLACKLIST
  3512. dst_blacklist_su(BLST_ERR_CONNECT, tcpconn->rcv.proto,
  3513. &tcpconn->rcv.src_su,
  3514. &tcpconn->send_flags, 0);
  3515. #endif /* USE_DST_BLACKLIST */
  3516. TCP_EV_CONNECT_ERR(0, TCP_LADDR(tcpconn),
  3517. TCP_LPORT(tcpconn), TCP_PSU(tcpconn),
  3518. TCP_PROTO(tcpconn));
  3519. TCP_STATS_CONNECT_FAILED();
  3520. }else{
  3521. #ifdef USE_DST_BLACKLIST
  3522. dst_blacklist_su(BLST_ERR_SEND, tcpconn->rcv.proto,
  3523. &tcpconn->rcv.src_su,
  3524. &tcpconn->send_flags, 0);
  3525. #endif /* USE_DST_BLACKLIST */
  3526. TCP_STATS_CON_RESET(); /* FIXME: it could != RST */
  3527. }
  3528. }
  3529. if (unlikely(!tcpconn_try_unhash(tcpconn))){
  3530. LOG(L_CRIT, "BUG: tcpconn_ev: unhashed connection %p\n",
  3531. tcpconn);
  3532. }
  3533. tcpconn_put_destroy(tcpconn);
  3534. goto error;
  3535. }
  3536. if (empty_q){
  3537. tcpconn->flags&=~F_CONN_WANTS_WR;
  3538. if (!(tcpconn->flags & F_CONN_READ_W)){
  3539. if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
  3540. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(2)"
  3541. " failed:" " for %p, fd %d\n",
  3542. tcpconn, tcpconn->s);
  3543. goto error;
  3544. }
  3545. }else{
  3546. if (unlikely(io_watch_chg(&io_h, tcpconn->s,
  3547. POLLIN, fd_i)==-1)){
  3548. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(1)"
  3549. " failed:" " for %p, fd %d\n",
  3550. tcpconn, tcpconn->s);
  3551. goto error;
  3552. }
  3553. }
  3554. tcpconn->flags&=~F_CONN_WRITE_W;
  3555. }
  3556. ev&=~POLLOUT; /* clear POLLOUT */
  3557. }
  3558. if (likely(ev && (tcpconn->flags & F_CONN_READ_W))){
  3559. /* if still some other IO event (POLLIN|POLLHUP|POLLERR) and
  3560. * connection is still watched in tcp_main for reads, send it to a
  3561. * child and stop watching it for input (but continue watching for
  3562. * writes if needed): */
  3563. if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
  3564. if (unlikely(io_watch_chg(&io_h, tcpconn->s, POLLOUT, fd_i)==-1)){
  3565. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(2)"
  3566. " failed:" " for %p, fd %d\n",
  3567. tcpconn, tcpconn->s);
  3568. goto error;
  3569. }
  3570. }else
  3571. #else
  3572. {
  3573. #endif /* TCP_ASYNC */
  3574. if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
  3575. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(3)"
  3576. " failed:" " for %p, fd %d\n",
  3577. tcpconn, tcpconn->s);
  3578. goto error;
  3579. }
  3580. #ifdef TCP_ASYNC
  3581. send_to_child:
  3582. #endif
  3583. DBG("tcp: DBG: sending to child, events %x\n", ev);
  3584. #ifdef POLLRDHUP
  3585. tcpconn->flags|=((int)!(ev & (POLLRDHUP|POLLHUP|POLLERR)) -1) &
  3586. F_CONN_EOF_SEEN;
  3587. #else /* POLLRDHUP */
  3588. tcpconn->flags|=((int)!(ev & (POLLHUP|POLLERR)) -1) & F_CONN_EOF_SEEN;
  3589. #endif /* POLLRDHUP */
  3590. tcpconn->flags|= ((int)!(ev & POLLPRI) -1) & F_CONN_OOB_DATA;
  3591. tcpconn->flags|=F_CONN_READER;
  3592. local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
  3593. tcpconn->flags&=~(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
  3594. tcpconn_ref(tcpconn); /* refcnt ++ */
  3595. if (unlikely(send2child(tcpconn)<0)){
  3596. LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
  3597. tcpconn->flags&=~F_CONN_READER;
  3598. #ifdef TCP_ASYNC
  3599. if (tcpconn->flags & F_CONN_WRITE_W){
  3600. if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
  3601. LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(4)"
  3602. " failed:" " for %p, fd %d\n",
  3603. tcpconn, tcpconn->s);
  3604. }
  3605. tcpconn->flags&=~F_CONN_WRITE_W;
  3606. }
  3607. #endif /* TCP_ASYNC */
  3608. tcpconn_put(tcpconn);
  3609. tcpconn_try_unhash(tcpconn);
  3610. tcpconn_put_destroy(tcpconn); /* because of the tcpconn_ref() */
  3611. }
  3612. }
  3613. return 0; /* we are not interested in possibly queued io events,
  3614. the fd was either passed to a child, closed, or for writes,
  3615. everything possible was already written */
  3616. error:
  3617. return -1;
  3618. }
  3619. /* generic handle io routine, it will call the appropiate
  3620. * handle_xxx() based on the fd_map type
  3621. *
  3622. * params: fm - pointer to a fd hash entry
  3623. * idx - index in the fd_array (or -1 if not known)
  3624. * return: -1 on error
  3625. * 0 on EAGAIN or when by some other way it is known that no more
  3626. * io events are queued on the fd (the receive buffer is empty).
  3627. * Usefull to detect when there are no more io events queued for
  3628. * sigio_rt, epoll_et, kqueue.
  3629. * >0 on successfull read from the fd (when there might be more io
  3630. * queued -- the receive buffer might still be non-empty)
  3631. */
  3632. inline static int handle_io(struct fd_map* fm, short ev, int idx)
  3633. {
  3634. int ret;
  3635. /* update the local config */
  3636. cfg_update();
  3637. switch(fm->type){
  3638. case F_SOCKINFO:
  3639. ret=handle_new_connect((struct socket_info*)fm->data);
  3640. break;
  3641. case F_TCPCONN:
  3642. ret=handle_tcpconn_ev((struct tcp_connection*)fm->data, ev, idx);
  3643. break;
  3644. case F_TCPCHILD:
  3645. ret=handle_tcp_child((struct tcp_child*)fm->data, idx);
  3646. break;
  3647. case F_PROC:
  3648. ret=handle_ser_child((struct process_table*)fm->data, idx);
  3649. break;
  3650. case F_NONE:
  3651. LOG(L_CRIT, "BUG: handle_io: empty fd map: %p {%d, %d, %p},"
  3652. " idx %d\n", fm, fm->fd, fm->type, fm->data, idx);
  3653. goto error;
  3654. default:
  3655. LOG(L_CRIT, "BUG: handle_io: uknown fd type %d\n", fm->type);
  3656. goto error;
  3657. }
  3658. return ret;
  3659. error:
  3660. return -1;
  3661. }
  3662. /* timer handler for tcpconnection handled by tcp_main */
  3663. static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
  3664. {
  3665. struct tcp_connection *c;
  3666. int fd;
  3667. int tcp_async;
  3668. c=(struct tcp_connection*)data;
  3669. /* or (struct tcp...*)(tl-offset(c->timer)) */
  3670. #ifdef TCP_ASYNC
  3671. DBG( "tcp_main: entering timer for %p (ticks=%d, timeout=%d (%d s), "
  3672. "wr_timeout=%d (%d s)), write queue: %d bytes\n",
  3673. c, t, c->timeout, TICKS_TO_S(c->timeout-t),
  3674. c->wbuf_q.wr_timeout, TICKS_TO_S(c->wbuf_q.wr_timeout-t),
  3675. c->wbuf_q.queued);
  3676. tcp_async=cfg_get(tcp, tcp_cfg, async);
  3677. if (likely(TICKS_LT(t, c->timeout) && ( !tcp_async | _wbufq_empty(c) |
  3678. TICKS_LT(t, c->wbuf_q.wr_timeout)) )){
  3679. if (unlikely(tcp_async && _wbufq_non_empty(c)))
  3680. return (ticks_t)MIN_unsigned(c->timeout-t, c->wbuf_q.wr_timeout-t);
  3681. else
  3682. return (ticks_t)(c->timeout - t);
  3683. }
  3684. /* if time out due to write, add it to the blacklist */
  3685. if (tcp_async && _wbufq_non_empty(c) && TICKS_GE(t, c->wbuf_q.wr_timeout)){
  3686. if (unlikely(c->state==S_CONN_CONNECT)){
  3687. #ifdef USE_DST_BLACKLIST
  3688. dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto, &c->rcv.src_su,
  3689. &c->send_flags, 0);
  3690. #endif /* USE_DST_BLACKLIST */
  3691. TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(c), TCP_LPORT(c), TCP_PSU(c),
  3692. TCP_PROTO(c));
  3693. TCP_STATS_CONNECT_FAILED();
  3694. }else{
  3695. #ifdef USE_DST_BLACKLIST
  3696. dst_blacklist_su(BLST_ERR_SEND, c->rcv.proto, &c->rcv.src_su,
  3697. &c->send_flags, 0);
  3698. #endif /* USE_DST_BLACKLIST */
  3699. TCP_EV_SEND_TIMEOUT(0, &c->rcv);
  3700. TCP_STATS_SEND_TIMEOUT();
  3701. }
  3702. }else{
  3703. /* idle timeout */
  3704. TCP_EV_IDLE_CONN_CLOSED(0, &c->rcv);
  3705. TCP_STATS_CON_TIMEOUT();
  3706. }
  3707. #else /* ! TCP_ASYNC */
  3708. if (TICKS_LT(t, c->timeout)){
  3709. /* timeout extended, exit */
  3710. return (ticks_t)(c->timeout - t);
  3711. }
  3712. /* idle timeout */
  3713. TCP_EV_IDLE_CONN_CLOSED(0, &c->rcv);
  3714. TCP_STATS_CON_TIMEOUT();
  3715. #endif /* TCP_ASYNC */
  3716. DBG("tcp_main: timeout for %p\n", c);
  3717. if (likely(c->flags & F_CONN_HASHED)){
  3718. c->flags&=~(F_CONN_HASHED|F_CONN_MAIN_TIMER);
  3719. c->state=S_CONN_BAD;
  3720. TCPCONN_LOCK;
  3721. _tcpconn_detach(c);
  3722. TCPCONN_UNLOCK;
  3723. }else{
  3724. c->flags&=~F_CONN_MAIN_TIMER;
  3725. LOG(L_CRIT, "BUG: tcp_main: timer: called with unhashed connection %p"
  3726. "\n", c);
  3727. tcpconn_ref(c); /* ugly hack to try to go on */
  3728. }
  3729. fd=c->s;
  3730. if (likely(fd>0)){
  3731. if (likely(c->flags & (F_CONN_READ_W|F_CONN_WRITE_W))){
  3732. io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
  3733. c->flags&=~(F_CONN_READ_W|F_CONN_WRITE_W);
  3734. }
  3735. }
  3736. tcpconn_put_destroy(c);
  3737. return 0;
  3738. }
  3739. static inline void tcp_timer_run()
  3740. {
  3741. ticks_t ticks;
  3742. ticks=get_ticks_raw();
  3743. if (unlikely((ticks-tcp_main_prev_ticks)<TCPCONN_TIMEOUT_MIN_RUN)) return;
  3744. tcp_main_prev_ticks=ticks;
  3745. local_timer_run(&tcp_main_ltimer, ticks);
  3746. }
  3747. /* keep in sync with tcpconn_destroy, the "delete" part should be
  3748. * the same except for io_watch_del..
  3749. * Note: this function is called only on shutdown by the main ser process via
  3750. * cleanup(). However it's also safe to call it from the tcp_main process.
  3751. * => with the ser shutdown exception, it cannot execute in parallel
  3752. * with tcpconn_add() or tcpconn_destroy()*/
  3753. static inline void tcpconn_destroy_all()
  3754. {
  3755. struct tcp_connection *c, *next;
  3756. unsigned h;
  3757. int fd;
  3758. TCPCONN_LOCK;
  3759. for(h=0; h<TCP_ID_HASH_SIZE; h++){
  3760. c=tcpconn_id_hash[h];
  3761. while(c){
  3762. next=c->id_next;
  3763. if (is_tcp_main){
  3764. /* we cannot close or remove the fd if we are not in the
  3765. * tcp main proc.*/
  3766. if ((c->flags & F_CONN_MAIN_TIMER)){
  3767. local_timer_del(&tcp_main_ltimer, &c->timer);
  3768. c->flags&=~F_CONN_MAIN_TIMER;
  3769. } /* else still in some reader */
  3770. fd=c->s;
  3771. if (fd>0 && (c->flags & (F_CONN_READ_W|F_CONN_WRITE_W))){
  3772. io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
  3773. c->flags&=~(F_CONN_READ_W|F_CONN_WRITE_W);
  3774. }
  3775. }else{
  3776. fd=-1;
  3777. }
  3778. #ifdef USE_TLS
  3779. if (fd>0 && c->type==PROTO_TLS)
  3780. tls_close(c, fd);
  3781. #endif
  3782. _tcpconn_rm(c);
  3783. if (fd>0) {
  3784. #ifdef TCP_FD_CACHE
  3785. if (likely(cfg_get(tcp, tcp_cfg, fd_cache)))
  3786. shutdown(fd, SHUT_RDWR);
  3787. #endif /* TCP_FD_CACHE */
  3788. close(fd);
  3789. }
  3790. (*tcp_connections_no)--;
  3791. c=next;
  3792. }
  3793. }
  3794. TCPCONN_UNLOCK;
  3795. }
  3796. /* tcp main loop */
  3797. void tcp_main_loop()
  3798. {
  3799. struct socket_info* si;
  3800. int r;
  3801. is_tcp_main=1; /* mark this process as tcp main */
  3802. tcp_main_max_fd_no=get_max_open_fds();
  3803. /* init send fd queues (here because we want mem. alloc only in the tcp
  3804. * process */
  3805. #ifdef SEND_FD_QUEUE
  3806. if (init_send_fd_queues()<0){
  3807. LOG(L_CRIT, "ERROR: init_tcp: could not init send fd queues\n");
  3808. goto error;
  3809. }
  3810. #endif
  3811. /* init io_wait (here because we want the memory allocated only in
  3812. * the tcp_main process) */
  3813. if (init_io_wait(&io_h, tcp_main_max_fd_no, tcp_poll_method)<0)
  3814. goto error;
  3815. /* init: start watching all the fds*/
  3816. /* init local timer */
  3817. tcp_main_prev_ticks=get_ticks_raw();
  3818. if (init_local_timer(&tcp_main_ltimer, get_ticks_raw())!=0){
  3819. LOG(L_ERR, "ERROR: init_tcp: failed to init local timer\n");
  3820. goto error;
  3821. }
  3822. #ifdef TCP_FD_CACHE
  3823. if (cfg_get(tcp, tcp_cfg, fd_cache)) tcp_fd_cache_init();
  3824. #endif /* TCP_FD_CACHE */
  3825. /* add all the sockets we listen on for connections */
  3826. for (si=tcp_listen; si; si=si->next){
  3827. if ((si->proto==PROTO_TCP) &&(si->socket!=-1)){
  3828. if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
  3829. LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
  3830. "listen socket to the fd list\n");
  3831. goto error;
  3832. }
  3833. }else{
  3834. LOG(L_CRIT, "BUG: tcp_main_loop: non tcp address in tcp_listen\n");
  3835. }
  3836. }
  3837. #ifdef USE_TLS
  3838. if (!tls_disable && tls_loaded()){
  3839. for (si=tls_listen; si; si=si->next){
  3840. if ((si->proto==PROTO_TLS) && (si->socket!=-1)){
  3841. if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
  3842. LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
  3843. "tls listen socket to the fd list\n");
  3844. goto error;
  3845. }
  3846. }else{
  3847. LOG(L_CRIT, "BUG: tcp_main_loop: non tls address"
  3848. " in tls_listen\n");
  3849. }
  3850. }
  3851. }
  3852. #endif
  3853. /* add all the unix sockets used for communcation with other ser processes
  3854. * (get fd, new connection a.s.o) */
  3855. for (r=1; r<process_no; r++){
  3856. if (pt[r].unix_sock>0) /* we can't have 0, we never close it!*/
  3857. if (io_watch_add(&io_h, pt[r].unix_sock, POLLIN,F_PROC, &pt[r])<0){
  3858. LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
  3859. "process %d unix socket to the fd list\n", r);
  3860. goto error;
  3861. }
  3862. }
  3863. /* add all the unix sokets used for communication with the tcp childs */
  3864. for (r=0; r<tcp_children_no; r++){
  3865. if (tcp_children[r].unix_sock>0)/*we can't have 0, we never close it!*/
  3866. if (io_watch_add(&io_h, tcp_children[r].unix_sock, POLLIN,
  3867. F_TCPCHILD, &tcp_children[r]) <0){
  3868. LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
  3869. "tcp child %d unix socket to the fd list\n", r);
  3870. goto error;
  3871. }
  3872. }
  3873. /* initialize the cfg framework */
  3874. if (cfg_child_init()) goto error;
  3875. /* main loop */
  3876. switch(io_h.poll_method){
  3877. case POLL_POLL:
  3878. while(1){
  3879. /* wait and process IO */
  3880. io_wait_loop_poll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
  3881. send_fd_queue_run(&send2child_q); /* then new io */
  3882. /* remove old connections */
  3883. tcp_timer_run();
  3884. }
  3885. break;
  3886. #ifdef HAVE_SELECT
  3887. case POLL_SELECT:
  3888. while(1){
  3889. io_wait_loop_select(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
  3890. send_fd_queue_run(&send2child_q); /* then new io */
  3891. tcp_timer_run();
  3892. }
  3893. break;
  3894. #endif
  3895. #ifdef HAVE_SIGIO_RT
  3896. case POLL_SIGIO_RT:
  3897. while(1){
  3898. io_wait_loop_sigio_rt(&io_h, TCP_MAIN_SELECT_TIMEOUT);
  3899. send_fd_queue_run(&send2child_q); /* then new io */
  3900. tcp_timer_run();
  3901. }
  3902. break;
  3903. #endif
  3904. #ifdef HAVE_EPOLL
  3905. case POLL_EPOLL_LT:
  3906. while(1){
  3907. io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
  3908. send_fd_queue_run(&send2child_q); /* then new io */
  3909. tcp_timer_run();
  3910. }
  3911. break;
  3912. case POLL_EPOLL_ET:
  3913. while(1){
  3914. io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 1);
  3915. send_fd_queue_run(&send2child_q); /* then new io */
  3916. tcp_timer_run();
  3917. }
  3918. break;
  3919. #endif
  3920. #ifdef HAVE_KQUEUE
  3921. case POLL_KQUEUE:
  3922. while(1){
  3923. io_wait_loop_kqueue(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
  3924. send_fd_queue_run(&send2child_q); /* then new io */
  3925. tcp_timer_run();
  3926. }
  3927. break;
  3928. #endif
  3929. #ifdef HAVE_DEVPOLL
  3930. case POLL_DEVPOLL:
  3931. while(1){
  3932. io_wait_loop_devpoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
  3933. send_fd_queue_run(&send2child_q); /* then new io */
  3934. tcp_timer_run();
  3935. }
  3936. break;
  3937. #endif
  3938. default:
  3939. LOG(L_CRIT, "BUG: tcp_main_loop: no support for poll method "
  3940. " %s (%d)\n",
  3941. poll_method_name(io_h.poll_method), io_h.poll_method);
  3942. goto error;
  3943. }
  3944. error:
  3945. #ifdef SEND_FD_QUEUE
  3946. destroy_send_fd_queues();
  3947. #endif
  3948. destroy_io_wait(&io_h);
  3949. LOG(L_CRIT, "ERROR: tcp_main_loop: exiting...");
  3950. exit(-1);
  3951. }
  3952. /* cleanup before exit */
  3953. void destroy_tcp()
  3954. {
  3955. if (tcpconn_id_hash){
  3956. if (tcpconn_lock)
  3957. TCPCONN_UNLOCK; /* hack: force-unlock the tcp lock in case
  3958. some process was terminated while holding
  3959. it; this will allow an almost gracious
  3960. shutdown */
  3961. tcpconn_destroy_all();
  3962. shm_free(tcpconn_id_hash);
  3963. tcpconn_id_hash=0;
  3964. }
  3965. DESTROY_TCP_STATS();
  3966. if (tcp_connections_no){
  3967. shm_free(tcp_connections_no);
  3968. tcp_connections_no=0;
  3969. }
  3970. #ifdef TCP_ASYNC
  3971. if (tcp_total_wq){
  3972. shm_free(tcp_total_wq);
  3973. tcp_total_wq=0;
  3974. }
  3975. #endif /* TCP_ASYNC */
  3976. if (connection_id){
  3977. shm_free(connection_id);
  3978. connection_id=0;
  3979. }
  3980. if (tcpconn_aliases_hash){
  3981. shm_free(tcpconn_aliases_hash);
  3982. tcpconn_aliases_hash=0;
  3983. }
  3984. if (tcpconn_lock){
  3985. lock_destroy(tcpconn_lock);
  3986. lock_dealloc((void*)tcpconn_lock);
  3987. tcpconn_lock=0;
  3988. }
  3989. if (tcp_children){
  3990. pkg_free(tcp_children);
  3991. tcp_children=0;
  3992. }
  3993. destroy_local_timer(&tcp_main_ltimer);
  3994. }
  3995. int init_tcp()
  3996. {
  3997. char* poll_err;
  3998. tcp_options_check();
  3999. if (tcp_cfg==0){
  4000. BUG("tcp_cfg not initialized\n");
  4001. goto error;
  4002. }
  4003. /* init lock */
  4004. tcpconn_lock=lock_alloc();
  4005. if (tcpconn_lock==0){
  4006. LOG(L_CRIT, "ERROR: init_tcp: could not alloc lock\n");
  4007. goto error;
  4008. }
  4009. if (lock_init(tcpconn_lock)==0){
  4010. LOG(L_CRIT, "ERROR: init_tcp: could not init lock\n");
  4011. lock_dealloc((void*)tcpconn_lock);
  4012. tcpconn_lock=0;
  4013. goto error;
  4014. }
  4015. /* init globals */
  4016. tcp_connections_no=shm_malloc(sizeof(int));
  4017. if (tcp_connections_no==0){
  4018. LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
  4019. goto error;
  4020. }
  4021. *tcp_connections_no=0;
  4022. if (INIT_TCP_STATS()!=0) goto error;
  4023. connection_id=shm_malloc(sizeof(int));
  4024. if (connection_id==0){
  4025. LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
  4026. goto error;
  4027. }
  4028. *connection_id=1;
  4029. #ifdef TCP_ASYNC
  4030. tcp_total_wq=shm_malloc(sizeof(*tcp_total_wq));
  4031. if (tcp_total_wq==0){
  4032. LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
  4033. goto error;
  4034. }
  4035. #endif /* TCP_ASYNC */
  4036. /* alloc hashtables*/
  4037. tcpconn_aliases_hash=(struct tcp_conn_alias**)
  4038. shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
  4039. if (tcpconn_aliases_hash==0){
  4040. LOG(L_CRIT, "ERROR: init_tcp: could not alloc address hashtable\n");
  4041. goto error;
  4042. }
  4043. tcpconn_id_hash=(struct tcp_connection**)shm_malloc(TCP_ID_HASH_SIZE*
  4044. sizeof(struct tcp_connection*));
  4045. if (tcpconn_id_hash==0){
  4046. LOG(L_CRIT, "ERROR: init_tcp: could not alloc id hashtable\n");
  4047. goto error;
  4048. }
  4049. /* init hashtables*/
  4050. memset((void*)tcpconn_aliases_hash, 0,
  4051. TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
  4052. memset((void*)tcpconn_id_hash, 0,
  4053. TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
  4054. /* fix config variables */
  4055. poll_err=check_poll_method(tcp_poll_method);
  4056. /* set an appropriate poll method */
  4057. if (poll_err || (tcp_poll_method==0)){
  4058. tcp_poll_method=choose_poll_method();
  4059. if (poll_err){
  4060. LOG(L_ERR, "ERROR: init_tcp: %s, using %s instead\n",
  4061. poll_err, poll_method_name(tcp_poll_method));
  4062. }else{
  4063. LOG(L_INFO, "init_tcp: using %s as the io watch method"
  4064. " (auto detected)\n", poll_method_name(tcp_poll_method));
  4065. }
  4066. }else{
  4067. LOG(L_INFO, "init_tcp: using %s io watch method (config)\n",
  4068. poll_method_name(tcp_poll_method));
  4069. }
  4070. return 0;
  4071. error:
  4072. /* clean-up */
  4073. destroy_tcp();
  4074. return -1;
  4075. }
  4076. #ifdef TCP_CHILD_NON_BLOCKING
  4077. /* returns -1 on error */
  4078. static int set_non_blocking(int s)
  4079. {
  4080. int flags;
  4081. /* non-blocking */
  4082. flags=fcntl(s, F_GETFL);
  4083. if (flags==-1){
  4084. LOG(L_ERR, "ERROR: set_non_blocking: fnctl failed: (%d) %s\n",
  4085. errno, strerror(errno));
  4086. goto error;
  4087. }
  4088. if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
  4089. LOG(L_ERR, "ERROR: set_non_blocking: fcntl: set non-blocking failed:"
  4090. " (%d) %s\n", errno, strerror(errno));
  4091. goto error;
  4092. }
  4093. return 0;
  4094. error:
  4095. return -1;
  4096. }
  4097. #endif
  4098. /* returns -1 on error, 0 on success */
  4099. int tcp_fix_child_sockets(int* fd)
  4100. {
  4101. #ifdef TCP_CHILD_NON_BLOCKING
  4102. if ((set_non_blocking(fd[0])<0) ||
  4103. (set_non_blocking(fd[1])<0)){
  4104. return -1;
  4105. }
  4106. #endif
  4107. return 0;
  4108. }
  4109. /* starts the tcp processes */
  4110. int tcp_init_children()
  4111. {
  4112. int r;
  4113. int reader_fd_1; /* for comm. with the tcp children read */
  4114. pid_t pid;
  4115. struct socket_info *si;
  4116. /* estimate max fd. no:
  4117. * 1 tcp send unix socket/all_proc,
  4118. * + 1 udp sock/udp proc + 1 tcp_child sock/tcp child*
  4119. * + no_listen_tcp */
  4120. for(r=0, si=tcp_listen; si; si=si->next, r++);
  4121. #ifdef USE_TLS
  4122. if (! tls_disable)
  4123. for (si=tls_listen; si; si=si->next, r++);
  4124. #endif
  4125. register_fds(r+tcp_max_connections+get_max_procs()-1 /* tcp main */);
  4126. #if 0
  4127. tcp_max_fd_no=get_max_procs()*2 +r-1 /* timer */ +3; /* stdin/out/err*/
  4128. /* max connections can be temporarily exceeded with estimated_process_count
  4129. * - tcp_main (tcpconn_connect called simultaneously in all all the
  4130. * processes) */
  4131. tcp_max_fd_no+=tcp_max_connections+get_max_procs()-1 /* tcp main */;
  4132. #endif
  4133. /* alloc the children array */
  4134. tcp_children=pkg_malloc(sizeof(struct tcp_child)*tcp_children_no);
  4135. if (tcp_children==0){
  4136. LOG(L_ERR, "ERROR: tcp_init_children: out of memory\n");
  4137. goto error;
  4138. }
  4139. /* create the tcp sock_info structures */
  4140. /* copy the sockets --moved to main_loop*/
  4141. /* fork children & create the socket pairs*/
  4142. for(r=0; r<tcp_children_no; r++){
  4143. child_rank++;
  4144. pid=fork_tcp_process(child_rank, "tcp receiver", r, &reader_fd_1);
  4145. if (pid<0){
  4146. LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
  4147. strerror(errno));
  4148. goto error;
  4149. }else if (pid>0){
  4150. /* parent */
  4151. }else{
  4152. /* child */
  4153. bind_address=0; /* force a SEGFAULT if someone uses a non-init.
  4154. bind address on tcp */
  4155. tcp_receive_loop(reader_fd_1);
  4156. }
  4157. }
  4158. return 0;
  4159. error:
  4160. return -1;
  4161. }
  4162. void tcp_get_info(struct tcp_gen_info *ti)
  4163. {
  4164. ti->tcp_readers=tcp_children_no;
  4165. ti->tcp_max_connections=tcp_max_connections;
  4166. ti->tcp_connections_no=*tcp_connections_no;
  4167. #ifdef TCP_ASYNC
  4168. ti->tcp_write_queued=*tcp_total_wq;
  4169. #else
  4170. ti->tcp_write_queued=0;
  4171. #endif /* TCP_ASYNC */
  4172. }
  4173. #endif