3
0

relay_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "github.com/sirupsen/logrus"
  8. "github.com/slackhq/nebula/config"
  9. "github.com/slackhq/nebula/header"
  10. "github.com/slackhq/nebula/iputil"
  11. )
  12. type relayManager struct {
  13. l *logrus.Logger
  14. hostmap *HostMap
  15. amRelay atomic.Bool
  16. }
  17. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  18. rm := &relayManager{
  19. l: l,
  20. hostmap: hostmap,
  21. }
  22. rm.reload(c, true)
  23. c.RegisterReloadCallback(func(c *config.C) {
  24. err := rm.reload(c, false)
  25. if err != nil {
  26. l.WithError(err).Error("Failed to reload relay_manager")
  27. }
  28. })
  29. return rm
  30. }
  31. func (rm *relayManager) reload(c *config.C, initial bool) error {
  32. if initial || c.HasChanged("relay.am_relay") {
  33. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  34. }
  35. return nil
  36. }
  37. func (rm *relayManager) GetAmRelay() bool {
  38. return rm.amRelay.Load()
  39. }
  40. func (rm *relayManager) setAmRelay(v bool) {
  41. rm.amRelay.Store(v)
  42. }
  43. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  44. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  45. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iputil.VpnIp, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  46. hm.Lock()
  47. defer hm.Unlock()
  48. for i := 0; i < 32; i++ {
  49. index, err := generateIndex(l)
  50. if err != nil {
  51. return 0, err
  52. }
  53. _, inRelays := hm.Relays[index]
  54. if !inRelays {
  55. // Avoid standing up a relay that can't be used since only the primary hostinfo
  56. // will be pointed to by the relay logic
  57. //TODO: if there was an existing primary and it had relay state, should we merge?
  58. hm.unlockedMakePrimary(relayHostInfo)
  59. hm.Relays[index] = relayHostInfo
  60. newRelay := Relay{
  61. Type: relayType,
  62. State: state,
  63. LocalIndex: index,
  64. PeerIp: vpnIp,
  65. }
  66. if remoteIdx != nil {
  67. newRelay.RemoteIndex = *remoteIdx
  68. }
  69. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  70. return index, nil
  71. }
  72. }
  73. return 0, errors.New("failed to generate unique localIndexId")
  74. }
  75. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  76. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  77. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  78. if !ok {
  79. rm.l.WithFields(logrus.Fields{"relay": relayHostInfo.vpnIp,
  80. "initiatorRelayIndex": m.InitiatorRelayIndex,
  81. "relayFrom": m.RelayFromIp,
  82. "relayTo": m.RelayToIp}).Info("relayManager failed to update relay")
  83. return nil, fmt.Errorf("unknown relay")
  84. }
  85. return relay, nil
  86. }
  87. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  88. switch m.Type {
  89. case NebulaControl_CreateRelayRequest:
  90. rm.handleCreateRelayRequest(h, f, m)
  91. case NebulaControl_CreateRelayResponse:
  92. rm.handleCreateRelayResponse(h, f, m)
  93. }
  94. }
  95. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  96. rm.l.WithFields(logrus.Fields{
  97. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  98. "relayTo": iputil.VpnIp(m.RelayToIp),
  99. "initiatorRelayIndex": m.InitiatorRelayIndex,
  100. "responderRelayIndex": m.ResponderRelayIndex,
  101. "vpnIp": h.vpnIp}).
  102. Info("handleCreateRelayResponse")
  103. target := iputil.VpnIp(m.RelayToIp)
  104. relay, err := rm.EstablishRelay(h, m)
  105. if err != nil {
  106. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  107. return
  108. }
  109. // Do I need to complete the relays now?
  110. if relay.Type == TerminalType {
  111. return
  112. }
  113. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  114. peerHostInfo := rm.hostmap.QueryVpnIp(relay.PeerIp)
  115. if peerHostInfo == nil {
  116. rm.l.WithField("relayTo", relay.PeerIp).Error("Can't find a HostInfo for peer")
  117. return
  118. }
  119. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(target)
  120. if !ok {
  121. rm.l.WithField("relayTo", peerHostInfo.vpnIp).Error("peerRelay does not have Relay state for relayTo")
  122. return
  123. }
  124. if peerRelay.State == PeerRequested {
  125. peerRelay.State = Established
  126. resp := NebulaControl{
  127. Type: NebulaControl_CreateRelayResponse,
  128. ResponderRelayIndex: peerRelay.LocalIndex,
  129. InitiatorRelayIndex: peerRelay.RemoteIndex,
  130. RelayFromIp: uint32(peerHostInfo.vpnIp),
  131. RelayToIp: uint32(target),
  132. }
  133. msg, err := resp.Marshal()
  134. if err != nil {
  135. rm.l.
  136. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  137. } else {
  138. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  139. rm.l.WithFields(logrus.Fields{
  140. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  141. "relayTo": iputil.VpnIp(resp.RelayToIp),
  142. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  143. "responderRelayIndex": resp.ResponderRelayIndex,
  144. "vpnIp": peerHostInfo.vpnIp}).
  145. Info("send CreateRelayResponse")
  146. }
  147. }
  148. }
  149. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  150. from := iputil.VpnIp(m.RelayFromIp)
  151. target := iputil.VpnIp(m.RelayToIp)
  152. logMsg := rm.l.WithFields(logrus.Fields{
  153. "relayFrom": from,
  154. "relayTo": target,
  155. "initiatorRelayIndex": m.InitiatorRelayIndex,
  156. "vpnIp": h.vpnIp})
  157. logMsg.Info("handleCreateRelayRequest")
  158. // Is the source of the relay me? This should never happen, but did happen due to
  159. // an issue migrating relays over to newly re-handshaked host info objects.
  160. if from == f.myVpnIp {
  161. logMsg.WithField("myIP", f.myVpnIp).Error("Discarding relay request from myself")
  162. return
  163. }
  164. // Is the target of the relay me?
  165. if target == f.myVpnIp {
  166. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  167. if ok {
  168. switch existingRelay.State {
  169. case Requested:
  170. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  171. if !ok {
  172. logMsg.Error("Relay State not found")
  173. return
  174. }
  175. case Established:
  176. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  177. // We got a brand new Relay request, because its index is different than what we saw before.
  178. // This should never happen. The peer should never change an index, once created.
  179. logMsg.WithFields(logrus.Fields{
  180. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  181. return
  182. }
  183. }
  184. } else {
  185. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  186. if err != nil {
  187. logMsg.WithError(err).Error("Failed to add relay")
  188. return
  189. }
  190. }
  191. relay, ok := h.relayState.QueryRelayForByIp(from)
  192. if !ok {
  193. logMsg.Error("Relay State not found")
  194. return
  195. }
  196. resp := NebulaControl{
  197. Type: NebulaControl_CreateRelayResponse,
  198. ResponderRelayIndex: relay.LocalIndex,
  199. InitiatorRelayIndex: relay.RemoteIndex,
  200. RelayFromIp: uint32(from),
  201. RelayToIp: uint32(target),
  202. }
  203. msg, err := resp.Marshal()
  204. if err != nil {
  205. logMsg.
  206. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  207. } else {
  208. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  209. rm.l.WithFields(logrus.Fields{
  210. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  211. "relayTo": iputil.VpnIp(resp.RelayToIp),
  212. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  213. "responderRelayIndex": resp.ResponderRelayIndex,
  214. "vpnIp": h.vpnIp}).
  215. Info("send CreateRelayResponse")
  216. }
  217. return
  218. } else {
  219. // the target is not me. Create a relay to the target, from me.
  220. if !rm.GetAmRelay() {
  221. return
  222. }
  223. peer := rm.hostmap.QueryVpnIp(target)
  224. if peer == nil {
  225. // Try to establish a connection to this host. If we get a future relay request,
  226. // we'll be ready!
  227. f.Handshake(target)
  228. return
  229. }
  230. if peer.remote == nil {
  231. // Only create relays to peers for whom I have a direct connection
  232. return
  233. }
  234. sendCreateRequest := false
  235. var index uint32
  236. var err error
  237. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  238. if ok {
  239. index = targetRelay.LocalIndex
  240. if targetRelay.State == Requested {
  241. sendCreateRequest = true
  242. }
  243. } else {
  244. // Allocate an index in the hostMap for this relay peer
  245. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  246. if err != nil {
  247. return
  248. }
  249. sendCreateRequest = true
  250. }
  251. if sendCreateRequest {
  252. // Send a CreateRelayRequest to the peer.
  253. req := NebulaControl{
  254. Type: NebulaControl_CreateRelayRequest,
  255. InitiatorRelayIndex: index,
  256. RelayFromIp: uint32(h.vpnIp),
  257. RelayToIp: uint32(target),
  258. }
  259. msg, err := req.Marshal()
  260. if err != nil {
  261. logMsg.
  262. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  263. } else {
  264. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  265. rm.l.WithFields(logrus.Fields{
  266. "relayFrom": iputil.VpnIp(req.RelayFromIp),
  267. "relayTo": iputil.VpnIp(req.RelayToIp),
  268. "initiatorRelayIndex": req.InitiatorRelayIndex,
  269. "responderRelayIndex": req.ResponderRelayIndex,
  270. "vpnIp": target}).
  271. Info("send CreateRelayRequest")
  272. }
  273. }
  274. // Also track the half-created Relay state just received
  275. relay, ok := h.relayState.QueryRelayForByIp(target)
  276. if !ok {
  277. // Add the relay
  278. state := PeerRequested
  279. if targetRelay != nil && targetRelay.State == Established {
  280. state = Established
  281. }
  282. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  283. if err != nil {
  284. logMsg.
  285. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  286. return
  287. }
  288. } else {
  289. switch relay.State {
  290. case Established:
  291. if relay.RemoteIndex != m.InitiatorRelayIndex {
  292. // We got a brand new Relay request, because its index is different than what we saw before.
  293. // This should never happen. The peer should never change an index, once created.
  294. logMsg.WithFields(logrus.Fields{
  295. "existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  296. return
  297. }
  298. resp := NebulaControl{
  299. Type: NebulaControl_CreateRelayResponse,
  300. ResponderRelayIndex: relay.LocalIndex,
  301. InitiatorRelayIndex: relay.RemoteIndex,
  302. RelayFromIp: uint32(h.vpnIp),
  303. RelayToIp: uint32(target),
  304. }
  305. msg, err := resp.Marshal()
  306. if err != nil {
  307. rm.l.
  308. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  309. } else {
  310. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  311. rm.l.WithFields(logrus.Fields{
  312. "relayFrom": iputil.VpnIp(resp.RelayFromIp),
  313. "relayTo": iputil.VpnIp(resp.RelayToIp),
  314. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  315. "responderRelayIndex": resp.ResponderRelayIndex,
  316. "vpnIp": h.vpnIp}).
  317. Info("send CreateRelayResponse")
  318. }
  319. case Requested:
  320. // Keep waiting for the other relay to complete
  321. }
  322. }
  323. }
  324. }
  325. func (rm *relayManager) RemoveRelay(localIdx uint32) {
  326. rm.hostmap.RemoveRelay(localIdx)
  327. }