relay_manager.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package nebula
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "net/netip"
  8. "sync/atomic"
  9. "github.com/sirupsen/logrus"
  10. "github.com/slackhq/nebula/cert"
  11. "github.com/slackhq/nebula/config"
  12. "github.com/slackhq/nebula/header"
  13. )
  14. type relayManager struct {
  15. l *logrus.Logger
  16. hostmap *HostMap
  17. amRelay atomic.Bool
  18. }
  19. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  20. rm := &relayManager{
  21. l: l,
  22. hostmap: hostmap,
  23. }
  24. rm.reload(c, true)
  25. c.RegisterReloadCallback(func(c *config.C) {
  26. err := rm.reload(c, false)
  27. if err != nil {
  28. l.WithError(err).Error("Failed to reload relay_manager")
  29. }
  30. })
  31. return rm
  32. }
  33. func (rm *relayManager) reload(c *config.C, initial bool) error {
  34. if initial || c.HasChanged("relay.am_relay") {
  35. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  36. }
  37. return nil
  38. }
  39. func (rm *relayManager) GetAmRelay() bool {
  40. return rm.amRelay.Load()
  41. }
  42. func (rm *relayManager) setAmRelay(v bool) {
  43. rm.amRelay.Store(v)
  44. }
  45. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  46. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  47. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp netip.Addr, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  48. hm.Lock()
  49. defer hm.Unlock()
  50. for i := 0; i < 32; i++ {
  51. index, err := generateIndex(l)
  52. if err != nil {
  53. return 0, err
  54. }
  55. _, inRelays := hm.Relays[index]
  56. if !inRelays {
  57. // Avoid standing up a relay that can't be used since only the primary hostinfo
  58. // will be pointed to by the relay logic
  59. //TODO: if there was an existing primary and it had relay state, should we merge?
  60. hm.unlockedMakePrimary(relayHostInfo)
  61. hm.Relays[index] = relayHostInfo
  62. newRelay := Relay{
  63. Type: relayType,
  64. State: state,
  65. LocalIndex: index,
  66. PeerAddr: vpnIp,
  67. }
  68. if remoteIdx != nil {
  69. newRelay.RemoteIndex = *remoteIdx
  70. }
  71. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  72. return index, nil
  73. }
  74. }
  75. return 0, errors.New("failed to generate unique localIndexId")
  76. }
  77. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  78. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  79. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  80. if !ok {
  81. fields := logrus.Fields{
  82. "relay": relayHostInfo.vpnAddrs[0],
  83. "initiatorRelayIndex": m.InitiatorRelayIndex,
  84. }
  85. if m.RelayFromAddr == nil {
  86. fields["relayFrom"] = m.OldRelayFromAddr
  87. } else {
  88. fields["relayFrom"] = m.RelayFromAddr
  89. }
  90. if m.RelayToAddr == nil {
  91. fields["relayTo"] = m.OldRelayToAddr
  92. } else {
  93. fields["relayTo"] = m.RelayToAddr
  94. }
  95. rm.l.WithFields(fields).Info("relayManager failed to update relay")
  96. return nil, fmt.Errorf("unknown relay")
  97. }
  98. return relay, nil
  99. }
  100. func (rm *relayManager) HandleControlMsg(h *HostInfo, d []byte, f *Interface) {
  101. msg := &NebulaControl{}
  102. err := msg.Unmarshal(d)
  103. if err != nil {
  104. h.logger(f.l).WithError(err).Error("Failed to unmarshal control message")
  105. return
  106. }
  107. var v cert.Version
  108. if msg.OldRelayFromAddr > 0 || msg.OldRelayToAddr > 0 {
  109. v = cert.Version1
  110. b := [4]byte{}
  111. binary.BigEndian.PutUint32(b[:], msg.OldRelayFromAddr)
  112. msg.RelayFromAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  113. binary.BigEndian.PutUint32(b[:], msg.OldRelayToAddr)
  114. msg.RelayToAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  115. } else {
  116. v = cert.Version2
  117. }
  118. switch msg.Type {
  119. case NebulaControl_CreateRelayRequest:
  120. rm.handleCreateRelayRequest(v, h, f, msg)
  121. case NebulaControl_CreateRelayResponse:
  122. rm.handleCreateRelayResponse(v, h, f, msg)
  123. }
  124. }
  125. func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  126. rm.l.WithFields(logrus.Fields{
  127. "relayFrom": protoAddrToNetAddr(m.RelayFromAddr),
  128. "relayTo": protoAddrToNetAddr(m.RelayToAddr),
  129. "initiatorRelayIndex": m.InitiatorRelayIndex,
  130. "responderRelayIndex": m.ResponderRelayIndex,
  131. "vpnAddrs": h.vpnAddrs}).
  132. Info("handleCreateRelayResponse")
  133. //peer == relayFrom
  134. //target == relayTo
  135. target := m.RelayToAddr
  136. targetAddr := protoAddrToNetAddr(target)
  137. relay, err := rm.EstablishRelay(h, m)
  138. if err != nil {
  139. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  140. return
  141. }
  142. // Do I need to complete the relays now?
  143. if relay.Type == TerminalType {
  144. return
  145. }
  146. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  147. peerHostInfo := rm.hostmap.QueryVpnAddr(relay.PeerAddr)
  148. if peerHostInfo == nil {
  149. rm.l.WithField("relayTo", relay.PeerAddr).Error("Can't find a HostInfo for peer")
  150. return
  151. }
  152. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(targetAddr)
  153. if !ok {
  154. rm.l.WithField("relayTo", peerHostInfo.vpnAddrs[0]).Error("peerRelay does not have Relay state for relayTo")
  155. return
  156. }
  157. switch peerRelay.State {
  158. case Requested:
  159. // I initiated the request to this peer, but haven't heard back from the peer yet. I must wait for this peer
  160. // to respond to complete the connection.
  161. case PeerRequested, Disestablished, Established:
  162. peerHostInfo.relayState.UpdateRelayForByIpState(targetAddr, Established)
  163. resp := NebulaControl{
  164. Type: NebulaControl_CreateRelayResponse,
  165. ResponderRelayIndex: peerRelay.LocalIndex,
  166. InitiatorRelayIndex: peerRelay.RemoteIndex,
  167. }
  168. relayFrom := h.vpnAddrs[0]
  169. if v == cert.Version1 {
  170. peer := peerHostInfo.vpnAddrs[0]
  171. if !peer.Is4() {
  172. rm.l.WithField("relayFrom", peer).
  173. WithField("relayTo", targetAddr).
  174. WithField("initiatorRelayIndex", resp.InitiatorRelayIndex).
  175. WithField("responderRelayIndex", resp.ResponderRelayIndex).
  176. WithField("vpnAddrs", peerHostInfo.vpnAddrs).
  177. Error("Refusing to CreateRelayResponse for a v1 relay with an ipv6 address")
  178. return
  179. }
  180. b := peer.As4()
  181. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  182. b = targetAddr.As4()
  183. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  184. } else {
  185. ok = false
  186. peerNetworks := h.GetCert().Certificate.Networks()
  187. for i := range peerNetworks {
  188. if peerNetworks[i].Contains(targetAddr) {
  189. relayFrom = peerNetworks[i].Addr()
  190. ok = true
  191. break
  192. }
  193. }
  194. if !ok {
  195. rm.l.WithFields(logrus.Fields{"from": f.myVpnNetworks, "to": targetAddr}).
  196. Error("cannot establish relay, no networks in common")
  197. return
  198. }
  199. resp.RelayFromAddr = netAddrToProtoAddr(relayFrom)
  200. resp.RelayToAddr = target
  201. }
  202. msg, err := resp.Marshal()
  203. if err != nil {
  204. rm.l.WithError(err).
  205. Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  206. } else {
  207. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  208. rm.l.WithFields(logrus.Fields{
  209. "relayFrom": relayFrom,
  210. "relayTo": targetAddr,
  211. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  212. "responderRelayIndex": resp.ResponderRelayIndex,
  213. "vpnAddrs": peerHostInfo.vpnAddrs}).
  214. Info("send CreateRelayResponse")
  215. }
  216. }
  217. }
  218. func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  219. from := protoAddrToNetAddr(m.RelayFromAddr)
  220. target := protoAddrToNetAddr(m.RelayToAddr)
  221. logMsg := rm.l.WithFields(logrus.Fields{
  222. "relayFrom": from,
  223. "relayTo": target,
  224. "initiatorRelayIndex": m.InitiatorRelayIndex,
  225. "vpnAddrs": h.vpnAddrs})
  226. logMsg.Info("handleCreateRelayRequest")
  227. // Is the source of the relay me? This should never happen, but did happen due to
  228. // an issue migrating relays over to newly re-handshaked host info objects.
  229. if f.myVpnAddrsTable.Contains(from) {
  230. logMsg.WithField("myIP", from).Error("Discarding relay request from myself")
  231. return
  232. }
  233. // Is the target of the relay me?
  234. if f.myVpnAddrsTable.Contains(target) {
  235. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  236. if ok {
  237. switch existingRelay.State {
  238. case Requested:
  239. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  240. if !ok {
  241. logMsg.Error("Relay State not found")
  242. return
  243. }
  244. case Established:
  245. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  246. // We got a brand new Relay request, because its index is different than what we saw before.
  247. // This should never happen. The peer should never change an index, once created.
  248. logMsg.WithFields(logrus.Fields{
  249. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  250. return
  251. }
  252. case Disestablished:
  253. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  254. // We got a brand new Relay request, because its index is different than what we saw before.
  255. // This should never happen. The peer should never change an index, once created.
  256. logMsg.WithFields(logrus.Fields{
  257. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  258. return
  259. }
  260. // Mark the relay as 'Established' because it's safe to use again
  261. h.relayState.UpdateRelayForByIpState(from, Established)
  262. case PeerRequested:
  263. // I should never be in this state, because I am terminal, not forwarding.
  264. logMsg.WithFields(logrus.Fields{
  265. "existingRemoteIndex": existingRelay.RemoteIndex,
  266. "state": existingRelay.State}).Error("Unexpected Relay State found")
  267. }
  268. } else {
  269. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  270. if err != nil {
  271. logMsg.WithError(err).Error("Failed to add relay")
  272. return
  273. }
  274. }
  275. relay, ok := h.relayState.QueryRelayForByIp(from)
  276. if !ok {
  277. logMsg.WithField("from", from).Error("Relay State not found")
  278. return
  279. }
  280. resp := NebulaControl{
  281. Type: NebulaControl_CreateRelayResponse,
  282. ResponderRelayIndex: relay.LocalIndex,
  283. InitiatorRelayIndex: relay.RemoteIndex,
  284. }
  285. if v == cert.Version1 {
  286. b := from.As4()
  287. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  288. b = target.As4()
  289. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  290. } else {
  291. resp.RelayFromAddr = netAddrToProtoAddr(from)
  292. resp.RelayToAddr = netAddrToProtoAddr(target)
  293. }
  294. msg, err := resp.Marshal()
  295. if err != nil {
  296. logMsg.WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  297. } else {
  298. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  299. rm.l.WithFields(logrus.Fields{
  300. "relayFrom": from,
  301. "relayTo": target,
  302. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  303. "responderRelayIndex": resp.ResponderRelayIndex,
  304. "vpnAddrs": h.vpnAddrs}).
  305. Info("send CreateRelayResponse")
  306. }
  307. return
  308. } else {
  309. // the target is not me. Create a relay to the target, from me.
  310. if !rm.GetAmRelay() {
  311. return
  312. }
  313. peer := rm.hostmap.QueryVpnAddr(target)
  314. if peer == nil {
  315. // Try to establish a connection to this host. If we get a future relay request,
  316. // we'll be ready!
  317. f.Handshake(target)
  318. return
  319. }
  320. if !peer.remote.IsValid() {
  321. // Only create relays to peers for whom I have a direct connection
  322. return
  323. }
  324. var index uint32
  325. var err error
  326. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  327. if ok {
  328. index = targetRelay.LocalIndex
  329. } else {
  330. // Allocate an index in the hostMap for this relay peer
  331. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  332. if err != nil {
  333. return
  334. }
  335. }
  336. peer.relayState.UpdateRelayForByIpState(from, Requested)
  337. // Send a CreateRelayRequest to the peer.
  338. req := NebulaControl{
  339. Type: NebulaControl_CreateRelayRequest,
  340. InitiatorRelayIndex: index,
  341. }
  342. relayFrom := h.vpnAddrs[0]
  343. if v == cert.Version1 {
  344. if !relayFrom.Is4() {
  345. rm.l.WithField("relayFrom", relayFrom).
  346. WithField("relayTo", target).
  347. WithField("initiatorRelayIndex", req.InitiatorRelayIndex).
  348. WithField("responderRelayIndex", req.ResponderRelayIndex).
  349. WithField("vpnAddr", target).
  350. Error("Refusing to CreateRelayRequest for a v1 relay with an ipv6 address")
  351. return
  352. }
  353. b := relayFrom.As4()
  354. req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  355. b = target.As4()
  356. req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  357. } else {
  358. ok = false
  359. peerNetworks := h.GetCert().Certificate.Networks()
  360. for i := range peerNetworks {
  361. if peerNetworks[i].Contains(target) {
  362. relayFrom = peerNetworks[i].Addr()
  363. ok = true
  364. break
  365. }
  366. }
  367. if !ok {
  368. rm.l.WithFields(logrus.Fields{"from": f.myVpnNetworks, "to": target}).
  369. Error("cannot establish relay, no networks in common")
  370. return
  371. }
  372. req.RelayFromAddr = netAddrToProtoAddr(relayFrom)
  373. req.RelayToAddr = netAddrToProtoAddr(target)
  374. }
  375. msg, err := req.Marshal()
  376. if err != nil {
  377. logMsg.WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  378. } else {
  379. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  380. rm.l.WithFields(logrus.Fields{
  381. "relayFrom": relayFrom,
  382. "relayTo": target,
  383. "initiatorRelayIndex": req.InitiatorRelayIndex,
  384. "responderRelayIndex": req.ResponderRelayIndex,
  385. "vpnAddr": target}).
  386. Info("send CreateRelayRequest")
  387. }
  388. // Also track the half-created Relay state just received
  389. _, ok = h.relayState.QueryRelayForByIp(target)
  390. if !ok {
  391. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, PeerRequested)
  392. if err != nil {
  393. logMsg.WithError(err).Error("relayManager Failed to allocate a local index for relay")
  394. return
  395. }
  396. }
  397. }
  398. }