interface.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/netip"
  8. "os"
  9. "runtime"
  10. "strings"
  11. "sync/atomic"
  12. "time"
  13. "github.com/gaissmai/bart"
  14. "github.com/rcrowley/go-metrics"
  15. "github.com/sirupsen/logrus"
  16. "github.com/slackhq/nebula/config"
  17. "github.com/slackhq/nebula/firewall"
  18. "github.com/slackhq/nebula/header"
  19. "github.com/slackhq/nebula/overlay"
  20. "github.com/slackhq/nebula/udp"
  21. )
  22. const (
  23. mtu = 9001
  24. defaultGSOFlushInterval = 150 * time.Microsecond
  25. defaultBatchQueueDepthFactor = 4
  26. defaultGSOMaxSegments = 8
  27. maxKernelGSOSegments = 64
  28. )
  29. type InterfaceConfig struct {
  30. HostMap *HostMap
  31. Outside udp.Conn
  32. Inside overlay.Device
  33. pki *PKI
  34. Cipher string
  35. Firewall *Firewall
  36. ServeDns bool
  37. HandshakeManager *HandshakeManager
  38. lightHouse *LightHouse
  39. connectionManager *connectionManager
  40. DropLocalBroadcast bool
  41. DropMulticast bool
  42. EnableGSO bool
  43. EnableGRO bool
  44. GSOMaxSegments int
  45. routines int
  46. MessageMetrics *MessageMetrics
  47. version string
  48. relayManager *relayManager
  49. punchy *Punchy
  50. tryPromoteEvery uint32
  51. reQueryEvery uint32
  52. reQueryWait time.Duration
  53. ConntrackCacheTimeout time.Duration
  54. BatchFlushInterval time.Duration
  55. BatchQueueDepth int
  56. l *logrus.Logger
  57. }
  58. type Interface struct {
  59. hostMap *HostMap
  60. outside udp.Conn
  61. inside overlay.Device
  62. pki *PKI
  63. firewall *Firewall
  64. connectionManager *connectionManager
  65. handshakeManager *HandshakeManager
  66. serveDns bool
  67. createTime time.Time
  68. lightHouse *LightHouse
  69. myBroadcastAddrsTable *bart.Lite
  70. myVpnAddrs []netip.Addr // A list of addresses assigned to us via our certificate
  71. myVpnAddrsTable *bart.Lite
  72. myVpnNetworks []netip.Prefix // A list of networks assigned to us via our certificate
  73. myVpnNetworksTable *bart.Lite
  74. dropLocalBroadcast bool
  75. dropMulticast bool
  76. routines int
  77. disconnectInvalid atomic.Bool
  78. closed atomic.Bool
  79. relayManager *relayManager
  80. tryPromoteEvery atomic.Uint32
  81. reQueryEvery atomic.Uint32
  82. reQueryWait atomic.Int64
  83. sendRecvErrorConfig sendRecvErrorConfig
  84. // rebindCount is used to decide if an active tunnel should trigger a punch notification through a lighthouse
  85. rebindCount int8
  86. version string
  87. conntrackCacheTimeout time.Duration
  88. batchQueueDepth int
  89. enableGSO bool
  90. enableGRO bool
  91. gsoMaxSegments int
  92. batchUDPQueueGauge metrics.Gauge
  93. batchUDPFlushCounter metrics.Counter
  94. batchTunQueueGauge metrics.Gauge
  95. batchTunFlushCounter metrics.Counter
  96. batchFlushInterval atomic.Int64
  97. sendSem chan struct{}
  98. writers []udp.Conn
  99. readers []io.ReadWriteCloser
  100. batches batchPipelines
  101. metricHandshakes metrics.Histogram
  102. messageMetrics *MessageMetrics
  103. cachedPacketMetrics *cachedPacketMetrics
  104. l *logrus.Logger
  105. }
  106. type EncWriter interface {
  107. SendVia(via *HostInfo,
  108. relay *Relay,
  109. ad,
  110. nb,
  111. out []byte,
  112. nocopy bool,
  113. )
  114. SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte)
  115. SendMessageToHostInfo(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte)
  116. Handshake(vpnAddr netip.Addr)
  117. GetHostInfo(vpnAddr netip.Addr) *HostInfo
  118. GetCertState() *CertState
  119. }
  120. type sendRecvErrorConfig uint8
  121. const (
  122. sendRecvErrorAlways sendRecvErrorConfig = iota
  123. sendRecvErrorNever
  124. sendRecvErrorPrivate
  125. )
  126. func (s sendRecvErrorConfig) ShouldSendRecvError(endpoint netip.AddrPort) bool {
  127. switch s {
  128. case sendRecvErrorPrivate:
  129. return endpoint.Addr().IsPrivate()
  130. case sendRecvErrorAlways:
  131. return true
  132. case sendRecvErrorNever:
  133. return false
  134. default:
  135. panic(fmt.Errorf("invalid sendRecvErrorConfig value: %d", s))
  136. }
  137. }
  138. func (s sendRecvErrorConfig) String() string {
  139. switch s {
  140. case sendRecvErrorAlways:
  141. return "always"
  142. case sendRecvErrorNever:
  143. return "never"
  144. case sendRecvErrorPrivate:
  145. return "private"
  146. default:
  147. return fmt.Sprintf("invalid(%d)", s)
  148. }
  149. }
  150. func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
  151. if c.Outside == nil {
  152. return nil, errors.New("no outside connection")
  153. }
  154. if c.Inside == nil {
  155. return nil, errors.New("no inside interface (tun)")
  156. }
  157. if c.pki == nil {
  158. return nil, errors.New("no certificate state")
  159. }
  160. if c.Firewall == nil {
  161. return nil, errors.New("no firewall rules")
  162. }
  163. if c.connectionManager == nil {
  164. return nil, errors.New("no connection manager")
  165. }
  166. if c.GSOMaxSegments <= 0 {
  167. c.GSOMaxSegments = defaultGSOMaxSegments
  168. }
  169. if c.GSOMaxSegments > maxKernelGSOSegments {
  170. c.GSOMaxSegments = maxKernelGSOSegments
  171. }
  172. if c.BatchQueueDepth <= 0 {
  173. c.BatchQueueDepth = c.routines * defaultBatchQueueDepthFactor
  174. }
  175. if c.BatchFlushInterval < 0 {
  176. c.BatchFlushInterval = 0
  177. }
  178. if c.BatchFlushInterval == 0 && c.EnableGSO {
  179. c.BatchFlushInterval = defaultGSOFlushInterval
  180. }
  181. cs := c.pki.getCertState()
  182. ifce := &Interface{
  183. pki: c.pki,
  184. hostMap: c.HostMap,
  185. outside: c.Outside,
  186. inside: c.Inside,
  187. firewall: c.Firewall,
  188. serveDns: c.ServeDns,
  189. handshakeManager: c.HandshakeManager,
  190. createTime: time.Now(),
  191. lightHouse: c.lightHouse,
  192. dropLocalBroadcast: c.DropLocalBroadcast,
  193. dropMulticast: c.DropMulticast,
  194. routines: c.routines,
  195. version: c.version,
  196. writers: make([]udp.Conn, c.routines),
  197. readers: make([]io.ReadWriteCloser, c.routines),
  198. myVpnNetworks: cs.myVpnNetworks,
  199. myVpnNetworksTable: cs.myVpnNetworksTable,
  200. myVpnAddrs: cs.myVpnAddrs,
  201. myVpnAddrsTable: cs.myVpnAddrsTable,
  202. myBroadcastAddrsTable: cs.myVpnBroadcastAddrsTable,
  203. relayManager: c.relayManager,
  204. connectionManager: c.connectionManager,
  205. conntrackCacheTimeout: c.ConntrackCacheTimeout,
  206. batchQueueDepth: c.BatchQueueDepth,
  207. enableGSO: c.EnableGSO,
  208. enableGRO: c.EnableGRO,
  209. gsoMaxSegments: c.GSOMaxSegments,
  210. metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
  211. messageMetrics: c.MessageMetrics,
  212. cachedPacketMetrics: &cachedPacketMetrics{
  213. sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
  214. dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
  215. },
  216. l: c.l,
  217. }
  218. ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
  219. ifce.batchUDPQueueGauge = metrics.GetOrRegisterGauge("batch.udp.queue_depth", nil)
  220. ifce.batchUDPFlushCounter = metrics.GetOrRegisterCounter("batch.udp.flushes", nil)
  221. ifce.batchTunQueueGauge = metrics.GetOrRegisterGauge("batch.tun.queue_depth", nil)
  222. ifce.batchTunFlushCounter = metrics.GetOrRegisterCounter("batch.tun.flushes", nil)
  223. ifce.batchFlushInterval.Store(int64(c.BatchFlushInterval))
  224. ifce.sendSem = make(chan struct{}, c.routines)
  225. ifce.batches.init(c.Inside, c.routines, c.BatchQueueDepth, c.GSOMaxSegments)
  226. ifce.reQueryEvery.Store(c.reQueryEvery)
  227. ifce.reQueryWait.Store(int64(c.reQueryWait))
  228. if c.l.Level >= logrus.DebugLevel {
  229. c.l.WithFields(logrus.Fields{
  230. "enableGSO": c.EnableGSO,
  231. "enableGRO": c.EnableGRO,
  232. "gsoMaxSegments": c.GSOMaxSegments,
  233. "batchQueueDepth": c.BatchQueueDepth,
  234. "batchFlush": c.BatchFlushInterval,
  235. "batching": ifce.batches.Enabled(),
  236. }).Debug("initialized batch pipelines")
  237. }
  238. ifce.connectionManager.intf = ifce
  239. return ifce, nil
  240. }
  241. // activate creates the interface on the host. After the interface is created, any
  242. // other services that want to bind listeners to its IP may do so successfully. However,
  243. // the interface isn't going to process anything until run() is called.
  244. func (f *Interface) activate() {
  245. // actually turn on tun dev
  246. addr, err := f.outside.LocalAddr()
  247. if err != nil {
  248. f.l.WithError(err).Error("Failed to get udp listen address")
  249. }
  250. f.l.WithField("interface", f.inside.Name()).WithField("networks", f.myVpnNetworks).
  251. WithField("build", f.version).WithField("udpAddr", addr).
  252. WithField("boringcrypto", boringEnabled()).
  253. Info("Nebula interface is active")
  254. metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
  255. // Prepare n tun queues
  256. var reader io.ReadWriteCloser = f.inside
  257. for i := 0; i < f.routines; i++ {
  258. if i > 0 {
  259. reader, err = f.inside.NewMultiQueueReader()
  260. if err != nil {
  261. f.l.Fatal(err)
  262. }
  263. }
  264. f.readers[i] = reader
  265. }
  266. if err := f.inside.Activate(); err != nil {
  267. f.inside.Close()
  268. f.l.Fatal(err)
  269. }
  270. }
  271. func (f *Interface) run() {
  272. // Launch n queues to read packets from udp
  273. for i := 0; i < f.routines; i++ {
  274. go f.listenOut(i)
  275. }
  276. if f.l.Level >= logrus.DebugLevel {
  277. f.l.WithField("batching", f.batches.Enabled()).Debug("starting interface run loops")
  278. }
  279. if f.batches.Enabled() {
  280. for i := 0; i < f.routines; i++ {
  281. go f.runInsideBatchWorker(i)
  282. go f.runTunWriteQueue(i)
  283. go f.runSendQueue(i)
  284. }
  285. }
  286. // Launch n queues to read packets from tun dev
  287. for i := 0; i < f.routines; i++ {
  288. go f.listenIn(f.readers[i], i)
  289. }
  290. }
  291. func (f *Interface) listenOut(i int) {
  292. runtime.LockOSThread()
  293. var li udp.Conn
  294. if i > 0 {
  295. li = f.writers[i]
  296. } else {
  297. li = f.outside
  298. }
  299. ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  300. lhh := f.lightHouse.NewRequestHandler()
  301. plaintext := make([]byte, udp.MTU)
  302. h := &header.H{}
  303. fwPacket := &firewall.Packet{}
  304. nb := make([]byte, 12, 12)
  305. li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
  306. f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
  307. })
  308. }
  309. func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
  310. runtime.LockOSThread()
  311. if f.batches.Enabled() {
  312. if br, ok := reader.(overlay.BatchReader); ok {
  313. f.listenInBatchLocked(reader, br, i)
  314. return
  315. }
  316. }
  317. f.listenInLegacyLocked(reader, i)
  318. }
  319. func (f *Interface) listenInLegacyLocked(reader io.ReadWriteCloser, i int) {
  320. packet := make([]byte, mtu)
  321. out := make([]byte, mtu)
  322. fwPacket := &firewall.Packet{}
  323. nb := make([]byte, 12, 12)
  324. conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  325. for {
  326. n, err := reader.Read(packet)
  327. if err != nil {
  328. if errors.Is(err, os.ErrClosed) && f.closed.Load() {
  329. return
  330. }
  331. f.l.WithError(err).Error("Error while reading outbound packet")
  332. // This only seems to happen when something fatal happens to the fd, so exit.
  333. os.Exit(2)
  334. }
  335. f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
  336. }
  337. }
  338. func (f *Interface) listenInBatchLocked(raw io.ReadWriteCloser, reader overlay.BatchReader, i int) {
  339. pool := f.batches.Pool()
  340. if pool == nil {
  341. f.l.Warn("batch pipeline enabled without an allocated pool; falling back to single-packet reads")
  342. f.listenInLegacyLocked(raw, i)
  343. return
  344. }
  345. for {
  346. packets, err := reader.ReadIntoBatch(pool)
  347. if err != nil {
  348. if errors.Is(err, os.ErrClosed) && f.closed.Load() {
  349. return
  350. }
  351. if isVirtioHeadroomError(err) {
  352. f.l.WithError(err).Warn("Batch reader fell back due to tun headroom issue")
  353. f.listenInLegacyLocked(raw, i)
  354. return
  355. }
  356. f.l.WithError(err).Error("Error while reading outbound packet batch")
  357. os.Exit(2)
  358. }
  359. if len(packets) == 0 {
  360. continue
  361. }
  362. for _, pkt := range packets {
  363. if pkt == nil {
  364. continue
  365. }
  366. if !f.batches.enqueueRx(i, pkt) {
  367. pkt.Release()
  368. }
  369. }
  370. }
  371. }
  372. func (f *Interface) runInsideBatchWorker(i int) {
  373. queue := f.batches.rxQueue(i)
  374. if queue == nil {
  375. return
  376. }
  377. out := make([]byte, mtu)
  378. fwPacket := &firewall.Packet{}
  379. nb := make([]byte, 12, 12)
  380. conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  381. for pkt := range queue {
  382. if pkt == nil {
  383. continue
  384. }
  385. f.consumeInsidePacket(pkt.Payload(), fwPacket, nb, out, i, conntrackCache.Get(f.l))
  386. pkt.Release()
  387. }
  388. }
  389. func (f *Interface) runSendQueue(i int) {
  390. queue := f.batches.txQueue(i)
  391. if queue == nil {
  392. if f.l.Level >= logrus.DebugLevel {
  393. f.l.WithField("queue", i).Debug("tx queue not initialized; batching disabled for writer")
  394. }
  395. return
  396. }
  397. writer := f.writerForIndex(i)
  398. if writer == nil {
  399. if f.l.Level >= logrus.DebugLevel {
  400. f.l.WithField("queue", i).Debug("no UDP writer for batch queue")
  401. }
  402. return
  403. }
  404. if f.l.Level >= logrus.DebugLevel {
  405. f.l.WithField("queue", i).Debug("send queue worker started")
  406. }
  407. defer func() {
  408. if f.l.Level >= logrus.WarnLevel {
  409. f.l.WithField("queue", i).Warn("send queue worker exited")
  410. }
  411. }()
  412. batchCap := f.batches.batchSizeHint()
  413. if batchCap <= 0 {
  414. batchCap = 1
  415. }
  416. gsoLimit := f.effectiveGSOMaxSegments()
  417. if gsoLimit > batchCap {
  418. batchCap = gsoLimit
  419. }
  420. pending := make([]queuedDatagram, 0, batchCap)
  421. var (
  422. flushTimer *time.Timer
  423. flushC <-chan time.Time
  424. )
  425. dispatch := func(reason string, timerFired bool) {
  426. if len(pending) == 0 {
  427. return
  428. }
  429. batch := pending
  430. f.flushAndReleaseBatch(i, writer, batch, reason)
  431. for idx := range batch {
  432. batch[idx] = queuedDatagram{}
  433. }
  434. pending = pending[:0]
  435. if flushTimer != nil {
  436. if !timerFired {
  437. if !flushTimer.Stop() {
  438. select {
  439. case <-flushTimer.C:
  440. default:
  441. }
  442. }
  443. }
  444. flushTimer = nil
  445. flushC = nil
  446. }
  447. }
  448. armTimer := func() {
  449. delay := f.currentBatchFlushInterval()
  450. if delay <= 0 {
  451. dispatch("nogso", false)
  452. return
  453. }
  454. if flushTimer == nil {
  455. flushTimer = time.NewTimer(delay)
  456. flushC = flushTimer.C
  457. }
  458. }
  459. for {
  460. select {
  461. case d := <-queue:
  462. if d.packet == nil {
  463. continue
  464. }
  465. if f.l.Level >= logrus.DebugLevel {
  466. f.l.WithFields(logrus.Fields{
  467. "queue": i,
  468. "payload_len": d.packet.Len,
  469. "dest": d.addr,
  470. }).Debug("send queue received packet")
  471. }
  472. pending = append(pending, d)
  473. if gsoLimit > 0 && len(pending) >= gsoLimit {
  474. dispatch("gso", false)
  475. continue
  476. }
  477. if len(pending) >= cap(pending) {
  478. dispatch("cap", false)
  479. continue
  480. }
  481. armTimer()
  482. f.observeUDPQueueLen(i)
  483. case <-flushC:
  484. dispatch("timer", true)
  485. }
  486. }
  487. }
  488. func (f *Interface) runTunWriteQueue(i int) {
  489. queue := f.batches.tunQueue(i)
  490. if queue == nil {
  491. return
  492. }
  493. writer := f.batches.inside
  494. if writer == nil {
  495. return
  496. }
  497. requiredHeadroom := writer.BatchHeadroom()
  498. batchCap := f.batches.batchSizeHint()
  499. if batchCap <= 0 {
  500. batchCap = 1
  501. }
  502. pending := make([]*overlay.Packet, 0, batchCap)
  503. var (
  504. flushTimer *time.Timer
  505. flushC <-chan time.Time
  506. )
  507. flush := func(reason string, timerFired bool) {
  508. if len(pending) == 0 {
  509. return
  510. }
  511. valid := pending[:0]
  512. for idx := range pending {
  513. if !f.ensurePacketHeadroom(&pending[idx], requiredHeadroom, i, reason) {
  514. pending[idx] = nil
  515. continue
  516. }
  517. if pending[idx] != nil {
  518. valid = append(valid, pending[idx])
  519. }
  520. }
  521. if len(valid) > 0 {
  522. if _, err := writer.WriteBatch(valid); err != nil {
  523. f.l.WithError(err).
  524. WithField("queue", i).
  525. WithField("reason", reason).
  526. Warn("Failed to write tun batch")
  527. for _, pkt := range valid {
  528. if pkt != nil {
  529. f.writePacketToTun(i, pkt)
  530. }
  531. }
  532. }
  533. }
  534. pending = pending[:0]
  535. if flushTimer != nil {
  536. if !timerFired {
  537. if !flushTimer.Stop() {
  538. select {
  539. case <-flushTimer.C:
  540. default:
  541. }
  542. }
  543. }
  544. flushTimer = nil
  545. flushC = nil
  546. }
  547. }
  548. armTimer := func() {
  549. delay := f.currentBatchFlushInterval()
  550. if delay <= 0 {
  551. return
  552. }
  553. if flushTimer == nil {
  554. flushTimer = time.NewTimer(delay)
  555. flushC = flushTimer.C
  556. }
  557. }
  558. for {
  559. select {
  560. case pkt := <-queue:
  561. if pkt == nil {
  562. continue
  563. }
  564. if f.ensurePacketHeadroom(&pkt, requiredHeadroom, i, "queue") {
  565. pending = append(pending, pkt)
  566. }
  567. if len(pending) >= cap(pending) {
  568. flush("cap", false)
  569. continue
  570. }
  571. armTimer()
  572. f.observeTunQueueLen(i)
  573. case <-flushC:
  574. flush("timer", true)
  575. }
  576. }
  577. }
  578. func (f *Interface) flushAndReleaseBatch(index int, writer udp.Conn, batch []queuedDatagram, reason string) {
  579. if len(batch) == 0 {
  580. return
  581. }
  582. f.flushDatagrams(index, writer, batch, reason)
  583. for idx := range batch {
  584. if batch[idx].packet != nil {
  585. batch[idx].packet.Release()
  586. batch[idx].packet = nil
  587. }
  588. }
  589. if f.batchUDPFlushCounter != nil {
  590. f.batchUDPFlushCounter.Inc(int64(len(batch)))
  591. }
  592. }
  593. func (f *Interface) flushDatagrams(index int, writer udp.Conn, batch []queuedDatagram, reason string) {
  594. if len(batch) == 0 {
  595. return
  596. }
  597. if f.l.Level >= logrus.DebugLevel {
  598. f.l.WithFields(logrus.Fields{
  599. "writer": index,
  600. "reason": reason,
  601. "pending": len(batch),
  602. }).Debug("udp batch flush summary")
  603. }
  604. maxSeg := f.effectiveGSOMaxSegments()
  605. if bw, ok := writer.(udp.BatchConn); ok {
  606. chunkCap := maxSeg
  607. if chunkCap <= 0 {
  608. chunkCap = len(batch)
  609. }
  610. chunk := make([]udp.Datagram, 0, chunkCap)
  611. var (
  612. currentAddr netip.AddrPort
  613. segments int
  614. )
  615. flushChunk := func() {
  616. if len(chunk) == 0 {
  617. return
  618. }
  619. if f.l.Level >= logrus.DebugLevel {
  620. f.l.WithFields(logrus.Fields{
  621. "writer": index,
  622. "segments": len(chunk),
  623. "dest": chunk[0].Addr,
  624. "reason": reason,
  625. "pending_total": len(batch),
  626. }).Debug("flushing UDP batch")
  627. }
  628. if err := bw.WriteBatch(chunk); err != nil {
  629. f.l.WithError(err).
  630. WithField("writer", index).
  631. WithField("reason", reason).
  632. Warn("Failed to write UDP batch")
  633. }
  634. chunk = chunk[:0]
  635. segments = 0
  636. }
  637. for _, item := range batch {
  638. if item.packet == nil || !item.addr.IsValid() {
  639. continue
  640. }
  641. payload := item.packet.Payload()[:item.packet.Len]
  642. if segments == 0 {
  643. currentAddr = item.addr
  644. }
  645. if item.addr != currentAddr || (maxSeg > 0 && segments >= maxSeg) {
  646. flushChunk()
  647. currentAddr = item.addr
  648. }
  649. chunk = append(chunk, udp.Datagram{Payload: payload, Addr: item.addr})
  650. segments++
  651. }
  652. flushChunk()
  653. return
  654. }
  655. for _, item := range batch {
  656. if item.packet == nil || !item.addr.IsValid() {
  657. continue
  658. }
  659. if f.l.Level >= logrus.DebugLevel {
  660. f.l.WithFields(logrus.Fields{
  661. "writer": index,
  662. "reason": reason,
  663. "dest": item.addr,
  664. "segments": 1,
  665. }).Debug("flushing UDP batch")
  666. }
  667. if err := writer.WriteTo(item.packet.Payload()[:item.packet.Len], item.addr); err != nil {
  668. f.l.WithError(err).
  669. WithField("writer", index).
  670. WithField("udpAddr", item.addr).
  671. WithField("reason", reason).
  672. Warn("Failed to write UDP packet")
  673. }
  674. }
  675. }
  676. func (f *Interface) tryQueueDatagram(q int, buf []byte, addr netip.AddrPort) bool {
  677. if !addr.IsValid() || !f.batches.Enabled() {
  678. return false
  679. }
  680. pkt := f.batches.newPacket()
  681. if pkt == nil {
  682. return false
  683. }
  684. payload := pkt.Payload()
  685. if len(payload) < len(buf) {
  686. pkt.Release()
  687. return false
  688. }
  689. copy(payload, buf)
  690. pkt.Len = len(buf)
  691. if f.batches.enqueueTx(q, pkt, addr) {
  692. f.observeUDPQueueLen(q)
  693. return true
  694. }
  695. pkt.Release()
  696. return false
  697. }
  698. func (f *Interface) writerForIndex(i int) udp.Conn {
  699. if i < 0 || i >= len(f.writers) {
  700. return nil
  701. }
  702. return f.writers[i]
  703. }
  704. func (f *Interface) writeImmediate(q int, buf []byte, addr netip.AddrPort, hostinfo *HostInfo) {
  705. writer := f.writerForIndex(q)
  706. if writer == nil {
  707. f.l.WithField("udpAddr", addr).
  708. WithField("writer", q).
  709. Error("Failed to write outgoing packet: no writer available")
  710. return
  711. }
  712. if err := writer.WriteTo(buf, addr); err != nil {
  713. hostinfo.logger(f.l).
  714. WithError(err).
  715. WithField("udpAddr", addr).
  716. Error("Failed to write outgoing packet")
  717. }
  718. }
  719. func (f *Interface) tryQueuePacket(q int, pkt *overlay.Packet, addr netip.AddrPort) bool {
  720. if pkt == nil || !addr.IsValid() || !f.batches.Enabled() {
  721. return false
  722. }
  723. if f.batches.enqueueTx(q, pkt, addr) {
  724. f.observeUDPQueueLen(q)
  725. return true
  726. }
  727. return false
  728. }
  729. func (f *Interface) writeImmediatePacket(q int, pkt *overlay.Packet, addr netip.AddrPort, hostinfo *HostInfo) {
  730. if pkt == nil {
  731. return
  732. }
  733. writer := f.writerForIndex(q)
  734. if writer == nil {
  735. f.l.WithField("udpAddr", addr).
  736. WithField("writer", q).
  737. Error("Failed to write outgoing packet: no writer available")
  738. pkt.Release()
  739. return
  740. }
  741. if err := writer.WriteTo(pkt.Payload()[:pkt.Len], addr); err != nil {
  742. hostinfo.logger(f.l).
  743. WithError(err).
  744. WithField("udpAddr", addr).
  745. Error("Failed to write outgoing packet")
  746. }
  747. pkt.Release()
  748. }
  749. func (f *Interface) writePacketToTun(q int, pkt *overlay.Packet) {
  750. if pkt == nil {
  751. return
  752. }
  753. writer := f.readers[q]
  754. if writer == nil {
  755. pkt.Release()
  756. return
  757. }
  758. if bw, ok := writer.(interface {
  759. WriteBatch([]*overlay.Packet) (int, error)
  760. }); ok {
  761. if _, err := bw.WriteBatch([]*overlay.Packet{pkt}); err != nil {
  762. f.l.WithError(err).WithField("queue", q).Warn("Failed to write tun packet via batch writer")
  763. pkt.Release()
  764. }
  765. return
  766. }
  767. if _, err := writer.Write(pkt.Payload()[:pkt.Len]); err != nil {
  768. f.l.WithError(err).Error("Failed to write to tun")
  769. }
  770. pkt.Release()
  771. }
  772. func (f *Interface) clonePacketWithHeadroom(pkt *overlay.Packet, required int) *overlay.Packet {
  773. if pkt == nil {
  774. return nil
  775. }
  776. payload := pkt.Payload()[:pkt.Len]
  777. if len(payload) == 0 && required <= 0 {
  778. return pkt
  779. }
  780. pool := f.batches.Pool()
  781. if pool != nil {
  782. if clone := pool.Get(); clone != nil {
  783. if len(clone.Payload()) >= len(payload) {
  784. clone.Len = copy(clone.Payload(), payload)
  785. pkt.Release()
  786. return clone
  787. }
  788. clone.Release()
  789. }
  790. }
  791. if required < 0 {
  792. required = 0
  793. }
  794. buf := make([]byte, required+len(payload))
  795. n := copy(buf[required:], payload)
  796. pkt.Release()
  797. return &overlay.Packet{
  798. Buf: buf,
  799. Offset: required,
  800. Len: n,
  801. }
  802. }
  803. func (f *Interface) observeUDPQueueLen(i int) {
  804. if f.batchUDPQueueGauge == nil {
  805. return
  806. }
  807. f.batchUDPQueueGauge.Update(int64(f.batches.txQueueLen(i)))
  808. }
  809. func (f *Interface) observeTunQueueLen(i int) {
  810. if f.batchTunQueueGauge == nil {
  811. return
  812. }
  813. f.batchTunQueueGauge.Update(int64(f.batches.tunQueueLen(i)))
  814. }
  815. func (f *Interface) currentBatchFlushInterval() time.Duration {
  816. if v := f.batchFlushInterval.Load(); v > 0 {
  817. return time.Duration(v)
  818. }
  819. return 0
  820. }
  821. func (f *Interface) ensurePacketHeadroom(pkt **overlay.Packet, required int, queue int, reason string) bool {
  822. p := *pkt
  823. if p == nil {
  824. return false
  825. }
  826. if required <= 0 || p.Offset >= required {
  827. return true
  828. }
  829. clone := f.clonePacketWithHeadroom(p, required)
  830. if clone == nil {
  831. f.l.WithFields(logrus.Fields{
  832. "queue": queue,
  833. "reason": reason,
  834. }).Warn("dropping packet lacking tun headroom")
  835. return false
  836. }
  837. *pkt = clone
  838. return true
  839. }
  840. func isVirtioHeadroomError(err error) bool {
  841. if err == nil {
  842. return false
  843. }
  844. msg := err.Error()
  845. return strings.Contains(msg, "headroom") || strings.Contains(msg, "virtio")
  846. }
  847. func (f *Interface) effectiveGSOMaxSegments() int {
  848. max := f.gsoMaxSegments
  849. if max <= 0 {
  850. max = defaultGSOMaxSegments
  851. }
  852. if max > maxKernelGSOSegments {
  853. max = maxKernelGSOSegments
  854. }
  855. if !f.enableGSO {
  856. return 1
  857. }
  858. return max
  859. }
  860. type udpOffloadConfigurator interface {
  861. ConfigureOffload(enableGSO, enableGRO bool, maxSegments int)
  862. }
  863. func (f *Interface) applyOffloadConfig(enableGSO, enableGRO bool, maxSegments int) {
  864. if maxSegments <= 0 {
  865. maxSegments = defaultGSOMaxSegments
  866. }
  867. if maxSegments > maxKernelGSOSegments {
  868. maxSegments = maxKernelGSOSegments
  869. }
  870. f.enableGSO = enableGSO
  871. f.enableGRO = enableGRO
  872. f.gsoMaxSegments = maxSegments
  873. for _, writer := range f.writers {
  874. if cfg, ok := writer.(udpOffloadConfigurator); ok {
  875. cfg.ConfigureOffload(enableGSO, enableGRO, maxSegments)
  876. }
  877. }
  878. }
  879. func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
  880. c.RegisterReloadCallback(f.reloadFirewall)
  881. c.RegisterReloadCallback(f.reloadSendRecvError)
  882. c.RegisterReloadCallback(f.reloadDisconnectInvalid)
  883. c.RegisterReloadCallback(f.reloadMisc)
  884. for _, udpConn := range f.writers {
  885. c.RegisterReloadCallback(udpConn.ReloadConfig)
  886. }
  887. }
  888. func (f *Interface) reloadDisconnectInvalid(c *config.C) {
  889. initial := c.InitialLoad()
  890. if initial || c.HasChanged("pki.disconnect_invalid") {
  891. f.disconnectInvalid.Store(c.GetBool("pki.disconnect_invalid", true))
  892. if !initial {
  893. f.l.Infof("pki.disconnect_invalid changed to %v", f.disconnectInvalid.Load())
  894. }
  895. }
  896. }
  897. func (f *Interface) reloadFirewall(c *config.C) {
  898. //TODO: need to trigger/detect if the certificate changed too
  899. if c.HasChanged("firewall") == false {
  900. f.l.Debug("No firewall config change detected")
  901. return
  902. }
  903. fw, err := NewFirewallFromConfig(f.l, f.pki.getCertState(), c)
  904. if err != nil {
  905. f.l.WithError(err).Error("Error while creating firewall during reload")
  906. return
  907. }
  908. oldFw := f.firewall
  909. conntrack := oldFw.Conntrack
  910. conntrack.Lock()
  911. defer conntrack.Unlock()
  912. fw.rulesVersion = oldFw.rulesVersion + 1
  913. // If rulesVersion is back to zero, we have wrapped all the way around. Be
  914. // safe and just reset conntrack in this case.
  915. if fw.rulesVersion == 0 {
  916. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  917. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  918. WithField("rulesVersion", fw.rulesVersion).
  919. Warn("firewall rulesVersion has overflowed, resetting conntrack")
  920. } else {
  921. fw.Conntrack = conntrack
  922. }
  923. f.firewall = fw
  924. oldFw.Destroy()
  925. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  926. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  927. WithField("rulesVersion", fw.rulesVersion).
  928. Info("New firewall has been installed")
  929. }
  930. func (f *Interface) reloadSendRecvError(c *config.C) {
  931. if c.InitialLoad() || c.HasChanged("listen.send_recv_error") {
  932. stringValue := c.GetString("listen.send_recv_error", "always")
  933. switch stringValue {
  934. case "always":
  935. f.sendRecvErrorConfig = sendRecvErrorAlways
  936. case "never":
  937. f.sendRecvErrorConfig = sendRecvErrorNever
  938. case "private":
  939. f.sendRecvErrorConfig = sendRecvErrorPrivate
  940. default:
  941. if c.GetBool("listen.send_recv_error", true) {
  942. f.sendRecvErrorConfig = sendRecvErrorAlways
  943. } else {
  944. f.sendRecvErrorConfig = sendRecvErrorNever
  945. }
  946. }
  947. f.l.WithField("sendRecvError", f.sendRecvErrorConfig.String()).
  948. Info("Loaded send_recv_error config")
  949. }
  950. }
  951. func (f *Interface) reloadMisc(c *config.C) {
  952. if c.HasChanged("counters.try_promote") {
  953. n := c.GetUint32("counters.try_promote", defaultPromoteEvery)
  954. f.tryPromoteEvery.Store(n)
  955. f.l.Info("counters.try_promote has changed")
  956. }
  957. if c.HasChanged("counters.requery_every_packets") {
  958. n := c.GetUint32("counters.requery_every_packets", defaultReQueryEvery)
  959. f.reQueryEvery.Store(n)
  960. f.l.Info("counters.requery_every_packets has changed")
  961. }
  962. if c.HasChanged("timers.requery_wait_duration") {
  963. n := c.GetDuration("timers.requery_wait_duration", defaultReQueryWait)
  964. f.reQueryWait.Store(int64(n))
  965. f.l.Info("timers.requery_wait_duration has changed")
  966. }
  967. if c.HasChanged("listen.gso_flush_timeout") {
  968. d := c.GetDuration("listen.gso_flush_timeout", defaultGSOFlushInterval)
  969. if d < 0 {
  970. d = 0
  971. }
  972. f.batchFlushInterval.Store(int64(d))
  973. f.l.WithField("duration", d).Info("listen.gso_flush_timeout has changed")
  974. } else if c.HasChanged("batch.flush_interval") {
  975. d := c.GetDuration("batch.flush_interval", defaultGSOFlushInterval)
  976. if d < 0 {
  977. d = 0
  978. }
  979. f.batchFlushInterval.Store(int64(d))
  980. f.l.WithField("duration", d).Warn("batch.flush_interval is deprecated; use listen.gso_flush_timeout")
  981. }
  982. if c.HasChanged("batch.queue_depth") {
  983. n := c.GetInt("batch.queue_depth", f.batchQueueDepth)
  984. if n != f.batchQueueDepth {
  985. f.batchQueueDepth = n
  986. f.l.Warn("batch.queue_depth changes require a restart to take effect")
  987. }
  988. }
  989. if c.HasChanged("listen.enable_gso") || c.HasChanged("listen.enable_gro") || c.HasChanged("listen.gso_max_segments") {
  990. enableGSO := c.GetBool("listen.enable_gso", f.enableGSO)
  991. enableGRO := c.GetBool("listen.enable_gro", f.enableGRO)
  992. maxSeg := c.GetInt("listen.gso_max_segments", f.gsoMaxSegments)
  993. f.applyOffloadConfig(enableGSO, enableGRO, maxSeg)
  994. f.l.WithFields(logrus.Fields{
  995. "enableGSO": enableGSO,
  996. "enableGRO": enableGRO,
  997. "gsoMaxSegments": maxSeg,
  998. }).Info("listen GSO/GRO configuration updated")
  999. }
  1000. }
  1001. func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
  1002. ticker := time.NewTicker(i)
  1003. defer ticker.Stop()
  1004. udpStats := udp.NewUDPStatsEmitter(f.writers)
  1005. certExpirationGauge := metrics.GetOrRegisterGauge("certificate.ttl_seconds", nil)
  1006. certInitiatingVersion := metrics.GetOrRegisterGauge("certificate.initiating_version", nil)
  1007. certMaxVersion := metrics.GetOrRegisterGauge("certificate.max_version", nil)
  1008. for {
  1009. select {
  1010. case <-ctx.Done():
  1011. return
  1012. case <-ticker.C:
  1013. f.firewall.EmitStats()
  1014. f.handshakeManager.EmitStats()
  1015. udpStats()
  1016. certState := f.pki.getCertState()
  1017. defaultCrt := certState.GetDefaultCertificate()
  1018. certExpirationGauge.Update(int64(defaultCrt.NotAfter().Sub(time.Now()) / time.Second))
  1019. certInitiatingVersion.Update(int64(defaultCrt.Version()))
  1020. // Report the max certificate version we are capable of using
  1021. if certState.v2Cert != nil {
  1022. certMaxVersion.Update(int64(certState.v2Cert.Version()))
  1023. } else {
  1024. certMaxVersion.Update(int64(certState.v1Cert.Version()))
  1025. }
  1026. }
  1027. }
  1028. }
  1029. func (f *Interface) GetHostInfo(vpnIp netip.Addr) *HostInfo {
  1030. return f.hostMap.QueryVpnAddr(vpnIp)
  1031. }
  1032. func (f *Interface) GetCertState() *CertState {
  1033. return f.pki.getCertState()
  1034. }
  1035. func (f *Interface) Close() error {
  1036. f.closed.Store(true)
  1037. for _, u := range f.writers {
  1038. err := u.Close()
  1039. if err != nil {
  1040. f.l.WithError(err).Error("Error while closing udp socket")
  1041. }
  1042. }
  1043. // Release the tun device
  1044. return f.inside.Close()
  1045. }