relay_manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. package nebula
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "net/netip"
  8. "sync/atomic"
  9. "github.com/sirupsen/logrus"
  10. "github.com/slackhq/nebula/cert"
  11. "github.com/slackhq/nebula/config"
  12. "github.com/slackhq/nebula/header"
  13. )
  14. type relayManager struct {
  15. l *logrus.Logger
  16. hostmap *HostMap
  17. amRelay atomic.Bool
  18. }
  19. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  20. rm := &relayManager{
  21. l: l,
  22. hostmap: hostmap,
  23. }
  24. rm.reload(c, true)
  25. c.RegisterReloadCallback(func(c *config.C) {
  26. err := rm.reload(c, false)
  27. if err != nil {
  28. l.WithError(err).Error("Failed to reload relay_manager")
  29. }
  30. })
  31. return rm
  32. }
  33. func (rm *relayManager) reload(c *config.C, initial bool) error {
  34. if initial || c.HasChanged("relay.am_relay") {
  35. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  36. }
  37. return nil
  38. }
  39. func (rm *relayManager) GetAmRelay() bool {
  40. return rm.amRelay.Load()
  41. }
  42. func (rm *relayManager) setAmRelay(v bool) {
  43. rm.amRelay.Store(v)
  44. }
  45. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  46. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  47. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp netip.Addr, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  48. hm.Lock()
  49. defer hm.Unlock()
  50. for i := 0; i < 32; i++ {
  51. index, err := generateIndex(l)
  52. if err != nil {
  53. return 0, err
  54. }
  55. _, inRelays := hm.Relays[index]
  56. if !inRelays {
  57. // Avoid standing up a relay that can't be used since only the primary hostinfo
  58. // will be pointed to by the relay logic
  59. //TODO: if there was an existing primary and it had relay state, should we merge?
  60. hm.unlockedMakePrimary(relayHostInfo)
  61. hm.Relays[index] = relayHostInfo
  62. newRelay := Relay{
  63. Type: relayType,
  64. State: state,
  65. LocalIndex: index,
  66. PeerAddr: vpnIp,
  67. }
  68. if remoteIdx != nil {
  69. newRelay.RemoteIndex = *remoteIdx
  70. }
  71. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  72. return index, nil
  73. }
  74. }
  75. return 0, errors.New("failed to generate unique localIndexId")
  76. }
  77. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  78. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  79. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  80. if !ok {
  81. //TODO: we need to handle possibly logging deprecated fields as well
  82. rm.l.WithFields(logrus.Fields{"relay": relayHostInfo.vpnAddrs[0],
  83. "initiatorRelayIndex": m.InitiatorRelayIndex,
  84. "relayFrom": m.RelayFromAddr,
  85. "relayTo": m.RelayToAddr}).Info("relayManager failed to update relay")
  86. return nil, fmt.Errorf("unknown relay")
  87. }
  88. return relay, nil
  89. }
  90. func (rm *relayManager) HandleControlMsg(h *HostInfo, d []byte, f *Interface) {
  91. msg := &NebulaControl{}
  92. err := msg.Unmarshal(d)
  93. if err != nil {
  94. h.logger(f.l).WithError(err).Error("Failed to unmarshal control message")
  95. return
  96. }
  97. var v cert.Version
  98. if msg.OldRelayFromAddr > 0 || msg.OldRelayToAddr > 0 {
  99. v = cert.Version1
  100. //TODO: yeah this is junk but maybe its less junky than the other options
  101. b := [4]byte{}
  102. binary.BigEndian.PutUint32(b[:], msg.OldRelayFromAddr)
  103. msg.RelayFromAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  104. binary.BigEndian.PutUint32(b[:], msg.OldRelayToAddr)
  105. msg.RelayToAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  106. } else {
  107. v = cert.Version2
  108. }
  109. switch msg.Type {
  110. case NebulaControl_CreateRelayRequest:
  111. rm.handleCreateRelayRequest(v, h, f, msg)
  112. case NebulaControl_CreateRelayResponse:
  113. rm.handleCreateRelayResponse(v, h, f, msg)
  114. }
  115. }
  116. func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  117. rm.l.WithFields(logrus.Fields{
  118. "relayFrom": m.RelayFromAddr,
  119. "relayTo": m.RelayToAddr,
  120. "initiatorRelayIndex": m.InitiatorRelayIndex,
  121. "responderRelayIndex": m.ResponderRelayIndex,
  122. "vpnAddrs": h.vpnAddrs}).
  123. Info("handleCreateRelayResponse")
  124. target := m.RelayToAddr
  125. targetAddr := protoAddrToNetAddr(target)
  126. relay, err := rm.EstablishRelay(h, m)
  127. if err != nil {
  128. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  129. return
  130. }
  131. // Do I need to complete the relays now?
  132. if relay.Type == TerminalType {
  133. return
  134. }
  135. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  136. peerHostInfo := rm.hostmap.QueryVpnAddr(relay.PeerAddr)
  137. if peerHostInfo == nil {
  138. rm.l.WithField("relayTo", relay.PeerAddr).Error("Can't find a HostInfo for peer")
  139. return
  140. }
  141. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(targetAddr)
  142. if !ok {
  143. rm.l.WithField("relayTo", peerHostInfo.vpnAddrs[0]).Error("peerRelay does not have Relay state for relayTo")
  144. return
  145. }
  146. if peerRelay.State == PeerRequested {
  147. peerRelay.State = Established
  148. resp := NebulaControl{
  149. Type: NebulaControl_CreateRelayResponse,
  150. ResponderRelayIndex: peerRelay.LocalIndex,
  151. InitiatorRelayIndex: peerRelay.RemoteIndex,
  152. }
  153. if v == cert.Version1 {
  154. peer := peerHostInfo.vpnAddrs[0]
  155. if !peer.Is4() {
  156. //TODO: log cant do it
  157. return
  158. }
  159. b := peer.As4()
  160. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  161. b = targetAddr.As4()
  162. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  163. } else {
  164. resp.RelayFromAddr = netAddrToProtoAddr(peerHostInfo.vpnAddrs[0])
  165. resp.RelayToAddr = target
  166. }
  167. msg, err := resp.Marshal()
  168. if err != nil {
  169. rm.l.WithError(err).
  170. Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  171. } else {
  172. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  173. rm.l.WithFields(logrus.Fields{
  174. "relayFrom": resp.RelayFromAddr,
  175. "relayTo": resp.RelayToAddr,
  176. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  177. "responderRelayIndex": resp.ResponderRelayIndex,
  178. "vpnAddrs": peerHostInfo.vpnAddrs}).
  179. Info("send CreateRelayResponse")
  180. }
  181. }
  182. }
  183. func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  184. from := protoAddrToNetAddr(m.RelayFromAddr)
  185. target := protoAddrToNetAddr(m.RelayToAddr)
  186. logMsg := rm.l.WithFields(logrus.Fields{
  187. "relayFrom": from,
  188. "relayTo": target,
  189. "initiatorRelayIndex": m.InitiatorRelayIndex,
  190. "vpnAddrs": h.vpnAddrs})
  191. logMsg.Info("handleCreateRelayRequest")
  192. // Is the source of the relay me? This should never happen, but did happen due to
  193. // an issue migrating relays over to newly re-handshaked host info objects.
  194. _, found := f.myVpnAddrsTable.Lookup(from)
  195. if found {
  196. logMsg.WithField("myIP", from).Error("Discarding relay request from myself")
  197. return
  198. }
  199. // Is the target of the relay me?
  200. _, found = f.myVpnAddrsTable.Lookup(target)
  201. if found {
  202. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  203. if ok {
  204. switch existingRelay.State {
  205. case Requested:
  206. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  207. if !ok {
  208. logMsg.Error("Relay State not found")
  209. return
  210. }
  211. case Established:
  212. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  213. // We got a brand new Relay request, because its index is different than what we saw before.
  214. // This should never happen. The peer should never change an index, once created.
  215. logMsg.WithFields(logrus.Fields{
  216. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  217. return
  218. }
  219. }
  220. } else {
  221. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  222. if err != nil {
  223. logMsg.WithError(err).Error("Failed to add relay")
  224. return
  225. }
  226. }
  227. relay, ok := h.relayState.QueryRelayForByIp(from)
  228. if !ok {
  229. logMsg.Error("Relay State not found")
  230. return
  231. }
  232. resp := NebulaControl{
  233. Type: NebulaControl_CreateRelayResponse,
  234. ResponderRelayIndex: relay.LocalIndex,
  235. InitiatorRelayIndex: relay.RemoteIndex,
  236. }
  237. if v == cert.Version1 {
  238. b := from.As4()
  239. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  240. b = target.As4()
  241. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  242. } else {
  243. resp.RelayFromAddr = netAddrToProtoAddr(from)
  244. resp.RelayToAddr = netAddrToProtoAddr(target)
  245. }
  246. msg, err := resp.Marshal()
  247. if err != nil {
  248. logMsg.
  249. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  250. } else {
  251. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  252. rm.l.WithFields(logrus.Fields{
  253. //TODO: IPV6-WORK, this used to use the resp object but I am getting lazy now
  254. "relayFrom": from,
  255. "relayTo": target,
  256. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  257. "responderRelayIndex": resp.ResponderRelayIndex,
  258. "vpnAddrs": h.vpnAddrs}).
  259. Info("send CreateRelayResponse")
  260. }
  261. return
  262. } else {
  263. // the target is not me. Create a relay to the target, from me.
  264. if !rm.GetAmRelay() {
  265. return
  266. }
  267. peer := rm.hostmap.QueryVpnAddr(target)
  268. if peer == nil {
  269. // Try to establish a connection to this host. If we get a future relay request,
  270. // we'll be ready!
  271. f.Handshake(target)
  272. return
  273. }
  274. if !peer.remote.IsValid() {
  275. // Only create relays to peers for whom I have a direct connection
  276. return
  277. }
  278. sendCreateRequest := false
  279. var index uint32
  280. var err error
  281. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  282. if ok {
  283. index = targetRelay.LocalIndex
  284. if targetRelay.State == Requested {
  285. sendCreateRequest = true
  286. }
  287. } else {
  288. // Allocate an index in the hostMap for this relay peer
  289. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  290. if err != nil {
  291. return
  292. }
  293. sendCreateRequest = true
  294. }
  295. if sendCreateRequest {
  296. // Send a CreateRelayRequest to the peer.
  297. req := NebulaControl{
  298. Type: NebulaControl_CreateRelayRequest,
  299. InitiatorRelayIndex: index,
  300. }
  301. if v == cert.Version1 {
  302. if !h.vpnAddrs[0].Is4() {
  303. //TODO: log it
  304. return
  305. }
  306. b := h.vpnAddrs[0].As4()
  307. req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  308. b = target.As4()
  309. req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  310. } else {
  311. req.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
  312. req.RelayToAddr = netAddrToProtoAddr(target)
  313. }
  314. msg, err := req.Marshal()
  315. if err != nil {
  316. logMsg.
  317. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  318. } else {
  319. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  320. rm.l.WithFields(logrus.Fields{
  321. //TODO: IPV6-WORK another lazy used to use the req object
  322. "relayFrom": h.vpnAddrs[0],
  323. "relayTo": target,
  324. "initiatorRelayIndex": req.InitiatorRelayIndex,
  325. "responderRelayIndex": req.ResponderRelayIndex,
  326. "vpnAddr": target}).
  327. Info("send CreateRelayRequest")
  328. }
  329. }
  330. // Also track the half-created Relay state just received
  331. relay, ok := h.relayState.QueryRelayForByIp(target)
  332. if !ok {
  333. // Add the relay
  334. state := PeerRequested
  335. if targetRelay != nil && targetRelay.State == Established {
  336. state = Established
  337. }
  338. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  339. if err != nil {
  340. logMsg.
  341. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  342. return
  343. }
  344. } else {
  345. switch relay.State {
  346. case Established:
  347. if relay.RemoteIndex != m.InitiatorRelayIndex {
  348. // We got a brand new Relay request, because its index is different than what we saw before.
  349. // This should never happen. The peer should never change an index, once created.
  350. logMsg.WithFields(logrus.Fields{
  351. "existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  352. return
  353. }
  354. resp := NebulaControl{
  355. Type: NebulaControl_CreateRelayResponse,
  356. ResponderRelayIndex: relay.LocalIndex,
  357. InitiatorRelayIndex: relay.RemoteIndex,
  358. }
  359. if v == cert.Version1 {
  360. if !h.vpnAddrs[0].Is4() {
  361. //TODO: log it
  362. return
  363. }
  364. b := h.vpnAddrs[0].As4()
  365. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  366. b = target.As4()
  367. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  368. } else {
  369. resp.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
  370. resp.RelayToAddr = netAddrToProtoAddr(target)
  371. }
  372. msg, err := resp.Marshal()
  373. if err != nil {
  374. rm.l.
  375. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  376. } else {
  377. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  378. rm.l.WithFields(logrus.Fields{
  379. //TODO: IPV6-WORK more lazy, used to use resp object
  380. "relayFrom": h.vpnAddrs[0],
  381. "relayTo": target,
  382. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  383. "responderRelayIndex": resp.ResponderRelayIndex,
  384. "vpnAddrs": h.vpnAddrs}).
  385. Info("send CreateRelayResponse")
  386. }
  387. case Requested:
  388. // Keep waiting for the other relay to complete
  389. }
  390. }
  391. }
  392. }