relay_manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package nebula
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "net/netip"
  8. "sync/atomic"
  9. "github.com/sirupsen/logrus"
  10. "github.com/slackhq/nebula/cert"
  11. "github.com/slackhq/nebula/config"
  12. "github.com/slackhq/nebula/header"
  13. )
  14. type relayManager struct {
  15. l *logrus.Logger
  16. hostmap *HostMap
  17. amRelay atomic.Bool
  18. }
  19. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  20. rm := &relayManager{
  21. l: l,
  22. hostmap: hostmap,
  23. }
  24. rm.reload(c, true)
  25. c.RegisterReloadCallback(func(c *config.C) {
  26. err := rm.reload(c, false)
  27. if err != nil {
  28. l.WithError(err).Error("Failed to reload relay_manager")
  29. }
  30. })
  31. return rm
  32. }
  33. func (rm *relayManager) reload(c *config.C, initial bool) error {
  34. if initial || c.HasChanged("relay.am_relay") {
  35. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  36. }
  37. return nil
  38. }
  39. func (rm *relayManager) GetAmRelay() bool {
  40. return rm.amRelay.Load()
  41. }
  42. func (rm *relayManager) setAmRelay(v bool) {
  43. rm.amRelay.Store(v)
  44. }
  45. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  46. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  47. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp netip.Addr, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  48. hm.Lock()
  49. defer hm.Unlock()
  50. for i := 0; i < 32; i++ {
  51. index, err := generateIndex(l)
  52. if err != nil {
  53. return 0, err
  54. }
  55. _, inRelays := hm.Relays[index]
  56. if !inRelays {
  57. // Avoid standing up a relay that can't be used since only the primary hostinfo
  58. // will be pointed to by the relay logic
  59. //TODO: if there was an existing primary and it had relay state, should we merge?
  60. hm.unlockedMakePrimary(relayHostInfo)
  61. hm.Relays[index] = relayHostInfo
  62. newRelay := Relay{
  63. Type: relayType,
  64. State: state,
  65. LocalIndex: index,
  66. PeerAddr: vpnIp,
  67. }
  68. if remoteIdx != nil {
  69. newRelay.RemoteIndex = *remoteIdx
  70. }
  71. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  72. return index, nil
  73. }
  74. }
  75. return 0, errors.New("failed to generate unique localIndexId")
  76. }
  77. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  78. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  79. relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
  80. if !ok {
  81. fields := logrus.Fields{
  82. "relay": relayHostInfo.vpnAddrs[0],
  83. "initiatorRelayIndex": m.InitiatorRelayIndex,
  84. }
  85. if m.RelayFromAddr == nil {
  86. fields["relayFrom"] = m.OldRelayFromAddr
  87. } else {
  88. fields["relayFrom"] = m.RelayFromAddr
  89. }
  90. if m.RelayToAddr == nil {
  91. fields["relayTo"] = m.OldRelayToAddr
  92. } else {
  93. fields["relayTo"] = m.RelayToAddr
  94. }
  95. rm.l.WithFields(fields).Info("relayManager failed to update relay")
  96. return nil, fmt.Errorf("unknown relay")
  97. }
  98. return relay, nil
  99. }
  100. func (rm *relayManager) HandleControlMsg(h *HostInfo, d []byte, f *Interface) {
  101. msg := &NebulaControl{}
  102. err := msg.Unmarshal(d)
  103. if err != nil {
  104. h.logger(f.l).WithError(err).Error("Failed to unmarshal control message")
  105. return
  106. }
  107. var v cert.Version
  108. if msg.OldRelayFromAddr > 0 || msg.OldRelayToAddr > 0 {
  109. v = cert.Version1
  110. b := [4]byte{}
  111. binary.BigEndian.PutUint32(b[:], msg.OldRelayFromAddr)
  112. msg.RelayFromAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  113. binary.BigEndian.PutUint32(b[:], msg.OldRelayToAddr)
  114. msg.RelayToAddr = netAddrToProtoAddr(netip.AddrFrom4(b))
  115. } else {
  116. v = cert.Version2
  117. }
  118. switch msg.Type {
  119. case NebulaControl_CreateRelayRequest:
  120. rm.handleCreateRelayRequest(v, h, f, msg)
  121. case NebulaControl_CreateRelayResponse:
  122. rm.handleCreateRelayResponse(v, h, f, msg)
  123. }
  124. }
  125. func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  126. rm.l.WithFields(logrus.Fields{
  127. "relayFrom": protoAddrToNetAddr(m.RelayFromAddr),
  128. "relayTo": protoAddrToNetAddr(m.RelayToAddr),
  129. "initiatorRelayIndex": m.InitiatorRelayIndex,
  130. "responderRelayIndex": m.ResponderRelayIndex,
  131. "vpnAddrs": h.vpnAddrs}).
  132. Info("handleCreateRelayResponse")
  133. target := m.RelayToAddr
  134. targetAddr := protoAddrToNetAddr(target)
  135. relay, err := rm.EstablishRelay(h, m)
  136. if err != nil {
  137. rm.l.WithError(err).Error("Failed to update relay for relayTo")
  138. return
  139. }
  140. // Do I need to complete the relays now?
  141. if relay.Type == TerminalType {
  142. return
  143. }
  144. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  145. peerHostInfo := rm.hostmap.QueryVpnAddr(relay.PeerAddr)
  146. if peerHostInfo == nil {
  147. rm.l.WithField("relayTo", relay.PeerAddr).Error("Can't find a HostInfo for peer")
  148. return
  149. }
  150. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(targetAddr)
  151. if !ok {
  152. rm.l.WithField("relayTo", peerHostInfo.vpnAddrs[0]).Error("peerRelay does not have Relay state for relayTo")
  153. return
  154. }
  155. switch peerRelay.State {
  156. case Requested:
  157. // I initiated the request to this peer, but haven't heard back from the peer yet. I must wait for this peer
  158. // to respond to complete the connection.
  159. case PeerRequested, Disestablished, Established:
  160. peerHostInfo.relayState.UpdateRelayForByIpState(targetAddr, Established)
  161. resp := NebulaControl{
  162. Type: NebulaControl_CreateRelayResponse,
  163. ResponderRelayIndex: peerRelay.LocalIndex,
  164. InitiatorRelayIndex: peerRelay.RemoteIndex,
  165. }
  166. if v == cert.Version1 {
  167. peer := peerHostInfo.vpnAddrs[0]
  168. if !peer.Is4() {
  169. rm.l.WithField("relayFrom", peer).
  170. WithField("relayTo", target).
  171. WithField("initiatorRelayIndex", resp.InitiatorRelayIndex).
  172. WithField("responderRelayIndex", resp.ResponderRelayIndex).
  173. WithField("vpnAddrs", peerHostInfo.vpnAddrs).
  174. Error("Refusing to CreateRelayResponse for a v1 relay with an ipv6 address")
  175. return
  176. }
  177. b := peer.As4()
  178. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  179. b = targetAddr.As4()
  180. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  181. } else {
  182. resp.RelayFromAddr = netAddrToProtoAddr(peerHostInfo.vpnAddrs[0])
  183. resp.RelayToAddr = target
  184. }
  185. msg, err := resp.Marshal()
  186. if err != nil {
  187. rm.l.WithError(err).
  188. Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  189. } else {
  190. f.SendMessageToHostInfo(header.Control, 0, peerHostInfo, msg, make([]byte, 12), make([]byte, mtu))
  191. rm.l.WithFields(logrus.Fields{
  192. "relayFrom": resp.RelayFromAddr,
  193. "relayTo": resp.RelayToAddr,
  194. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  195. "responderRelayIndex": resp.ResponderRelayIndex,
  196. "vpnAddrs": peerHostInfo.vpnAddrs}).
  197. Info("send CreateRelayResponse")
  198. }
  199. }
  200. }
  201. func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
  202. from := protoAddrToNetAddr(m.RelayFromAddr)
  203. target := protoAddrToNetAddr(m.RelayToAddr)
  204. logMsg := rm.l.WithFields(logrus.Fields{
  205. "relayFrom": from,
  206. "relayTo": target,
  207. "initiatorRelayIndex": m.InitiatorRelayIndex,
  208. "vpnAddrs": h.vpnAddrs})
  209. logMsg.Info("handleCreateRelayRequest")
  210. // Is the source of the relay me? This should never happen, but did happen due to
  211. // an issue migrating relays over to newly re-handshaked host info objects.
  212. if f.myVpnAddrsTable.Contains(from) {
  213. logMsg.WithField("myIP", from).Error("Discarding relay request from myself")
  214. return
  215. }
  216. // Is the target of the relay me?
  217. if f.myVpnAddrsTable.Contains(target) {
  218. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  219. if ok {
  220. switch existingRelay.State {
  221. case Requested:
  222. ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
  223. if !ok {
  224. logMsg.Error("Relay State not found")
  225. return
  226. }
  227. case Established:
  228. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  229. // We got a brand new Relay request, because its index is different than what we saw before.
  230. // This should never happen. The peer should never change an index, once created.
  231. logMsg.WithFields(logrus.Fields{
  232. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  233. return
  234. }
  235. case Disestablished:
  236. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  237. // We got a brand new Relay request, because its index is different than what we saw before.
  238. // This should never happen. The peer should never change an index, once created.
  239. logMsg.WithFields(logrus.Fields{
  240. "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
  241. return
  242. }
  243. // Mark the relay as 'Established' because it's safe to use again
  244. h.relayState.UpdateRelayForByIpState(from, Established)
  245. case PeerRequested:
  246. // I should never be in this state, because I am terminal, not forwarding.
  247. logMsg.WithFields(logrus.Fields{
  248. "existingRemoteIndex": existingRelay.RemoteIndex,
  249. "state": existingRelay.State}).Error("Unexpected Relay State found")
  250. }
  251. } else {
  252. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  253. if err != nil {
  254. logMsg.WithError(err).Error("Failed to add relay")
  255. return
  256. }
  257. }
  258. relay, ok := h.relayState.QueryRelayForByIp(from)
  259. if !ok {
  260. logMsg.WithField("from", from).Error("Relay State not found")
  261. return
  262. }
  263. resp := NebulaControl{
  264. Type: NebulaControl_CreateRelayResponse,
  265. ResponderRelayIndex: relay.LocalIndex,
  266. InitiatorRelayIndex: relay.RemoteIndex,
  267. }
  268. if v == cert.Version1 {
  269. b := from.As4()
  270. resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  271. b = target.As4()
  272. resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  273. } else {
  274. resp.RelayFromAddr = netAddrToProtoAddr(from)
  275. resp.RelayToAddr = netAddrToProtoAddr(target)
  276. }
  277. msg, err := resp.Marshal()
  278. if err != nil {
  279. logMsg.
  280. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  281. } else {
  282. f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
  283. rm.l.WithFields(logrus.Fields{
  284. "relayFrom": from,
  285. "relayTo": target,
  286. "initiatorRelayIndex": resp.InitiatorRelayIndex,
  287. "responderRelayIndex": resp.ResponderRelayIndex,
  288. "vpnAddrs": h.vpnAddrs}).
  289. Info("send CreateRelayResponse")
  290. }
  291. return
  292. } else {
  293. // the target is not me. Create a relay to the target, from me.
  294. if !rm.GetAmRelay() {
  295. return
  296. }
  297. peer := rm.hostmap.QueryVpnAddr(target)
  298. if peer == nil {
  299. // Try to establish a connection to this host. If we get a future relay request,
  300. // we'll be ready!
  301. f.Handshake(target)
  302. return
  303. }
  304. if !peer.remote.IsValid() {
  305. // Only create relays to peers for whom I have a direct connection
  306. return
  307. }
  308. var index uint32
  309. var err error
  310. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  311. if ok {
  312. index = targetRelay.LocalIndex
  313. } else {
  314. // Allocate an index in the hostMap for this relay peer
  315. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  316. if err != nil {
  317. return
  318. }
  319. }
  320. peer.relayState.UpdateRelayForByIpState(from, Requested)
  321. // Send a CreateRelayRequest to the peer.
  322. req := NebulaControl{
  323. Type: NebulaControl_CreateRelayRequest,
  324. InitiatorRelayIndex: index,
  325. }
  326. if v == cert.Version1 {
  327. if !h.vpnAddrs[0].Is4() {
  328. rm.l.WithField("relayFrom", h.vpnAddrs[0]).
  329. WithField("relayTo", target).
  330. WithField("initiatorRelayIndex", req.InitiatorRelayIndex).
  331. WithField("responderRelayIndex", req.ResponderRelayIndex).
  332. WithField("vpnAddr", target).
  333. Error("Refusing to CreateRelayRequest for a v1 relay with an ipv6 address")
  334. return
  335. }
  336. b := h.vpnAddrs[0].As4()
  337. req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
  338. b = target.As4()
  339. req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
  340. } else {
  341. req.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
  342. req.RelayToAddr = netAddrToProtoAddr(target)
  343. }
  344. msg, err := req.Marshal()
  345. if err != nil {
  346. logMsg.
  347. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  348. } else {
  349. f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
  350. rm.l.WithFields(logrus.Fields{
  351. "relayFrom": h.vpnAddrs[0],
  352. "relayTo": target,
  353. "initiatorRelayIndex": req.InitiatorRelayIndex,
  354. "responderRelayIndex": req.ResponderRelayIndex,
  355. "vpnAddr": target}).
  356. Info("send CreateRelayRequest")
  357. }
  358. // Also track the half-created Relay state just received
  359. _, ok = h.relayState.QueryRelayForByIp(target)
  360. if !ok {
  361. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, PeerRequested)
  362. if err != nil {
  363. logMsg.
  364. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  365. return
  366. }
  367. }
  368. }
  369. }