Browse Source

IOT-66: Single Peer update for IOT client (#2424)

* send peer update to IOT client only when it is relayed

* move node check

* send relay del update for iot client

* fix relay delete logic for iot

* set relay node to true for iot peer update

* add node addrs to peer update

* revert tag
Abhishek K 2 years ago
parent
commit
38454ece79
5 changed files with 87 additions and 11 deletions
  1. 10 0
      ee/ee_controllers/relay.go
  2. 55 0
      logic/peers.go
  3. 1 0
      models/mqtt.go
  4. 8 0
      models/node.go
  5. 13 11
      mq/publishers.go

+ 10 - 0
ee/ee_controllers/relay.go

@@ -1,6 +1,7 @@
 package ee_controllers
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -87,6 +88,15 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 				logger.Log(1, "relayed node update ", relayedNode.ID.String(), "on network", relayedNode.Network, ": ", err.Error())
 
 			}
+			h, err := logic.GetHost(relayedNode.HostID.String())
+			if err == nil {
+				if h.OS == models.OS_Types.IoT {
+					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
+					if err = mq.PublishSingleHostPeerUpdate(context.Background(), h, &node, nil); err != nil {
+						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
+					}
+				}
+			}
 		}
 		mq.PublishPeerUpdate()
 	}()

+ 55 - 0
logic/peers.go

@@ -135,6 +135,61 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 		if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE {
 			continue
 		}
+		if host.OS == models.OS_Types.IoT {
+			hostPeerUpdate.NodeAddrs = append(hostPeerUpdate.NodeAddrs, node.PrimaryAddressIPNet())
+			if node.IsRelayed {
+				relayNode, err := GetNodeByID(node.RelayedBy)
+				if err != nil {
+					continue
+				}
+				relayHost, err := GetHost(relayNode.HostID.String())
+				if err != nil {
+					continue
+				}
+				relayPeer := wgtypes.PeerConfig{
+					PublicKey:                   relayHost.PublicKey,
+					PersistentKeepaliveInterval: &relayNode.PersistentKeepalive,
+					ReplaceAllowedIPs:           true,
+					AllowedIPs:                  GetAllowedIPs(&node, &relayNode, nil),
+				}
+				uselocal := false
+				if host.EndpointIP.String() == relayHost.EndpointIP.String() {
+					// peer is on same network
+					// set to localaddress
+					uselocal = true
+					if node.LocalAddress.IP == nil {
+						// use public endpint
+						uselocal = false
+					}
+					if node.LocalAddress.String() == relayNode.LocalAddress.String() {
+						uselocal = false
+					}
+				}
+				relayPeer.Endpoint = &net.UDPAddr{
+					IP:   relayHost.EndpointIP,
+					Port: getPeerWgListenPort(relayHost),
+				}
+
+				if uselocal {
+					relayPeer.Endpoint.IP = relayNode.LocalAddress.IP
+					relayPeer.Endpoint.Port = relayHost.ListenPort
+				}
+
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, relayPeer)
+			} else if deletedNode != nil && deletedNode.IsRelay {
+				relayHost, err := GetHost(deletedNode.HostID.String())
+				if err != nil {
+					continue
+				}
+				relayPeer := wgtypes.PeerConfig{
+					PublicKey: relayHost.PublicKey,
+					Remove:    true,
+				}
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, relayPeer)
+			}
+			continue
+		}
+
 		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
 		var nodePeerMap map[string]models.PeerRouteInfo
 		if node.IsIngressGateway || node.IsEgressGateway {

+ 1 - 0
models/mqtt.go

@@ -9,6 +9,7 @@ import (
 // HostPeerUpdate - struct for host peer updates
 type HostPeerUpdate struct {
 	Host            Host                 `json:"host" bson:"host" yaml:"host"`
+	NodeAddrs       []net.IPNet          `json:"nodes_addrs" yaml:"nodes_addrs"`
 	Server          string               `json:"server" bson:"server" yaml:"server"`
 	ServerVersion   string               `json:"serverversion" bson:"serverversion" yaml:"serverversion"`
 	ServerAddrs     []ServerAddr         `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`

+ 8 - 0
models/node.go

@@ -181,6 +181,14 @@ func isLess(ipA string, ipB string) bool {
 	return bytes.Compare(ipNetA, ipNetB) < 0
 }
 
+// Node.PrimaryAddress - return ipv4 address if present, else return ipv6
+func (node *Node) PrimaryAddressIPNet() net.IPNet {
+	if node.Address.IP != nil {
+		return node.Address
+	}
+	return node.Address6
+}
+
 // Node.PrimaryAddress - return ipv4 address if present, else return ipv6
 func (node *Node) PrimaryAddress() string {
 	if node.Address.IP != nil {

+ 13 - 11
mq/publishers.go

@@ -88,18 +88,20 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deleted
 	if len(peerUpdate.Peers) == 0 { // no peers to send
 		return nil
 	}
-	proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
-	if err != nil {
-		return err
-	}
-	proxyUpdate.Server = servercfg.GetServer()
-	if host.ProxyEnabled {
-		proxyUpdate.Action = models.ProxyUpdate
-	} else {
-		proxyUpdate.Action = models.NoProxy
-	}
+	if host.OS != models.OS_Types.IoT {
+		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
+		if err != nil {
+			return err
+		}
+		proxyUpdate.Server = servercfg.GetServer()
+		if host.ProxyEnabled {
+			proxyUpdate.Action = models.ProxyUpdate
+		} else {
+			proxyUpdate.Action = models.NoProxy
+		}
 
-	peerUpdate.ProxyUpdate = proxyUpdate
+		peerUpdate.ProxyUpdate = proxyUpdate
+	}
 
 	data, err := json.Marshal(&peerUpdate)
 	if err != nil {