relay_manager.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "github.com/sirupsen/logrus"
  8. "github.com/slackhq/nebula/config"
  9. "github.com/slackhq/nebula/header"
  10. "github.com/slackhq/nebula/iputil"
  11. )
  12. type relayManager struct {
  13. l *logrus.Logger
  14. hostmap *HostMap
  15. atomicAmRelay int32
  16. }
  17. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  18. rm := &relayManager{
  19. l: l,
  20. hostmap: hostmap,
  21. }
  22. rm.reload(c, true)
  23. c.RegisterReloadCallback(func(c *config.C) {
  24. err := rm.reload(c, false)
  25. if err != nil {
  26. l.WithError(err).Error("Failed to reload relay_manager")
  27. }
  28. })
  29. return rm
  30. }
  31. func (rm *relayManager) reload(c *config.C, initial bool) error {
  32. if initial || c.HasChanged("relay.am_relay") {
  33. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  34. }
  35. return nil
  36. }
  37. func (rm *relayManager) GetAmRelay() bool {
  38. return atomic.LoadInt32(&rm.atomicAmRelay) == 1
  39. }
  40. func (rm *relayManager) setAmRelay(v bool) {
  41. var val int32
  42. switch v {
  43. case true:
  44. val = 1
  45. case false:
  46. val = 0
  47. }
  48. atomic.StoreInt32(&rm.atomicAmRelay, val)
  49. }
  50. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  51. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  52. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iputil.VpnIp, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  53. hm.Lock()
  54. defer hm.Unlock()
  55. for i := 0; i < 32; i++ {
  56. index, err := generateIndex(l)
  57. if err != nil {
  58. return 0, err
  59. }
  60. _, inRelays := hm.Relays[index]
  61. if !inRelays {
  62. hm.Relays[index] = relayHostInfo
  63. newRelay := Relay{
  64. Type: relayType,
  65. State: state,
  66. LocalIndex: index,
  67. PeerIp: vpnIp,
  68. }
  69. if remoteIdx != nil {
  70. newRelay.RemoteIndex = *remoteIdx
  71. }
  72. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  73. return index, nil
  74. }
  75. }
  76. return 0, errors.New("failed to generate unique localIndexId")
  77. }
  78. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  79. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  80. relay, ok := relayHostInfo.relayState.QueryRelayForByIdx(m.InitiatorRelayIndex)
  81. if !ok {
  82. rm.l.WithFields(logrus.Fields{"relayHostInfo": relayHostInfo.vpnIp,
  83. "initiatorRelayIndex": m.InitiatorRelayIndex,
  84. "relayFrom": m.RelayFromIp,
  85. "relayTo": m.RelayToIp}).Info("relayManager EstablishRelay relayForByIdx not found")
  86. return nil, fmt.Errorf("unknown relay")
  87. }
  88. // relay deserves some synchronization
  89. relay.RemoteIndex = m.ResponderRelayIndex
  90. relay.State = Established
  91. return relay, nil
  92. }
  93. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  94. switch m.Type {
  95. case NebulaControl_CreateRelayRequest:
  96. rm.handleCreateRelayRequest(h, f, m)
  97. case NebulaControl_CreateRelayResponse:
  98. rm.handleCreateRelayResponse(h, f, m)
  99. }
  100. }
  101. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  102. rm.l.WithFields(logrus.Fields{
  103. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  104. "relayTarget": iputil.VpnIp(m.RelayToIp),
  105. "initiatorIdx": m.InitiatorRelayIndex,
  106. "responderIdx": m.ResponderRelayIndex,
  107. "hostInfo": h.vpnIp}).
  108. Info("handleCreateRelayResponse")
  109. target := iputil.VpnIp(m.RelayToIp)
  110. relay, err := rm.EstablishRelay(h, m)
  111. if err != nil {
  112. rm.l.WithError(err).WithField("target", target.String()).Error("Failed to update relay for target")
  113. return
  114. }
  115. // Do I need to complete the relays now?
  116. if relay.Type == TerminalType {
  117. return
  118. }
  119. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  120. peerHostInfo, err := rm.hostmap.QueryVpnIp(relay.PeerIp)
  121. if err != nil {
  122. rm.l.WithError(err).WithField("relayPeerIp", relay.PeerIp).Error("Can't find a HostInfo for peer IP")
  123. return
  124. }
  125. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(target)
  126. if !ok {
  127. rm.l.WithField("peerIp", peerHostInfo.vpnIp).WithField("target", target.String()).Error("peerRelay does not have Relay state for target IP", peerHostInfo.vpnIp.String(), target.String())
  128. return
  129. }
  130. peerRelay.State = Established
  131. resp := NebulaControl{
  132. Type: NebulaControl_CreateRelayResponse,
  133. ResponderRelayIndex: peerRelay.LocalIndex,
  134. InitiatorRelayIndex: peerRelay.RemoteIndex,
  135. RelayFromIp: uint32(peerHostInfo.vpnIp),
  136. RelayToIp: uint32(target),
  137. }
  138. msg, err := resp.Marshal()
  139. if err != nil {
  140. rm.l.
  141. WithError(err).Error("relayManager Failed to marhsal Control CreateRelayResponse message to create relay")
  142. } else {
  143. f.SendMessageToVpnIp(header.Control, 0, peerHostInfo.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  144. }
  145. }
  146. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  147. rm.l.WithFields(logrus.Fields{
  148. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  149. "relayTarget": iputil.VpnIp(m.RelayToIp),
  150. "initiatorIdx": m.InitiatorRelayIndex,
  151. "hostInfo": h.vpnIp}).
  152. Info("handleCreateRelayRequest")
  153. from := iputil.VpnIp(m.RelayFromIp)
  154. target := iputil.VpnIp(m.RelayToIp)
  155. // Is the target of the relay me?
  156. if target == f.myVpnIp {
  157. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  158. addRelay := !ok
  159. if ok {
  160. // Clean up existing relay, if this is a new request.
  161. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  162. // We got a brand new Relay request, because its index is different than what we saw before.
  163. // Clean up the existing Relay state, and get ready to record new Relay state.
  164. rm.hostmap.RemoveRelay(existingRelay.LocalIndex)
  165. addRelay = true
  166. }
  167. }
  168. if addRelay {
  169. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  170. if err != nil {
  171. return
  172. }
  173. }
  174. relay, ok := h.relayState.QueryRelayForByIp(from)
  175. if ok && m.InitiatorRelayIndex != relay.RemoteIndex {
  176. // Do something, Something happened.
  177. }
  178. resp := NebulaControl{
  179. Type: NebulaControl_CreateRelayResponse,
  180. ResponderRelayIndex: relay.LocalIndex,
  181. InitiatorRelayIndex: relay.RemoteIndex,
  182. RelayFromIp: uint32(from),
  183. RelayToIp: uint32(target),
  184. }
  185. msg, err := resp.Marshal()
  186. if err != nil {
  187. rm.l.
  188. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  189. } else {
  190. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  191. }
  192. return
  193. } else {
  194. // the target is not me. Create a relay to the target, from me.
  195. if rm.GetAmRelay() == false {
  196. return
  197. }
  198. peer, err := rm.hostmap.QueryVpnIp(target)
  199. if err != nil {
  200. // Try to establish a connection to this host. If we get a future relay request,
  201. // we'll be ready!
  202. f.getOrHandshake(target)
  203. return
  204. }
  205. if peer.remote == nil {
  206. // Only create relays to peers for whom I have a direct connection
  207. return
  208. }
  209. sendCreateRequest := false
  210. var index uint32
  211. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  212. if ok {
  213. index = targetRelay.LocalIndex
  214. if targetRelay.State == Requested {
  215. sendCreateRequest = true
  216. }
  217. } else {
  218. // Allocate an index in the hostMap for this relay peer
  219. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  220. if err != nil {
  221. return
  222. }
  223. sendCreateRequest = true
  224. }
  225. if sendCreateRequest {
  226. // Send a CreateRelayRequest to the peer.
  227. req := NebulaControl{
  228. Type: NebulaControl_CreateRelayRequest,
  229. InitiatorRelayIndex: index,
  230. RelayFromIp: uint32(h.vpnIp),
  231. RelayToIp: uint32(target),
  232. }
  233. msg, err := req.Marshal()
  234. if err != nil {
  235. rm.l.
  236. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  237. } else {
  238. f.SendMessageToVpnIp(header.Control, 0, target, msg, make([]byte, 12), make([]byte, mtu))
  239. }
  240. }
  241. // Also track the half-created Relay state just received
  242. relay, ok := h.relayState.QueryRelayForByIp(target)
  243. if !ok {
  244. // Add the relay
  245. state := Requested
  246. if targetRelay != nil && targetRelay.State == Established {
  247. state = Established
  248. }
  249. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  250. if err != nil {
  251. rm.l.
  252. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  253. return
  254. }
  255. } else {
  256. if relay.RemoteIndex != m.InitiatorRelayIndex {
  257. // This is a stale Relay entry for the same tunnel targets.
  258. // Clean up the existing stuff.
  259. rm.RemoveRelay(relay.LocalIndex)
  260. // Add the new relay
  261. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, Requested)
  262. if err != nil {
  263. return
  264. }
  265. relay, _ = h.relayState.QueryRelayForByIp(target)
  266. }
  267. switch relay.State {
  268. case Established:
  269. resp := NebulaControl{
  270. Type: NebulaControl_CreateRelayResponse,
  271. ResponderRelayIndex: relay.LocalIndex,
  272. InitiatorRelayIndex: relay.RemoteIndex,
  273. RelayFromIp: uint32(h.vpnIp),
  274. RelayToIp: uint32(target),
  275. }
  276. msg, err := resp.Marshal()
  277. if err != nil {
  278. rm.l.
  279. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  280. } else {
  281. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  282. }
  283. case Requested:
  284. // Keep waiting for the other relay to complete
  285. }
  286. }
  287. }
  288. }
  289. func (rm *relayManager) RemoveRelay(localIdx uint32) {
  290. rm.hostmap.RemoveRelay(localIdx)
  291. }