connection_manager.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package nebula
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/sirupsen/logrus"
  7. "github.com/slackhq/nebula/header"
  8. "github.com/slackhq/nebula/iputil"
  9. )
  10. // TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
  11. // and something like every 10 packets we could lock, send 10, then unlock for a moment
  12. type connectionManager struct {
  13. hostMap *HostMap
  14. in map[iputil.VpnIp]struct{}
  15. inLock *sync.RWMutex
  16. inCount int
  17. out map[iputil.VpnIp]struct{}
  18. outLock *sync.RWMutex
  19. outCount int
  20. TrafficTimer *SystemTimerWheel
  21. intf *Interface
  22. pendingDeletion map[iputil.VpnIp]int
  23. pendingDeletionLock *sync.RWMutex
  24. pendingDeletionTimer *SystemTimerWheel
  25. checkInterval int
  26. pendingDeletionInterval int
  27. l *logrus.Logger
  28. // I wanted to call one matLock
  29. }
  30. func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
  31. nc := &connectionManager{
  32. hostMap: intf.hostMap,
  33. in: make(map[iputil.VpnIp]struct{}),
  34. inLock: &sync.RWMutex{},
  35. inCount: 0,
  36. out: make(map[iputil.VpnIp]struct{}),
  37. outLock: &sync.RWMutex{},
  38. outCount: 0,
  39. TrafficTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  40. intf: intf,
  41. pendingDeletion: make(map[iputil.VpnIp]int),
  42. pendingDeletionLock: &sync.RWMutex{},
  43. pendingDeletionTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  44. checkInterval: checkInterval,
  45. pendingDeletionInterval: pendingDeletionInterval,
  46. l: l,
  47. }
  48. nc.Start(ctx)
  49. return nc
  50. }
  51. func (n *connectionManager) In(ip iputil.VpnIp) {
  52. n.inLock.RLock()
  53. // If this already exists, return
  54. if _, ok := n.in[ip]; ok {
  55. n.inLock.RUnlock()
  56. return
  57. }
  58. n.inLock.RUnlock()
  59. n.inLock.Lock()
  60. n.in[ip] = struct{}{}
  61. n.inLock.Unlock()
  62. }
  63. func (n *connectionManager) Out(ip iputil.VpnIp) {
  64. n.outLock.RLock()
  65. // If this already exists, return
  66. if _, ok := n.out[ip]; ok {
  67. n.outLock.RUnlock()
  68. return
  69. }
  70. n.outLock.RUnlock()
  71. n.outLock.Lock()
  72. // double check since we dropped the lock temporarily
  73. if _, ok := n.out[ip]; ok {
  74. n.outLock.Unlock()
  75. return
  76. }
  77. n.out[ip] = struct{}{}
  78. n.AddTrafficWatch(ip, n.checkInterval)
  79. n.outLock.Unlock()
  80. }
  81. func (n *connectionManager) CheckIn(vpnIp iputil.VpnIp) bool {
  82. n.inLock.RLock()
  83. if _, ok := n.in[vpnIp]; ok {
  84. n.inLock.RUnlock()
  85. return true
  86. }
  87. n.inLock.RUnlock()
  88. return false
  89. }
  90. func (n *connectionManager) ClearIP(ip iputil.VpnIp) {
  91. n.inLock.Lock()
  92. n.outLock.Lock()
  93. delete(n.in, ip)
  94. delete(n.out, ip)
  95. n.inLock.Unlock()
  96. n.outLock.Unlock()
  97. }
  98. func (n *connectionManager) ClearPendingDeletion(ip iputil.VpnIp) {
  99. n.pendingDeletionLock.Lock()
  100. delete(n.pendingDeletion, ip)
  101. n.pendingDeletionLock.Unlock()
  102. }
  103. func (n *connectionManager) AddPendingDeletion(ip iputil.VpnIp) {
  104. n.pendingDeletionLock.Lock()
  105. if _, ok := n.pendingDeletion[ip]; ok {
  106. n.pendingDeletion[ip] += 1
  107. } else {
  108. n.pendingDeletion[ip] = 0
  109. }
  110. n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
  111. n.pendingDeletionLock.Unlock()
  112. }
  113. func (n *connectionManager) checkPendingDeletion(ip iputil.VpnIp) bool {
  114. n.pendingDeletionLock.RLock()
  115. if _, ok := n.pendingDeletion[ip]; ok {
  116. n.pendingDeletionLock.RUnlock()
  117. return true
  118. }
  119. n.pendingDeletionLock.RUnlock()
  120. return false
  121. }
  122. func (n *connectionManager) AddTrafficWatch(vpnIp iputil.VpnIp, seconds int) {
  123. n.TrafficTimer.Add(vpnIp, time.Second*time.Duration(seconds))
  124. }
  125. func (n *connectionManager) Start(ctx context.Context) {
  126. go n.Run(ctx)
  127. }
  128. func (n *connectionManager) Run(ctx context.Context) {
  129. clockSource := time.NewTicker(500 * time.Millisecond)
  130. defer clockSource.Stop()
  131. p := []byte("")
  132. nb := make([]byte, 12, 12)
  133. out := make([]byte, mtu)
  134. for {
  135. select {
  136. case <-ctx.Done():
  137. return
  138. case now := <-clockSource.C:
  139. n.HandleMonitorTick(now, p, nb, out)
  140. n.HandleDeletionTick(now)
  141. }
  142. }
  143. }
  144. func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte) {
  145. n.TrafficTimer.advance(now)
  146. for {
  147. ep := n.TrafficTimer.Purge()
  148. if ep == nil {
  149. break
  150. }
  151. vpnIp := ep.(iputil.VpnIp)
  152. // Check for traffic coming back in from this host.
  153. traf := n.CheckIn(vpnIp)
  154. hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
  155. if err != nil {
  156. n.l.Debugf("Not found in hostmap: %s", vpnIp)
  157. if !n.intf.disconnectInvalid {
  158. n.ClearIP(vpnIp)
  159. n.ClearPendingDeletion(vpnIp)
  160. continue
  161. }
  162. }
  163. if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
  164. continue
  165. }
  166. // If we saw an incoming packets from this ip and peer's certificate is not
  167. // expired, just ignore.
  168. if traf {
  169. if n.l.Level >= logrus.DebugLevel {
  170. n.l.WithField("vpnIp", vpnIp).
  171. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  172. Debug("Tunnel status")
  173. }
  174. n.ClearIP(vpnIp)
  175. n.ClearPendingDeletion(vpnIp)
  176. continue
  177. }
  178. hostinfo.logger(n.l).
  179. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  180. Debug("Tunnel status")
  181. if hostinfo != nil && hostinfo.ConnectionState != nil {
  182. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  183. n.intf.SendMessageToVpnIp(header.Test, header.TestRequest, vpnIp, p, nb, out)
  184. } else {
  185. hostinfo.logger(n.l).Debugf("Hostinfo sadness: %s", vpnIp)
  186. }
  187. n.AddPendingDeletion(vpnIp)
  188. }
  189. }
  190. func (n *connectionManager) HandleDeletionTick(now time.Time) {
  191. n.pendingDeletionTimer.advance(now)
  192. for {
  193. ep := n.pendingDeletionTimer.Purge()
  194. if ep == nil {
  195. break
  196. }
  197. vpnIp := ep.(iputil.VpnIp)
  198. hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
  199. if err != nil {
  200. n.l.Debugf("Not found in hostmap: %s", vpnIp)
  201. if !n.intf.disconnectInvalid {
  202. n.ClearIP(vpnIp)
  203. n.ClearPendingDeletion(vpnIp)
  204. continue
  205. }
  206. }
  207. if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
  208. continue
  209. }
  210. // If we saw an incoming packets from this ip and peer's certificate is not
  211. // expired, just ignore.
  212. traf := n.CheckIn(vpnIp)
  213. if traf {
  214. n.l.WithField("vpnIp", vpnIp).
  215. WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
  216. Debug("Tunnel status")
  217. n.ClearIP(vpnIp)
  218. n.ClearPendingDeletion(vpnIp)
  219. continue
  220. }
  221. // If it comes around on deletion wheel and hasn't resolved itself, delete
  222. if n.checkPendingDeletion(vpnIp) {
  223. cn := ""
  224. if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
  225. cn = hostinfo.ConnectionState.peerCert.Details.Name
  226. }
  227. hostinfo.logger(n.l).
  228. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  229. WithField("certName", cn).
  230. Info("Tunnel status")
  231. n.ClearIP(vpnIp)
  232. n.ClearPendingDeletion(vpnIp)
  233. // TODO: This is only here to let tests work. Should do proper mocking
  234. if n.intf.lightHouse != nil {
  235. n.intf.lightHouse.DeleteVpnIp(vpnIp)
  236. }
  237. n.hostMap.DeleteHostInfo(hostinfo)
  238. } else {
  239. n.ClearIP(vpnIp)
  240. n.ClearPendingDeletion(vpnIp)
  241. }
  242. }
  243. }
  244. // handleInvalidCertificates will destroy a tunnel if pki.disconnect_invalid is true and the certificate is no longer valid
  245. func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIp iputil.VpnIp, hostinfo *HostInfo) bool {
  246. if !n.intf.disconnectInvalid {
  247. return false
  248. }
  249. remoteCert := hostinfo.GetCert()
  250. if remoteCert == nil {
  251. return false
  252. }
  253. valid, err := remoteCert.Verify(now, n.intf.caPool)
  254. if valid {
  255. return false
  256. }
  257. fingerprint, _ := remoteCert.Sha256Sum()
  258. n.l.WithField("vpnIp", vpnIp).WithError(err).
  259. WithField("certName", remoteCert.Details.Name).
  260. WithField("fingerprint", fingerprint).
  261. Info("Remote certificate is no longer valid, tearing down the tunnel")
  262. // Inform the remote and close the tunnel locally
  263. n.intf.sendCloseTunnel(hostinfo)
  264. n.intf.closeTunnel(hostinfo, false)
  265. n.ClearIP(vpnIp)
  266. n.ClearPendingDeletion(vpnIp)
  267. return true
  268. }