2
0

connection_manager.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package nebula
  2. import (
  3. "github.com/sirupsen/logrus"
  4. "sync"
  5. "time"
  6. )
  7. // TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
  8. // and something like every 10 packets we could lock, send 10, then unlock for a moment
  9. type connectionManager struct {
  10. hostMap *HostMap
  11. in map[uint32]struct{}
  12. inLock *sync.RWMutex
  13. inCount int
  14. out map[uint32]struct{}
  15. outLock *sync.RWMutex
  16. outCount int
  17. TrafficTimer *SystemTimerWheel
  18. intf *Interface
  19. pendingDeletion map[uint32]int
  20. pendingDeletionLock *sync.RWMutex
  21. pendingDeletionTimer *SystemTimerWheel
  22. checkInterval int
  23. pendingDeletionInterval int
  24. // I wanted to call one matLock
  25. }
  26. func newConnectionManager(intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
  27. nc := &connectionManager{
  28. hostMap: intf.hostMap,
  29. in: make(map[uint32]struct{}),
  30. inLock: &sync.RWMutex{},
  31. inCount: 0,
  32. out: make(map[uint32]struct{}),
  33. outLock: &sync.RWMutex{},
  34. outCount: 0,
  35. TrafficTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  36. intf: intf,
  37. pendingDeletion: make(map[uint32]int),
  38. pendingDeletionLock: &sync.RWMutex{},
  39. pendingDeletionTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  40. checkInterval: checkInterval,
  41. pendingDeletionInterval: pendingDeletionInterval,
  42. }
  43. nc.Start()
  44. return nc
  45. }
  46. func (n *connectionManager) In(ip uint32) {
  47. n.inLock.RLock()
  48. // If this already exists, return
  49. if _, ok := n.in[ip]; ok {
  50. n.inLock.RUnlock()
  51. return
  52. }
  53. n.inLock.RUnlock()
  54. n.inLock.Lock()
  55. n.in[ip] = struct{}{}
  56. n.inLock.Unlock()
  57. }
  58. func (n *connectionManager) Out(ip uint32) {
  59. n.outLock.RLock()
  60. // If this already exists, return
  61. if _, ok := n.out[ip]; ok {
  62. n.outLock.RUnlock()
  63. return
  64. }
  65. n.outLock.RUnlock()
  66. n.outLock.Lock()
  67. // double check since we dropped the lock temporarily
  68. if _, ok := n.out[ip]; ok {
  69. n.outLock.Unlock()
  70. return
  71. }
  72. n.out[ip] = struct{}{}
  73. n.AddTrafficWatch(ip, n.checkInterval)
  74. n.outLock.Unlock()
  75. }
  76. func (n *connectionManager) CheckIn(vpnIP uint32) bool {
  77. n.inLock.RLock()
  78. if _, ok := n.in[vpnIP]; ok {
  79. n.inLock.RUnlock()
  80. return true
  81. }
  82. n.inLock.RUnlock()
  83. return false
  84. }
  85. func (n *connectionManager) ClearIP(ip uint32) {
  86. n.inLock.Lock()
  87. n.outLock.Lock()
  88. delete(n.in, ip)
  89. delete(n.out, ip)
  90. n.inLock.Unlock()
  91. n.outLock.Unlock()
  92. }
  93. func (n *connectionManager) ClearPendingDeletion(ip uint32) {
  94. n.pendingDeletionLock.Lock()
  95. delete(n.pendingDeletion, ip)
  96. n.pendingDeletionLock.Unlock()
  97. }
  98. func (n *connectionManager) AddPendingDeletion(ip uint32) {
  99. n.pendingDeletionLock.Lock()
  100. if _, ok := n.pendingDeletion[ip]; ok {
  101. n.pendingDeletion[ip] += 1
  102. } else {
  103. n.pendingDeletion[ip] = 0
  104. }
  105. n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
  106. n.pendingDeletionLock.Unlock()
  107. }
  108. func (n *connectionManager) checkPendingDeletion(ip uint32) bool {
  109. n.pendingDeletionLock.RLock()
  110. if _, ok := n.pendingDeletion[ip]; ok {
  111. n.pendingDeletionLock.RUnlock()
  112. return true
  113. }
  114. n.pendingDeletionLock.RUnlock()
  115. return false
  116. }
  117. func (n *connectionManager) AddTrafficWatch(vpnIP uint32, seconds int) {
  118. n.TrafficTimer.Add(vpnIP, time.Second*time.Duration(seconds))
  119. }
  120. func (n *connectionManager) Start() {
  121. go n.Run()
  122. }
  123. func (n *connectionManager) Run() {
  124. clockSource := time.Tick(500 * time.Millisecond)
  125. for now := range clockSource {
  126. n.HandleMonitorTick(now)
  127. n.HandleDeletionTick(now)
  128. }
  129. }
  130. func (n *connectionManager) HandleMonitorTick(now time.Time) {
  131. n.TrafficTimer.advance(now)
  132. for {
  133. ep := n.TrafficTimer.Purge()
  134. if ep == nil {
  135. break
  136. }
  137. vpnIP := ep.(uint32)
  138. // Check for traffic coming back in from this host.
  139. traf := n.CheckIn(vpnIP)
  140. // If we saw incoming packets from this ip, just return
  141. if traf {
  142. if l.Level >= logrus.DebugLevel {
  143. l.WithField("vpnIp", IntIp(vpnIP)).
  144. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  145. Debug("Tunnel status")
  146. }
  147. n.ClearIP(vpnIP)
  148. n.ClearPendingDeletion(vpnIP)
  149. continue
  150. }
  151. // If we didn't we may need to probe or destroy the conn
  152. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  153. if err != nil {
  154. l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  155. n.ClearIP(vpnIP)
  156. n.ClearPendingDeletion(vpnIP)
  157. continue
  158. }
  159. l.WithField("vpnIp", IntIp(vpnIP)).
  160. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  161. Debug("Tunnel status")
  162. if hostinfo != nil && hostinfo.ConnectionState != nil {
  163. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  164. n.intf.SendMessageToVpnIp(test, testRequest, vpnIP, []byte(""), make([]byte, 12, 12), make([]byte, mtu))
  165. } else {
  166. l.Debugf("Hostinfo sadness: %s", IntIp(vpnIP))
  167. }
  168. n.AddPendingDeletion(vpnIP)
  169. }
  170. }
  171. func (n *connectionManager) HandleDeletionTick(now time.Time) {
  172. n.pendingDeletionTimer.advance(now)
  173. for {
  174. ep := n.pendingDeletionTimer.Purge()
  175. if ep == nil {
  176. break
  177. }
  178. vpnIP := ep.(uint32)
  179. // If we saw incoming packets from this ip, just return
  180. traf := n.CheckIn(vpnIP)
  181. if traf {
  182. l.WithField("vpnIp", IntIp(vpnIP)).
  183. WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
  184. Debug("Tunnel status")
  185. n.ClearIP(vpnIP)
  186. n.ClearPendingDeletion(vpnIP)
  187. continue
  188. }
  189. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  190. if err != nil {
  191. n.ClearIP(vpnIP)
  192. n.ClearPendingDeletion(vpnIP)
  193. l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  194. continue
  195. }
  196. // If it comes around on deletion wheel and hasn't resolved itself, delete
  197. if n.checkPendingDeletion(vpnIP) {
  198. cn := ""
  199. if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
  200. cn = hostinfo.ConnectionState.peerCert.Details.Name
  201. }
  202. l.WithField("vpnIp", IntIp(vpnIP)).
  203. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  204. WithField("certName", cn).
  205. Info("Tunnel status")
  206. n.ClearIP(vpnIP)
  207. n.ClearPendingDeletion(vpnIP)
  208. // TODO: This is only here to let tests work. Should do proper mocking
  209. if n.intf.lightHouse != nil {
  210. n.intf.lightHouse.DeleteVpnIP(vpnIP)
  211. }
  212. n.hostMap.DeleteVpnIP(vpnIP)
  213. n.hostMap.DeleteIndex(hostinfo.localIndexId)
  214. } else {
  215. n.ClearIP(vpnIP)
  216. n.ClearPendingDeletion(vpnIP)
  217. }
  218. }
  219. }