interface.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/netip"
  8. "runtime"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/gaissmai/bart"
  13. "github.com/rcrowley/go-metrics"
  14. "github.com/sirupsen/logrus"
  15. "github.com/slackhq/nebula/config"
  16. "github.com/slackhq/nebula/firewall"
  17. "github.com/slackhq/nebula/header"
  18. "github.com/slackhq/nebula/overlay"
  19. "github.com/slackhq/nebula/packet"
  20. "github.com/slackhq/nebula/udp"
  21. )
  22. const (
  23. mtu = 9001
  24. inboundBatchSizeDefault = 128
  25. outboundBatchSizeDefault = 64
  26. batchFlushIntervalDefault = 12 * time.Microsecond
  27. maxOutstandingBatchesDefault = 8
  28. sendBatchSizeDefault = 64
  29. maxPendingPacketsDefault = 32
  30. maxPendingBytesDefault = 64 * 1024
  31. maxSendBufPerRoutineDefault = 16
  32. )
  33. type InterfaceConfig struct {
  34. HostMap *HostMap
  35. Outside udp.Conn
  36. Inside overlay.Device
  37. pki *PKI
  38. Cipher string
  39. Firewall *Firewall
  40. ServeDns bool
  41. HandshakeManager *HandshakeManager
  42. lightHouse *LightHouse
  43. connectionManager *connectionManager
  44. DropLocalBroadcast bool
  45. DropMulticast bool
  46. routines int
  47. MessageMetrics *MessageMetrics
  48. version string
  49. relayManager *relayManager
  50. punchy *Punchy
  51. tryPromoteEvery uint32
  52. reQueryEvery uint32
  53. reQueryWait time.Duration
  54. ConntrackCacheTimeout time.Duration
  55. BatchConfig BatchConfig
  56. l *logrus.Logger
  57. }
  58. type BatchConfig struct {
  59. InboundBatchSize int
  60. OutboundBatchSize int
  61. FlushInterval time.Duration
  62. MaxOutstandingPerChan int
  63. MaxPendingPackets int
  64. MaxPendingBytes int
  65. MaxSendBuffersPerChan int
  66. }
  67. type Interface struct {
  68. hostMap *HostMap
  69. outside udp.Conn
  70. inside overlay.Device
  71. pki *PKI
  72. firewall *Firewall
  73. connectionManager *connectionManager
  74. handshakeManager *HandshakeManager
  75. serveDns bool
  76. createTime time.Time
  77. lightHouse *LightHouse
  78. myBroadcastAddrsTable *bart.Lite
  79. myVpnAddrs []netip.Addr // A list of addresses assigned to us via our certificate
  80. myVpnAddrsTable *bart.Lite
  81. myVpnNetworks []netip.Prefix // A list of networks assigned to us via our certificate
  82. myVpnNetworksTable *bart.Lite
  83. dropLocalBroadcast bool
  84. dropMulticast bool
  85. routines int
  86. disconnectInvalid atomic.Bool
  87. closed atomic.Bool
  88. relayManager *relayManager
  89. tryPromoteEvery atomic.Uint32
  90. reQueryEvery atomic.Uint32
  91. reQueryWait atomic.Int64
  92. sendRecvErrorConfig sendRecvErrorConfig
  93. // rebindCount is used to decide if an active tunnel should trigger a punch notification through a lighthouse
  94. rebindCount int8
  95. version string
  96. conntrackCacheTimeout time.Duration
  97. writers []udp.Conn
  98. readers []io.ReadWriteCloser
  99. wg sync.WaitGroup
  100. metricHandshakes metrics.Histogram
  101. messageMetrics *MessageMetrics
  102. cachedPacketMetrics *cachedPacketMetrics
  103. l *logrus.Logger
  104. inPool sync.Pool
  105. inbound []chan *packetBatch
  106. outPool sync.Pool
  107. outbound []chan *outboundBatch
  108. packetBatchPool sync.Pool
  109. outboundBatchPool sync.Pool
  110. sendPool sync.Pool
  111. sendBufCache [][]*[]byte
  112. sendBatchSize int
  113. inboundBatchSize int
  114. outboundBatchSize int
  115. batchFlushInterval time.Duration
  116. maxOutstandingPerChan int
  117. maxPendingPackets int
  118. maxPendingBytes int
  119. maxSendBufPerRoutine int
  120. }
  121. type outboundSend struct {
  122. buf *[]byte
  123. length int
  124. addr netip.AddrPort
  125. }
  126. type packetBatch struct {
  127. packets []*packet.Packet
  128. }
  129. func newPacketBatch(capacity int) *packetBatch {
  130. return &packetBatch{
  131. packets: make([]*packet.Packet, 0, capacity),
  132. }
  133. }
  134. func (b *packetBatch) add(p *packet.Packet) {
  135. b.packets = append(b.packets, p)
  136. }
  137. func (b *packetBatch) reset() {
  138. for i := range b.packets {
  139. b.packets[i] = nil
  140. }
  141. b.packets = b.packets[:0]
  142. }
  143. func (f *Interface) getPacketBatch() *packetBatch {
  144. if v := f.packetBatchPool.Get(); v != nil {
  145. b := v.(*packetBatch)
  146. b.reset()
  147. return b
  148. }
  149. return newPacketBatch(f.inboundBatchSize)
  150. }
  151. func (f *Interface) releasePacketBatch(b *packetBatch) {
  152. b.reset()
  153. f.packetBatchPool.Put(b)
  154. }
  155. type outboundBatch struct {
  156. payloads []*[]byte
  157. }
  158. func newOutboundBatch(capacity int) *outboundBatch {
  159. return &outboundBatch{payloads: make([]*[]byte, 0, capacity)}
  160. }
  161. func (b *outboundBatch) add(buf *[]byte) {
  162. b.payloads = append(b.payloads, buf)
  163. }
  164. func (b *outboundBatch) reset() {
  165. for i := range b.payloads {
  166. b.payloads[i] = nil
  167. }
  168. b.payloads = b.payloads[:0]
  169. }
  170. func (f *Interface) getOutboundBatch() *outboundBatch {
  171. if v := f.outboundBatchPool.Get(); v != nil {
  172. b := v.(*outboundBatch)
  173. b.reset()
  174. return b
  175. }
  176. return newOutboundBatch(f.outboundBatchSize)
  177. }
  178. func (f *Interface) releaseOutboundBatch(b *outboundBatch) {
  179. b.reset()
  180. f.outboundBatchPool.Put(b)
  181. }
  182. func (f *Interface) getSendBuffer(q int) *[]byte {
  183. cache := f.sendBufCache[q]
  184. if n := len(cache); n > 0 {
  185. buf := cache[n-1]
  186. f.sendBufCache[q] = cache[:n-1]
  187. *buf = (*buf)[:0]
  188. return buf
  189. }
  190. if v := f.sendPool.Get(); v != nil {
  191. buf := v.(*[]byte)
  192. *buf = (*buf)[:0]
  193. return buf
  194. }
  195. b := make([]byte, mtu)
  196. return &b
  197. }
  198. func (f *Interface) releaseSendBuffer(q int, buf *[]byte) {
  199. if buf == nil {
  200. return
  201. }
  202. *buf = (*buf)[:0]
  203. cache := f.sendBufCache[q]
  204. if len(cache) < f.maxSendBufPerRoutine {
  205. f.sendBufCache[q] = append(cache, buf)
  206. return
  207. }
  208. f.sendPool.Put(buf)
  209. }
  210. func (f *Interface) flushSendQueue(q int, pending *[]outboundSend, pendingBytes *int) {
  211. if len(*pending) == 0 {
  212. return
  213. }
  214. batch := make([]udp.BatchPacket, len(*pending))
  215. for i, entry := range *pending {
  216. batch[i] = udp.BatchPacket{
  217. Payload: (*entry.buf)[:entry.length],
  218. Addr: entry.addr,
  219. }
  220. }
  221. sent, err := f.writers[q].WriteBatch(batch)
  222. if err != nil {
  223. f.l.WithError(err).WithField("sent", sent).Error("Failed to batch send packets")
  224. }
  225. for _, entry := range *pending {
  226. f.releaseSendBuffer(q, entry.buf)
  227. }
  228. *pending = (*pending)[:0]
  229. if pendingBytes != nil {
  230. *pendingBytes = 0
  231. }
  232. }
  233. type EncWriter interface {
  234. SendVia(via *HostInfo,
  235. relay *Relay,
  236. ad,
  237. nb,
  238. out []byte,
  239. nocopy bool,
  240. )
  241. SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte)
  242. SendMessageToHostInfo(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte)
  243. Handshake(vpnAddr netip.Addr)
  244. GetHostInfo(vpnAddr netip.Addr) *HostInfo
  245. GetCertState() *CertState
  246. }
  247. type sendRecvErrorConfig uint8
  248. const (
  249. sendRecvErrorAlways sendRecvErrorConfig = iota
  250. sendRecvErrorNever
  251. sendRecvErrorPrivate
  252. )
  253. func (s sendRecvErrorConfig) ShouldSendRecvError(endpoint netip.AddrPort) bool {
  254. switch s {
  255. case sendRecvErrorPrivate:
  256. return endpoint.Addr().IsPrivate()
  257. case sendRecvErrorAlways:
  258. return true
  259. case sendRecvErrorNever:
  260. return false
  261. default:
  262. panic(fmt.Errorf("invalid sendRecvErrorConfig value: %d", s))
  263. }
  264. }
  265. func (s sendRecvErrorConfig) String() string {
  266. switch s {
  267. case sendRecvErrorAlways:
  268. return "always"
  269. case sendRecvErrorNever:
  270. return "never"
  271. case sendRecvErrorPrivate:
  272. return "private"
  273. default:
  274. return fmt.Sprintf("invalid(%d)", s)
  275. }
  276. }
  277. func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
  278. if c.Outside == nil {
  279. return nil, errors.New("no outside connection")
  280. }
  281. if c.Inside == nil {
  282. return nil, errors.New("no inside interface (tun)")
  283. }
  284. if c.pki == nil {
  285. return nil, errors.New("no certificate state")
  286. }
  287. if c.Firewall == nil {
  288. return nil, errors.New("no firewall rules")
  289. }
  290. if c.connectionManager == nil {
  291. return nil, errors.New("no connection manager")
  292. }
  293. cs := c.pki.getCertState()
  294. bc := c.BatchConfig
  295. if bc.InboundBatchSize <= 0 {
  296. bc.InboundBatchSize = inboundBatchSizeDefault
  297. }
  298. if bc.OutboundBatchSize <= 0 {
  299. bc.OutboundBatchSize = outboundBatchSizeDefault
  300. }
  301. if bc.FlushInterval <= 0 {
  302. bc.FlushInterval = batchFlushIntervalDefault
  303. }
  304. if bc.MaxOutstandingPerChan <= 0 {
  305. bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault
  306. }
  307. if bc.MaxPendingPackets <= 0 {
  308. bc.MaxPendingPackets = maxPendingPacketsDefault
  309. }
  310. if bc.MaxPendingBytes <= 0 {
  311. bc.MaxPendingBytes = maxPendingBytesDefault
  312. }
  313. if bc.MaxSendBuffersPerChan <= 0 {
  314. bc.MaxSendBuffersPerChan = maxSendBufPerRoutineDefault
  315. }
  316. ifce := &Interface{
  317. pki: c.pki,
  318. hostMap: c.HostMap,
  319. outside: c.Outside,
  320. inside: c.Inside,
  321. firewall: c.Firewall,
  322. serveDns: c.ServeDns,
  323. handshakeManager: c.HandshakeManager,
  324. createTime: time.Now(),
  325. lightHouse: c.lightHouse,
  326. dropLocalBroadcast: c.DropLocalBroadcast,
  327. dropMulticast: c.DropMulticast,
  328. routines: c.routines,
  329. version: c.version,
  330. writers: make([]udp.Conn, c.routines),
  331. readers: make([]io.ReadWriteCloser, c.routines),
  332. myVpnNetworks: cs.myVpnNetworks,
  333. myVpnNetworksTable: cs.myVpnNetworksTable,
  334. myVpnAddrs: cs.myVpnAddrs,
  335. myVpnAddrsTable: cs.myVpnAddrsTable,
  336. myBroadcastAddrsTable: cs.myVpnBroadcastAddrsTable,
  337. relayManager: c.relayManager,
  338. connectionManager: c.connectionManager,
  339. conntrackCacheTimeout: c.ConntrackCacheTimeout,
  340. metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
  341. messageMetrics: c.MessageMetrics,
  342. cachedPacketMetrics: &cachedPacketMetrics{
  343. sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
  344. dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
  345. },
  346. inbound: make([]chan *packetBatch, c.routines),
  347. outbound: make([]chan *outboundBatch, c.routines),
  348. l: c.l,
  349. inboundBatchSize: bc.InboundBatchSize,
  350. outboundBatchSize: bc.OutboundBatchSize,
  351. batchFlushInterval: bc.FlushInterval,
  352. maxOutstandingPerChan: bc.MaxOutstandingPerChan,
  353. maxPendingPackets: bc.MaxPendingPackets,
  354. maxPendingBytes: bc.MaxPendingBytes,
  355. maxSendBufPerRoutine: bc.MaxSendBuffersPerChan,
  356. sendBatchSize: bc.OutboundBatchSize,
  357. }
  358. for i := 0; i < c.routines; i++ {
  359. ifce.inbound[i] = make(chan *packetBatch, ifce.maxOutstandingPerChan)
  360. ifce.outbound[i] = make(chan *outboundBatch, ifce.maxOutstandingPerChan)
  361. }
  362. ifce.inPool = sync.Pool{New: func() any {
  363. return packet.New()
  364. }}
  365. ifce.outPool = sync.Pool{New: func() any {
  366. t := make([]byte, mtu)
  367. return &t
  368. }}
  369. ifce.packetBatchPool = sync.Pool{New: func() any {
  370. return newPacketBatch(ifce.inboundBatchSize)
  371. }}
  372. ifce.outboundBatchPool = sync.Pool{New: func() any {
  373. return newOutboundBatch(ifce.outboundBatchSize)
  374. }}
  375. ifce.sendPool = sync.Pool{New: func() any {
  376. buf := make([]byte, mtu)
  377. return &buf
  378. }}
  379. ifce.sendBufCache = make([][]*[]byte, c.routines)
  380. ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
  381. ifce.reQueryEvery.Store(c.reQueryEvery)
  382. ifce.reQueryWait.Store(int64(c.reQueryWait))
  383. ifce.connectionManager.intf = ifce
  384. return ifce, nil
  385. }
  386. // activate creates the interface on the host. After the interface is created, any
  387. // other services that want to bind listeners to its IP may do so successfully. However,
  388. // the interface isn't going to process anything until run() is called.
  389. func (f *Interface) activate() error {
  390. // actually turn on tun dev
  391. addr, err := f.outside.LocalAddr()
  392. if err != nil {
  393. f.l.WithError(err).Error("Failed to get udp listen address")
  394. }
  395. f.l.WithField("interface", f.inside.Name()).WithField("networks", f.myVpnNetworks).
  396. WithField("build", f.version).WithField("udpAddr", addr).
  397. WithField("boringcrypto", boringEnabled()).
  398. Info("Nebula interface is active")
  399. metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
  400. // Prepare n tun queues
  401. var reader io.ReadWriteCloser = f.inside
  402. for i := 0; i < f.routines; i++ {
  403. if i > 0 {
  404. reader, err = f.inside.NewMultiQueueReader()
  405. if err != nil {
  406. return err
  407. }
  408. }
  409. f.readers[i] = reader
  410. }
  411. if err = f.inside.Activate(); err != nil {
  412. f.inside.Close()
  413. return err
  414. }
  415. return nil
  416. }
  417. func (f *Interface) run(c context.Context) (func(), error) {
  418. for i := 0; i < f.routines; i++ {
  419. // Launch n queues to read packets from udp
  420. f.wg.Add(1)
  421. go f.listenOut(i)
  422. // Launch n queues to read packets from tun dev
  423. f.wg.Add(1)
  424. go f.listenIn(f.readers[i], i)
  425. // Launch n queues to read packets from tun dev
  426. f.wg.Add(1)
  427. go f.workerIn(i, c)
  428. // Launch n queues to read packets from tun dev
  429. f.wg.Add(1)
  430. go f.workerOut(i, c)
  431. }
  432. return f.wg.Wait, nil
  433. }
  434. func (f *Interface) listenOut(i int) {
  435. runtime.LockOSThread()
  436. var li udp.Conn
  437. if i > 0 {
  438. li = f.writers[i]
  439. } else {
  440. li = f.outside
  441. }
  442. batch := f.getPacketBatch()
  443. lastFlush := time.Now()
  444. flush := func(force bool) {
  445. if len(batch.packets) == 0 {
  446. if force {
  447. f.releasePacketBatch(batch)
  448. }
  449. return
  450. }
  451. f.inbound[i] <- batch
  452. batch = f.getPacketBatch()
  453. lastFlush = time.Now()
  454. }
  455. err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
  456. p := f.inPool.Get().(*packet.Packet)
  457. p.Payload = p.Payload[:mtu]
  458. copy(p.Payload, payload)
  459. p.Payload = p.Payload[:len(payload)]
  460. p.Addr = fromUdpAddr
  461. batch.add(p)
  462. if len(batch.packets) >= f.inboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
  463. flush(false)
  464. }
  465. })
  466. if len(batch.packets) > 0 {
  467. f.inbound[i] <- batch
  468. } else {
  469. f.releasePacketBatch(batch)
  470. }
  471. if err != nil && !f.closed.Load() {
  472. f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
  473. //TODO: Trigger Control to close
  474. }
  475. f.l.Debugf("underlay reader %v is done", i)
  476. f.wg.Done()
  477. }
  478. func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
  479. runtime.LockOSThread()
  480. batch := f.getOutboundBatch()
  481. lastFlush := time.Now()
  482. flush := func(force bool) {
  483. if len(batch.payloads) == 0 {
  484. if force {
  485. f.releaseOutboundBatch(batch)
  486. }
  487. return
  488. }
  489. f.outbound[i] <- batch
  490. batch = f.getOutboundBatch()
  491. lastFlush = time.Now()
  492. }
  493. for {
  494. p := f.outPool.Get().(*[]byte)
  495. *p = (*p)[:mtu]
  496. n, err := reader.Read(*p)
  497. if err != nil {
  498. if !f.closed.Load() {
  499. f.l.WithError(err).Error("Error while reading outbound packet, closing")
  500. //TODO: Trigger Control to close
  501. }
  502. break
  503. }
  504. *p = (*p)[:n]
  505. batch.add(p)
  506. if len(batch.payloads) >= f.outboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
  507. flush(false)
  508. }
  509. }
  510. if len(batch.payloads) > 0 {
  511. f.outbound[i] <- batch
  512. } else {
  513. f.releaseOutboundBatch(batch)
  514. }
  515. f.l.Debugf("overlay reader %v is done", i)
  516. f.wg.Done()
  517. }
  518. func (f *Interface) workerIn(i int, ctx context.Context) {
  519. lhh := f.lightHouse.NewRequestHandler()
  520. conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  521. fwPacket2 := &firewall.Packet{}
  522. nb2 := make([]byte, 12, 12)
  523. result2 := make([]byte, mtu)
  524. h := &header.H{}
  525. for {
  526. select {
  527. case batch := <-f.inbound[i]:
  528. for _, p := range batch.packets {
  529. f.readOutsidePackets(p.Addr, nil, result2[:0], p.Payload, h, fwPacket2, lhh, nb2, i, conntrackCache.Get(f.l))
  530. p.Payload = p.Payload[:mtu]
  531. f.inPool.Put(p)
  532. }
  533. f.releasePacketBatch(batch)
  534. case <-ctx.Done():
  535. f.wg.Done()
  536. return
  537. }
  538. }
  539. }
  540. func (f *Interface) workerOut(i int, ctx context.Context) {
  541. conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
  542. fwPacket1 := &firewall.Packet{}
  543. nb1 := make([]byte, 12, 12)
  544. pending := make([]outboundSend, 0, f.sendBatchSize)
  545. pendingBytes := 0
  546. maxPendingPackets := f.maxPendingPackets
  547. if maxPendingPackets <= 0 {
  548. maxPendingPackets = f.sendBatchSize
  549. }
  550. maxPendingBytes := f.maxPendingBytes
  551. if maxPendingBytes <= 0 {
  552. maxPendingBytes = f.sendBatchSize * mtu
  553. }
  554. for {
  555. select {
  556. case batch := <-f.outbound[i]:
  557. for _, data := range batch.payloads {
  558. sendBuf := f.getSendBuffer(i)
  559. buf := (*sendBuf)[:0]
  560. queue := func(addr netip.AddrPort, length int) {
  561. if len(pending) >= maxPendingPackets || pendingBytes+length > maxPendingBytes {
  562. f.flushSendQueue(i, &pending, &pendingBytes)
  563. }
  564. pending = append(pending, outboundSend{
  565. buf: sendBuf,
  566. length: length,
  567. addr: addr,
  568. })
  569. pendingBytes += length
  570. if len(pending) >= f.sendBatchSize || pendingBytes >= maxPendingBytes {
  571. f.flushSendQueue(i, &pending, &pendingBytes)
  572. }
  573. }
  574. sent := f.consumeInsidePacket(*data, fwPacket1, nb1, buf, queue, i, conntrackCache.Get(f.l))
  575. if !sent {
  576. f.releaseSendBuffer(i, sendBuf)
  577. }
  578. *data = (*data)[:mtu]
  579. f.outPool.Put(data)
  580. }
  581. f.releaseOutboundBatch(batch)
  582. if len(pending) > 0 {
  583. f.flushSendQueue(i, &pending, &pendingBytes)
  584. }
  585. case <-ctx.Done():
  586. if len(pending) > 0 {
  587. f.flushSendQueue(i, &pending, &pendingBytes)
  588. }
  589. f.wg.Done()
  590. return
  591. }
  592. }
  593. }
  594. func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
  595. c.RegisterReloadCallback(f.reloadFirewall)
  596. c.RegisterReloadCallback(f.reloadSendRecvError)
  597. c.RegisterReloadCallback(f.reloadDisconnectInvalid)
  598. c.RegisterReloadCallback(f.reloadMisc)
  599. for _, udpConn := range f.writers {
  600. c.RegisterReloadCallback(udpConn.ReloadConfig)
  601. }
  602. }
  603. func (f *Interface) reloadDisconnectInvalid(c *config.C) {
  604. initial := c.InitialLoad()
  605. if initial || c.HasChanged("pki.disconnect_invalid") {
  606. f.disconnectInvalid.Store(c.GetBool("pki.disconnect_invalid", true))
  607. if !initial {
  608. f.l.Infof("pki.disconnect_invalid changed to %v", f.disconnectInvalid.Load())
  609. }
  610. }
  611. }
  612. func (f *Interface) reloadFirewall(c *config.C) {
  613. //TODO: need to trigger/detect if the certificate changed too
  614. if c.HasChanged("firewall") == false {
  615. f.l.Debug("No firewall config change detected")
  616. return
  617. }
  618. fw, err := NewFirewallFromConfig(f.l, f.pki.getCertState(), c)
  619. if err != nil {
  620. f.l.WithError(err).Error("Error while creating firewall during reload")
  621. return
  622. }
  623. oldFw := f.firewall
  624. conntrack := oldFw.Conntrack
  625. conntrack.Lock()
  626. defer conntrack.Unlock()
  627. fw.rulesVersion = oldFw.rulesVersion + 1
  628. // If rulesVersion is back to zero, we have wrapped all the way around. Be
  629. // safe and just reset conntrack in this case.
  630. if fw.rulesVersion == 0 {
  631. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  632. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  633. WithField("rulesVersion", fw.rulesVersion).
  634. Warn("firewall rulesVersion has overflowed, resetting conntrack")
  635. } else {
  636. fw.Conntrack = conntrack
  637. }
  638. f.firewall = fw
  639. oldFw.Destroy()
  640. f.l.WithField("firewallHashes", fw.GetRuleHashes()).
  641. WithField("oldFirewallHashes", oldFw.GetRuleHashes()).
  642. WithField("rulesVersion", fw.rulesVersion).
  643. Info("New firewall has been installed")
  644. }
  645. func (f *Interface) reloadSendRecvError(c *config.C) {
  646. if c.InitialLoad() || c.HasChanged("listen.send_recv_error") {
  647. stringValue := c.GetString("listen.send_recv_error", "always")
  648. switch stringValue {
  649. case "always":
  650. f.sendRecvErrorConfig = sendRecvErrorAlways
  651. case "never":
  652. f.sendRecvErrorConfig = sendRecvErrorNever
  653. case "private":
  654. f.sendRecvErrorConfig = sendRecvErrorPrivate
  655. default:
  656. if c.GetBool("listen.send_recv_error", true) {
  657. f.sendRecvErrorConfig = sendRecvErrorAlways
  658. } else {
  659. f.sendRecvErrorConfig = sendRecvErrorNever
  660. }
  661. }
  662. f.l.WithField("sendRecvError", f.sendRecvErrorConfig.String()).
  663. Info("Loaded send_recv_error config")
  664. }
  665. }
  666. func (f *Interface) reloadMisc(c *config.C) {
  667. if c.HasChanged("counters.try_promote") {
  668. n := c.GetUint32("counters.try_promote", defaultPromoteEvery)
  669. f.tryPromoteEvery.Store(n)
  670. f.l.Info("counters.try_promote has changed")
  671. }
  672. if c.HasChanged("counters.requery_every_packets") {
  673. n := c.GetUint32("counters.requery_every_packets", defaultReQueryEvery)
  674. f.reQueryEvery.Store(n)
  675. f.l.Info("counters.requery_every_packets has changed")
  676. }
  677. if c.HasChanged("timers.requery_wait_duration") {
  678. n := c.GetDuration("timers.requery_wait_duration", defaultReQueryWait)
  679. f.reQueryWait.Store(int64(n))
  680. f.l.Info("timers.requery_wait_duration has changed")
  681. }
  682. }
  683. func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
  684. ticker := time.NewTicker(i)
  685. defer ticker.Stop()
  686. udpStats := udp.NewUDPStatsEmitter(f.writers)
  687. certExpirationGauge := metrics.GetOrRegisterGauge("certificate.ttl_seconds", nil)
  688. certInitiatingVersion := metrics.GetOrRegisterGauge("certificate.initiating_version", nil)
  689. certMaxVersion := metrics.GetOrRegisterGauge("certificate.max_version", nil)
  690. for {
  691. select {
  692. case <-ctx.Done():
  693. return
  694. case <-ticker.C:
  695. f.firewall.EmitStats()
  696. f.handshakeManager.EmitStats()
  697. udpStats()
  698. certState := f.pki.getCertState()
  699. defaultCrt := certState.GetDefaultCertificate()
  700. certExpirationGauge.Update(int64(defaultCrt.NotAfter().Sub(time.Now()) / time.Second))
  701. certInitiatingVersion.Update(int64(defaultCrt.Version()))
  702. // Report the max certificate version we are capable of using
  703. if certState.v2Cert != nil {
  704. certMaxVersion.Update(int64(certState.v2Cert.Version()))
  705. } else {
  706. certMaxVersion.Update(int64(certState.v1Cert.Version()))
  707. }
  708. }
  709. }
  710. }
  711. func (f *Interface) GetHostInfo(vpnIp netip.Addr) *HostInfo {
  712. return f.hostMap.QueryVpnAddr(vpnIp)
  713. }
  714. func (f *Interface) GetCertState() *CertState {
  715. return f.pki.getCertState()
  716. }
  717. func (f *Interface) Close() error {
  718. f.closed.Store(true)
  719. // Release the udp readers
  720. for _, u := range f.writers {
  721. err := u.Close()
  722. if err != nil {
  723. f.l.WithError(err).Error("Error while closing udp socket")
  724. }
  725. }
  726. // Release the tun readers
  727. for _, u := range f.readers {
  728. err := u.Close()
  729. if err != nil {
  730. f.l.WithError(err).Error("Error while closing tun device")
  731. }
  732. }
  733. return nil
  734. }