Browse Source

NET-1906: Add endpoints for Sync All Hosts and Upgrade All Hosts. (#3302)

* feat(go): add endpoint to sync all hosts.

* feat(go): add endpoint to upgrade all hosts.

* feat(go): allow force upgrade of hosts.

* fix(go): config yaml tag.
Vishal Dalwadi 7 months ago
parent
commit
cec48be354
3 changed files with 100 additions and 3 deletions
  1. 1 1
      config/config.go
  2. 97 2
      controllers/hosts.go
  3. 2 0
      models/host.go

+ 1 - 1
config/config.go

@@ -37,7 +37,7 @@ type ServerConfig struct {
 	APIConnString              string        `yaml:"apiconn"`
 	APIHost                    string        `yaml:"apihost"`
 	APIPort                    string        `yaml:"apiport"`
-	Broker                     string        `yam:"broker"`
+	Broker                     string        `yaml:"broker"`
 	ServerBrokerEndpoint       string        `yaml:"serverbrokerendpoint"`
 	BrokerType                 string        `yaml:"brokertype"`
 	EmqxRestEndpoint           string        `yaml:"emqxrestendpoint"`

+ 97 - 2
controllers/hosts.go

@@ -24,6 +24,10 @@ func hostHandlers(r *mux.Router) {
 		Methods(http.MethodGet)
 	r.HandleFunc("/api/hosts/keys", logic.SecurityCheck(true, http.HandlerFunc(updateAllKeys))).
 		Methods(http.MethodPut)
+	r.HandleFunc("/api/hosts/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHosts))).
+		Methods(http.MethodPost)
+	r.HandleFunc("/api/hosts/upgrade", logic.SecurityCheck(true, http.HandlerFunc(upgradeHosts))).
+		Methods(http.MethodPost)
 	r.HandleFunc("/api/hosts/{hostid}/keys", logic.SecurityCheck(true, http.HandlerFunc(updateKeys))).
 		Methods(http.MethodPut)
 	r.HandleFunc("/api/hosts/{hostid}/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHost))).
@@ -50,11 +54,57 @@ func hostHandlers(r *mux.Router) {
 	r.HandleFunc("/api/v1/auth-register/host", socketHandler)
 }
 
+// @Summary     Requests all the hosts to upgrade their version
+// @Router      /api/hosts/upgrade [post]
+// @Tags        Hosts
+// @Security    oauth
+// @Param       force query bool false "Force upgrade"
+// @Success     200 {string} string "upgrade all hosts request received"
+func upgradeHosts(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "application/json")
+
+	action := models.Upgrade
+
+	if r.URL.Query().Get("force") == "true" {
+		action = models.ForceUpgrade
+	}
+
+	user := r.Header.Get("user")
+
+	go func() {
+		slog.Info("requesting all hosts to upgrade", "user", user)
+
+		hosts, err := logic.GetAllHosts()
+		if err != nil {
+			slog.Error("failed to retrieve all hosts", "user", user, "error", err)
+			return
+		}
+
+		for _, host := range hosts {
+			go func(host models.Host) {
+				hostUpdate := models.HostUpdate{
+					Action: action,
+					Host:   host,
+				}
+				if err = mq.HostUpdate(&hostUpdate); err != nil {
+					slog.Error("failed to request host to upgrade", "user", user, "host", host.ID.String(), "error", err)
+				} else {
+					slog.Info("host upgrade requested", "user", user, "host", host.ID.String())
+				}
+			}(host)
+		}
+	}()
+
+	slog.Info("upgrade all hosts request received", "user", user)
+	logic.ReturnSuccessResponse(w, r, "upgrade all hosts request received")
+}
+
 // @Summary     Upgrade a host
 // @Router      /api/hosts/{hostid}/upgrade [put]
 // @Tags        Hosts
 // @Security    oauth
 // @Param       hostid path string true "Host ID"
+// @Param       force query bool false "Force upgrade"
 // @Success     200 {string} string "passed message to upgrade host"
 // @Failure     500 {object} models.ErrorResponse
 // upgrade host is a handler to send upgrade message to a host
@@ -65,7 +115,14 @@ func upgradeHost(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "notfound"))
 		return
 	}
-	if err := mq.HostUpdate(&models.HostUpdate{Action: models.Upgrade, Host: *host}); err != nil {
+
+	action := models.Upgrade
+
+	if r.URL.Query().Get("force") == "true" {
+		action = models.ForceUpgrade
+	}
+
+	if err := mq.HostUpdate(&models.HostUpdate{Action: action, Host: *host}); err != nil {
 		slog.Error("failed to upgrade host", "error", err)
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
@@ -860,6 +917,44 @@ func updateKeys(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 }
 
+// @Summary     Requests all the hosts to pull
+// @Router      /api/hosts/sync [post]
+// @Tags        Hosts
+// @Security    oauth
+// @Success     200 {string} string "sync all hosts request received"
+func syncHosts(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "application/json")
+
+	user := r.Header.Get("user")
+
+	go func() {
+		slog.Info("requesting all hosts to sync", "user", user)
+
+		hosts, err := logic.GetAllHosts()
+		if err != nil {
+			slog.Error("failed to retrieve all hosts", "user", user, "error", err)
+			return
+		}
+
+		for _, host := range hosts {
+			go func(host models.Host) {
+				hostUpdate := models.HostUpdate{
+					Action: models.RequestPull,
+					Host:   host,
+				}
+				if err = mq.HostUpdate(&hostUpdate); err != nil {
+					slog.Error("failed to request host to sync", "user", user, "host", host.ID.String(), "error", err)
+				} else {
+					slog.Info("host sync requested", "user", user, "host", host.ID.String())
+				}
+			}(host)
+		}
+	}()
+
+	slog.Info("sync all hosts request received", "user", user)
+	logic.ReturnSuccessResponse(w, r, "sync all hosts request received")
+}
+
 // @Summary     Requests a host to pull
 // @Router      /api/hosts/{hostid}/sync [post]
 // @Tags        Hosts
@@ -892,7 +987,7 @@ func syncHost(w http.ResponseWriter, r *http.Request) {
 		}
 	}()
 
-	slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID)
+	slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID.String())
 	w.WriteHeader(http.StatusOK)
 }
 

+ 2 - 0
models/host.go

@@ -98,6 +98,8 @@ type HostMqAction string
 const (
 	// Upgrade - const to request host to update it's client
 	Upgrade HostMqAction = "UPGRADE"
+	// ForceUpgrade - const for forcing a host to upgrade its client binary
+	ForceUpgrade HostMqAction = "FORCE_UPGRADE"
 	// SignalHost - const for host signal action
 	SignalHost HostMqAction = "SIGNAL_HOST"
 	// UpdateHost - constant for host update action