|
@@ -5,6 +5,7 @@ import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"net/http"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
"github.com/gorilla/mux"
|
|
@@ -23,6 +24,10 @@ func hostHandlers(r *mux.Router) {
|
|
|
Methods(http.MethodGet)
|
|
|
r.HandleFunc("/api/hosts/keys", logic.SecurityCheck(true, http.HandlerFunc(updateAllKeys))).
|
|
|
Methods(http.MethodPut)
|
|
|
+ r.HandleFunc("/api/hosts/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHosts))).
|
|
|
+ Methods(http.MethodPost)
|
|
|
+ r.HandleFunc("/api/hosts/upgrade", logic.SecurityCheck(true, http.HandlerFunc(upgradeHosts))).
|
|
|
+ Methods(http.MethodPost)
|
|
|
r.HandleFunc("/api/hosts/{hostid}/keys", logic.SecurityCheck(true, http.HandlerFunc(updateKeys))).
|
|
|
Methods(http.MethodPut)
|
|
|
r.HandleFunc("/api/hosts/{hostid}/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHost))).
|
|
@@ -44,16 +49,64 @@ func hostHandlers(r *mux.Router) {
|
|
|
Methods(http.MethodPost)
|
|
|
r.HandleFunc("/api/v1/fallback/host/{hostid}", Authorize(true, false, "host", http.HandlerFunc(hostUpdateFallback))).
|
|
|
Methods(http.MethodPut)
|
|
|
+ r.HandleFunc("/api/v1/host/{hostid}/peer_info", Authorize(true, false, "host", http.HandlerFunc(getHostPeerInfo))).
|
|
|
+ Methods(http.MethodGet)
|
|
|
r.HandleFunc("/api/emqx/hosts", logic.SecurityCheck(true, http.HandlerFunc(delEmqxHosts))).
|
|
|
Methods(http.MethodDelete)
|
|
|
r.HandleFunc("/api/v1/auth-register/host", socketHandler)
|
|
|
}
|
|
|
|
|
|
+// @Summary Requests all the hosts to upgrade their version
|
|
|
+// @Router /api/hosts/upgrade [post]
|
|
|
+// @Tags Hosts
|
|
|
+// @Security oauth
|
|
|
+// @Param force query bool false "Force upgrade"
|
|
|
+// @Success 200 {string} string "upgrade all hosts request received"
|
|
|
+func upgradeHosts(w http.ResponseWriter, r *http.Request) {
|
|
|
+ w.Header().Set("Content-Type", "application/json")
|
|
|
+
|
|
|
+ action := models.Upgrade
|
|
|
+
|
|
|
+ if r.URL.Query().Get("force") == "true" {
|
|
|
+ action = models.ForceUpgrade
|
|
|
+ }
|
|
|
+
|
|
|
+ user := r.Header.Get("user")
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ slog.Info("requesting all hosts to upgrade", "user", user)
|
|
|
+
|
|
|
+ hosts, err := logic.GetAllHosts()
|
|
|
+ if err != nil {
|
|
|
+ slog.Error("failed to retrieve all hosts", "user", user, "error", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, host := range hosts {
|
|
|
+ go func(host models.Host) {
|
|
|
+ hostUpdate := models.HostUpdate{
|
|
|
+ Action: action,
|
|
|
+ Host: host,
|
|
|
+ }
|
|
|
+ if err = mq.HostUpdate(&hostUpdate); err != nil {
|
|
|
+ slog.Error("failed to request host to upgrade", "user", user, "host", host.ID.String(), "error", err)
|
|
|
+ } else {
|
|
|
+ slog.Info("host upgrade requested", "user", user, "host", host.ID.String())
|
|
|
+ }
|
|
|
+ }(host)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ slog.Info("upgrade all hosts request received", "user", user)
|
|
|
+ logic.ReturnSuccessResponse(w, r, "upgrade all hosts request received")
|
|
|
+}
|
|
|
+
|
|
|
// @Summary Upgrade a host
|
|
|
// @Router /api/hosts/{hostid}/upgrade [put]
|
|
|
// @Tags Hosts
|
|
|
// @Security oauth
|
|
|
// @Param hostid path string true "Host ID"
|
|
|
+// @Param force query bool false "Force upgrade"
|
|
|
// @Success 200 {string} string "passed message to upgrade host"
|
|
|
// @Failure 500 {object} models.ErrorResponse
|
|
|
// upgrade host is a handler to send upgrade message to a host
|
|
@@ -64,7 +117,14 @@ func upgradeHost(w http.ResponseWriter, r *http.Request) {
|
|
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "notfound"))
|
|
|
return
|
|
|
}
|
|
|
- if err := mq.HostUpdate(&models.HostUpdate{Action: models.Upgrade, Host: *host}); err != nil {
|
|
|
+
|
|
|
+ action := models.Upgrade
|
|
|
+
|
|
|
+ if r.URL.Query().Get("force") == "true" {
|
|
|
+ action = models.ForceUpgrade
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := mq.HostUpdate(&models.HostUpdate{Action: action, Host: *host}); err != nil {
|
|
|
slog.Error("failed to upgrade host", "error", err)
|
|
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
|
|
return
|
|
@@ -175,13 +235,13 @@ func pull(w http.ResponseWriter, r *http.Request) {
|
|
|
slog.Error("failed to get node:", "id", node.ID, "error", err)
|
|
|
continue
|
|
|
}
|
|
|
- if node.FailedOverBy != uuid.Nil {
|
|
|
+ if node.FailedOverBy != uuid.Nil && r.URL.Query().Get("reset_failovered") == "true" {
|
|
|
logic.ResetFailedOverPeer(&node)
|
|
|
sendPeerUpdate = true
|
|
|
}
|
|
|
}
|
|
|
if sendPeerUpdate {
|
|
|
- if err := mq.PublishPeerUpdate(true); err != nil {
|
|
|
+ if err := mq.PublishPeerUpdate(false); err != nil {
|
|
|
logger.Log(0, "fail to publish peer update: ", err.Error())
|
|
|
}
|
|
|
}
|
|
@@ -311,11 +371,11 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
|
|
|
var hostUpdate models.HostUpdate
|
|
|
err = json.NewDecoder(r.Body).Decode(&hostUpdate)
|
|
|
if err != nil {
|
|
|
- logger.Log(0, r.Header.Get("user"), "failed to update a host:", err.Error())
|
|
|
+ slog.Error("failed to update a host:", "user", r.Header.Get("user"), "error", err.Error(), "host", currentHost.Name)
|
|
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
|
|
return
|
|
|
}
|
|
|
- slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID)
|
|
|
+ slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID, "action", hostUpdate.Action)
|
|
|
switch hostUpdate.Action {
|
|
|
case models.CheckIn:
|
|
|
sendPeerUpdate = mq.HandleHostCheckin(&hostUpdate.Host, currentHost)
|
|
@@ -532,7 +592,7 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
|
|
|
w,
|
|
|
r,
|
|
|
logic.FormatError(
|
|
|
- fmt.Errorf("failed to force delete daemon node: "+err.Error()),
|
|
|
+ fmt.Errorf("failed to force delete daemon node: %s", err.Error()),
|
|
|
"internal",
|
|
|
),
|
|
|
)
|
|
@@ -572,7 +632,7 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
|
|
|
w,
|
|
|
r,
|
|
|
logic.FormatError(
|
|
|
- fmt.Errorf("failed to force delete daemon node: "+err.Error()),
|
|
|
+ fmt.Errorf("failed to force delete daemon node: %s", err.Error()),
|
|
|
"internal",
|
|
|
),
|
|
|
)
|
|
@@ -855,6 +915,45 @@ func updateKeys(w http.ResponseWriter, r *http.Request) {
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
}
|
|
|
|
|
|
+// @Summary Requests all the hosts to pull
|
|
|
+// @Router /api/hosts/sync [post]
|
|
|
+// @Tags Hosts
|
|
|
+// @Security oauth
|
|
|
+// @Success 200 {string} string "sync all hosts request received"
|
|
|
+func syncHosts(w http.ResponseWriter, r *http.Request) {
|
|
|
+ w.Header().Set("Content-Type", "application/json")
|
|
|
+
|
|
|
+ user := r.Header.Get("user")
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ slog.Info("requesting all hosts to sync", "user", user)
|
|
|
+
|
|
|
+ hosts, err := logic.GetAllHosts()
|
|
|
+ if err != nil {
|
|
|
+ slog.Error("failed to retrieve all hosts", "user", user, "error", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, host := range hosts {
|
|
|
+ go func(host models.Host) {
|
|
|
+ hostUpdate := models.HostUpdate{
|
|
|
+ Action: models.RequestPull,
|
|
|
+ Host: host,
|
|
|
+ }
|
|
|
+ if err = mq.HostUpdate(&hostUpdate); err != nil {
|
|
|
+ slog.Error("failed to request host to sync", "user", user, "host", host.ID.String(), "error", err)
|
|
|
+ } else {
|
|
|
+ slog.Info("host sync requested", "user", user, "host", host.ID.String())
|
|
|
+ }
|
|
|
+ }(host)
|
|
|
+ time.Sleep(time.Millisecond * 100)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ slog.Info("sync all hosts request received", "user", user)
|
|
|
+ logic.ReturnSuccessResponse(w, r, "sync all hosts request received")
|
|
|
+}
|
|
|
+
|
|
|
// @Summary Requests a host to pull
|
|
|
// @Router /api/hosts/{hostid}/sync [post]
|
|
|
// @Tags Hosts
|
|
@@ -887,7 +986,7 @@ func syncHost(w http.ResponseWriter, r *http.Request) {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID)
|
|
|
+ slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID.String())
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
}
|
|
|
|
|
@@ -922,3 +1021,33 @@ func delEmqxHosts(w http.ResponseWriter, r *http.Request) {
|
|
|
}
|
|
|
logic.ReturnSuccessResponse(w, r, "deleted hosts data on emqx")
|
|
|
}
|
|
|
+
|
|
|
+// @Summary Fetches host peerinfo
|
|
|
+// @Router /api/host/{hostid}/peer_info [get]
|
|
|
+// @Tags Hosts
|
|
|
+// @Security oauth
|
|
|
+// @Param hostid path string true "Host ID"
|
|
|
+// @Success 200 {object} models.SuccessResponse
|
|
|
+// @Failure 500 {object} models.ErrorResponse
|
|
|
+func getHostPeerInfo(w http.ResponseWriter, r *http.Request) {
|
|
|
+ hostId := mux.Vars(r)["hostid"]
|
|
|
+ var errorResponse = models.ErrorResponse{}
|
|
|
+
|
|
|
+ host, err := logic.GetHost(hostId)
|
|
|
+ if err != nil {
|
|
|
+ slog.Error("failed to retrieve host", "error", err)
|
|
|
+ errorResponse.Code = http.StatusBadRequest
|
|
|
+ errorResponse.Message = err.Error()
|
|
|
+ logic.ReturnErrorResponse(w, r, errorResponse)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ peerInfo, err := logic.GetHostPeerInfo(host)
|
|
|
+ if err != nil {
|
|
|
+ slog.Error("failed to retrieve host peerinfo", "error", err)
|
|
|
+ errorResponse.Code = http.StatusBadRequest
|
|
|
+ errorResponse.Message = err.Error()
|
|
|
+ logic.ReturnErrorResponse(w, r, errorResponse)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ logic.ReturnSuccessResponseWithJson(w, r, peerInfo, "fetched host peer info")
|
|
|
+}
|