relay_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "github.com/sirupsen/logrus"
  8. "github.com/slackhq/nebula/config"
  9. "github.com/slackhq/nebula/header"
  10. "github.com/slackhq/nebula/iputil"
  11. )
  12. type relayManager struct {
  13. l *logrus.Logger
  14. hostmap *HostMap
  15. amRelay atomic.Bool
  16. }
  17. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  18. rm := &relayManager{
  19. l: l,
  20. hostmap: hostmap,
  21. }
  22. rm.reload(c, true)
  23. c.RegisterReloadCallback(func(c *config.C) {
  24. err := rm.reload(c, false)
  25. if err != nil {
  26. l.WithError(err).Error("Failed to reload relay_manager")
  27. }
  28. })
  29. return rm
  30. }
  31. func (rm *relayManager) reload(c *config.C, initial bool) error {
  32. if initial || c.HasChanged("relay.am_relay") {
  33. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  34. }
  35. return nil
  36. }
  37. func (rm *relayManager) GetAmRelay() bool {
  38. return rm.amRelay.Load()
  39. }
  40. func (rm *relayManager) setAmRelay(v bool) {
  41. rm.amRelay.Store(v)
  42. }
  43. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  44. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  45. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iputil.VpnIp, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  46. hm.Lock()
  47. defer hm.Unlock()
  48. for i := 0; i < 32; i++ {
  49. index, err := generateIndex(l)
  50. if err != nil {
  51. return 0, err
  52. }
  53. _, inRelays := hm.Relays[index]
  54. if !inRelays {
  55. // Avoid standing up a relay that can't be used since only the primary hostinfo
  56. // will be pointed to by the relay logic
  57. //TODO: if there was an existing primary and it had relay state, should we merge?
  58. hm.unlockedMakePrimary(relayHostInfo)
  59. hm.Relays[index] = relayHostInfo
  60. newRelay := Relay{
  61. Type: relayType,
  62. State: state,
  63. LocalIndex: index,
  64. PeerIp: vpnIp,
  65. }
  66. if remoteIdx != nil {
  67. newRelay.RemoteIndex = *remoteIdx
  68. }
  69. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  70. return index, nil
  71. }
  72. }
  73. return 0, errors.New("failed to generate unique localIndexId")
  74. }
  75. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  76. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  77. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  78. if !ok {
  79. rm.l.WithFields(logrus.Fields{"relay": relayHostInfo.vpnIp,
  80. "initiatorRelayIndex": m.InitiatorRelayIndex,
  81. "relayFrom": m.RelayFromIp,
  82. "relayTo": m.RelayToIp}).Info("relayManager failed to update relay")
  83. return nil, fmt.Errorf("unknown relay")
  84. }
  85. return relay, nil
  86. }
  87. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  88. switch m.Type {
  89. case NebulaControl_CreateRelayRequest:
  90. rm.handleCreateRelayRequest(h, f, m)
  91. case NebulaControl_CreateRelayResponse:
  92. rm.handleCreateRelayResponse(h, f, m)
  93. }
  94. }
  95. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  96. rm.l.WithFields(logrus.Fields{
  97. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  98. "relayTo": iputil.VpnIp(m.RelayToIp),
  99. "initiatorRelayIndex": m.InitiatorRelayIndex,
  100. "responderRelayIndex": m.ResponderRelayIndex,
  101. "vpnIp": h.vpnIp}).
  102. Info("handleCreateRelayResponse")
  103. target := iputil.VpnIp(m.RelayToIp)
  104. relay, err := rm.EstablishRelay(h, m)
  105. if err != nil {
  106. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  107. return
  108. }
  109. // Do I need to complete the relays now?
  110. if relay.Type == TerminalType {
  111. return
  112. }
  113. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  114. peerHostInfo, err := rm.hostmap.QueryVpnIp(relay.PeerIp)
  115. if err != nil {
  116. rm.l.WithError(err).WithField("relayTo", relay.PeerIp).Error("Can't find a HostInfo for peer")
  117. return
  118. }
  119. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(target)
  120. if !ok {
  121. rm.l.WithField("relayTo", peerHostInfo.vpnIp).Error("peerRelay does not have Relay state for relayTo")
  122. return
  123. }
  124. peerRelay.State = Established
  125. resp := NebulaControl{
  126. Type: NebulaControl_CreateRelayResponse,
  127. ResponderRelayIndex: peerRelay.LocalIndex,
  128. InitiatorRelayIndex: peerRelay.RemoteIndex,
  129. RelayFromIp: uint32(peerHostInfo.vpnIp),
  130. RelayToIp: uint32(target),
  131. }
  132. msg, err := resp.Marshal()
  133. if err != nil {
  134. rm.l.
  135. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  136. } else {
  137. f.SendMessageToVpnIp(header.Control, 0, peerHostInfo.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  138. rm.l.WithFields(logrus.Fields{
  139. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  140. "relayTo": iputil.VpnIp(resp.RelayToIp),
  141. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  142. "responderRelayIndex": resp.ResponderRelayIndex,
  143. "vpnIp": peerHostInfo.vpnIp}).
  144. Info("send CreateRelayResponse")
  145. }
  146. }
  147. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  148. from := iputil.VpnIp(m.RelayFromIp)
  149. target := iputil.VpnIp(m.RelayToIp)
  150. logMsg := rm.l.WithFields(logrus.Fields{
  151. "relayFrom": from,
  152. "relayTo": target,
  153. "initiatorRelayIndex": m.InitiatorRelayIndex,
  154. "vpnIp": h.vpnIp})
  155. logMsg.Info("handleCreateRelayRequest")
  156. // Is the target of the relay me?
  157. if target == f.myVpnIp {
  158. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  159. if ok {
  160. switch existingRelay.State {
  161. case Requested:
  162. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  163. if !ok {
  164. logMsg.Error("Relay State not found")
  165. return
  166. }
  167. case Established:
  168. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  169. // We got a brand new Relay request, because its index is different than what we saw before.
  170. // This should never happen. The peer should never change an index, once created.
  171. logMsg.WithFields(logrus.Fields{
  172. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  173. return
  174. }
  175. }
  176. } else {
  177. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  178. if err != nil {
  179. logMsg.WithError(err).Error("Failed to add relay")
  180. return
  181. }
  182. }
  183. relay, ok := h.relayState.QueryRelayForByIp(from)
  184. if !ok {
  185. logMsg.Error("Relay State not found")
  186. return
  187. }
  188. resp := NebulaControl{
  189. Type: NebulaControl_CreateRelayResponse,
  190. ResponderRelayIndex: relay.LocalIndex,
  191. InitiatorRelayIndex: relay.RemoteIndex,
  192. RelayFromIp: uint32(from),
  193. RelayToIp: uint32(target),
  194. }
  195. msg, err := resp.Marshal()
  196. if err != nil {
  197. logMsg.
  198. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  199. } else {
  200. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  201. rm.l.WithFields(logrus.Fields{
  202. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  203. "relayTo": iputil.VpnIp(resp.RelayToIp),
  204. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  205. "responderRelayIndex": resp.ResponderRelayIndex,
  206. "vpnIp": h.vpnIp}).
  207. Info("send CreateRelayResponse")
  208. }
  209. return
  210. } else {
  211. // the target is not me. Create a relay to the target, from me.
  212. if !rm.GetAmRelay() {
  213. return
  214. }
  215. peer, err := rm.hostmap.QueryVpnIp(target)
  216. if err != nil {
  217. // Try to establish a connection to this host. If we get a future relay request,
  218. // we'll be ready!
  219. f.getOrHandshake(target)
  220. return
  221. }
  222. if peer.remote == nil {
  223. // Only create relays to peers for whom I have a direct connection
  224. return
  225. }
  226. sendCreateRequest := false
  227. var index uint32
  228. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  229. if ok {
  230. index = targetRelay.LocalIndex
  231. if targetRelay.State == Requested {
  232. sendCreateRequest = true
  233. }
  234. } else {
  235. // Allocate an index in the hostMap for this relay peer
  236. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  237. if err != nil {
  238. return
  239. }
  240. sendCreateRequest = true
  241. }
  242. if sendCreateRequest {
  243. // Send a CreateRelayRequest to the peer.
  244. req := NebulaControl{
  245. Type: NebulaControl_CreateRelayRequest,
  246. InitiatorRelayIndex: index,
  247. RelayFromIp: uint32(h.vpnIp),
  248. RelayToIp: uint32(target),
  249. }
  250. msg, err := req.Marshal()
  251. if err != nil {
  252. logMsg.
  253. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  254. } else {
  255. f.SendMessageToVpnIp(header.Control, 0, target, msg, make([]byte, 12), make([]byte, mtu))
  256. rm.l.WithFields(logrus.Fields{
  257. "relayFrom": iputil.VpnIp(req.RelayFromIp),
  258. "relayTo": iputil.VpnIp(req.RelayToIp),
  259. "initiatorRelayIndex": req.InitiatorRelayIndex,
  260. "responderRelayIndex": req.ResponderRelayIndex,
  261. "vpnIp": target}).
  262. Info("send CreateRelayRequest")
  263. }
  264. }
  265. // Also track the half-created Relay state just received
  266. relay, ok := h.relayState.QueryRelayForByIp(target)
  267. if !ok {
  268. // Add the relay
  269. state := Requested
  270. if targetRelay != nil && targetRelay.State == Established {
  271. state = Established
  272. }
  273. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  274. if err != nil {
  275. logMsg.
  276. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  277. return
  278. }
  279. } else {
  280. switch relay.State {
  281. case Established:
  282. if relay.RemoteIndex != m.InitiatorRelayIndex {
  283. // We got a brand new Relay request, because its index is different than what we saw before.
  284. // This should never happen. The peer should never change an index, once created.
  285. logMsg.WithFields(logrus.Fields{
  286. "existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  287. return
  288. }
  289. resp := NebulaControl{
  290. Type: NebulaControl_CreateRelayResponse,
  291. ResponderRelayIndex: relay.LocalIndex,
  292. InitiatorRelayIndex: relay.RemoteIndex,
  293. RelayFromIp: uint32(h.vpnIp),
  294. RelayToIp: uint32(target),
  295. }
  296. msg, err := resp.Marshal()
  297. if err != nil {
  298. rm.l.
  299. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  300. } else {
  301. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  302. rm.l.WithFields(logrus.Fields{
  303. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  304. "relayTo": iputil.VpnIp(resp.RelayToIp),
  305. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  306. "responderRelayIndex": resp.ResponderRelayIndex,
  307. "vpnIp": h.vpnIp}).
  308. Info("send CreateRelayResponse")
  309. }
  310. case Requested:
  311. // Keep waiting for the other relay to complete
  312. }
  313. }
  314. }
  315. }
  316. func (rm *relayManager) RemoveRelay(localIdx uint32) {
  317. rm.hostmap.RemoveRelay(localIdx)
  318. }