connection_manager.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. package nebula
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "net/netip"
  7. "sync"
  8. "time"
  9. "github.com/rcrowley/go-metrics"
  10. "github.com/sirupsen/logrus"
  11. "github.com/slackhq/nebula/cert"
  12. "github.com/slackhq/nebula/config"
  13. "github.com/slackhq/nebula/header"
  14. )
  15. type trafficDecision int
  16. const (
  17. doNothing trafficDecision = 0
  18. deleteTunnel trafficDecision = 1 // delete the hostinfo on our side, do not notify the remote
  19. closeTunnel trafficDecision = 2 // delete the hostinfo and notify the remote
  20. swapPrimary trafficDecision = 3
  21. migrateRelays trafficDecision = 4
  22. tryRehandshake trafficDecision = 5
  23. sendTestPacket trafficDecision = 6
  24. )
  25. // LastCommunication tracks when we last communicated with a host
  26. type LastCommunication struct {
  27. timestamp time.Time
  28. vpnIp netip.Addr // To help with logging
  29. }
  30. type connectionManager struct {
  31. in map[uint32]struct{}
  32. inLock *sync.RWMutex
  33. out map[uint32]struct{}
  34. outLock *sync.RWMutex
  35. // relayUsed holds which relay localIndexs are in use
  36. relayUsed map[uint32]struct{}
  37. relayUsedLock *sync.RWMutex
  38. // Track last communication with hosts
  39. lastCommMap map[uint32]time.Time
  40. lastCommLock *sync.RWMutex
  41. inactivityTimer *LockingTimerWheel[uint32]
  42. inactivityTimeout time.Duration
  43. hostMap *HostMap
  44. trafficTimer *LockingTimerWheel[uint32]
  45. intf *Interface
  46. pendingDeletion map[uint32]struct{}
  47. punchy *Punchy
  48. checkInterval time.Duration
  49. pendingDeletionInterval time.Duration
  50. metricsTxPunchy metrics.Counter
  51. l *logrus.Logger
  52. }
  53. func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval time.Duration, punchy *Punchy) *connectionManager {
  54. var max time.Duration
  55. if checkInterval < pendingDeletionInterval {
  56. max = pendingDeletionInterval
  57. } else {
  58. max = checkInterval
  59. }
  60. nc := &connectionManager{
  61. hostMap: intf.hostMap,
  62. in: make(map[uint32]struct{}),
  63. inLock: &sync.RWMutex{},
  64. out: make(map[uint32]struct{}),
  65. outLock: &sync.RWMutex{},
  66. relayUsed: make(map[uint32]struct{}),
  67. relayUsedLock: &sync.RWMutex{},
  68. lastCommMap: make(map[uint32]time.Time),
  69. lastCommLock: &sync.RWMutex{},
  70. inactivityTimeout: 1 * time.Minute, // Default inactivity timeout: 10 minutes
  71. trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
  72. intf: intf,
  73. pendingDeletion: make(map[uint32]struct{}),
  74. checkInterval: checkInterval,
  75. pendingDeletionInterval: pendingDeletionInterval,
  76. punchy: punchy,
  77. metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil),
  78. l: l,
  79. }
  80. // Initialize the inactivity timer wheel - make wheel duration slightly longer than the timeout
  81. nc.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, nc.inactivityTimeout+time.Minute)
  82. nc.Start(ctx)
  83. return nc
  84. }
  85. func (n *connectionManager) updateLastCommunication(localIndex uint32) {
  86. // Get host info to record VPN IP for better logging
  87. hostInfo := n.hostMap.QueryIndex(localIndex)
  88. if hostInfo == nil {
  89. return
  90. }
  91. now := time.Now()
  92. n.lastCommLock.Lock()
  93. n.lastCommMap[localIndex] = now
  94. n.lastCommLock.Unlock()
  95. // Reset the inactivity timer for this host
  96. n.inactivityTimer.m.Lock()
  97. n.inactivityTimer.t.Add(localIndex, n.inactivityTimeout)
  98. n.inactivityTimer.m.Unlock()
  99. }
  100. func (n *connectionManager) In(localIndex uint32) {
  101. n.inLock.RLock()
  102. // If this already exists, return
  103. if _, ok := n.in[localIndex]; ok {
  104. n.inLock.RUnlock()
  105. return
  106. }
  107. n.inLock.RUnlock()
  108. n.inLock.Lock()
  109. n.in[localIndex] = struct{}{}
  110. n.inLock.Unlock()
  111. // Update last communication time
  112. n.updateLastCommunication(localIndex)
  113. }
  114. func (n *connectionManager) Out(localIndex uint32) {
  115. n.outLock.RLock()
  116. // If this already exists, return
  117. if _, ok := n.out[localIndex]; ok {
  118. n.outLock.RUnlock()
  119. return
  120. }
  121. n.outLock.RUnlock()
  122. n.outLock.Lock()
  123. n.out[localIndex] = struct{}{}
  124. n.outLock.Unlock()
  125. // Update last communication time
  126. n.updateLastCommunication(localIndex)
  127. }
  128. func (n *connectionManager) RelayUsed(localIndex uint32) {
  129. n.relayUsedLock.RLock()
  130. // If this already exists, return
  131. if _, ok := n.relayUsed[localIndex]; ok {
  132. n.relayUsedLock.RUnlock()
  133. return
  134. }
  135. n.relayUsedLock.RUnlock()
  136. n.relayUsedLock.Lock()
  137. n.relayUsed[localIndex] = struct{}{}
  138. n.relayUsedLock.Unlock()
  139. }
  140. // getAndResetTrafficCheck returns if there was any inbound or outbound traffic within the last tick and
  141. // resets the state for this local index
  142. func (n *connectionManager) getAndResetTrafficCheck(localIndex uint32) (bool, bool) {
  143. n.inLock.Lock()
  144. n.outLock.Lock()
  145. _, in := n.in[localIndex]
  146. _, out := n.out[localIndex]
  147. delete(n.in, localIndex)
  148. delete(n.out, localIndex)
  149. n.inLock.Unlock()
  150. n.outLock.Unlock()
  151. return in, out
  152. }
  153. func (n *connectionManager) AddTrafficWatch(localIndex uint32) {
  154. // Use a write lock directly because it should be incredibly rare that we are ever already tracking this index
  155. n.outLock.Lock()
  156. if _, ok := n.out[localIndex]; ok {
  157. n.outLock.Unlock()
  158. return
  159. }
  160. n.out[localIndex] = struct{}{}
  161. n.trafficTimer.Add(localIndex, n.checkInterval)
  162. n.outLock.Unlock()
  163. }
  164. // checkInactiveTunnels checks for tunnels that have been inactive for too long and drops them
  165. func (n *connectionManager) checkInactiveTunnels() {
  166. now := time.Now()
  167. // First, advance the timer wheel to the current time
  168. n.inactivityTimer.m.Lock()
  169. n.inactivityTimer.t.Advance(now)
  170. n.inactivityTimer.m.Unlock()
  171. // Check for expired timers (inactive connections)
  172. for {
  173. // Get the next expired tunnel
  174. n.inactivityTimer.m.Lock()
  175. localIndex, ok := n.inactivityTimer.t.Purge()
  176. n.inactivityTimer.m.Unlock()
  177. if !ok {
  178. // No more expired timers
  179. break
  180. }
  181. n.lastCommLock.RLock()
  182. lastComm, exists := n.lastCommMap[localIndex]
  183. n.lastCommLock.RUnlock()
  184. if !exists {
  185. // No last communication record, odd but skip
  186. continue
  187. }
  188. // Calculate inactivity duration
  189. inactiveDuration := now.Sub(lastComm)
  190. // Check if we've exceeded the inactivity timeout
  191. if inactiveDuration >= n.inactivityTimeout {
  192. // Get the host info (if it still exists)
  193. hostInfo := n.hostMap.QueryIndex(localIndex)
  194. if hostInfo == nil {
  195. // Host info is gone, remove from our tracking map
  196. n.lastCommLock.Lock()
  197. delete(n.lastCommMap, localIndex)
  198. n.lastCommLock.Unlock()
  199. continue
  200. }
  201. // Log the inactivity and drop the tunnel
  202. n.l.WithField("vpnIp", hostInfo.vpnAddrs[0]).
  203. WithField("localIndex", localIndex).
  204. WithField("inactiveDuration", inactiveDuration).
  205. WithField("timeout", n.inactivityTimeout).
  206. Info("Dropping tunnel due to inactivity")
  207. // Close the tunnel using the existing mechanism
  208. n.intf.closeTunnel(hostInfo)
  209. // Clean up our tracking map
  210. n.lastCommLock.Lock()
  211. delete(n.lastCommMap, localIndex)
  212. n.lastCommLock.Unlock()
  213. } else {
  214. // Re-add to the timer wheel with the remaining time
  215. remainingTime := n.inactivityTimeout - inactiveDuration
  216. n.inactivityTimer.m.Lock()
  217. n.inactivityTimer.t.Add(localIndex, remainingTime)
  218. n.inactivityTimer.m.Unlock()
  219. }
  220. }
  221. }
  222. // CleanupDeletedHostInfos removes entries from our lastCommMap for hosts that no longer exist
  223. func (n *connectionManager) CleanupDeletedHostInfos() {
  224. n.lastCommLock.Lock()
  225. defer n.lastCommLock.Unlock()
  226. // Find indexes to delete
  227. var toDelete []uint32
  228. for localIndex := range n.lastCommMap {
  229. if n.hostMap.QueryIndex(localIndex) == nil {
  230. toDelete = append(toDelete, localIndex)
  231. }
  232. }
  233. // Delete them
  234. for _, localIndex := range toDelete {
  235. delete(n.lastCommMap, localIndex)
  236. }
  237. if len(toDelete) > 0 && n.l.Level >= logrus.DebugLevel {
  238. n.l.WithField("count", len(toDelete)).Debug("Cleaned up deleted host entries from lastCommMap")
  239. }
  240. }
  241. // ReloadConfig updates the connection manager configuration
  242. func (n *connectionManager) ReloadConfig(c *config.C) {
  243. // Get the inactivity timeout from config
  244. inactivityTimeout := c.GetDuration("timers.inactivity_timeout", 10*time.Minute)
  245. // Only update if different
  246. if inactivityTimeout != n.inactivityTimeout {
  247. n.l.WithField("old", n.inactivityTimeout).
  248. WithField("new", inactivityTimeout).
  249. Info("Updating inactivity timeout")
  250. n.inactivityTimeout = inactivityTimeout
  251. // Recreate the inactivity timer wheel with the new timeout
  252. n.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, n.inactivityTimeout+time.Minute)
  253. // Re-add all existing hosts to the new timer wheel
  254. n.lastCommLock.RLock()
  255. for localIndex, lastComm := range n.lastCommMap {
  256. // Calculate remaining time based on last communication
  257. now := time.Now()
  258. elapsed := now.Sub(lastComm)
  259. // If the elapsed time exceeds the new timeout, this will be caught
  260. // in the next inactivity check. Otherwise, add with remaining time.
  261. if elapsed < n.inactivityTimeout {
  262. remainingTime := n.inactivityTimeout - elapsed
  263. n.inactivityTimer.m.Lock()
  264. n.inactivityTimer.t.Add(localIndex, remainingTime)
  265. n.inactivityTimer.m.Unlock()
  266. }
  267. }
  268. n.lastCommLock.RUnlock()
  269. }
  270. }
  271. func (n *connectionManager) Start(ctx context.Context) {
  272. go n.Run(ctx)
  273. }
  274. func (n *connectionManager) Run(ctx context.Context) {
  275. //TODO: this tick should be based on the min wheel tick? Check firewall
  276. clockSource := time.NewTicker(500 * time.Millisecond)
  277. defer clockSource.Stop()
  278. // Create ticker for inactivity checks (every minute)
  279. inactivityTicker := time.NewTicker(time.Minute)
  280. defer inactivityTicker.Stop()
  281. // Create ticker for cleanup (every 5 minutes)
  282. cleanupTicker := time.NewTicker(5 * time.Minute)
  283. defer cleanupTicker.Stop()
  284. p := []byte("")
  285. nb := make([]byte, 12, 12)
  286. out := make([]byte, mtu)
  287. for {
  288. select {
  289. case <-ctx.Done():
  290. return
  291. case now := <-clockSource.C:
  292. n.trafficTimer.Advance(now)
  293. for {
  294. localIndex, has := n.trafficTimer.Purge()
  295. if !has {
  296. break
  297. }
  298. n.doTrafficCheck(localIndex, p, nb, out, now)
  299. }
  300. case <-inactivityTicker.C:
  301. // Check for inactive tunnels
  302. n.checkInactiveTunnels()
  303. case <-cleanupTicker.C:
  304. // Periodically clean up deleted hosts
  305. n.CleanupDeletedHostInfos()
  306. }
  307. }
  308. }
  309. func (n *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) {
  310. decision, hostinfo, primary := n.makeTrafficDecision(localIndex, now)
  311. switch decision {
  312. case deleteTunnel:
  313. if n.hostMap.DeleteHostInfo(hostinfo) {
  314. // Only clearing the lighthouse cache if this is the last hostinfo for this vpn ip in the hostmap
  315. n.intf.lightHouse.DeleteVpnAddrs(hostinfo.vpnAddrs)
  316. }
  317. case closeTunnel:
  318. n.intf.sendCloseTunnel(hostinfo)
  319. n.intf.closeTunnel(hostinfo)
  320. case swapPrimary:
  321. n.swapPrimary(hostinfo, primary)
  322. case migrateRelays:
  323. n.migrateRelayUsed(hostinfo, primary)
  324. case tryRehandshake:
  325. n.tryRehandshake(hostinfo)
  326. case sendTestPacket:
  327. n.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
  328. }
  329. n.resetRelayTrafficCheck(hostinfo)
  330. }
  331. func (n *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) {
  332. if hostinfo != nil {
  333. n.relayUsedLock.Lock()
  334. defer n.relayUsedLock.Unlock()
  335. // No need to migrate any relays, delete usage info now.
  336. for _, idx := range hostinfo.relayState.CopyRelayForIdxs() {
  337. delete(n.relayUsed, idx)
  338. }
  339. }
  340. }
  341. func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) {
  342. relayFor := oldhostinfo.relayState.CopyAllRelayFor()
  343. for _, r := range relayFor {
  344. existing, ok := newhostinfo.relayState.QueryRelayForByIp(r.PeerAddr)
  345. var index uint32
  346. var relayFrom netip.Addr
  347. var relayTo netip.Addr
  348. switch {
  349. case ok && existing.State == Established:
  350. // This relay already exists in newhostinfo, then do nothing.
  351. continue
  352. case ok && existing.State == Requested:
  353. // The relay exists in a Requested state; re-send the request
  354. index = existing.LocalIndex
  355. switch r.Type {
  356. case TerminalType:
  357. relayFrom = n.intf.myVpnAddrs[0]
  358. relayTo = existing.PeerAddr
  359. case ForwardingType:
  360. relayFrom = existing.PeerAddr
  361. relayTo = newhostinfo.vpnAddrs[0]
  362. default:
  363. // should never happen
  364. }
  365. case !ok:
  366. n.relayUsedLock.RLock()
  367. if _, relayUsed := n.relayUsed[r.LocalIndex]; !relayUsed {
  368. // The relay hasn't been used; don't migrate it.
  369. n.relayUsedLock.RUnlock()
  370. continue
  371. }
  372. n.relayUsedLock.RUnlock()
  373. // The relay doesn't exist at all; create some relay state and send the request.
  374. var err error
  375. index, err = AddRelay(n.l, newhostinfo, n.hostMap, r.PeerAddr, nil, r.Type, Requested)
  376. if err != nil {
  377. n.l.WithError(err).Error("failed to migrate relay to new hostinfo")
  378. continue
  379. }
  380. switch r.Type {
  381. case TerminalType:
  382. relayFrom = n.intf.myVpnAddrs[0]
  383. relayTo = r.PeerAddr
  384. case ForwardingType:
  385. relayFrom = r.PeerAddr
  386. relayTo = newhostinfo.vpnAddrs[0]
  387. default:
  388. // should never happen
  389. }
  390. }
  391. // Send a CreateRelayRequest to the peer.
  392. req := NebulaControl{
  393. Type: NebulaControl_CreateRelayRequest,
  394. InitiatorRelayIndex: index,
  395. }
  396. switch newhostinfo.GetCert().Certificate.Version() {
  397. case cert.Version1:
  398. if !relayFrom.Is4() {
  399. n.l.Error("can not migrate v1 relay with a v6 network because the relay is not running a current nebula version")
  400. continue
  401. }
  402. if !relayTo.Is4() {
  403. n.l.Error("can not migrate v1 relay with a v6 remote network because the relay is not running a current nebula version")
  404. continue
  405. }
  406. b := relayFrom.As4()
  407. req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  408. b = relayTo.As4()
  409. req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  410. case cert.Version2:
  411. req.RelayFromAddr = netAddrToProtoAddr(relayFrom)
  412. req.RelayToAddr = netAddrToProtoAddr(relayTo)
  413. default:
  414. newhostinfo.logger(n.l).Error("Unknown certificate version found while attempting to migrate relay")
  415. continue
  416. }
  417. msg, err := req.Marshal()
  418. if err != nil {
  419. n.l.WithError(err).Error("failed to marshal Control message to migrate relay")
  420. } else {
  421. n.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu))
  422. n.l.WithFields(logrus.Fields{
  423. "relayFrom": req.RelayFromAddr,
  424. "relayTo": req.RelayToAddr,
  425. "initiatorRelayIndex": req.InitiatorRelayIndex,
  426. "responderRelayIndex": req.ResponderRelayIndex,
  427. "vpnAddrs": newhostinfo.vpnAddrs}).
  428. Info("send CreateRelayRequest")
  429. }
  430. }
  431. }
  432. func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
  433. n.hostMap.RLock()
  434. defer n.hostMap.RUnlock()
  435. hostinfo := n.hostMap.Indexes[localIndex]
  436. if hostinfo == nil {
  437. n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap")
  438. delete(n.pendingDeletion, localIndex)
  439. return doNothing, nil, nil
  440. }
  441. if n.isInvalidCertificate(now, hostinfo) {
  442. delete(n.pendingDeletion, hostinfo.localIndexId)
  443. return closeTunnel, hostinfo, nil
  444. }
  445. primary := n.hostMap.Hosts[hostinfo.vpnAddrs[0]]
  446. mainHostInfo := true
  447. if primary != nil && primary != hostinfo {
  448. mainHostInfo = false
  449. }
  450. // Check for traffic on this hostinfo
  451. inTraffic, outTraffic := n.getAndResetTrafficCheck(localIndex)
  452. // A hostinfo is determined alive if there is incoming traffic
  453. if inTraffic {
  454. decision := doNothing
  455. if n.l.Level >= logrus.DebugLevel {
  456. hostinfo.logger(n.l).
  457. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  458. Debug("Tunnel status")
  459. }
  460. delete(n.pendingDeletion, hostinfo.localIndexId)
  461. if mainHostInfo {
  462. decision = tryRehandshake
  463. } else {
  464. if n.shouldSwapPrimary(hostinfo, primary) {
  465. decision = swapPrimary
  466. } else {
  467. // migrate the relays to the primary, if in use.
  468. decision = migrateRelays
  469. }
  470. }
  471. n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval)
  472. if !outTraffic {
  473. // Send a punch packet to keep the NAT state alive
  474. n.sendPunch(hostinfo)
  475. }
  476. return decision, hostinfo, primary
  477. }
  478. if _, ok := n.pendingDeletion[hostinfo.localIndexId]; ok {
  479. // We have already sent a test packet and nothing was returned, this hostinfo is dead
  480. hostinfo.logger(n.l).
  481. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  482. Info("Tunnel status")
  483. delete(n.pendingDeletion, hostinfo.localIndexId)
  484. return deleteTunnel, hostinfo, nil
  485. }
  486. decision := doNothing
  487. if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo {
  488. if !outTraffic {
  489. // If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel.
  490. // Just maintain NAT state if configured to do so.
  491. n.sendPunch(hostinfo)
  492. n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval)
  493. return doNothing, nil, nil
  494. }
  495. if n.punchy.GetTargetEverything() {
  496. // This is similar to the old punchy behavior with a slight optimization.
  497. // We aren't receiving traffic but we are sending it, punch on all known
  498. // ips in case we need to re-prime NAT state
  499. n.sendPunch(hostinfo)
  500. }
  501. if n.l.Level >= logrus.DebugLevel {
  502. hostinfo.logger(n.l).
  503. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  504. Debug("Tunnel status")
  505. }
  506. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  507. decision = sendTestPacket
  508. } else {
  509. if n.l.Level >= logrus.DebugLevel {
  510. hostinfo.logger(n.l).Debugf("Hostinfo sadness")
  511. }
  512. }
  513. n.pendingDeletion[hostinfo.localIndexId] = struct{}{}
  514. n.trafficTimer.Add(hostinfo.localIndexId, n.pendingDeletionInterval)
  515. return decision, hostinfo, nil
  516. }
  517. func (n *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool {
  518. // The primary tunnel is the most recent handshake to complete locally and should work entirely fine.
  519. // If we are here then we have multiple tunnels for a host pair and neither side believes the same tunnel is primary.
  520. // Let's sort this out.
  521. // Only one side should swap because if both swap then we may never resolve to a single tunnel.
  522. // vpn addr is static across all tunnels for this host pair so lets
  523. // use that to determine if we should consider swapping.
  524. if current.vpnAddrs[0].Compare(n.intf.myVpnAddrs[0]) < 0 {
  525. // Their primary vpn addr is less than mine. Do not swap.
  526. return false
  527. }
  528. crt := n.intf.pki.getCertState().getCertificate(current.ConnectionState.myCert.Version())
  529. // If this tunnel is using the latest certificate then we should swap it to primary for a bit and see if things
  530. // settle down.
  531. return bytes.Equal(current.ConnectionState.myCert.Signature(), crt.Signature())
  532. }
  533. func (n *connectionManager) swapPrimary(current, primary *HostInfo) {
  534. n.hostMap.Lock()
  535. // Make sure the primary is still the same after the write lock. This avoids a race with a rehandshake.
  536. if n.hostMap.Hosts[current.vpnAddrs[0]] == primary {
  537. n.hostMap.unlockedMakePrimary(current)
  538. }
  539. n.hostMap.Unlock()
  540. }
  541. // isInvalidCertificate will check if we should destroy a tunnel if pki.disconnect_invalid is true and
  542. // the certificate is no longer valid. Block listed certificates will skip the pki.disconnect_invalid
  543. // check and return true.
  544. func (n *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool {
  545. remoteCert := hostinfo.GetCert()
  546. if remoteCert == nil {
  547. return false
  548. }
  549. caPool := n.intf.pki.GetCAPool()
  550. err := caPool.VerifyCachedCertificate(now, remoteCert)
  551. if err == nil {
  552. return false
  553. }
  554. if !n.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed {
  555. // Block listed certificates should always be disconnected
  556. return false
  557. }
  558. hostinfo.logger(n.l).WithError(err).
  559. WithField("fingerprint", remoteCert.Fingerprint).
  560. Info("Remote certificate is no longer valid, tearing down the tunnel")
  561. return true
  562. }
  563. func (n *connectionManager) sendPunch(hostinfo *HostInfo) {
  564. if !n.punchy.GetPunch() {
  565. // Punching is disabled
  566. return
  567. }
  568. if n.punchy.GetTargetEverything() {
  569. hostinfo.remotes.ForEach(n.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) {
  570. n.metricsTxPunchy.Inc(1)
  571. _ = n.intf.outside.WriteTo([]byte{1}, addr)
  572. })
  573. } else if hostinfo.remote.IsValid() {
  574. n.metricsTxPunchy.Inc(1)
  575. _ = n.intf.outside.WriteTo([]byte{1}, hostinfo.remote)
  576. }
  577. }
  578. func (n *connectionManager) tryRehandshake(hostinfo *HostInfo) {
  579. cs := n.intf.pki.getCertState()
  580. curCrt := hostinfo.ConnectionState.myCert
  581. myCrt := cs.getCertificate(curCrt.Version())
  582. if curCrt.Version() >= cs.initiatingVersion && bytes.Equal(curCrt.Signature(), myCrt.Signature()) == true {
  583. // The current tunnel is using the latest certificate and version, no need to rehandshake.
  584. return
  585. }
  586. n.l.WithField("vpnAddrs", hostinfo.vpnAddrs).
  587. WithField("reason", "local certificate is not current").
  588. Info("Re-handshaking with remote")
  589. n.intf.handshakeManager.StartHandshake(hostinfo.vpnAddrs[0], nil)
  590. }