connection_manager.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package nebula
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/sirupsen/logrus"
  7. "github.com/slackhq/nebula/header"
  8. "github.com/slackhq/nebula/iputil"
  9. )
  10. // TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
  11. // and something like every 10 packets we could lock, send 10, then unlock for a moment
  12. type connectionManager struct {
  13. hostMap *HostMap
  14. in map[iputil.VpnIp]struct{}
  15. inLock *sync.RWMutex
  16. out map[iputil.VpnIp]struct{}
  17. outLock *sync.RWMutex
  18. TrafficTimer *LockingTimerWheel[iputil.VpnIp]
  19. intf *Interface
  20. pendingDeletion map[iputil.VpnIp]int
  21. pendingDeletionLock *sync.RWMutex
  22. pendingDeletionTimer *LockingTimerWheel[iputil.VpnIp]
  23. checkInterval int
  24. pendingDeletionInterval int
  25. l *logrus.Logger
  26. // I wanted to call one matLock
  27. }
  28. func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
  29. nc := &connectionManager{
  30. hostMap: intf.hostMap,
  31. in: make(map[iputil.VpnIp]struct{}),
  32. inLock: &sync.RWMutex{},
  33. out: make(map[iputil.VpnIp]struct{}),
  34. outLock: &sync.RWMutex{},
  35. TrafficTimer: NewLockingTimerWheel[iputil.VpnIp](time.Millisecond*500, time.Second*60),
  36. intf: intf,
  37. pendingDeletion: make(map[iputil.VpnIp]int),
  38. pendingDeletionLock: &sync.RWMutex{},
  39. pendingDeletionTimer: NewLockingTimerWheel[iputil.VpnIp](time.Millisecond*500, time.Second*60),
  40. checkInterval: checkInterval,
  41. pendingDeletionInterval: pendingDeletionInterval,
  42. l: l,
  43. }
  44. nc.Start(ctx)
  45. return nc
  46. }
  47. func (n *connectionManager) In(ip iputil.VpnIp) {
  48. n.inLock.RLock()
  49. // If this already exists, return
  50. if _, ok := n.in[ip]; ok {
  51. n.inLock.RUnlock()
  52. return
  53. }
  54. n.inLock.RUnlock()
  55. n.inLock.Lock()
  56. n.in[ip] = struct{}{}
  57. n.inLock.Unlock()
  58. }
  59. func (n *connectionManager) Out(ip iputil.VpnIp) {
  60. n.outLock.RLock()
  61. // If this already exists, return
  62. if _, ok := n.out[ip]; ok {
  63. n.outLock.RUnlock()
  64. return
  65. }
  66. n.outLock.RUnlock()
  67. n.outLock.Lock()
  68. // double check since we dropped the lock temporarily
  69. if _, ok := n.out[ip]; ok {
  70. n.outLock.Unlock()
  71. return
  72. }
  73. n.out[ip] = struct{}{}
  74. n.AddTrafficWatch(ip, n.checkInterval)
  75. n.outLock.Unlock()
  76. }
  77. func (n *connectionManager) CheckIn(vpnIp iputil.VpnIp) bool {
  78. n.inLock.RLock()
  79. if _, ok := n.in[vpnIp]; ok {
  80. n.inLock.RUnlock()
  81. return true
  82. }
  83. n.inLock.RUnlock()
  84. return false
  85. }
  86. func (n *connectionManager) ClearIP(ip iputil.VpnIp) {
  87. n.inLock.Lock()
  88. n.outLock.Lock()
  89. delete(n.in, ip)
  90. delete(n.out, ip)
  91. n.inLock.Unlock()
  92. n.outLock.Unlock()
  93. }
  94. func (n *connectionManager) ClearPendingDeletion(ip iputil.VpnIp) {
  95. n.pendingDeletionLock.Lock()
  96. delete(n.pendingDeletion, ip)
  97. n.pendingDeletionLock.Unlock()
  98. }
  99. func (n *connectionManager) AddPendingDeletion(ip iputil.VpnIp) {
  100. n.pendingDeletionLock.Lock()
  101. if _, ok := n.pendingDeletion[ip]; ok {
  102. n.pendingDeletion[ip] += 1
  103. } else {
  104. n.pendingDeletion[ip] = 0
  105. }
  106. n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
  107. n.pendingDeletionLock.Unlock()
  108. }
  109. func (n *connectionManager) checkPendingDeletion(ip iputil.VpnIp) bool {
  110. n.pendingDeletionLock.RLock()
  111. if _, ok := n.pendingDeletion[ip]; ok {
  112. n.pendingDeletionLock.RUnlock()
  113. return true
  114. }
  115. n.pendingDeletionLock.RUnlock()
  116. return false
  117. }
  118. func (n *connectionManager) AddTrafficWatch(vpnIp iputil.VpnIp, seconds int) {
  119. n.TrafficTimer.Add(vpnIp, time.Second*time.Duration(seconds))
  120. }
  121. func (n *connectionManager) Start(ctx context.Context) {
  122. go n.Run(ctx)
  123. }
  124. func (n *connectionManager) Run(ctx context.Context) {
  125. clockSource := time.NewTicker(500 * time.Millisecond)
  126. defer clockSource.Stop()
  127. p := []byte("")
  128. nb := make([]byte, 12, 12)
  129. out := make([]byte, mtu)
  130. for {
  131. select {
  132. case <-ctx.Done():
  133. return
  134. case now := <-clockSource.C:
  135. n.HandleMonitorTick(now, p, nb, out)
  136. n.HandleDeletionTick(now)
  137. }
  138. }
  139. }
  140. func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte) {
  141. n.TrafficTimer.Advance(now)
  142. for {
  143. vpnIp, has := n.TrafficTimer.Purge()
  144. if !has {
  145. break
  146. }
  147. // Check for traffic coming back in from this host.
  148. traf := n.CheckIn(vpnIp)
  149. hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
  150. if err != nil {
  151. n.l.Debugf("Not found in hostmap: %s", vpnIp)
  152. n.ClearIP(vpnIp)
  153. n.ClearPendingDeletion(vpnIp)
  154. continue
  155. }
  156. if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
  157. continue
  158. }
  159. // If we saw an incoming packets from this ip and peer's certificate is not
  160. // expired, just ignore.
  161. if traf {
  162. if n.l.Level >= logrus.DebugLevel {
  163. n.l.WithField("vpnIp", vpnIp).
  164. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  165. Debug("Tunnel status")
  166. }
  167. n.ClearIP(vpnIp)
  168. n.ClearPendingDeletion(vpnIp)
  169. continue
  170. }
  171. hostinfo.logger(n.l).
  172. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  173. Debug("Tunnel status")
  174. if hostinfo != nil && hostinfo.ConnectionState != nil {
  175. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  176. n.intf.SendMessageToVpnIp(header.Test, header.TestRequest, vpnIp, p, nb, out)
  177. } else {
  178. hostinfo.logger(n.l).Debugf("Hostinfo sadness: %s", vpnIp)
  179. }
  180. n.AddPendingDeletion(vpnIp)
  181. }
  182. }
  183. func (n *connectionManager) HandleDeletionTick(now time.Time) {
  184. n.pendingDeletionTimer.Advance(now)
  185. for {
  186. vpnIp, has := n.pendingDeletionTimer.Purge()
  187. if !has {
  188. break
  189. }
  190. hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
  191. if err != nil {
  192. n.l.Debugf("Not found in hostmap: %s", vpnIp)
  193. n.ClearIP(vpnIp)
  194. n.ClearPendingDeletion(vpnIp)
  195. continue
  196. }
  197. if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
  198. continue
  199. }
  200. // If we saw an incoming packets from this ip and peer's certificate is not
  201. // expired, just ignore.
  202. traf := n.CheckIn(vpnIp)
  203. if traf {
  204. n.l.WithField("vpnIp", vpnIp).
  205. WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
  206. Debug("Tunnel status")
  207. n.ClearIP(vpnIp)
  208. n.ClearPendingDeletion(vpnIp)
  209. continue
  210. }
  211. // If it comes around on deletion wheel and hasn't resolved itself, delete
  212. if n.checkPendingDeletion(vpnIp) {
  213. cn := ""
  214. if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
  215. cn = hostinfo.ConnectionState.peerCert.Details.Name
  216. }
  217. hostinfo.logger(n.l).
  218. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  219. WithField("certName", cn).
  220. Info("Tunnel status")
  221. n.ClearIP(vpnIp)
  222. n.ClearPendingDeletion(vpnIp)
  223. // TODO: This is only here to let tests work. Should do proper mocking
  224. if n.intf.lightHouse != nil {
  225. n.intf.lightHouse.DeleteVpnIp(vpnIp)
  226. }
  227. n.hostMap.DeleteHostInfo(hostinfo)
  228. } else {
  229. n.ClearIP(vpnIp)
  230. n.ClearPendingDeletion(vpnIp)
  231. }
  232. }
  233. }
  234. // handleInvalidCertificates will destroy a tunnel if pki.disconnect_invalid is true and the certificate is no longer valid
  235. func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIp iputil.VpnIp, hostinfo *HostInfo) bool {
  236. if !n.intf.disconnectInvalid {
  237. return false
  238. }
  239. remoteCert := hostinfo.GetCert()
  240. if remoteCert == nil {
  241. return false
  242. }
  243. valid, err := remoteCert.Verify(now, n.intf.caPool)
  244. if valid {
  245. return false
  246. }
  247. fingerprint, _ := remoteCert.Sha256Sum()
  248. n.l.WithField("vpnIp", vpnIp).WithError(err).
  249. WithField("certName", remoteCert.Details.Name).
  250. WithField("fingerprint", fingerprint).
  251. Info("Remote certificate is no longer valid, tearing down the tunnel")
  252. // Inform the remote and close the tunnel locally
  253. n.intf.sendCloseTunnel(hostinfo)
  254. n.intf.closeTunnel(hostinfo)
  255. n.ClearIP(vpnIp)
  256. n.ClearPendingDeletion(vpnIp)
  257. return true
  258. }