relay.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package mq
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net"
  6. "github.com/gravitl/netmaker/logger"
  7. "github.com/gravitl/netmaker/logic"
  8. "github.com/gravitl/netmaker/logic/acls/nodeacls"
  9. "github.com/gravitl/netmaker/models"
  10. "github.com/gravitl/netmaker/servercfg"
  11. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  12. )
  13. // PubPeerUpdate publishes a peer update to the client
  14. // relay is set to a newly created relay node or nil for other peer updates
  15. func PubPeerUpdate(client, relay *models.Client, peers []models.Client) {
  16. p := models.PeerAction{
  17. Action: models.UpdatePeer,
  18. }
  19. if client.Node.IsRelay {
  20. pubRelayUpdate(client, peers)
  21. return
  22. }
  23. if relay != nil {
  24. if client.Node.RelayedBy == relay.Node.ID.String() {
  25. pubRelayedUpdate(client, relay, peers)
  26. return
  27. }
  28. }
  29. for _, peer := range peers {
  30. if client.Host.ID == peer.Host.ID {
  31. continue
  32. }
  33. update := wgtypes.PeerConfig{
  34. PublicKey: peer.Host.PublicKey,
  35. ReplaceAllowedIPs: true,
  36. Endpoint: &net.UDPAddr{
  37. IP: peer.Host.EndpointIP,
  38. Port: peer.Host.ListenPort,
  39. },
  40. PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
  41. }
  42. if nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
  43. update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
  44. } else {
  45. update.Remove = true
  46. }
  47. if relay != nil {
  48. if peer.Node.IsRelayed && peer.Node.RelayedBy == relay.Node.ID.String() {
  49. update.Remove = true
  50. }
  51. }
  52. if peer.Node.IsRelay {
  53. update.AllowedIPs = append(update.AllowedIPs, getRelayAllowedIPs(*client, peer)...)
  54. }
  55. p.Peers = append(p.Peers, update)
  56. }
  57. data, err := json.Marshal(p)
  58. if err != nil {
  59. logger.Log(0, "marshal peer update", err.Error())
  60. return
  61. }
  62. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  63. }
  64. // getRelayAllowedIPs returns the list of allowedips for a given peer that is a relay
  65. func getRelayAllowedIPs(client, peer models.Client) []net.IPNet {
  66. var relayIPs []net.IPNet
  67. for _, relayed := range peer.Node.RelayedNodes {
  68. node, err := logic.GetNodeByID(relayed)
  69. if err != nil {
  70. logger.Log(0, "retrieve relayed node", err.Error())
  71. continue
  72. }
  73. if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(node.ID.String())) {
  74. continue
  75. }
  76. if node.Address.IP != nil {
  77. node.Address.Mask = net.CIDRMask(32, 32)
  78. relayIPs = append(relayIPs, node.Address)
  79. }
  80. if node.Address6.IP != nil {
  81. node.Address.Mask = net.CIDRMask(128, 128)
  82. relayIPs = append(relayIPs, node.Address6)
  83. }
  84. if node.IsRelay {
  85. relayIPs = append(relayIPs, getRelayAllowedIPs(client, peer)...)
  86. }
  87. if node.IsEgressGateway {
  88. relayIPs = append(relayIPs, getEgressIPs(peer)...)
  89. }
  90. if node.IsIngressGateway {
  91. relayIPs = append(relayIPs, getIngressIPs(peer)...)
  92. }
  93. }
  94. return relayIPs
  95. }
  96. // getEgressIPs returns the additional allowedips (egress ranges) that need
  97. // to be included for an egress gateway peer
  98. func getEgressIPs(peer models.Client) []net.IPNet {
  99. var egressIPs []net.IPNet
  100. for _, egressRange := range peer.Node.EgressGatewayRanges {
  101. ip, cidr, err := net.ParseCIDR(egressRange)
  102. if err != nil {
  103. logger.Log(0, "parse egress range", err.Error())
  104. continue
  105. }
  106. cidr.IP = ip
  107. egressIPs = append(egressIPs, *cidr)
  108. }
  109. return egressIPs
  110. }
  111. // getIngressIPs returns the additional allowedips (ext client addresses) that need
  112. // to be included for an ingress gateway peer
  113. // TODO: add ExtraAllowedIPs
  114. func getIngressIPs(peer models.Client) []net.IPNet {
  115. var ingressIPs []net.IPNet
  116. extclients, err := logic.GetNetworkExtClients(peer.Node.Network)
  117. if err != nil {
  118. return ingressIPs
  119. }
  120. for _, ec := range extclients {
  121. if ec.IngressGatewayID == peer.Node.ID.String() {
  122. if ec.Address != "" {
  123. ip, cidr, err := net.ParseCIDR(ec.Address)
  124. if err != nil {
  125. continue
  126. }
  127. cidr.IP = ip
  128. ingressIPs = append(ingressIPs, *cidr)
  129. }
  130. if ec.Address6 != "" {
  131. ip, cidr, err := net.ParseCIDR(ec.Address6)
  132. if err != nil {
  133. continue
  134. }
  135. cidr.IP = ip
  136. ingressIPs = append(ingressIPs, *cidr)
  137. }
  138. }
  139. }
  140. return ingressIPs
  141. }
  142. // pubRelayedUpdate - publish peer update to a node (client) that is relayed by the relay
  143. func pubRelayedUpdate(client, relay *models.Client, peers []models.Client) {
  144. //verify
  145. if !logic.StringSliceContains(relay.Node.RelayedNodes, client.Node.ID.String()) {
  146. logger.Log(0, "invalid call to pubRelayed update", client.Host.Name, relay.Host.Name)
  147. return
  148. }
  149. //remove all nodes except relay
  150. p := models.PeerAction{
  151. Action: models.RemovePeer,
  152. }
  153. for _, peer := range peers {
  154. if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
  155. continue
  156. }
  157. update := wgtypes.PeerConfig{
  158. PublicKey: peer.Host.PublicKey,
  159. Remove: true,
  160. }
  161. p.Peers = append(p.Peers, update)
  162. }
  163. data, err := json.Marshal(p)
  164. if err != nil {
  165. logger.Log(0, "marshal peer update", err.Error())
  166. return
  167. }
  168. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  169. //update the relay peer
  170. p = models.PeerAction{
  171. Action: models.UpdatePeer,
  172. }
  173. update := wgtypes.PeerConfig{
  174. PublicKey: relay.Host.PublicKey,
  175. ReplaceAllowedIPs: true,
  176. Endpoint: &net.UDPAddr{
  177. IP: relay.Host.EndpointIP,
  178. Port: relay.Host.ListenPort,
  179. },
  180. PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
  181. }
  182. if relay.Node.Address.IP != nil {
  183. relay.Node.Address.Mask = net.CIDRMask(32, 32)
  184. update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address)
  185. }
  186. if relay.Node.Address6.IP != nil {
  187. relay.Node.Address6.Mask = net.CIDRMask(128, 128)
  188. update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address6)
  189. }
  190. p.Peers = append(p.Peers, update)
  191. // add all other peers to allowed ips
  192. for _, peer := range peers {
  193. if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
  194. continue
  195. }
  196. if nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
  197. update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
  198. }
  199. }
  200. p.Peers = append(p.Peers, update)
  201. data, err = json.Marshal(p)
  202. if err != nil {
  203. logger.Log(0, "marshal peer update", err.Error())
  204. return
  205. }
  206. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  207. }
  208. // pubRelayUpdate - publish peer update to a relay
  209. func pubRelayUpdate(client *models.Client, peers []models.Client) {
  210. if !client.Node.IsRelay {
  211. return
  212. }
  213. // add all peers to allowedips
  214. p := models.PeerAction{
  215. Action: models.UpdatePeer,
  216. }
  217. for _, peer := range peers {
  218. if peer.Host.ID == client.Host.ID {
  219. continue
  220. }
  221. update := wgtypes.PeerConfig{
  222. PublicKey: peer.Host.PublicKey,
  223. ReplaceAllowedIPs: true,
  224. Remove: false,
  225. Endpoint: &net.UDPAddr{
  226. IP: peer.Host.EndpointIP,
  227. Port: peer.Host.ListenPort,
  228. },
  229. PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
  230. }
  231. update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
  232. p.Peers = append(p.Peers, update)
  233. }
  234. data, err := json.Marshal(p)
  235. if err != nil {
  236. logger.Log(0, "marshal peer update", err.Error())
  237. return
  238. }
  239. publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
  240. }