connection_manager.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package nebula
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/sirupsen/logrus"
  7. )
  8. // TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
  9. // and something like every 10 packets we could lock, send 10, then unlock for a moment
  10. type connectionManager struct {
  11. hostMap *HostMap
  12. in map[uint32]struct{}
  13. inLock *sync.RWMutex
  14. inCount int
  15. out map[uint32]struct{}
  16. outLock *sync.RWMutex
  17. outCount int
  18. TrafficTimer *SystemTimerWheel
  19. intf *Interface
  20. pendingDeletion map[uint32]int
  21. pendingDeletionLock *sync.RWMutex
  22. pendingDeletionTimer *SystemTimerWheel
  23. checkInterval int
  24. pendingDeletionInterval int
  25. l *logrus.Logger
  26. // I wanted to call one matLock
  27. }
  28. func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
  29. nc := &connectionManager{
  30. hostMap: intf.hostMap,
  31. in: make(map[uint32]struct{}),
  32. inLock: &sync.RWMutex{},
  33. inCount: 0,
  34. out: make(map[uint32]struct{}),
  35. outLock: &sync.RWMutex{},
  36. outCount: 0,
  37. TrafficTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  38. intf: intf,
  39. pendingDeletion: make(map[uint32]int),
  40. pendingDeletionLock: &sync.RWMutex{},
  41. pendingDeletionTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  42. checkInterval: checkInterval,
  43. pendingDeletionInterval: pendingDeletionInterval,
  44. l: l,
  45. }
  46. nc.Start(ctx)
  47. return nc
  48. }
  49. func (n *connectionManager) In(ip uint32) {
  50. n.inLock.RLock()
  51. // If this already exists, return
  52. if _, ok := n.in[ip]; ok {
  53. n.inLock.RUnlock()
  54. return
  55. }
  56. n.inLock.RUnlock()
  57. n.inLock.Lock()
  58. n.in[ip] = struct{}{}
  59. n.inLock.Unlock()
  60. }
  61. func (n *connectionManager) Out(ip uint32) {
  62. n.outLock.RLock()
  63. // If this already exists, return
  64. if _, ok := n.out[ip]; ok {
  65. n.outLock.RUnlock()
  66. return
  67. }
  68. n.outLock.RUnlock()
  69. n.outLock.Lock()
  70. // double check since we dropped the lock temporarily
  71. if _, ok := n.out[ip]; ok {
  72. n.outLock.Unlock()
  73. return
  74. }
  75. n.out[ip] = struct{}{}
  76. n.AddTrafficWatch(ip, n.checkInterval)
  77. n.outLock.Unlock()
  78. }
  79. func (n *connectionManager) CheckIn(vpnIP uint32) bool {
  80. n.inLock.RLock()
  81. if _, ok := n.in[vpnIP]; ok {
  82. n.inLock.RUnlock()
  83. return true
  84. }
  85. n.inLock.RUnlock()
  86. return false
  87. }
  88. func (n *connectionManager) ClearIP(ip uint32) {
  89. n.inLock.Lock()
  90. n.outLock.Lock()
  91. delete(n.in, ip)
  92. delete(n.out, ip)
  93. n.inLock.Unlock()
  94. n.outLock.Unlock()
  95. }
  96. func (n *connectionManager) ClearPendingDeletion(ip uint32) {
  97. n.pendingDeletionLock.Lock()
  98. delete(n.pendingDeletion, ip)
  99. n.pendingDeletionLock.Unlock()
  100. }
  101. func (n *connectionManager) AddPendingDeletion(ip uint32) {
  102. n.pendingDeletionLock.Lock()
  103. if _, ok := n.pendingDeletion[ip]; ok {
  104. n.pendingDeletion[ip] += 1
  105. } else {
  106. n.pendingDeletion[ip] = 0
  107. }
  108. n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
  109. n.pendingDeletionLock.Unlock()
  110. }
  111. func (n *connectionManager) checkPendingDeletion(ip uint32) bool {
  112. n.pendingDeletionLock.RLock()
  113. if _, ok := n.pendingDeletion[ip]; ok {
  114. n.pendingDeletionLock.RUnlock()
  115. return true
  116. }
  117. n.pendingDeletionLock.RUnlock()
  118. return false
  119. }
  120. func (n *connectionManager) AddTrafficWatch(vpnIP uint32, seconds int) {
  121. n.TrafficTimer.Add(vpnIP, time.Second*time.Duration(seconds))
  122. }
  123. func (n *connectionManager) Start(ctx context.Context) {
  124. go n.Run(ctx)
  125. }
  126. func (n *connectionManager) Run(ctx context.Context) {
  127. clockSource := time.NewTicker(500 * time.Millisecond)
  128. defer clockSource.Stop()
  129. p := []byte("")
  130. nb := make([]byte, 12, 12)
  131. out := make([]byte, mtu)
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. return
  136. case now := <-clockSource.C:
  137. n.HandleMonitorTick(now, p, nb, out)
  138. n.HandleDeletionTick(now)
  139. }
  140. }
  141. }
  142. func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte) {
  143. n.TrafficTimer.advance(now)
  144. for {
  145. ep := n.TrafficTimer.Purge()
  146. if ep == nil {
  147. break
  148. }
  149. vpnIP := ep.(uint32)
  150. // Check for traffic coming back in from this host.
  151. traf := n.CheckIn(vpnIP)
  152. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  153. if err != nil {
  154. n.l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  155. if !n.intf.disconnectInvalid {
  156. n.ClearIP(vpnIP)
  157. n.ClearPendingDeletion(vpnIP)
  158. continue
  159. }
  160. }
  161. if n.handleInvalidCertificate(now, vpnIP, hostinfo) {
  162. continue
  163. }
  164. // If we saw an incoming packets from this ip and peer's certificate is not
  165. // expired, just ignore.
  166. if traf {
  167. if n.l.Level >= logrus.DebugLevel {
  168. n.l.WithField("vpnIp", IntIp(vpnIP)).
  169. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  170. Debug("Tunnel status")
  171. }
  172. n.ClearIP(vpnIP)
  173. n.ClearPendingDeletion(vpnIP)
  174. continue
  175. }
  176. hostinfo.logger(n.l).
  177. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  178. Debug("Tunnel status")
  179. if hostinfo != nil && hostinfo.ConnectionState != nil {
  180. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  181. n.intf.SendMessageToVpnIp(test, testRequest, vpnIP, p, nb, out)
  182. } else {
  183. hostinfo.logger(n.l).Debugf("Hostinfo sadness: %s", IntIp(vpnIP))
  184. }
  185. n.AddPendingDeletion(vpnIP)
  186. }
  187. }
  188. func (n *connectionManager) HandleDeletionTick(now time.Time) {
  189. n.pendingDeletionTimer.advance(now)
  190. for {
  191. ep := n.pendingDeletionTimer.Purge()
  192. if ep == nil {
  193. break
  194. }
  195. vpnIP := ep.(uint32)
  196. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  197. if err != nil {
  198. n.l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  199. if !n.intf.disconnectInvalid {
  200. n.ClearIP(vpnIP)
  201. n.ClearPendingDeletion(vpnIP)
  202. continue
  203. }
  204. }
  205. if n.handleInvalidCertificate(now, vpnIP, hostinfo) {
  206. continue
  207. }
  208. // If we saw an incoming packets from this ip and peer's certificate is not
  209. // expired, just ignore.
  210. traf := n.CheckIn(vpnIP)
  211. if traf {
  212. n.l.WithField("vpnIp", IntIp(vpnIP)).
  213. WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
  214. Debug("Tunnel status")
  215. n.ClearIP(vpnIP)
  216. n.ClearPendingDeletion(vpnIP)
  217. continue
  218. }
  219. // If it comes around on deletion wheel and hasn't resolved itself, delete
  220. if n.checkPendingDeletion(vpnIP) {
  221. cn := ""
  222. if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
  223. cn = hostinfo.ConnectionState.peerCert.Details.Name
  224. }
  225. hostinfo.logger(n.l).
  226. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  227. WithField("certName", cn).
  228. Info("Tunnel status")
  229. n.ClearIP(vpnIP)
  230. n.ClearPendingDeletion(vpnIP)
  231. // TODO: This is only here to let tests work. Should do proper mocking
  232. if n.intf.lightHouse != nil {
  233. n.intf.lightHouse.DeleteVpnIP(vpnIP)
  234. }
  235. n.hostMap.DeleteHostInfo(hostinfo)
  236. } else {
  237. n.ClearIP(vpnIP)
  238. n.ClearPendingDeletion(vpnIP)
  239. }
  240. }
  241. }
  242. // handleInvalidCertificates will destroy a tunnel if pki.disconnect_invalid is true and the certificate is no longer valid
  243. func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIP uint32, hostinfo *HostInfo) bool {
  244. if !n.intf.disconnectInvalid {
  245. return false
  246. }
  247. remoteCert := hostinfo.GetCert()
  248. if remoteCert == nil {
  249. return false
  250. }
  251. valid, err := remoteCert.Verify(now, n.intf.caPool)
  252. if valid {
  253. return false
  254. }
  255. fingerprint, _ := remoteCert.Sha256Sum()
  256. n.l.WithField("vpnIp", IntIp(vpnIP)).WithError(err).
  257. WithField("certName", remoteCert.Details.Name).
  258. WithField("fingerprint", fingerprint).
  259. Info("Remote certificate is no longer valid, tearing down the tunnel")
  260. // Inform the remote and close the tunnel locally
  261. n.intf.sendCloseTunnel(hostinfo)
  262. n.intf.closeTunnel(hostinfo, false)
  263. n.ClearIP(vpnIP)
  264. n.ClearPendingDeletion(vpnIP)
  265. return true
  266. }