connection_manager.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. package nebula
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "net/netip"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/rcrowley/go-metrics"
  12. "github.com/sirupsen/logrus"
  13. "github.com/slackhq/nebula/cert"
  14. "github.com/slackhq/nebula/config"
  15. "github.com/slackhq/nebula/header"
  16. )
  17. type trafficDecision int
  18. const (
  19. doNothing trafficDecision = 0
  20. deleteTunnel trafficDecision = 1 // delete the hostinfo on our side, do not notify the remote
  21. closeTunnel trafficDecision = 2 // delete the hostinfo and notify the remote
  22. swapPrimary trafficDecision = 3
  23. migrateRelays trafficDecision = 4
  24. tryRehandshake trafficDecision = 5
  25. sendTestPacket trafficDecision = 6
  26. )
  27. type connectionManager struct {
  28. // relayUsed holds which relay localIndexs are in use
  29. relayUsed map[uint32]struct{}
  30. relayUsedLock *sync.RWMutex
  31. hostMap *HostMap
  32. trafficTimer *LockingTimerWheel[uint32]
  33. intf *Interface
  34. punchy *Punchy
  35. // Configuration settings
  36. checkInterval time.Duration
  37. pendingDeletionInterval time.Duration
  38. inactivityTimeout atomic.Int64
  39. dropInactive atomic.Bool
  40. metricsTxPunchy metrics.Counter
  41. l *logrus.Logger
  42. }
  43. func newConnectionManagerFromConfig(l *logrus.Logger, c *config.C, hm *HostMap, p *Punchy) *connectionManager {
  44. cm := &connectionManager{
  45. hostMap: hm,
  46. l: l,
  47. punchy: p,
  48. relayUsed: make(map[uint32]struct{}),
  49. relayUsedLock: &sync.RWMutex{},
  50. metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil),
  51. }
  52. cm.reload(c, true)
  53. c.RegisterReloadCallback(func(c *config.C) {
  54. cm.reload(c, false)
  55. })
  56. return cm
  57. }
  58. func (cm *connectionManager) reload(c *config.C, initial bool) {
  59. if initial {
  60. cm.checkInterval = time.Duration(c.GetInt("timers.connection_alive_interval", 5)) * time.Second
  61. cm.pendingDeletionInterval = time.Duration(c.GetInt("timers.pending_deletion_interval", 10)) * time.Second
  62. // We want at least a minimum resolution of 500ms per tick so that we can hit these intervals
  63. // pretty close to their configured duration.
  64. // The inactivity duration is checked each time a hostinfo ticks through so we don't need the wheel to contain it.
  65. minDuration := min(time.Millisecond*500, cm.checkInterval, cm.pendingDeletionInterval)
  66. maxDuration := max(cm.checkInterval, cm.pendingDeletionInterval)
  67. cm.trafficTimer = NewLockingTimerWheel[uint32](minDuration, maxDuration)
  68. }
  69. if initial || c.HasChanged("tunnels.inactivity_timeout") {
  70. old := cm.getInactivityTimeout()
  71. cm.inactivityTimeout.Store((int64)(c.GetDuration("tunnels.inactivity_timeout", 10*time.Minute)))
  72. if !initial {
  73. cm.l.WithField("oldDuration", old).
  74. WithField("newDuration", cm.getInactivityTimeout()).
  75. Info("Inactivity timeout has changed")
  76. }
  77. }
  78. if initial || c.HasChanged("tunnels.drop_inactive") {
  79. old := cm.dropInactive.Load()
  80. cm.dropInactive.Store(c.GetBool("tunnels.drop_inactive", false))
  81. if !initial {
  82. cm.l.WithField("oldBool", old).
  83. WithField("newBool", cm.dropInactive.Load()).
  84. Info("Drop inactive setting has changed")
  85. }
  86. }
  87. }
  88. func (cm *connectionManager) getInactivityTimeout() time.Duration {
  89. return (time.Duration)(cm.inactivityTimeout.Load())
  90. }
  91. func (cm *connectionManager) In(h *HostInfo) {
  92. h.in.Store(true)
  93. }
  94. func (cm *connectionManager) Out(h *HostInfo) {
  95. h.out.Store(true)
  96. }
  97. func (cm *connectionManager) RelayUsed(localIndex uint32) {
  98. cm.relayUsedLock.RLock()
  99. // If this already exists, return
  100. if _, ok := cm.relayUsed[localIndex]; ok {
  101. cm.relayUsedLock.RUnlock()
  102. return
  103. }
  104. cm.relayUsedLock.RUnlock()
  105. cm.relayUsedLock.Lock()
  106. cm.relayUsed[localIndex] = struct{}{}
  107. cm.relayUsedLock.Unlock()
  108. }
  109. // getAndResetTrafficCheck returns if there was any inbound or outbound traffic within the last tick and
  110. // resets the state for this local index
  111. func (cm *connectionManager) getAndResetTrafficCheck(h *HostInfo, now time.Time) (bool, bool) {
  112. in := h.in.Swap(false)
  113. out := h.out.Swap(false)
  114. if in || out {
  115. h.lastUsed = now
  116. }
  117. return in, out
  118. }
  119. // AddTrafficWatch must be called for every new HostInfo.
  120. // We will continue to monitor the HostInfo until the tunnel is dropped.
  121. func (cm *connectionManager) AddTrafficWatch(h *HostInfo) {
  122. if h.out.Swap(true) == false {
  123. cm.trafficTimer.Add(h.localIndexId, cm.checkInterval)
  124. }
  125. }
  126. func (cm *connectionManager) Start(ctx context.Context) {
  127. clockSource := time.NewTicker(cm.trafficTimer.t.tickDuration)
  128. defer clockSource.Stop()
  129. p := []byte("")
  130. nb := make([]byte, 12, 12)
  131. out := make([]byte, mtu)
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. return
  136. case now := <-clockSource.C:
  137. cm.trafficTimer.Advance(now)
  138. for {
  139. localIndex, has := cm.trafficTimer.Purge()
  140. if !has {
  141. break
  142. }
  143. cm.doTrafficCheck(localIndex, p, nb, out, now)
  144. }
  145. }
  146. }
  147. }
  148. func (cm *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) {
  149. decision, hostinfo, primary := cm.makeTrafficDecision(localIndex, now)
  150. switch decision {
  151. case deleteTunnel:
  152. if cm.hostMap.DeleteHostInfo(hostinfo) {
  153. // Only clearing the lighthouse cache if this is the last hostinfo for this vpn ip in the hostmap
  154. cm.intf.lightHouse.DeleteVpnIp(hostinfo.vpnIp)
  155. }
  156. case closeTunnel:
  157. cm.intf.sendCloseTunnel(hostinfo)
  158. cm.intf.closeTunnel(hostinfo)
  159. case swapPrimary:
  160. cm.swapPrimary(hostinfo, primary)
  161. case migrateRelays:
  162. cm.migrateRelayUsed(hostinfo, primary)
  163. case tryRehandshake:
  164. cm.tryRehandshake(hostinfo)
  165. case sendTestPacket:
  166. cm.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
  167. }
  168. cm.resetRelayTrafficCheck(hostinfo)
  169. }
  170. func (cm *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) {
  171. if hostinfo != nil {
  172. cm.relayUsedLock.Lock()
  173. defer cm.relayUsedLock.Unlock()
  174. // No need to migrate any relays, delete usage info now.
  175. for _, idx := range hostinfo.relayState.CopyRelayForIdxs() {
  176. delete(cm.relayUsed, idx)
  177. }
  178. }
  179. }
  180. func (cm *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) {
  181. relayFor := oldhostinfo.relayState.CopyAllRelayFor()
  182. for _, r := range relayFor {
  183. existing, ok := newhostinfo.relayState.QueryRelayForByIp(r.PeerIp)
  184. var index uint32
  185. var relayFrom netip.Addr
  186. var relayTo netip.Addr
  187. switch {
  188. case ok:
  189. switch existing.State {
  190. case Established, PeerRequested, Disestablished:
  191. // This relay already exists in newhostinfo, then do nothing.
  192. continue
  193. case Requested:
  194. // The relayed connection exists in a Requested state; re-send the request
  195. index = existing.LocalIndex
  196. switch r.Type {
  197. case TerminalType:
  198. relayFrom = cm.intf.myVpnNet.Addr()
  199. relayTo = existing.PeerIp
  200. case ForwardingType:
  201. relayFrom = existing.PeerIp
  202. relayTo = newhostinfo.vpnIp
  203. default:
  204. // should never happen
  205. panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type))
  206. }
  207. }
  208. case !ok:
  209. cm.relayUsedLock.RLock()
  210. if _, relayUsed := cm.relayUsed[r.LocalIndex]; !relayUsed {
  211. // The relay hasn't been used; don't migrate it.
  212. cm.relayUsedLock.RUnlock()
  213. continue
  214. }
  215. cm.relayUsedLock.RUnlock()
  216. // The relay doesn't exist at all; create some relay state and send the request.
  217. var err error
  218. index, err = AddRelay(cm.l, newhostinfo, cm.hostMap, r.PeerIp, nil, r.Type, Requested)
  219. if err != nil {
  220. cm.l.WithError(err).Error("failed to migrate relay to new hostinfo")
  221. continue
  222. }
  223. switch r.Type {
  224. case TerminalType:
  225. relayFrom = cm.intf.myVpnNet.Addr()
  226. relayTo = r.PeerIp
  227. case ForwardingType:
  228. relayFrom = r.PeerIp
  229. relayTo = newhostinfo.vpnIp
  230. default:
  231. // should never happen
  232. panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type))
  233. }
  234. }
  235. //TODO: IPV6-WORK
  236. relayFromB := relayFrom.As4()
  237. relayToB := relayTo.As4()
  238. // Send a CreateRelayRequest to the peer.
  239. req := NebulaControl{
  240. Type: NebulaControl_CreateRelayRequest,
  241. InitiatorRelayIndex: index,
  242. RelayFromIp: binary.BigEndian.Uint32(relayFromB[:]),
  243. RelayToIp: binary.BigEndian.Uint32(relayToB[:]),
  244. }
  245. msg, err := req.Marshal()
  246. if err != nil {
  247. cm.l.WithError(err).Error("failed to marshal Control message to migrate relay")
  248. } else {
  249. cm.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu))
  250. cm.l.WithFields(logrus.Fields{
  251. "relayFrom": req.RelayFromIp,
  252. "relayTo": req.RelayToIp,
  253. "initiatorRelayIndex": req.InitiatorRelayIndex,
  254. "responderRelayIndex": req.ResponderRelayIndex,
  255. "vpnIp": newhostinfo.vpnIp}).
  256. Info("send CreateRelayRequest")
  257. }
  258. }
  259. }
  260. func (cm *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
  261. // Read lock the main hostmap to order decisions based on tunnels being the primary tunnel
  262. cm.hostMap.RLock()
  263. defer cm.hostMap.RUnlock()
  264. hostinfo := cm.hostMap.Indexes[localIndex]
  265. if hostinfo == nil {
  266. cm.l.WithField("localIndex", localIndex).Debugln("Not found in hostmap")
  267. return doNothing, nil, nil
  268. }
  269. if cm.isInvalidCertificate(now, hostinfo) {
  270. return closeTunnel, hostinfo, nil
  271. }
  272. primary := cm.hostMap.Hosts[hostinfo.vpnIp]
  273. mainHostInfo := true
  274. if primary != nil && primary != hostinfo {
  275. mainHostInfo = false
  276. }
  277. // Check for traffic on this hostinfo
  278. inTraffic, outTraffic := cm.getAndResetTrafficCheck(hostinfo, now)
  279. // A hostinfo is determined alive if there is incoming traffic
  280. if inTraffic {
  281. decision := doNothing
  282. if cm.l.Level >= logrus.DebugLevel {
  283. hostinfo.logger(cm.l).
  284. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  285. Debug("Tunnel status")
  286. }
  287. hostinfo.pendingDeletion.Store(false)
  288. if mainHostInfo {
  289. decision = tryRehandshake
  290. } else {
  291. if cm.shouldSwapPrimary(hostinfo, primary) {
  292. decision = swapPrimary
  293. } else {
  294. // migrate the relays to the primary, if in use.
  295. decision = migrateRelays
  296. }
  297. }
  298. cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval)
  299. if !outTraffic {
  300. // Send a punch packet to keep the NAT state alive
  301. cm.sendPunch(hostinfo)
  302. }
  303. return decision, hostinfo, primary
  304. }
  305. if hostinfo.pendingDeletion.Load() {
  306. // We have already sent a test packet and nothing was returned, this hostinfo is dead
  307. hostinfo.logger(cm.l).
  308. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  309. Info("Tunnel status")
  310. return deleteTunnel, hostinfo, nil
  311. }
  312. decision := doNothing
  313. if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo {
  314. if !outTraffic {
  315. inactiveFor, isInactive := cm.isInactive(hostinfo, now)
  316. if isInactive {
  317. // Tunnel is inactive, tear it down
  318. hostinfo.logger(cm.l).
  319. WithField("inactiveDuration", inactiveFor).
  320. WithField("primary", mainHostInfo).
  321. Info("Dropping tunnel due to inactivity")
  322. return closeTunnel, hostinfo, primary
  323. }
  324. // If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel.
  325. // Just maintain NAT state if configured to do so.
  326. cm.sendPunch(hostinfo)
  327. cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval)
  328. return doNothing, nil, nil
  329. }
  330. if cm.punchy.GetTargetEverything() {
  331. // This is similar to the old punchy behavior with a slight optimization.
  332. // We aren't receiving traffic but we are sending it, punch on all known
  333. // ips in case we need to re-prime NAT state
  334. cm.sendPunch(hostinfo)
  335. }
  336. if cm.l.Level >= logrus.DebugLevel {
  337. hostinfo.logger(cm.l).
  338. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  339. Debug("Tunnel status")
  340. }
  341. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  342. decision = sendTestPacket
  343. } else {
  344. if cm.l.Level >= logrus.DebugLevel {
  345. hostinfo.logger(cm.l).Debugf("Hostinfo sadness")
  346. }
  347. }
  348. hostinfo.pendingDeletion.Store(true)
  349. cm.trafficTimer.Add(hostinfo.localIndexId, cm.pendingDeletionInterval)
  350. return decision, hostinfo, nil
  351. }
  352. func (cm *connectionManager) isInactive(hostinfo *HostInfo, now time.Time) (time.Duration, bool) {
  353. if cm.dropInactive.Load() == false {
  354. // We aren't configured to drop inactive tunnels
  355. return 0, false
  356. }
  357. inactiveDuration := now.Sub(hostinfo.lastUsed)
  358. if inactiveDuration < cm.getInactivityTimeout() {
  359. // It's not considered inactive
  360. return inactiveDuration, false
  361. }
  362. // The tunnel is inactive
  363. return inactiveDuration, true
  364. }
  365. func (cm *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool {
  366. // The primary tunnel is the most recent handshake to complete locally and should work entirely fine.
  367. // If we are here then we have multiple tunnels for a host pair and neither side believes the same tunnel is primary.
  368. // Let's sort this out.
  369. if current.vpnIp.Compare(cm.intf.myVpnNet.Addr()) < 0 {
  370. // Only one side should flip primary because if both flip then we may never resolve to a single tunnel.
  371. // vpn ip is static across all tunnels for this host pair so lets use that to determine who is flipping.
  372. // The remotes vpn ip is lower than mine. I will not flip.
  373. return false
  374. }
  375. certState := cm.intf.pki.GetCertState()
  376. return bytes.Equal(current.ConnectionState.myCert.Signature, certState.Certificate.Signature)
  377. }
  378. func (cm *connectionManager) swapPrimary(current, primary *HostInfo) {
  379. cm.hostMap.Lock()
  380. // Make sure the primary is still the same after the write lock. This avoids a race with a rehandshake.
  381. if cm.hostMap.Hosts[current.vpnIp] == primary {
  382. cm.hostMap.unlockedMakePrimary(current)
  383. }
  384. cm.hostMap.Unlock()
  385. }
  386. // isInvalidCertificate will check if we should destroy a tunnel if pki.disconnect_invalid is true and
  387. // the certificate is no longer valid. Block listed certificates will skip the pki.disconnect_invalid
  388. // check and return true.
  389. func (cm *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool {
  390. remoteCert := hostinfo.GetCert()
  391. if remoteCert == nil {
  392. return false
  393. }
  394. valid, err := remoteCert.VerifyWithCache(now, cm.intf.pki.GetCAPool())
  395. if valid {
  396. return false
  397. }
  398. if !cm.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed {
  399. // Block listed certificates should always be disconnected
  400. return false
  401. }
  402. fingerprint, _ := remoteCert.Sha256Sum()
  403. hostinfo.logger(cm.l).WithError(err).
  404. WithField("fingerprint", fingerprint).
  405. Info("Remote certificate is no longer valid, tearing down the tunnel")
  406. return true
  407. }
  408. func (cm *connectionManager) sendPunch(hostinfo *HostInfo) {
  409. if !cm.punchy.GetPunch() {
  410. // Punching is disabled
  411. return
  412. }
  413. if cm.intf.lightHouse.IsLighthouseIP(hostinfo.vpnIp) {
  414. // Do not punch to lighthouses, we assume our lighthouse update interval is good enough.
  415. // In the event the update interval is not sufficient to maintain NAT state then a publicly available lighthouse
  416. // would lose the ability to notify us and punchy.respond would become unreliable.
  417. return
  418. }
  419. if cm.punchy.GetTargetEverything() {
  420. hostinfo.remotes.ForEach(cm.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) {
  421. cm.metricsTxPunchy.Inc(1)
  422. cm.intf.outside.WriteTo([]byte{1}, addr)
  423. })
  424. } else if hostinfo.remote.IsValid() {
  425. cm.metricsTxPunchy.Inc(1)
  426. cm.intf.outside.WriteTo([]byte{1}, hostinfo.remote)
  427. }
  428. }
  429. func (cm *connectionManager) tryRehandshake(hostinfo *HostInfo) {
  430. certState := cm.intf.pki.GetCertState()
  431. if bytes.Equal(hostinfo.ConnectionState.myCert.Signature, certState.Certificate.Signature) {
  432. return
  433. }
  434. cm.l.WithField("vpnIp", hostinfo.vpnIp).
  435. WithField("reason", "local certificate is not current").
  436. Info("Re-handshaking with remote")
  437. cm.intf.handshakeManager.StartHandshake(hostinfo.vpnIp, nil)
  438. }