3
0

relay_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package nebula
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "net/netip"
  8. "sync/atomic"
  9. "github.com/sirupsen/logrus"
  10. "github.com/slackhq/nebula/config"
  11. "github.com/slackhq/nebula/header"
  12. )
  13. type relayManager struct {
  14. l *logrus.Logger
  15. hostmap *HostMap
  16. amRelay atomic.Bool
  17. }
  18. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  19. rm := &relayManager{
  20. l: l,
  21. hostmap: hostmap,
  22. }
  23. rm.reload(c, true)
  24. c.RegisterReloadCallback(func(c *config.C) {
  25. err := rm.reload(c, false)
  26. if err != nil {
  27. l.WithError(err).Error("Failed to reload relay_manager")
  28. }
  29. })
  30. return rm
  31. }
  32. func (rm *relayManager) reload(c *config.C, initial bool) error {
  33. if initial || c.HasChanged("relay.am_relay") {
  34. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  35. }
  36. return nil
  37. }
  38. func (rm *relayManager) GetAmRelay() bool {
  39. return rm.amRelay.Load()
  40. }
  41. func (rm *relayManager) setAmRelay(v bool) {
  42. rm.amRelay.Store(v)
  43. }
  44. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  45. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  46. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp netip.Addr, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  47. hm.Lock()
  48. defer hm.Unlock()
  49. for i := 0; i < 32; i++ {
  50. index, err := generateIndex(l)
  51. if err != nil {
  52. return 0, err
  53. }
  54. _, inRelays := hm.Relays[index]
  55. if !inRelays {
  56. // Avoid standing up a relay that can't be used since only the primary hostinfo
  57. // will be pointed to by the relay logic
  58. //TODO: if there was an existing primary and it had relay state, should we merge?
  59. hm.unlockedMakePrimary(relayHostInfo)
  60. hm.Relays[index] = relayHostInfo
  61. newRelay := Relay{
  62. Type: relayType,
  63. State: state,
  64. LocalIndex: index,
  65. PeerIp: vpnIp,
  66. }
  67. if remoteIdx != nil {
  68. newRelay.RemoteIndex = *remoteIdx
  69. }
  70. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  71. return index, nil
  72. }
  73. }
  74. return 0, errors.New("failed to generate unique localIndexId")
  75. }
  76. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  77. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  78. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  79. if !ok {
  80. rm.l.WithFields(logrus.Fields{"relay": relayHostInfo.vpnIp,
  81. "initiatorRelayIndex": m.InitiatorRelayIndex,
  82. "relayFrom": m.RelayFromIp,
  83. "relayTo": m.RelayToIp}).Info("relayManager failed to update relay")
  84. return nil, fmt.Errorf("unknown relay")
  85. }
  86. return relay, nil
  87. }
  88. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  89. switch m.Type {
  90. case NebulaControl_CreateRelayRequest:
  91. rm.handleCreateRelayRequest(h, f, m)
  92. case NebulaControl_CreateRelayResponse:
  93. rm.handleCreateRelayResponse(h, f, m)
  94. }
  95. }
  96. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  97. rm.l.WithFields(logrus.Fields{
  98. "relayFrom": m.RelayFromIp,
  99. "relayTo": m.RelayToIp,
  100. "initiatorRelayIndex": m.InitiatorRelayIndex,
  101. "responderRelayIndex": m.ResponderRelayIndex,
  102. "vpnIp": h.vpnIp}).
  103. Info("handleCreateRelayResponse")
  104. target := m.RelayToIp
  105. //TODO: IPV6-WORK
  106. b := [4]byte{}
  107. binary.BigEndian.PutUint32(b[:], m.RelayToIp)
  108. targetAddr := netip.AddrFrom4(b)
  109. relay, err := rm.EstablishRelay(h, m)
  110. if err != nil {
  111. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  112. return
  113. }
  114. // Do I need to complete the relays now?
  115. if relay.Type == TerminalType {
  116. return
  117. }
  118. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  119. peerHostInfo := rm.hostmap.QueryVpnIp(relay.PeerIp)
  120. if peerHostInfo == nil {
  121. rm.l.WithField("relayTo", relay.PeerIp).Error("Can't find a HostInfo for peer")
  122. return
  123. }
  124. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(targetAddr)
  125. if !ok {
  126. rm.l.WithField("relayTo", peerHostInfo.vpnIp).Error("peerRelay does not have Relay state for relayTo")
  127. return
  128. }
  129. switch peerRelay.State {
  130. case Requested:
  131. // I initiated the request to this peer, but haven't heard back from the peer yet. I must wait for this peer
  132. // to respond to complete the connection.
  133. case PeerRequested, Disestablished, Established:
  134. peerHostInfo.relayState.UpdateRelayForByIpState(targetAddr, Established)
  135. //TODO: IPV6-WORK
  136. b = peerHostInfo.vpnIp.As4()
  137. resp := NebulaControl{
  138. Type: NebulaControl_CreateRelayResponse,
  139. ResponderRelayIndex: peerRelay.LocalIndex,
  140. InitiatorRelayIndex: peerRelay.RemoteIndex,
  141. RelayFromIp: binary.BigEndian.Uint32(b[:]),
  142. RelayToIp: uint32(target),
  143. }
  144. msg, err := resp.Marshal()
  145. if err != nil {
  146. rm.l.
  147. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  148. } else {
  149. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  150. rm.l.WithFields(logrus.Fields{
  151. "relayFrom": resp.RelayFromIp,
  152. "relayTo": resp.RelayToIp,
  153. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  154. "responderRelayIndex": resp.ResponderRelayIndex,
  155. "vpnIp": peerHostInfo.vpnIp}).
  156. Info("send CreateRelayResponse")
  157. }
  158. }
  159. }
  160. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  161. //TODO: IPV6-WORK
  162. b := [4]byte{}
  163. binary.BigEndian.PutUint32(b[:], m.RelayFromIp)
  164. from := netip.AddrFrom4(b)
  165. binary.BigEndian.PutUint32(b[:], m.RelayToIp)
  166. target := netip.AddrFrom4(b)
  167. logMsg := rm.l.WithFields(logrus.Fields{
  168. "relayFrom": from,
  169. "relayTo": target,
  170. "initiatorRelayIndex": m.InitiatorRelayIndex,
  171. "vpnIp": h.vpnIp})
  172. logMsg.Info("handleCreateRelayRequest")
  173. // Is the source of the relay me? This should never happen, but did happen due to
  174. // an issue migrating relays over to newly re-handshaked host info objects.
  175. if from == f.myVpnNet.Addr() {
  176. logMsg.WithField("myIP", from).Error("Discarding relay request from myself")
  177. return
  178. }
  179. // Is the target of the relay me?
  180. if target == f.myVpnNet.Addr() {
  181. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  182. if ok {
  183. switch existingRelay.State {
  184. case Requested:
  185. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  186. if !ok {
  187. logMsg.Error("Relay State not found")
  188. return
  189. }
  190. case Established:
  191. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  192. // We got a brand new Relay request, because its index is different than what we saw before.
  193. // This should never happen. The peer should never change an index, once created.
  194. logMsg.WithFields(logrus.Fields{
  195. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  196. return
  197. }
  198. case Disestablished:
  199. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  200. // We got a brand new Relay request, because its index is different than what we saw before.
  201. // This should never happen. The peer should never change an index, once created.
  202. logMsg.WithFields(logrus.Fields{
  203. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  204. return
  205. }
  206. // Mark the relay as 'Established' because it's safe to use again
  207. h.relayState.UpdateRelayForByIpState(from, Established)
  208. case PeerRequested:
  209. // I should never be in this state, because I am terminal, not forwarding.
  210. logMsg.WithFields(logrus.Fields{
  211. "existingRemoteIndex": existingRelay.RemoteIndex,
  212. "state": existingRelay.State}).Error("Unexpected Relay State found")
  213. }
  214. } else {
  215. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  216. if err != nil {
  217. logMsg.WithError(err).Error("Failed to add relay")
  218. return
  219. }
  220. }
  221. relay, ok := h.relayState.QueryRelayForByIp(from)
  222. if !ok {
  223. logMsg.WithField("from", from).Error("Relay State not found")
  224. return
  225. }
  226. //TODO: IPV6-WORK
  227. fromB := from.As4()
  228. targetB := target.As4()
  229. resp := NebulaControl{
  230. Type: NebulaControl_CreateRelayResponse,
  231. ResponderRelayIndex: relay.LocalIndex,
  232. InitiatorRelayIndex: relay.RemoteIndex,
  233. RelayFromIp: binary.BigEndian.Uint32(fromB[:]),
  234. RelayToIp: binary.BigEndian.Uint32(targetB[:]),
  235. }
  236. msg, err := resp.Marshal()
  237. if err != nil {
  238. logMsg.
  239. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  240. } else {
  241. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  242. rm.l.WithFields(logrus.Fields{
  243. //TODO: IPV6-WORK, this used to use the resp object but I am getting lazy now
  244. "relayFrom": from,
  245. "relayTo": target,
  246. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  247. "responderRelayIndex": resp.ResponderRelayIndex,
  248. "vpnIp": h.vpnIp}).
  249. Info("send CreateRelayResponse")
  250. }
  251. return
  252. } else {
  253. // the target is not me. Create a relay to the target, from me.
  254. if !rm.GetAmRelay() {
  255. return
  256. }
  257. peer := rm.hostmap.QueryVpnIp(target)
  258. if peer == nil {
  259. // Try to establish a connection to this host. If we get a future relay request,
  260. // we'll be ready!
  261. f.Handshake(target)
  262. return
  263. }
  264. if !peer.remote.IsValid() {
  265. // Only create relays to peers for whom I have a direct connection
  266. return
  267. }
  268. var index uint32
  269. var err error
  270. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  271. if ok {
  272. index = targetRelay.LocalIndex
  273. } else {
  274. // Allocate an index in the hostMap for this relay peer
  275. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  276. if err != nil {
  277. return
  278. }
  279. }
  280. peer.relayState.UpdateRelayForByIpState(from, Requested)
  281. // Send a CreateRelayRequest to the peer.
  282. fromB := from.As4()
  283. targetB := target.As4()
  284. req := NebulaControl{
  285. Type: NebulaControl_CreateRelayRequest,
  286. InitiatorRelayIndex: index,
  287. RelayFromIp: binary.BigEndian.Uint32(fromB[:]),
  288. RelayToIp: binary.BigEndian.Uint32(targetB[:]),
  289. }
  290. msg, err := req.Marshal()
  291. if err != nil {
  292. logMsg.
  293. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  294. } else {
  295. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  296. rm.l.WithFields(logrus.Fields{
  297. //TODO: IPV6-WORK another lazy used to use the req object
  298. "relayFrom": h.vpnIp,
  299. "relayTo": target,
  300. "initiatorRelayIndex": req.InitiatorRelayIndex,
  301. "responderRelayIndex": req.ResponderRelayIndex,
  302. "vpnAddr": target}).
  303. Info("send CreateRelayRequest")
  304. // Also track the half-created Relay state just received
  305. _, ok := h.relayState.QueryRelayForByIp(target)
  306. if !ok {
  307. // Add the relay
  308. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, PeerRequested)
  309. if err != nil {
  310. logMsg.
  311. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  312. return
  313. }
  314. }
  315. }
  316. }
  317. }