interface.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math/bits"
  8. "net/netip"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/gaissmai/bart"
  15. "github.com/rcrowley/go-metrics"
  16. "github.com/sirupsen/logrus"
  17. "github.com/slackhq/nebula/config"
  18. "github.com/slackhq/nebula/firewall"
  19. "github.com/slackhq/nebula/header"
  20. "github.com/slackhq/nebula/overlay"
  21. "github.com/slackhq/nebula/udp"
  22. )
  23. const (
  24. mtu = 9001
  25. tunReadBufferSize = mtu * 8
  26. defaultDecryptWorkerFactor = 2
  27. defaultInboundQueueDepth = 1024
  28. )
  29. type InterfaceConfig struct {
  30. HostMap *HostMap
  31. Outside udp.Conn
  32. Inside overlay.Device
  33. pki *PKI
  34. Cipher string
  35. Firewall *Firewall
  36. ServeDns bool
  37. HandshakeManager *HandshakeManager
  38. lightHouse *LightHouse
  39. connectionManager *connectionManager
  40. DropLocalBroadcast bool
  41. DropMulticast bool
  42. routines int
  43. MessageMetrics *MessageMetrics
  44. version string
  45. relayManager *relayManager
  46. punchy *Punchy
  47. tryPromoteEvery uint32
  48. reQueryEvery uint32
  49. reQueryWait time.Duration
  50. ConntrackCacheTimeout time.Duration
  51. l *logrus.Logger
  52. DecryptWorkers int
  53. DecryptQueueDepth int
  54. }
  55. type Interface struct {
  56. hostMap *HostMap
  57. outside udp.Conn
  58. inside overlay.Device
  59. pki *PKI
  60. firewall *Firewall
  61. connectionManager *connectionManager
  62. handshakeManager *HandshakeManager
  63. serveDns bool
  64. createTime time.Time
  65. lightHouse *LightHouse
  66. myBroadcastAddrsTable *bart.Lite
  67. myVpnAddrs []netip.Addr // A list of addresses assigned to us via our certificate
  68. myVpnAddrsTable *bart.Lite
  69. myVpnNetworks []netip.Prefix // A list of networks assigned to us via our certificate
  70. myVpnNetworksTable *bart.Lite
  71. dropLocalBroadcast bool
  72. dropMulticast bool
  73. routines int
  74. disconnectInvalid atomic.Bool
  75. closed atomic.Bool
  76. relayManager *relayManager
  77. tryPromoteEvery atomic.Uint32
  78. reQueryEvery atomic.Uint32
  79. reQueryWait atomic.Int64
  80. sendRecvErrorConfig sendRecvErrorConfig
  81. // rebindCount is used to decide if an active tunnel should trigger a punch notification through a lighthouse
  82. rebindCount int8
  83. version string
  84. conntrackCacheTimeout time.Duration
  85. writers []udp.Conn
  86. readers []io.ReadWriteCloser
  87. metricHandshakes metrics.Histogram
  88. messageMetrics *MessageMetrics
  89. cachedPacketMetrics *cachedPacketMetrics
  90. l *logrus.Logger
  91. ctx context.Context
  92. udpListenWG sync.WaitGroup
  93. inboundPool sync.Pool
  94. decryptWG sync.WaitGroup
  95. decryptQueues []*inboundRing
  96. decryptWorkers int
  97. decryptStates []decryptWorkerState
  98. decryptCounter atomic.Uint32
  99. }
  100. type inboundPacket struct {
  101. addr netip.AddrPort
  102. payload []byte
  103. release func()
  104. queue int
  105. }
  106. type decryptWorkerState struct {
  107. queue *inboundRing
  108. notify chan struct{}
  109. }
  110. type decryptContext struct {
  111. ctTicker *firewall.ConntrackCacheTicker
  112. plain []byte
  113. head header.H
  114. fwPacket firewall.Packet
  115. light *LightHouseHandler
  116. nebula []byte
  117. }
  118. type inboundCell struct {
  119. seq atomic.Uint64
  120. pkt *inboundPacket
  121. }
  122. type inboundRing struct {
  123. mask uint64
  124. cells []inboundCell
  125. enqueuePos atomic.Uint64
  126. dequeuePos atomic.Uint64
  127. }
  128. func newInboundRing(capacity int) *inboundRing {
  129. if capacity < 2 {
  130. capacity = 2
  131. }
  132. size := nextPowerOfTwo(uint32(capacity))
  133. if size < 2 {
  134. size = 2
  135. }
  136. ring := &inboundRing{
  137. mask: uint64(size - 1),
  138. cells: make([]inboundCell, size),
  139. }
  140. for i := range ring.cells {
  141. ring.cells[i].seq.Store(uint64(i))
  142. }
  143. return ring
  144. }
  145. func nextPowerOfTwo(v uint32) uint32 {
  146. if v == 0 {
  147. return 1
  148. }
  149. return 1 << (32 - bits.LeadingZeros32(v-1))
  150. }
  151. func (r *inboundRing) Enqueue(pkt *inboundPacket) bool {
  152. var cell *inboundCell
  153. pos := r.enqueuePos.Load()
  154. for {
  155. cell = &r.cells[pos&r.mask]
  156. seq := cell.seq.Load()
  157. diff := int64(seq) - int64(pos)
  158. if diff == 0 {
  159. if r.enqueuePos.CompareAndSwap(pos, pos+1) {
  160. break
  161. }
  162. } else if diff < 0 {
  163. return false
  164. } else {
  165. pos = r.enqueuePos.Load()
  166. }
  167. }
  168. cell.pkt = pkt
  169. cell.seq.Store(pos + 1)
  170. return true
  171. }
  172. func (r *inboundRing) Dequeue() (*inboundPacket, bool) {
  173. var cell *inboundCell
  174. pos := r.dequeuePos.Load()
  175. for {
  176. cell = &r.cells[pos&r.mask]
  177. seq := cell.seq.Load()
  178. diff := int64(seq) - int64(pos+1)
  179. if diff == 0 {
  180. if r.dequeuePos.CompareAndSwap(pos, pos+1) {
  181. break
  182. }
  183. } else if diff < 0 {
  184. return nil, false
  185. } else {
  186. pos = r.dequeuePos.Load()
  187. }
  188. }
  189. pkt := cell.pkt
  190. cell.pkt = nil
  191. cell.seq.Store(pos + r.mask + 1)
  192. return pkt, true
  193. }
  194. func (f *Interface) getInboundPacket() *inboundPacket {
  195. if pkt, ok := f.inboundPool.Get().(*inboundPacket); ok && pkt != nil {
  196. return pkt
  197. }
  198. return &inboundPacket{}
  199. }
  200. func (f *Interface) putInboundPacket(pkt *inboundPacket) {
  201. if pkt == nil {
  202. return
  203. }
  204. pkt.addr = netip.AddrPort{}
  205. pkt.payload = nil
  206. pkt.release = nil
  207. pkt.queue = 0
  208. f.inboundPool.Put(pkt)
  209. }
  210. func newDecryptContext(f *Interface) *decryptContext {
  211. return &decryptContext{
  212. ctTicker: firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout),
  213. plain: make([]byte, udp.MTU),
  214. head: header.H{},
  215. fwPacket: firewall.Packet{},
  216. light: f.lightHouse.NewRequestHandler(),
  217. nebula: make([]byte, 12, 12),
  218. }
  219. }
  220. func (f *Interface) processInboundPacket(pkt *inboundPacket, ctx *decryptContext) {
  221. if pkt == nil {
  222. return
  223. }
  224. defer func() {
  225. if pkt.release != nil {
  226. pkt.release()
  227. }
  228. f.putInboundPacket(pkt)
  229. }()
  230. ctx.head = header.H{}
  231. ctx.fwPacket = firewall.Packet{}
  232. var cache firewall.ConntrackCache
  233. if ctx.ctTicker != nil {
  234. cache = ctx.ctTicker.Get(f.l)
  235. }
  236. f.readOutsidePackets(pkt.addr, nil, ctx.plain[:0], pkt.payload, &ctx.head, &ctx.fwPacket, ctx.light, ctx.nebula, pkt.queue, cache)
  237. }
  238. type EncWriter interface {
  239. SendVia(via *HostInfo,
  240. relay *Relay,
  241. ad,
  242. nb,
  243. out []byte,
  244. nocopy bool,
  245. )
  246. SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte)
  247. SendMessageToHostInfo(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte)
  248. Handshake(vpnAddr netip.Addr)
  249. GetHostInfo(vpnAddr netip.Addr) *HostInfo
  250. GetCertState() *CertState
  251. }
  252. type sendRecvErrorConfig uint8
  253. const (
  254. sendRecvErrorAlways sendRecvErrorConfig = iota
  255. sendRecvErrorNever
  256. sendRecvErrorPrivate
  257. )
  258. func (s sendRecvErrorConfig) ShouldSendRecvError(endpoint netip.AddrPort) bool {
  259. switch s {
  260. case sendRecvErrorPrivate:
  261. return endpoint.Addr().IsPrivate()
  262. case sendRecvErrorAlways:
  263. return true
  264. case sendRecvErrorNever:
  265. return false
  266. default:
  267. panic(fmt.Errorf("invalid sendRecvErrorConfig value: %d", s))
  268. }
  269. }
  270. func (s sendRecvErrorConfig) String() string {
  271. switch s {
  272. case sendRecvErrorAlways:
  273. return "always"
  274. case sendRecvErrorNever:
  275. return "never"
  276. case sendRecvErrorPrivate:
  277. return "private"
  278. default:
  279. return fmt.Sprintf("invalid(%d)", s)
  280. }
  281. }
  282. func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
  283. if c.Outside == nil {
  284. return nil, errors.New("no outside connection")
  285. }
  286. if c.Inside == nil {
  287. return nil, errors.New("no inside interface (tun)")
  288. }
  289. if c.pki == nil {
  290. return nil, errors.New("no certificate state")
  291. }
  292. if c.Firewall == nil {
  293. return nil, errors.New("no firewall rules")
  294. }
  295. if c.connectionManager == nil {
  296. return nil, errors.New("no connection manager")
  297. }
  298. cs := c.pki.getCertState()
  299. decryptWorkers := c.DecryptWorkers
  300. if decryptWorkers < 0 {
  301. decryptWorkers = 0
  302. }
  303. if decryptWorkers == 0 {
  304. decryptWorkers = c.routines * defaultDecryptWorkerFactor
  305. if decryptWorkers < c.routines {
  306. decryptWorkers = c.routines
  307. }
  308. }
  309. if decryptWorkers < 0 {
  310. decryptWorkers = 0
  311. }
  312. if runtime.GOOS != "linux" {
  313. decryptWorkers = 0
  314. }
  315. queueDepth := c.DecryptQueueDepth
  316. if queueDepth <= 0 {
  317. queueDepth = defaultInboundQueueDepth
  318. }
  319. minDepth := c.routines * 64
  320. if minDepth <= 0 {
  321. minDepth = 64
  322. }
  323. if queueDepth < minDepth {
  324. queueDepth = minDepth
  325. }
  326. ifce := &Interface{
  327. pki: c.pki,
  328. hostMap: c.HostMap,
  329. outside: c.Outside,
  330. inside: c.Inside,
  331. firewall: c.Firewall,
  332. serveDns: c.ServeDns,
  333. handshakeManager: c.HandshakeManager,
  334. createTime: time.Now(),
  335. lightHouse: c.lightHouse,
  336. dropLocalBroadcast: c.DropLocalBroadcast,
  337. dropMulticast: c.DropMulticast,
  338. routines: c.routines,
  339. version: c.version,
  340. writers: make([]udp.Conn, c.routines),
  341. readers: make([]io.ReadWriteCloser, c.routines),
  342. myVpnNetworks: cs.myVpnNetworks,
  343. myVpnNetworksTable: cs.myVpnNetworksTable,
  344. myVpnAddrs: cs.myVpnAddrs,
  345. myVpnAddrsTable: cs.myVpnAddrsTable,
  346. myBroadcastAddrsTable: cs.myVpnBroadcastAddrsTable,
  347. relayManager: c.relayManager,
  348. connectionManager: c.connectionManager,
  349. conntrackCacheTimeout: c.ConntrackCacheTimeout,
  350. metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
  351. messageMetrics: c.MessageMetrics,
  352. cachedPacketMetrics: &cachedPacketMetrics{
  353. sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
  354. dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
  355. },
  356. l: c.l,
  357. ctx: ctx,
  358. inboundPool: sync.Pool{New: func() any { return &inboundPacket{} }},
  359. decryptWorkers: decryptWorkers,
  360. }
  361. ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
  362. ifce.reQueryEvery.Store(c.reQueryEvery)
  363. ifce.reQueryWait.Store(int64(c.reQueryWait))
  364. ifce.connectionManager.intf = ifce
  365. if decryptWorkers > 0 {
  366. ifce.decryptQueues = make([]*inboundRing, decryptWorkers)
  367. ifce.decryptStates = make([]decryptWorkerState, decryptWorkers)
  368. for i := 0; i < decryptWorkers; i++ {
  369. queue := newInboundRing(queueDepth)
  370. ifce.decryptQueues[i] = queue
  371. ifce.decryptStates[i] = decryptWorkerState{
  372. queue: queue,
  373. notify: make(chan struct{}, 1),
  374. }
  375. }
  376. }
  377. return ifce, nil
  378. }
  379. // activate creates the interface on the host. After the interface is created, any
  380. // other services that want to bind listeners to its IP may do so successfully. However,
  381. // the interface isn't going to process anything until run() is called.
  382. func (f *Interface) activate() {
  383. // actually turn on tun dev
  384. addr, err := f.outside.LocalAddr()
  385. if err != nil {
  386. f.l.WithError(err).Error("Failed to get udp listen address")
  387. }
  388. f.l.WithField("interface", f.inside.Name()).WithField("networks", f.myVpnNetworks).
  389. WithField("build", f.version).WithField("udpAddr", addr).
  390. WithField("boringcrypto", boringEnabled()).
  391. Info("Nebula interface is active")
  392. metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
  393. // Prepare n tun queues
  394. var reader io.ReadWriteCloser = f.inside
  395. for i := 0; i < f.routines; i++ {
  396. if i > 0 {
  397. reader, err = f.inside.NewMultiQueueReader()
  398. if err != nil {
  399. f.l.Fatal(err)
  400. }
  401. }
  402. f.readers[i] = reader
  403. }
  404. if err := f.inside.Activate(); err != nil {
  405. f.inside.Close()
  406. f.l.Fatal(err)
  407. }
  408. }
  409. func (f *Interface) startDecryptWorkers() {
  410. if f.decryptWorkers <= 0 || len(f.decryptQueues) == 0 {
  411. return
  412. }
  413. f.decryptWG.Add(f.decryptWorkers)
  414. for i := 0; i < f.decryptWorkers; i++ {
  415. go f.decryptWorker(i)
  416. }
  417. }
  418. func (f *Interface) decryptWorker(id int) {
  419. defer f.decryptWG.Done()
  420. if id < 0 || id >= len(f.decryptStates) {
  421. return
  422. }
  423. state := f.decryptStates[id]
  424. if state.queue == nil {
  425. return
  426. }
  427. ctx := newDecryptContext(f)
  428. for {
  429. for {
  430. pkt, ok := state.queue.Dequeue()
  431. if !ok {
  432. break
  433. }
  434. f.processInboundPacket(pkt, ctx)
  435. }
  436. if f.closed.Load() || f.ctx.Err() != nil {
  437. for {
  438. pkt, ok := state.queue.Dequeue()
  439. if !ok {
  440. return
  441. }
  442. f.processInboundPacket(pkt, ctx)
  443. }
  444. }
  445. select {
  446. case <-f.ctx.Done():
  447. case <-state.notify:
  448. }
  449. }
  450. }
  451. func (f *Interface) notifyDecryptWorker(idx int) {
  452. if idx < 0 || idx >= len(f.decryptStates) {
  453. return
  454. }
  455. state := f.decryptStates[idx]
  456. if state.notify == nil {
  457. return
  458. }
  459. select {
  460. case state.notify <- struct{}{}:
  461. default:
  462. }
  463. }
  464. func (f *Interface) run() {
  465. f.startDecryptWorkers()
  466. // Launch n queues to read packets from udp
  467. f.udpListenWG.Add(f.routines)
  468. for i := 0; i < f.routines; i++ {
  469. go f.listenOut(i)
  470. }
  471. // Launch n queues to read packets from tun dev
  472. for i := 0; i < f.routines; i++ {
  473. go f.listenIn(f.readers[i], i)
  474. }
  475. }
  476. func (f *Interface) listenOut(i int) {
  477. runtime.LockOSThread()
  478. defer f.udpListenWG.Done()
  479. var li udp.Conn
  480. if i > 0 {
  481. li = f.writers[i]
  482. } else {
  483. li = f.outside
  484. }
  485. useWorkers := f.decryptWorkers > 0 && len(f.decryptQueues) > 0
  486. var (
  487. inlineTicker *firewall.ConntrackCacheTicker
  488. inlineHandler *LightHouseHandler
  489. inlinePlain []byte
  490. inlineHeader header.H
  491. inlinePacket firewall.Packet
  492. inlineNB []byte
  493. inlineCtx *decryptContext
  494. )
  495. if useWorkers {
  496. inlineCtx = newDecryptContext(f)
  497. } else {
  498. inlineTicker = firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  499. inlineHandler = f.lightHouse.NewRequestHandler()
  500. inlinePlain = make([]byte, udp.MTU)
  501. inlineNB = make([]byte, 12, 12)
  502. }
  503. li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte, release func()) {
  504. if !useWorkers {
  505. if release != nil {
  506. defer release()
  507. }
  508. select {
  509. case <-f.ctx.Done():
  510. return
  511. default:
  512. }
  513. inlineHeader = header.H{}
  514. inlinePacket = firewall.Packet{}
  515. var cache firewall.ConntrackCache
  516. if inlineTicker != nil {
  517. cache = inlineTicker.Get(f.l)
  518. }
  519. f.readOutsidePackets(fromUdpAddr, nil, inlinePlain[:0], payload, &inlineHeader, &inlinePacket, inlineHandler, inlineNB, i, cache)
  520. return
  521. }
  522. if f.ctx.Err() != nil {
  523. if release != nil {
  524. release()
  525. }
  526. return
  527. }
  528. pkt := f.getInboundPacket()
  529. pkt.addr = fromUdpAddr
  530. pkt.payload = payload
  531. pkt.release = release
  532. pkt.queue = i
  533. queueCount := len(f.decryptQueues)
  534. if queueCount == 0 {
  535. f.processInboundPacket(pkt, inlineCtx)
  536. return
  537. }
  538. w := int(f.decryptCounter.Add(1)-1) % queueCount
  539. if w < 0 || w >= queueCount || !f.decryptQueues[w].Enqueue(pkt) {
  540. f.processInboundPacket(pkt, inlineCtx)
  541. return
  542. }
  543. f.notifyDecryptWorker(w)
  544. })
  545. }
  546. func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
  547. runtime.LockOSThread()
  548. packet := make([]byte, tunReadBufferSize)
  549. out := make([]byte, tunReadBufferSize)
  550. fwPacket := &firewall.Packet{}
  551. nb := make([]byte, 12, 12)
  552. conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  553. for {
  554. n, err := reader.Read(packet)
  555. if err != nil {
  556. if errors.Is(err, os.ErrClosed) && f.closed.Load() {
  557. return
  558. }
  559. f.l.WithError(err).Error("Error while reading outbound packet")
  560. // This only seems to happen when something fatal happens to the fd, so exit.
  561. os.Exit(2)
  562. }
  563. f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
  564. }
  565. }
  566. func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
  567. c.RegisterReloadCallback(f.reloadFirewall)
  568. c.RegisterReloadCallback(f.reloadSendRecvError)
  569. c.RegisterReloadCallback(f.reloadDisconnectInvalid)
  570. c.RegisterReloadCallback(f.reloadMisc)
  571. for _, udpConn := range f.writers {
  572. c.RegisterReloadCallback(udpConn.ReloadConfig)
  573. }
  574. }
  575. func (f *Interface) reloadDisconnectInvalid(c *config.C) {
  576. initial := c.InitialLoad()
  577. if initial || c.HasChanged("pki.disconnect_invalid") {
  578. f.disconnectInvalid.Store(c.GetBool("pki.disconnect_invalid", true))
  579. if !initial {
  580. f.l.Infof("pki.disconnect_invalid changed to %v", f.disconnectInvalid.Load())
  581. }
  582. }
  583. }
  584. func (f *Interface) reloadFirewall(c *config.C) {
  585. //TODO: need to trigger/detect if the certificate changed too
  586. if c.HasChanged("firewall") == false {
  587. f.l.Debug("No firewall config change detected")
  588. return
  589. }
  590. fw, err := NewFirewallFromConfig(f.l, f.pki.getCertState(), c)
  591. if err != nil {
  592. f.l.WithError(err).Error("Error while creating firewall during reload")
  593. return
  594. }
  595. oldFw := f.firewall
  596. conntrack := oldFw.Conntrack
  597. conntrack.Lock()
  598. defer conntrack.Unlock()
  599. fw.rulesVersion = oldFw.rulesVersion + 1
  600. // If rulesVersion is back to zero, we have wrapped all the way around. Be
  601. // safe and just reset conntrack in this case.
  602. if fw.rulesVersion == 0 {
  603. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  604. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  605. WithField("rulesVersion", fw.rulesVersion).
  606. Warn("firewall rulesVersion has overflowed, resetting conntrack")
  607. } else {
  608. fw.Conntrack = conntrack
  609. }
  610. f.firewall = fw
  611. oldFw.Destroy()
  612. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  613. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  614. WithField("rulesVersion", fw.rulesVersion).
  615. Info("New firewall has been installed")
  616. }
  617. func (f *Interface) reloadSendRecvError(c *config.C) {
  618. if c.InitialLoad() || c.HasChanged("listen.send_recv_error") {
  619. stringValue := c.GetString("listen.send_recv_error", "always")
  620. switch stringValue {
  621. case "always":
  622. f.sendRecvErrorConfig = sendRecvErrorAlways
  623. case "never":
  624. f.sendRecvErrorConfig = sendRecvErrorNever
  625. case "private":
  626. f.sendRecvErrorConfig = sendRecvErrorPrivate
  627. default:
  628. if c.GetBool("listen.send_recv_error", true) {
  629. f.sendRecvErrorConfig = sendRecvErrorAlways
  630. } else {
  631. f.sendRecvErrorConfig = sendRecvErrorNever
  632. }
  633. }
  634. f.l.WithField("sendRecvError", f.sendRecvErrorConfig.String()).
  635. Info("Loaded send_recv_error config")
  636. }
  637. }
  638. func (f *Interface) reloadMisc(c *config.C) {
  639. if c.HasChanged("counters.try_promote") {
  640. n := c.GetUint32("counters.try_promote", defaultPromoteEvery)
  641. f.tryPromoteEvery.Store(n)
  642. f.l.Info("counters.try_promote has changed")
  643. }
  644. if c.HasChanged("counters.requery_every_packets") {
  645. n := c.GetUint32("counters.requery_every_packets", defaultReQueryEvery)
  646. f.reQueryEvery.Store(n)
  647. f.l.Info("counters.requery_every_packets has changed")
  648. }
  649. if c.HasChanged("timers.requery_wait_duration") {
  650. n := c.GetDuration("timers.requery_wait_duration", defaultReQueryWait)
  651. f.reQueryWait.Store(int64(n))
  652. f.l.Info("timers.requery_wait_duration has changed")
  653. }
  654. }
  655. func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
  656. ticker := time.NewTicker(i)
  657. defer ticker.Stop()
  658. udpStats := udp.NewUDPStatsEmitter(f.writers)
  659. certExpirationGauge := metrics.GetOrRegisterGauge("certificate.ttl_seconds", nil)
  660. certInitiatingVersion := metrics.GetOrRegisterGauge("certificate.initiating_version", nil)
  661. certMaxVersion := metrics.GetOrRegisterGauge("certificate.max_version", nil)
  662. for {
  663. select {
  664. case <-ctx.Done():
  665. return
  666. case <-ticker.C:
  667. f.firewall.EmitStats()
  668. f.handshakeManager.EmitStats()
  669. udpStats()
  670. certState := f.pki.getCertState()
  671. defaultCrt := certState.GetDefaultCertificate()
  672. certExpirationGauge.Update(int64(defaultCrt.NotAfter().Sub(time.Now()) / time.Second))
  673. certInitiatingVersion.Update(int64(defaultCrt.Version()))
  674. // Report the max certificate version we are capable of using
  675. if certState.v2Cert != nil {
  676. certMaxVersion.Update(int64(certState.v2Cert.Version()))
  677. } else {
  678. certMaxVersion.Update(int64(certState.v1Cert.Version()))
  679. }
  680. }
  681. }
  682. }
  683. func (f *Interface) GetHostInfo(vpnIp netip.Addr) *HostInfo {
  684. return f.hostMap.QueryVpnAddr(vpnIp)
  685. }
  686. func (f *Interface) GetCertState() *CertState {
  687. return f.pki.getCertState()
  688. }
  689. func (f *Interface) Close() error {
  690. f.closed.Store(true)
  691. for _, u := range f.writers {
  692. err := u.Close()
  693. if err != nil {
  694. f.l.WithError(err).Error("Error while closing udp socket")
  695. }
  696. }
  697. f.udpListenWG.Wait()
  698. if f.decryptWorkers > 0 {
  699. for _, state := range f.decryptStates {
  700. if state.notify != nil {
  701. select {
  702. case state.notify <- struct{}{}:
  703. default:
  704. }
  705. }
  706. }
  707. f.decryptWG.Wait()
  708. }
  709. // Release the tun device
  710. return f.inside.Close()
  711. }