connection_manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. package nebula
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "net/netip"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/rcrowley/go-metrics"
  12. "github.com/sirupsen/logrus"
  13. "github.com/slackhq/nebula/cert"
  14. "github.com/slackhq/nebula/config"
  15. "github.com/slackhq/nebula/header"
  16. )
  17. type trafficDecision int
  18. const (
  19. doNothing trafficDecision = 0
  20. deleteTunnel trafficDecision = 1 // delete the hostinfo on our side, do not notify the remote
  21. closeTunnel trafficDecision = 2 // delete the hostinfo and notify the remote
  22. swapPrimary trafficDecision = 3
  23. migrateRelays trafficDecision = 4
  24. tryRehandshake trafficDecision = 5
  25. sendTestPacket trafficDecision = 6
  26. )
  27. type connectionManager struct {
  28. // relayUsed holds which relay localIndexs are in use
  29. relayUsed map[uint32]struct{}
  30. relayUsedLock *sync.RWMutex
  31. hostMap *HostMap
  32. trafficTimer *LockingTimerWheel[uint32]
  33. intf *Interface
  34. punchy *Punchy
  35. // Configuration settings
  36. checkInterval time.Duration
  37. pendingDeletionInterval time.Duration
  38. inactivityTimeout atomic.Int64
  39. dropInactive atomic.Bool
  40. metricsTxPunchy metrics.Counter
  41. l *logrus.Logger
  42. }
  43. func newConnectionManagerFromConfig(l *logrus.Logger, c *config.C, hm *HostMap, p *Punchy) *connectionManager {
  44. cm := &connectionManager{
  45. hostMap: hm,
  46. l: l,
  47. punchy: p,
  48. relayUsed: make(map[uint32]struct{}),
  49. relayUsedLock: &sync.RWMutex{},
  50. metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil),
  51. }
  52. cm.reload(c, true)
  53. c.RegisterReloadCallback(func(c *config.C) {
  54. cm.reload(c, false)
  55. })
  56. return cm
  57. }
  58. func (cm *connectionManager) reload(c *config.C, initial bool) {
  59. if initial {
  60. cm.checkInterval = time.Duration(c.GetInt("timers.connection_alive_interval", 5)) * time.Second
  61. cm.pendingDeletionInterval = time.Duration(c.GetInt("timers.pending_deletion_interval", 10)) * time.Second
  62. // We want at least a minimum resolution of 500ms per tick so that we can hit these intervals
  63. // pretty close to their configured duration.
  64. // The inactivity duration is checked each time a hostinfo ticks through so we don't need the wheel to contain it.
  65. minDuration := min(time.Millisecond*500, cm.checkInterval, cm.pendingDeletionInterval)
  66. maxDuration := max(cm.checkInterval, cm.pendingDeletionInterval)
  67. cm.trafficTimer = NewLockingTimerWheel[uint32](minDuration, maxDuration)
  68. }
  69. if initial || c.HasChanged("tunnels.inactivity_timeout") {
  70. old := cm.getInactivityTimeout()
  71. cm.inactivityTimeout.Store((int64)(c.GetDuration("tunnels.inactivity_timeout", 10*time.Minute)))
  72. if !initial {
  73. cm.l.WithField("oldDuration", old).
  74. WithField("newDuration", cm.getInactivityTimeout()).
  75. Info("Inactivity timeout has changed")
  76. }
  77. }
  78. if initial || c.HasChanged("tunnels.drop_inactive") {
  79. old := cm.dropInactive.Load()
  80. cm.dropInactive.Store(c.GetBool("tunnels.drop_inactive", false))
  81. if !initial {
  82. cm.l.WithField("oldBool", old).
  83. WithField("newBool", cm.dropInactive.Load()).
  84. Info("Drop inactive setting has changed")
  85. }
  86. }
  87. }
  88. func (cm *connectionManager) getInactivityTimeout() time.Duration {
  89. return (time.Duration)(cm.inactivityTimeout.Load())
  90. }
  91. func (cm *connectionManager) In(h *HostInfo) {
  92. h.in.Store(true)
  93. }
  94. func (cm *connectionManager) Out(h *HostInfo) {
  95. h.out.Store(true)
  96. }
  97. func (cm *connectionManager) RelayUsed(localIndex uint32) {
  98. cm.relayUsedLock.RLock()
  99. // If this already exists, return
  100. if _, ok := cm.relayUsed[localIndex]; ok {
  101. cm.relayUsedLock.RUnlock()
  102. return
  103. }
  104. cm.relayUsedLock.RUnlock()
  105. cm.relayUsedLock.Lock()
  106. cm.relayUsed[localIndex] = struct{}{}
  107. cm.relayUsedLock.Unlock()
  108. }
  109. // getAndResetTrafficCheck returns if there was any inbound or outbound traffic within the last tick and
  110. // resets the state for this local index
  111. func (cm *connectionManager) getAndResetTrafficCheck(h *HostInfo, now time.Time) (bool, bool) {
  112. in := h.in.Swap(false)
  113. out := h.out.Swap(false)
  114. if in || out {
  115. h.lastUsed = now
  116. }
  117. return in, out
  118. }
  119. // AddTrafficWatch must be called for every new HostInfo.
  120. // We will continue to monitor the HostInfo until the tunnel is dropped.
  121. func (cm *connectionManager) AddTrafficWatch(h *HostInfo) {
  122. if h.out.Swap(true) == false {
  123. cm.trafficTimer.Add(h.localIndexId, cm.checkInterval)
  124. }
  125. }
  126. func (cm *connectionManager) Start(ctx context.Context) {
  127. clockSource := time.NewTicker(cm.trafficTimer.t.tickDuration)
  128. defer clockSource.Stop()
  129. p := []byte("")
  130. nb := make([]byte, 12, 12)
  131. out := make([]byte, mtu)
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. return
  136. case now := <-clockSource.C:
  137. cm.trafficTimer.Advance(now)
  138. for {
  139. localIndex, has := cm.trafficTimer.Purge()
  140. if !has {
  141. break
  142. }
  143. cm.doTrafficCheck(localIndex, p, nb, out, now)
  144. }
  145. }
  146. }
  147. }
  148. func (cm *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) {
  149. decision, hostinfo, primary := cm.makeTrafficDecision(localIndex, now)
  150. switch decision {
  151. case deleteTunnel:
  152. if cm.hostMap.DeleteHostInfo(hostinfo) {
  153. // Only clearing the lighthouse cache if this is the last hostinfo for this vpn ip in the hostmap
  154. cm.intf.lightHouse.DeleteVpnAddrs(hostinfo.vpnAddrs)
  155. }
  156. case closeTunnel:
  157. cm.intf.sendCloseTunnel(hostinfo)
  158. cm.intf.closeTunnel(hostinfo)
  159. case swapPrimary:
  160. cm.swapPrimary(hostinfo, primary)
  161. case migrateRelays:
  162. cm.migrateRelayUsed(hostinfo, primary)
  163. case tryRehandshake:
  164. cm.tryRehandshake(hostinfo)
  165. case sendTestPacket:
  166. cm.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
  167. }
  168. cm.resetRelayTrafficCheck(hostinfo)
  169. }
  170. func (cm *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) {
  171. if hostinfo != nil {
  172. cm.relayUsedLock.Lock()
  173. defer cm.relayUsedLock.Unlock()
  174. // No need to migrate any relays, delete usage info now.
  175. for _, idx := range hostinfo.relayState.CopyRelayForIdxs() {
  176. delete(cm.relayUsed, idx)
  177. }
  178. }
  179. }
  180. func (cm *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) {
  181. relayFor := oldhostinfo.relayState.CopyAllRelayFor()
  182. for _, r := range relayFor {
  183. existing, ok := newhostinfo.relayState.QueryRelayForByIp(r.PeerAddr)
  184. var index uint32
  185. var relayFrom netip.Addr
  186. var relayTo netip.Addr
  187. switch {
  188. case ok:
  189. switch existing.State {
  190. case Established, PeerRequested, Disestablished:
  191. // This relay already exists in newhostinfo, then do nothing.
  192. continue
  193. case Requested:
  194. // The relay exists in a Requested state; re-send the request
  195. index = existing.LocalIndex
  196. switch r.Type {
  197. case TerminalType:
  198. relayFrom = cm.intf.myVpnAddrs[0]
  199. relayTo = existing.PeerAddr
  200. case ForwardingType:
  201. relayFrom = existing.PeerAddr
  202. relayTo = newhostinfo.vpnAddrs[0]
  203. default:
  204. // should never happen
  205. panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type))
  206. }
  207. }
  208. case !ok:
  209. cm.relayUsedLock.RLock()
  210. if _, relayUsed := cm.relayUsed[r.LocalIndex]; !relayUsed {
  211. // The relay hasn't been used; don't migrate it.
  212. cm.relayUsedLock.RUnlock()
  213. continue
  214. }
  215. cm.relayUsedLock.RUnlock()
  216. // The relay doesn't exist at all; create some relay state and send the request.
  217. var err error
  218. index, err = AddRelay(cm.l, newhostinfo, cm.hostMap, r.PeerAddr, nil, r.Type, Requested)
  219. if err != nil {
  220. cm.l.WithError(err).Error("failed to migrate relay to new hostinfo")
  221. continue
  222. }
  223. switch r.Type {
  224. case TerminalType:
  225. relayFrom = cm.intf.myVpnAddrs[0]
  226. relayTo = r.PeerAddr
  227. case ForwardingType:
  228. relayFrom = r.PeerAddr
  229. relayTo = newhostinfo.vpnAddrs[0]
  230. default:
  231. // should never happen
  232. panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type))
  233. }
  234. }
  235. // Send a CreateRelayRequest to the peer.
  236. req := NebulaControl{
  237. Type: NebulaControl_CreateRelayRequest,
  238. InitiatorRelayIndex: index,
  239. }
  240. switch newhostinfo.GetCert().Certificate.Version() {
  241. case cert.Version1:
  242. if !relayFrom.Is4() {
  243. cm.l.Error("can not migrate v1 relay with a v6 network because the relay is not running a current nebula version")
  244. continue
  245. }
  246. if !relayTo.Is4() {
  247. cm.l.Error("can not migrate v1 relay with a v6 remote network because the relay is not running a current nebula version")
  248. continue
  249. }
  250. b := relayFrom.As4()
  251. req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  252. b = relayTo.As4()
  253. req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  254. case cert.Version2:
  255. req.RelayFromAddr = netAddrToProtoAddr(relayFrom)
  256. req.RelayToAddr = netAddrToProtoAddr(relayTo)
  257. default:
  258. newhostinfo.logger(cm.l).Error("Unknown certificate version found while attempting to migrate relay")
  259. continue
  260. }
  261. msg, err := req.Marshal()
  262. if err != nil {
  263. cm.l.WithError(err).Error("failed to marshal Control message to migrate relay")
  264. } else {
  265. cm.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu))
  266. cm.l.WithFields(logrus.Fields{
  267. "relayFrom": req.RelayFromAddr,
  268. "relayTo": req.RelayToAddr,
  269. "initiatorRelayIndex": req.InitiatorRelayIndex,
  270. "responderRelayIndex": req.ResponderRelayIndex,
  271. "vpnAddrs": newhostinfo.vpnAddrs}).
  272. Info("send CreateRelayRequest")
  273. }
  274. }
  275. }
  276. func (cm *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
  277. // Read lock the main hostmap to order decisions based on tunnels being the primary tunnel
  278. cm.hostMap.RLock()
  279. defer cm.hostMap.RUnlock()
  280. hostinfo := cm.hostMap.Indexes[localIndex]
  281. if hostinfo == nil {
  282. cm.l.WithField("localIndex", localIndex).Debugln("Not found in hostmap")
  283. return doNothing, nil, nil
  284. }
  285. if cm.isInvalidCertificate(now, hostinfo) {
  286. return closeTunnel, hostinfo, nil
  287. }
  288. primary := cm.hostMap.Hosts[hostinfo.vpnAddrs[0]]
  289. mainHostInfo := true
  290. if primary != nil && primary != hostinfo {
  291. mainHostInfo = false
  292. }
  293. // Check for traffic on this hostinfo
  294. inTraffic, outTraffic := cm.getAndResetTrafficCheck(hostinfo, now)
  295. // A hostinfo is determined alive if there is incoming traffic
  296. if inTraffic {
  297. decision := doNothing
  298. if cm.l.Level >= logrus.DebugLevel {
  299. hostinfo.logger(cm.l).
  300. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  301. Debug("Tunnel status")
  302. }
  303. hostinfo.pendingDeletion.Store(false)
  304. if mainHostInfo {
  305. decision = tryRehandshake
  306. } else {
  307. if cm.shouldSwapPrimary(hostinfo, primary) {
  308. decision = swapPrimary
  309. } else {
  310. // migrate the relays to the primary, if in use.
  311. decision = migrateRelays
  312. }
  313. }
  314. cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval)
  315. if !outTraffic {
  316. // Send a punch packet to keep the NAT state alive
  317. cm.sendPunch(hostinfo)
  318. }
  319. return decision, hostinfo, primary
  320. }
  321. if hostinfo.pendingDeletion.Load() {
  322. // We have already sent a test packet and nothing was returned, this hostinfo is dead
  323. hostinfo.logger(cm.l).
  324. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  325. Info("Tunnel status")
  326. return deleteTunnel, hostinfo, nil
  327. }
  328. decision := doNothing
  329. if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo {
  330. if !outTraffic {
  331. inactiveFor, isInactive := cm.isInactive(hostinfo, now)
  332. if isInactive {
  333. // Tunnel is inactive, tear it down
  334. hostinfo.logger(cm.l).
  335. WithField("inactiveDuration", inactiveFor).
  336. WithField("primary", mainHostInfo).
  337. Info("Dropping tunnel due to inactivity")
  338. return closeTunnel, hostinfo, primary
  339. }
  340. // If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel.
  341. // Just maintain NAT state if configured to do so.
  342. cm.sendPunch(hostinfo)
  343. cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval)
  344. return doNothing, nil, nil
  345. }
  346. if cm.punchy.GetTargetEverything() {
  347. // This is similar to the old punchy behavior with a slight optimization.
  348. // We aren't receiving traffic but we are sending it, punch on all known
  349. // ips in case we need to re-prime NAT state
  350. cm.sendPunch(hostinfo)
  351. }
  352. if cm.l.Level >= logrus.DebugLevel {
  353. hostinfo.logger(cm.l).
  354. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  355. Debug("Tunnel status")
  356. }
  357. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  358. decision = sendTestPacket
  359. } else {
  360. if cm.l.Level >= logrus.DebugLevel {
  361. hostinfo.logger(cm.l).Debugf("Hostinfo sadness")
  362. }
  363. }
  364. hostinfo.pendingDeletion.Store(true)
  365. cm.trafficTimer.Add(hostinfo.localIndexId, cm.pendingDeletionInterval)
  366. return decision, hostinfo, nil
  367. }
  368. func (cm *connectionManager) isInactive(hostinfo *HostInfo, now time.Time) (time.Duration, bool) {
  369. if cm.dropInactive.Load() == false {
  370. // We aren't configured to drop inactive tunnels
  371. return 0, false
  372. }
  373. inactiveDuration := now.Sub(hostinfo.lastUsed)
  374. if inactiveDuration < cm.getInactivityTimeout() {
  375. // It's not considered inactive
  376. return inactiveDuration, false
  377. }
  378. // The tunnel is inactive
  379. return inactiveDuration, true
  380. }
  381. func (cm *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool {
  382. // The primary tunnel is the most recent handshake to complete locally and should work entirely fine.
  383. // If we are here then we have multiple tunnels for a host pair and neither side believes the same tunnel is primary.
  384. // Let's sort this out.
  385. // Only one side should swap because if both swap then we may never resolve to a single tunnel.
  386. // vpn addr is static across all tunnels for this host pair so lets
  387. // use that to determine if we should consider swapping.
  388. if current.vpnAddrs[0].Compare(cm.intf.myVpnAddrs[0]) < 0 {
  389. // Their primary vpn addr is less than mine. Do not swap.
  390. return false
  391. }
  392. crt := cm.intf.pki.getCertState().getCertificate(current.ConnectionState.myCert.Version())
  393. // If this tunnel is using the latest certificate then we should swap it to primary for a bit and see if things
  394. // settle down.
  395. return bytes.Equal(current.ConnectionState.myCert.Signature(), crt.Signature())
  396. }
  397. func (cm *connectionManager) swapPrimary(current, primary *HostInfo) {
  398. cm.hostMap.Lock()
  399. // Make sure the primary is still the same after the write lock. This avoids a race with a rehandshake.
  400. if cm.hostMap.Hosts[current.vpnAddrs[0]] == primary {
  401. cm.hostMap.unlockedMakePrimary(current)
  402. }
  403. cm.hostMap.Unlock()
  404. }
  405. // isInvalidCertificate will check if we should destroy a tunnel if pki.disconnect_invalid is true and
  406. // the certificate is no longer valid. Block listed certificates will skip the pki.disconnect_invalid
  407. // check and return true.
  408. func (cm *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool {
  409. remoteCert := hostinfo.GetCert()
  410. if remoteCert == nil {
  411. return false
  412. }
  413. caPool := cm.intf.pki.GetCAPool()
  414. err := caPool.VerifyCachedCertificate(now, remoteCert)
  415. if err == nil {
  416. return false
  417. }
  418. if !cm.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed {
  419. // Block listed certificates should always be disconnected
  420. return false
  421. }
  422. hostinfo.logger(cm.l).WithError(err).
  423. WithField("fingerprint", remoteCert.Fingerprint).
  424. Info("Remote certificate is no longer valid, tearing down the tunnel")
  425. return true
  426. }
  427. func (cm *connectionManager) sendPunch(hostinfo *HostInfo) {
  428. if !cm.punchy.GetPunch() {
  429. // Punching is disabled
  430. return
  431. }
  432. if cm.intf.lightHouse.IsAnyLighthouseAddr(hostinfo.vpnAddrs) {
  433. // Do not punch to lighthouses, we assume our lighthouse update interval is good enough.
  434. // In the event the update interval is not sufficient to maintain NAT state then a publicly available lighthouse
  435. // would lose the ability to notify us and punchy.respond would become unreliable.
  436. return
  437. }
  438. if cm.punchy.GetTargetEverything() {
  439. hostinfo.remotes.ForEach(cm.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) {
  440. cm.metricsTxPunchy.Inc(1)
  441. cm.intf.outside.WriteTo([]byte{1}, addr)
  442. })
  443. } else if hostinfo.remote.IsValid() {
  444. cm.metricsTxPunchy.Inc(1)
  445. cm.intf.outside.WriteTo([]byte{1}, hostinfo.remote)
  446. }
  447. }
  448. func (cm *connectionManager) tryRehandshake(hostinfo *HostInfo) {
  449. cs := cm.intf.pki.getCertState()
  450. curCrt := hostinfo.ConnectionState.myCert
  451. myCrt := cs.getCertificate(curCrt.Version())
  452. if curCrt.Version() >= cs.initiatingVersion && bytes.Equal(curCrt.Signature(), myCrt.Signature()) == true {
  453. // The current tunnel is using the latest certificate and version, no need to rehandshake.
  454. return
  455. }
  456. cm.l.WithField("vpnAddrs", hostinfo.vpnAddrs).
  457. WithField("reason", "local certificate is not current").
  458. Info("Re-handshaking with remote")
  459. cm.intf.handshakeManager.StartHandshake(hostinfo.vpnAddrs[0], nil)
  460. }