connection_manager.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package nebula
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/sirupsen/logrus"
  6. )
  7. // TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
  8. // and something like every 10 packets we could lock, send 10, then unlock for a moment
  9. type connectionManager struct {
  10. hostMap *HostMap
  11. in map[uint32]struct{}
  12. inLock *sync.RWMutex
  13. inCount int
  14. out map[uint32]struct{}
  15. outLock *sync.RWMutex
  16. outCount int
  17. TrafficTimer *SystemTimerWheel
  18. intf *Interface
  19. pendingDeletion map[uint32]int
  20. pendingDeletionLock *sync.RWMutex
  21. pendingDeletionTimer *SystemTimerWheel
  22. checkInterval int
  23. pendingDeletionInterval int
  24. l *logrus.Logger
  25. // I wanted to call one matLock
  26. }
  27. func newConnectionManager(l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
  28. nc := &connectionManager{
  29. hostMap: intf.hostMap,
  30. in: make(map[uint32]struct{}),
  31. inLock: &sync.RWMutex{},
  32. inCount: 0,
  33. out: make(map[uint32]struct{}),
  34. outLock: &sync.RWMutex{},
  35. outCount: 0,
  36. TrafficTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  37. intf: intf,
  38. pendingDeletion: make(map[uint32]int),
  39. pendingDeletionLock: &sync.RWMutex{},
  40. pendingDeletionTimer: NewSystemTimerWheel(time.Millisecond*500, time.Second*60),
  41. checkInterval: checkInterval,
  42. pendingDeletionInterval: pendingDeletionInterval,
  43. l: l,
  44. }
  45. nc.Start()
  46. return nc
  47. }
  48. func (n *connectionManager) In(ip uint32) {
  49. n.inLock.RLock()
  50. // If this already exists, return
  51. if _, ok := n.in[ip]; ok {
  52. n.inLock.RUnlock()
  53. return
  54. }
  55. n.inLock.RUnlock()
  56. n.inLock.Lock()
  57. n.in[ip] = struct{}{}
  58. n.inLock.Unlock()
  59. }
  60. func (n *connectionManager) Out(ip uint32) {
  61. n.outLock.RLock()
  62. // If this already exists, return
  63. if _, ok := n.out[ip]; ok {
  64. n.outLock.RUnlock()
  65. return
  66. }
  67. n.outLock.RUnlock()
  68. n.outLock.Lock()
  69. // double check since we dropped the lock temporarily
  70. if _, ok := n.out[ip]; ok {
  71. n.outLock.Unlock()
  72. return
  73. }
  74. n.out[ip] = struct{}{}
  75. n.AddTrafficWatch(ip, n.checkInterval)
  76. n.outLock.Unlock()
  77. }
  78. func (n *connectionManager) CheckIn(vpnIP uint32) bool {
  79. n.inLock.RLock()
  80. if _, ok := n.in[vpnIP]; ok {
  81. n.inLock.RUnlock()
  82. return true
  83. }
  84. n.inLock.RUnlock()
  85. return false
  86. }
  87. func (n *connectionManager) ClearIP(ip uint32) {
  88. n.inLock.Lock()
  89. n.outLock.Lock()
  90. delete(n.in, ip)
  91. delete(n.out, ip)
  92. n.inLock.Unlock()
  93. n.outLock.Unlock()
  94. }
  95. func (n *connectionManager) ClearPendingDeletion(ip uint32) {
  96. n.pendingDeletionLock.Lock()
  97. delete(n.pendingDeletion, ip)
  98. n.pendingDeletionLock.Unlock()
  99. }
  100. func (n *connectionManager) AddPendingDeletion(ip uint32) {
  101. n.pendingDeletionLock.Lock()
  102. if _, ok := n.pendingDeletion[ip]; ok {
  103. n.pendingDeletion[ip] += 1
  104. } else {
  105. n.pendingDeletion[ip] = 0
  106. }
  107. n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
  108. n.pendingDeletionLock.Unlock()
  109. }
  110. func (n *connectionManager) checkPendingDeletion(ip uint32) bool {
  111. n.pendingDeletionLock.RLock()
  112. if _, ok := n.pendingDeletion[ip]; ok {
  113. n.pendingDeletionLock.RUnlock()
  114. return true
  115. }
  116. n.pendingDeletionLock.RUnlock()
  117. return false
  118. }
  119. func (n *connectionManager) AddTrafficWatch(vpnIP uint32, seconds int) {
  120. n.TrafficTimer.Add(vpnIP, time.Second*time.Duration(seconds))
  121. }
  122. func (n *connectionManager) Start() {
  123. go n.Run()
  124. }
  125. func (n *connectionManager) Run() {
  126. clockSource := time.Tick(500 * time.Millisecond)
  127. p := []byte("")
  128. nb := make([]byte, 12, 12)
  129. out := make([]byte, mtu)
  130. for now := range clockSource {
  131. n.HandleMonitorTick(now, p, nb, out)
  132. n.HandleDeletionTick(now)
  133. }
  134. }
  135. func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte) {
  136. n.TrafficTimer.advance(now)
  137. for {
  138. ep := n.TrafficTimer.Purge()
  139. if ep == nil {
  140. break
  141. }
  142. vpnIP := ep.(uint32)
  143. // Check for traffic coming back in from this host.
  144. traf := n.CheckIn(vpnIP)
  145. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  146. if err != nil {
  147. n.l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  148. if !n.intf.disconnectInvalid {
  149. n.ClearIP(vpnIP)
  150. n.ClearPendingDeletion(vpnIP)
  151. continue
  152. }
  153. }
  154. if n.handleInvalidCertificate(now, vpnIP, hostinfo) {
  155. continue
  156. }
  157. // If we saw an incoming packets from this ip and peer's certificate is not
  158. // expired, just ignore.
  159. if traf {
  160. if n.l.Level >= logrus.DebugLevel {
  161. n.l.WithField("vpnIp", IntIp(vpnIP)).
  162. WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
  163. Debug("Tunnel status")
  164. }
  165. n.ClearIP(vpnIP)
  166. n.ClearPendingDeletion(vpnIP)
  167. continue
  168. }
  169. hostinfo.logger(n.l).
  170. WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
  171. Debug("Tunnel status")
  172. if hostinfo != nil && hostinfo.ConnectionState != nil {
  173. // Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
  174. n.intf.SendMessageToVpnIp(test, testRequest, vpnIP, p, nb, out)
  175. } else {
  176. hostinfo.logger(n.l).Debugf("Hostinfo sadness: %s", IntIp(vpnIP))
  177. }
  178. n.AddPendingDeletion(vpnIP)
  179. }
  180. }
  181. func (n *connectionManager) HandleDeletionTick(now time.Time) {
  182. n.pendingDeletionTimer.advance(now)
  183. for {
  184. ep := n.pendingDeletionTimer.Purge()
  185. if ep == nil {
  186. break
  187. }
  188. vpnIP := ep.(uint32)
  189. hostinfo, err := n.hostMap.QueryVpnIP(vpnIP)
  190. if err != nil {
  191. n.l.Debugf("Not found in hostmap: %s", IntIp(vpnIP))
  192. if !n.intf.disconnectInvalid {
  193. n.ClearIP(vpnIP)
  194. n.ClearPendingDeletion(vpnIP)
  195. continue
  196. }
  197. }
  198. if n.handleInvalidCertificate(now, vpnIP, hostinfo) {
  199. continue
  200. }
  201. // If we saw an incoming packets from this ip and peer's certificate is not
  202. // expired, just ignore.
  203. traf := n.CheckIn(vpnIP)
  204. if traf {
  205. n.l.WithField("vpnIp", IntIp(vpnIP)).
  206. WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
  207. Debug("Tunnel status")
  208. n.ClearIP(vpnIP)
  209. n.ClearPendingDeletion(vpnIP)
  210. continue
  211. }
  212. // If it comes around on deletion wheel and hasn't resolved itself, delete
  213. if n.checkPendingDeletion(vpnIP) {
  214. cn := ""
  215. if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
  216. cn = hostinfo.ConnectionState.peerCert.Details.Name
  217. }
  218. hostinfo.logger(n.l).
  219. WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
  220. WithField("certName", cn).
  221. Info("Tunnel status")
  222. n.ClearIP(vpnIP)
  223. n.ClearPendingDeletion(vpnIP)
  224. // TODO: This is only here to let tests work. Should do proper mocking
  225. if n.intf.lightHouse != nil {
  226. n.intf.lightHouse.DeleteVpnIP(vpnIP)
  227. }
  228. n.hostMap.DeleteHostInfo(hostinfo)
  229. } else {
  230. n.ClearIP(vpnIP)
  231. n.ClearPendingDeletion(vpnIP)
  232. }
  233. }
  234. }
  235. // handleInvalidCertificates will destroy a tunnel if pki.disconnect_invalid is true and the certificate is no longer valid
  236. func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIP uint32, hostinfo *HostInfo) bool {
  237. if !n.intf.disconnectInvalid {
  238. return false
  239. }
  240. remoteCert := hostinfo.GetCert()
  241. if remoteCert == nil {
  242. return false
  243. }
  244. valid, err := remoteCert.Verify(now, n.intf.caPool)
  245. if valid {
  246. return false
  247. }
  248. fingerprint, _ := remoteCert.Sha256Sum()
  249. n.l.WithField("vpnIp", IntIp(vpnIP)).WithError(err).
  250. WithField("certName", remoteCert.Details.Name).
  251. WithField("fingerprint", fingerprint).
  252. Info("Remote certificate is no longer valid, tearing down the tunnel")
  253. // Inform the remote and close the tunnel locally
  254. n.intf.sendCloseTunnel(hostinfo)
  255. n.intf.closeTunnel(hostinfo, false)
  256. n.ClearIP(vpnIP)
  257. n.ClearPendingDeletion(vpnIP)
  258. return true
  259. }