|
@@ -5,7 +5,6 @@ import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"net/http"
|
|
|
- "reflect"
|
|
|
"time"
|
|
|
|
|
|
"github.com/google/uuid"
|
|
@@ -55,6 +54,8 @@ func hostHandlers(r *mux.Router) {
|
|
|
r.HandleFunc("/api/emqx/hosts", logic.SecurityCheck(true, http.HandlerFunc(delEmqxHosts))).
|
|
|
Methods(http.MethodDelete)
|
|
|
r.HandleFunc("/api/v1/auth-register/host", socketHandler)
|
|
|
+ r.HandleFunc("/api/v1/hosts/{hostid}/external_service_connector", logic.SecurityCheck(true, http.HandlerFunc(addExternalConnector))).
|
|
|
+ Methods(http.MethodPost)
|
|
|
}
|
|
|
|
|
|
// @Summary Requests all the hosts to upgrade their version
|
|
@@ -265,7 +266,6 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
|
|
|
}
|
|
|
|
|
|
newHost := newHostData.ConvertAPIHostToNMHost(currHost)
|
|
|
-
|
|
|
logic.UpdateHost(newHost, currHost) // update the in memory struct values
|
|
|
if err = logic.UpsertHost(newHost); err != nil {
|
|
|
logger.Log(0, r.Header.Get("user"), "failed to update a host:", err.Error())
|
|
@@ -286,23 +286,6 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
|
|
|
err.Error(),
|
|
|
)
|
|
|
}
|
|
|
- if !reflect.DeepEqual(newHost.EgressServices, currHost.EgressServices) {
|
|
|
- // update egress range on nodes
|
|
|
- logic.MapExternalServicesToHostNodes(newHost)
|
|
|
- // publish host update through MQ
|
|
|
- if err := mq.HostUpdate(&models.HostUpdate{
|
|
|
- Action: models.DiscoverEgressIps,
|
|
|
- Host: *newHost,
|
|
|
- }); err != nil {
|
|
|
- logger.Log(
|
|
|
- 0,
|
|
|
- r.Header.Get("user"),
|
|
|
- "failed to send host update: ",
|
|
|
- currHost.ID.String(),
|
|
|
- err.Error(),
|
|
|
- )
|
|
|
- }
|
|
|
- }
|
|
|
go func() {
|
|
|
if err := mq.PublishPeerUpdate(false); err != nil {
|
|
|
logger.Log(0, "fail to publish peer update: ", err.Error())
|
|
@@ -337,7 +320,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
|
|
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
|
|
|
return
|
|
|
}
|
|
|
- var sendPeerUpdate bool
|
|
|
+ var sendPeerUpdate, sendHostUpdate bool
|
|
|
var replacePeers bool
|
|
|
var hostUpdate models.HostUpdate
|
|
|
err = json.NewDecoder(r.Body).Decode(&hostUpdate)
|
|
@@ -356,7 +339,8 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
|
|
|
//remove old peer entry
|
|
|
replacePeers = true
|
|
|
}
|
|
|
- sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
|
|
|
+
|
|
|
+ sendPeerUpdate, sendHostUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
|
|
|
err := logic.UpsertHost(currentHost)
|
|
|
if err != nil {
|
|
|
slog.Error("failed to update host", "id", currentHost.ID, "error", err)
|
|
@@ -367,13 +351,20 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
|
|
|
case models.UpdateMetrics:
|
|
|
mq.UpdateMetricsFallBack(hostUpdate.Node.ID.String(), hostUpdate.NewMetrics)
|
|
|
}
|
|
|
-
|
|
|
+ if sendHostUpdate {
|
|
|
+ fmt.Printf("Sending Host UPDATE: %+v\n", currentHost)
|
|
|
+ mq.HostUpdate(&models.HostUpdate{
|
|
|
+ Action: models.UpdateHost,
|
|
|
+ Host: *currentHost,
|
|
|
+ })
|
|
|
+ }
|
|
|
if sendPeerUpdate {
|
|
|
err := mq.PublishPeerUpdate(replacePeers)
|
|
|
if err != nil {
|
|
|
slog.Error("failed to publish peer update", "error", err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
logic.ReturnSuccessResponse(w, r, "updated host data")
|
|
|
}
|
|
|
|
|
@@ -1022,3 +1013,48 @@ func getHostPeerInfo(w http.ResponseWriter, r *http.Request) {
|
|
|
}
|
|
|
logic.ReturnSuccessResponseWithJson(w, r, peerInfo, "fetched host peer info")
|
|
|
}
|
|
|
+
|
|
|
+// @Summary create external connector
|
|
|
+// @Router /api/hosts/{hostid}/sync [post]
|
|
|
+// @Tags Hosts
|
|
|
+// @Security oauth
|
|
|
+// @Param hostid path string true "Host ID"
|
|
|
+// @Success 200 {string} string "OK"
|
|
|
+// @Failure 400 {object} models.ErrorResponse
|
|
|
+func addExternalConnector(w http.ResponseWriter, r *http.Request) {
|
|
|
+ hostId := mux.Vars(r)["hostid"]
|
|
|
+
|
|
|
+ var errorResponse = models.ErrorResponse{}
|
|
|
+ w.Header().Set("Content-Type", "application/json")
|
|
|
+
|
|
|
+ host, err := logic.GetHost(hostId)
|
|
|
+ if err != nil {
|
|
|
+ slog.Error("failed to retrieve host", "user", r.Header.Get("user"), "error", err)
|
|
|
+ errorResponse.Code = http.StatusBadRequest
|
|
|
+ errorResponse.Message = err.Error()
|
|
|
+ logic.ReturnErrorResponse(w, r, errorResponse)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var egressServices = []string{}
|
|
|
+ err = json.NewDecoder(r.Body).Decode(&egressServices)
|
|
|
+ if err != nil {
|
|
|
+ logger.Log(0, r.Header.Get("user"), "failed to update a host:", err.Error())
|
|
|
+ logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ host.EgressServices = make(map[string][]models.EgressIPNat)
|
|
|
+ for _, egressServiceI := range egressServices {
|
|
|
+ host.EgressServices[egressServiceI] = []models.EgressIPNat{}
|
|
|
+ }
|
|
|
+ logic.UpsertHost(host)
|
|
|
+ go func() {
|
|
|
+ hostUpdate := models.HostUpdate{
|
|
|
+ Action: models.DiscoverEgressIps,
|
|
|
+ Host: *host,
|
|
|
+ }
|
|
|
+ if err = mq.HostUpdate(&hostUpdate); err != nil {
|
|
|
+ slog.Error("failed to send host pull request", "host", host.ID.String(), "error", err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ logic.ReturnSuccessResponseWithJson(w, r, host, "updates host service connectors")
|
|
|
+}
|