relay_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "github.com/sirupsen/logrus"
  8. "github.com/slackhq/nebula/config"
  9. "github.com/slackhq/nebula/header"
  10. "github.com/slackhq/nebula/iputil"
  11. )
  12. type relayManager struct {
  13. l *logrus.Logger
  14. hostmap *HostMap
  15. amRelay atomic.Bool
  16. }
  17. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  18. rm := &relayManager{
  19. l: l,
  20. hostmap: hostmap,
  21. }
  22. rm.reload(c, true)
  23. c.RegisterReloadCallback(func(c *config.C) {
  24. err := rm.reload(c, false)
  25. if err != nil {
  26. l.WithError(err).Error("Failed to reload relay_manager")
  27. }
  28. })
  29. return rm
  30. }
  31. func (rm *relayManager) reload(c *config.C, initial bool) error {
  32. if initial || c.HasChanged("relay.am_relay") {
  33. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  34. }
  35. return nil
  36. }
  37. func (rm *relayManager) GetAmRelay() bool {
  38. return rm.amRelay.Load()
  39. }
  40. func (rm *relayManager) setAmRelay(v bool) {
  41. rm.amRelay.Store(v)
  42. }
  43. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  44. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  45. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iputil.VpnIp, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  46. hm.Lock()
  47. defer hm.Unlock()
  48. for i := 0; i < 32; i++ {
  49. index, err := generateIndex(l)
  50. if err != nil {
  51. return 0, err
  52. }
  53. _, inRelays := hm.Relays[index]
  54. if !inRelays {
  55. // Avoid standing up a relay that can't be used since only the primary hostinfo
  56. // will be pointed to by the relay logic
  57. //TODO: if there was an existing primary and it had relay state, should we merge?
  58. hm.unlockedMakePrimary(relayHostInfo)
  59. hm.Relays[index] = relayHostInfo
  60. newRelay := Relay{
  61. Type: relayType,
  62. State: state,
  63. LocalIndex: index,
  64. PeerIp: vpnIp,
  65. }
  66. if remoteIdx != nil {
  67. newRelay.RemoteIndex = *remoteIdx
  68. }
  69. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  70. return index, nil
  71. }
  72. }
  73. return 0, errors.New("failed to generate unique localIndexId")
  74. }
  75. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  76. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  77. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  78. if !ok {
  79. rm.l.WithFields(logrus.Fields{"relay": relayHostInfo.vpnIp,
  80. "initiatorRelayIndex": m.InitiatorRelayIndex,
  81. "relayFrom": m.RelayFromIp,
  82. "relayTo": m.RelayToIp}).Info("relayManager failed to update relay")
  83. return nil, fmt.Errorf("unknown relay")
  84. }
  85. return relay, nil
  86. }
  87. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  88. switch m.Type {
  89. case NebulaControl_CreateRelayRequest:
  90. rm.handleCreateRelayRequest(h, f, m)
  91. case NebulaControl_CreateRelayResponse:
  92. rm.handleCreateRelayResponse(h, f, m)
  93. }
  94. }
  95. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  96. rm.l.WithFields(logrus.Fields{
  97. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  98. "relayTo": iputil.VpnIp(m.RelayToIp),
  99. "initiatorRelayIndex": m.InitiatorRelayIndex,
  100. "responderRelayIndex": m.ResponderRelayIndex,
  101. "vpnIp": h.vpnIp}).
  102. Info("handleCreateRelayResponse")
  103. target := iputil.VpnIp(m.RelayToIp)
  104. relay, err := rm.EstablishRelay(h, m)
  105. if err != nil {
  106. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  107. return
  108. }
  109. // Do I need to complete the relays now?
  110. if relay.Type == TerminalType {
  111. return
  112. }
  113. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  114. peerHostInfo, err := rm.hostmap.QueryVpnIp(relay.PeerIp)
  115. if err != nil {
  116. rm.l.WithError(err).WithField("relayTo", relay.PeerIp).Error("Can't find a HostInfo for peer")
  117. return
  118. }
  119. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(target)
  120. if !ok {
  121. rm.l.WithField("relayTo", peerHostInfo.vpnIp).Error("peerRelay does not have Relay state for relayTo")
  122. return
  123. }
  124. if peerRelay.State == PeerRequested {
  125. peerRelay.State = Established
  126. resp := NebulaControl{
  127. Type: NebulaControl_CreateRelayResponse,
  128. ResponderRelayIndex: peerRelay.LocalIndex,
  129. InitiatorRelayIndex: peerRelay.RemoteIndex,
  130. RelayFromIp: uint32(peerHostInfo.vpnIp),
  131. RelayToIp: uint32(target),
  132. }
  133. msg, err := resp.Marshal()
  134. if err != nil {
  135. rm.l.
  136. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  137. } else {
  138. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  139. rm.l.WithFields(logrus.Fields{
  140. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  141. "relayTo": iputil.VpnIp(resp.RelayToIp),
  142. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  143. "responderRelayIndex": resp.ResponderRelayIndex,
  144. "vpnIp": peerHostInfo.vpnIp}).
  145. Info("send CreateRelayResponse")
  146. }
  147. }
  148. }
  149. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  150. from := iputil.VpnIp(m.RelayFromIp)
  151. target := iputil.VpnIp(m.RelayToIp)
  152. logMsg := rm.l.WithFields(logrus.Fields{
  153. "relayFrom": from,
  154. "relayTo": target,
  155. "initiatorRelayIndex": m.InitiatorRelayIndex,
  156. "vpnIp": h.vpnIp})
  157. logMsg.Info("handleCreateRelayRequest")
  158. // Is the target of the relay me?
  159. if target == f.myVpnIp {
  160. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  161. if ok {
  162. switch existingRelay.State {
  163. case Requested:
  164. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  165. if !ok {
  166. logMsg.Error("Relay State not found")
  167. return
  168. }
  169. case Established:
  170. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  171. // We got a brand new Relay request, because its index is different than what we saw before.
  172. // This should never happen. The peer should never change an index, once created.
  173. logMsg.WithFields(logrus.Fields{
  174. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  175. return
  176. }
  177. }
  178. } else {
  179. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  180. if err != nil {
  181. logMsg.WithError(err).Error("Failed to add relay")
  182. return
  183. }
  184. }
  185. relay, ok := h.relayState.QueryRelayForByIp(from)
  186. if !ok {
  187. logMsg.Error("Relay State not found")
  188. return
  189. }
  190. resp := NebulaControl{
  191. Type: NebulaControl_CreateRelayResponse,
  192. ResponderRelayIndex: relay.LocalIndex,
  193. InitiatorRelayIndex: relay.RemoteIndex,
  194. RelayFromIp: uint32(from),
  195. RelayToIp: uint32(target),
  196. }
  197. msg, err := resp.Marshal()
  198. if err != nil {
  199. logMsg.
  200. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  201. } else {
  202. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  203. rm.l.WithFields(logrus.Fields{
  204. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  205. "relayTo": iputil.VpnIp(resp.RelayToIp),
  206. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  207. "responderRelayIndex": resp.ResponderRelayIndex,
  208. "vpnIp": h.vpnIp}).
  209. Info("send CreateRelayResponse")
  210. }
  211. return
  212. } else {
  213. // the target is not me. Create a relay to the target, from me.
  214. if !rm.GetAmRelay() {
  215. return
  216. }
  217. peer, err := rm.hostmap.QueryVpnIp(target)
  218. if err != nil {
  219. // Try to establish a connection to this host. If we get a future relay request,
  220. // we'll be ready!
  221. f.getOrHandshake(target)
  222. return
  223. }
  224. if peer.remote == nil {
  225. // Only create relays to peers for whom I have a direct connection
  226. return
  227. }
  228. sendCreateRequest := false
  229. var index uint32
  230. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  231. if ok {
  232. index = targetRelay.LocalIndex
  233. if targetRelay.State == Requested {
  234. sendCreateRequest = true
  235. }
  236. } else {
  237. // Allocate an index in the hostMap for this relay peer
  238. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  239. if err != nil {
  240. return
  241. }
  242. sendCreateRequest = true
  243. }
  244. if sendCreateRequest {
  245. // Send a CreateRelayRequest to the peer.
  246. req := NebulaControl{
  247. Type: NebulaControl_CreateRelayRequest,
  248. InitiatorRelayIndex: index,
  249. RelayFromIp: uint32(h.vpnIp),
  250. RelayToIp: uint32(target),
  251. }
  252. msg, err := req.Marshal()
  253. if err != nil {
  254. logMsg.
  255. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  256. } else {
  257. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  258. rm.l.WithFields(logrus.Fields{
  259. "relayFrom": iputil.VpnIp(req.RelayFromIp),
  260. "relayTo": iputil.VpnIp(req.RelayToIp),
  261. "initiatorRelayIndex": req.InitiatorRelayIndex,
  262. "responderRelayIndex": req.ResponderRelayIndex,
  263. "vpnIp": target}).
  264. Info("send CreateRelayRequest")
  265. }
  266. }
  267. // Also track the half-created Relay state just received
  268. relay, ok := h.relayState.QueryRelayForByIp(target)
  269. if !ok {
  270. // Add the relay
  271. state := PeerRequested
  272. if targetRelay != nil && targetRelay.State == Established {
  273. state = Established
  274. }
  275. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  276. if err != nil {
  277. logMsg.
  278. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  279. return
  280. }
  281. } else {
  282. switch relay.State {
  283. case Established:
  284. if relay.RemoteIndex != m.InitiatorRelayIndex {
  285. // We got a brand new Relay request, because its index is different than what we saw before.
  286. // This should never happen. The peer should never change an index, once created.
  287. logMsg.WithFields(logrus.Fields{
  288. "existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  289. return
  290. }
  291. resp := NebulaControl{
  292. Type: NebulaControl_CreateRelayResponse,
  293. ResponderRelayIndex: relay.LocalIndex,
  294. InitiatorRelayIndex: relay.RemoteIndex,
  295. RelayFromIp: uint32(h.vpnIp),
  296. RelayToIp: uint32(target),
  297. }
  298. msg, err := resp.Marshal()
  299. if err != nil {
  300. rm.l.
  301. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  302. } else {
  303. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  304. rm.l.WithFields(logrus.Fields{
  305. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  306. "relayTo": iputil.VpnIp(resp.RelayToIp),
  307. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  308. "responderRelayIndex": resp.ResponderRelayIndex,
  309. "vpnIp": h.vpnIp}).
  310. Info("send CreateRelayResponse")
  311. }
  312. case Requested:
  313. // Keep waiting for the other relay to complete
  314. }
  315. }
  316. }
  317. }
  318. func (rm *relayManager) RemoveRelay(localIdx uint32) {
  319. rm.hostmap.RemoveRelay(localIdx)
  320. }