2
0

relay.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package mq
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net"
  6. "github.com/gravitl/netmaker/logger"
  7. "github.com/gravitl/netmaker/logic"
  8. "github.com/gravitl/netmaker/models"
  9. "github.com/gravitl/netmaker/servercfg"
  10. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  11. )
  12. // PubPeerUpdate publishes a peer update to the client
  13. // relay is set to a newly created relay node or nil for other peer updates
  14. func PubPeerUpdate(client, relay *models.Client, peers *[]models.Client) {
  15. p := models.PeerAction{
  16. Action: models.UpdatePeer,
  17. }
  18. if client.Node.IsRelay {
  19. pubRelayUpdate(client, peers)
  20. return
  21. }
  22. if relay != nil {
  23. if logic.StringSliceContains(relay.Node.RelayedNodes, client.Node.ID.String()) {
  24. pubRelayedUpdate(client, relay, peers)
  25. return
  26. }
  27. }
  28. for _, peer := range *peers {
  29. if client.Host.ID == peer.Host.ID {
  30. continue
  31. }
  32. update := wgtypes.PeerConfig{
  33. PublicKey: peer.Host.PublicKey,
  34. ReplaceAllowedIPs: true,
  35. Endpoint: &net.UDPAddr{
  36. IP: peer.Host.EndpointIP,
  37. Port: peer.Host.ListenPort,
  38. },
  39. PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
  40. }
  41. if relay != nil {
  42. if peer.Node.IsRelayed && peer.Node.RelayedBy == relay.Node.ID.String() {
  43. update.Remove = true
  44. }
  45. }
  46. addAllowedIPs(peer, &update)
  47. p.Peers = append(p.Peers, update)
  48. }
  49. data, err := json.Marshal(p)
  50. if err != nil {
  51. logger.Log(0, "marshal peer update", err.Error())
  52. return
  53. }
  54. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  55. }
  56. // getRelayAllowedIPs returns the list of allowedips for a given peer that is a relay
  57. func getRelayAllowedIPs(peer models.Client) []net.IPNet {
  58. var relayIPs []net.IPNet
  59. for _, relayed := range peer.Node.RelayedNodes {
  60. node, err := logic.GetNodeByID(relayed)
  61. if err != nil {
  62. logger.Log(0, "retrieve relayed node", err.Error())
  63. continue
  64. }
  65. if node.Address.IP != nil {
  66. node.Address.Mask = net.CIDRMask(32, 32)
  67. relayIPs = append(relayIPs, node.Address)
  68. }
  69. if node.Address6.IP != nil {
  70. node.Address.Mask = net.CIDRMask(128, 128)
  71. relayIPs = append(relayIPs, node.Address6)
  72. }
  73. if node.IsRelay {
  74. relayIPs = append(relayIPs, getRelayAllowedIPs(peer)...)
  75. }
  76. if node.IsEgressGateway {
  77. relayIPs = append(relayIPs, getEgressIPs(peer)...)
  78. }
  79. if node.IsIngressGateway {
  80. relayIPs = append(relayIPs, getIngressIPs(peer)...)
  81. }
  82. }
  83. return relayIPs
  84. }
  85. // getEgressIPs returns the additional allowedips (egress ranges) that need
  86. // to be included for an egress gateway peer
  87. func getEgressIPs(peer models.Client) []net.IPNet {
  88. var egressIPs []net.IPNet
  89. for _, egressRange := range peer.Node.EgressGatewayRanges {
  90. ip, cidr, err := net.ParseCIDR(egressRange)
  91. if err != nil {
  92. logger.Log(0, "parse egress range", err.Error())
  93. continue
  94. }
  95. cidr.IP = ip
  96. egressIPs = append(egressIPs, *cidr)
  97. }
  98. return egressIPs
  99. }
  100. // getIngressIPs returns the additional allowedips (ext client addresses) that need
  101. // to be included for an ingress gateway peer
  102. // TODO: add ExtraAllowedIPs
  103. func getIngressIPs(peer models.Client) []net.IPNet {
  104. var ingressIPs []net.IPNet
  105. extclients, err := logic.GetNetworkExtClients(peer.Node.Network)
  106. if err != nil {
  107. return ingressIPs
  108. }
  109. for _, ec := range extclients {
  110. if ec.IngressGatewayID == peer.Node.ID.String() {
  111. if ec.Address != "" {
  112. ip, cidr, err := net.ParseCIDR(ec.Address)
  113. if err != nil {
  114. continue
  115. }
  116. cidr.IP = ip
  117. ingressIPs = append(ingressIPs, *cidr)
  118. }
  119. if ec.Address6 != "" {
  120. ip, cidr, err := net.ParseCIDR(ec.Address6)
  121. if err != nil {
  122. continue
  123. }
  124. cidr.IP = ip
  125. ingressIPs = append(ingressIPs, *cidr)
  126. }
  127. }
  128. }
  129. return ingressIPs
  130. }
  131. // pubRelayedUpdate - publish peer update to a node (client) that is relayed by the relay
  132. func pubRelayedUpdate(client, relay *models.Client, peers *[]models.Client) {
  133. //verify
  134. if !logic.StringSliceContains(relay.Node.RelayedNodes, client.Node.ID.String()) {
  135. logger.Log(0, "invalid call to pubRelayed update", client.Host.Name, relay.Host.Name)
  136. return
  137. }
  138. //remove all nodes except relay
  139. p := models.PeerAction{
  140. Action: models.RemovePeer,
  141. }
  142. for _, peer := range *peers {
  143. if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
  144. continue
  145. }
  146. update := wgtypes.PeerConfig{
  147. PublicKey: peer.Host.PublicKey,
  148. Remove: true,
  149. }
  150. p.Peers = append(p.Peers, update)
  151. }
  152. data, err := json.Marshal(p)
  153. if err != nil {
  154. logger.Log(0, "marshal peer update", err.Error())
  155. return
  156. }
  157. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  158. //update the relay peer
  159. p = models.PeerAction{
  160. Action: models.UpdatePeer,
  161. }
  162. update := wgtypes.PeerConfig{
  163. PublicKey: relay.Host.PublicKey,
  164. ReplaceAllowedIPs: true,
  165. Endpoint: &net.UDPAddr{
  166. IP: relay.Host.EndpointIP,
  167. Port: relay.Host.ListenPort,
  168. },
  169. PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
  170. }
  171. if relay.Node.Address.IP != nil {
  172. relay.Node.Address.Mask = net.CIDRMask(32, 32)
  173. update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address)
  174. }
  175. if relay.Node.Address6.IP != nil {
  176. relay.Node.Address6.Mask = net.CIDRMask(128, 128)
  177. update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address6)
  178. }
  179. p.Peers = append(p.Peers, update)
  180. // add all other peers to allowed ips
  181. for _, peer := range *peers {
  182. if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
  183. continue
  184. }
  185. addAllowedIPs(peer, &update)
  186. }
  187. p.Peers = append(p.Peers, update)
  188. data, err = json.Marshal(p)
  189. if err != nil {
  190. logger.Log(0, "marshal peer update", err.Error())
  191. return
  192. }
  193. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  194. }
  195. // pubRelayUpdate - publish peer update to a relay
  196. func pubRelayUpdate(client *models.Client, peers *[]models.Client) {
  197. if !client.Node.IsRelay {
  198. return
  199. }
  200. // add all peers to allowedips
  201. p := models.PeerAction{
  202. Action: models.UpdatePeer,
  203. }
  204. for _, peer := range *peers {
  205. if peer.Host.ID == client.Host.ID {
  206. continue
  207. }
  208. update := wgtypes.PeerConfig{
  209. PublicKey: peer.Host.PublicKey,
  210. ReplaceAllowedIPs: true,
  211. Remove: false,
  212. Endpoint: &net.UDPAddr{
  213. IP: peer.Host.EndpointIP,
  214. Port: peer.Host.ListenPort,
  215. },
  216. PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
  217. }
  218. addAllowedIPs(peer, &update)
  219. p.Peers = append(p.Peers, update)
  220. }
  221. data, err := json.Marshal(p)
  222. if err != nil {
  223. logger.Log(0, "marshal peer update", err.Error())
  224. return
  225. }
  226. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  227. }
  228. func addAllowedIPs(peer models.Client, update *wgtypes.PeerConfig) {
  229. if peer.Node.Address.IP != nil {
  230. peer.Node.Address.Mask = net.CIDRMask(32, 32)
  231. update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
  232. }
  233. if peer.Node.Address6.IP != nil {
  234. peer.Node.Address6.Mask = net.CIDRMask(128, 128)
  235. update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
  236. }
  237. if peer.Node.IsRelay {
  238. update.AllowedIPs = append(update.AllowedIPs, getRelayAllowedIPs(peer)...)
  239. }
  240. if peer.Node.IsEgressGateway {
  241. update.AllowedIPs = append(update.AllowedIPs, getEgressIPs(peer)...)
  242. }
  243. if peer.Node.IsIngressGateway {
  244. update.AllowedIPs = append(update.AllowedIPs, getIngressIPs(peer)...)
  245. }
  246. }