| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- package queue
- import (
- "fmt"
- "github.com/gorilla/websocket"
- "github.com/gravitl/netmaker/database"
- "github.com/gravitl/netmaker/logger"
- "github.com/gravitl/netmaker/logic"
- "github.com/gravitl/netmaker/models"
- "github.com/gravitl/netmaker/netclient/ncutils"
- "github.com/gravitl/netmaker/servercfg"
- )
- // holds a map of funcs
- // based on topic to handle an event
- var handlerFuncs map[int]func(*models.Event)
- // initializes the map of functions
- // "Send" functions are sent to clients, others affect server
- func initializeHandlers() {
- handlerFuncs = make(map[int]func(*models.Event))
- handlerFuncs[models.EventTopics.Test] = test
- handlerFuncs[models.EventTopics.NodeUpdate] = nodeUpdate
- handlerFuncs[models.EventTopics.HostUpdate] = hostUpdate
- handlerFuncs[models.EventTopics.Ping] = ping
- handlerFuncs[models.EventTopics.Metrics] = updateMetrics
- handlerFuncs[models.EventTopics.ClientUpdate] = clientPeerUpdate
- handlerFuncs[models.EventTopics.SendAllHostPeerUpdate] = publishPeerUpdates
- handlerFuncs[models.EventTopics.SendHostUpdate] = sendHostUpdate
- handlerFuncs[models.EventTopics.SendNodeUpdate] = sendNodeUpdate
- }
- // == handler funcs ==
- func test(e *models.Event) {
- val, ok := ConnMap.Load(e.ID)
- if ok {
- conn := val.(*websocket.Conn)
- if conn != nil {
- conn.WriteMessage(websocket.TextMessage, []byte("success"))
- }
- }
- }
- func ping(e *models.Event) {
- node, err := logic.GetNodeByID(e.ID)
- if err != nil {
- logger.Log(3, "mq-ping error getting node: ", err.Error())
- record, err := database.FetchRecord(database.NODES_TABLE_NAME, e.ID)
- if err != nil {
- logger.Log(3, "error reading database ", err.Error())
- return
- }
- logger.Log(3, "record from database")
- logger.Log(3, record)
- return
- }
- checkin := e.Payload.NodeCheckin
- if checkin == nil {
- logger.Log(0, "failed to complete checkin for node", node.ID.String())
- }
- host, err := logic.GetHost(node.HostID.String())
- if err != nil {
- logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
- return
- }
- node.SetLastCheckIn()
- host.Version = checkin.Version
- node.Connected = checkin.Connected
- host.Interfaces = checkin.Ifaces
- for i := range host.Interfaces {
- host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
- }
- if err := logic.UpdateNode(&node, &node); err != nil {
- logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
- return
- }
- logger.Log(3, "ping processed for node", node.ID.String())
- }
- func nodeUpdate(e *models.Event) {
- currentNode, err := logic.GetNodeByID(e.ID)
- if err != nil {
- logger.Log(1, "error getting node ", e.ID, err.Error())
- return
- }
- newNode := e.Payload.Node
- if newNode == nil {
- logger.Log(0, "failed to update node", currentNode.ID.String())
- }
- ifaceDelta := logic.IfaceDelta(¤tNode, newNode)
- if servercfg.Is_EE && ifaceDelta {
- if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
- logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
- }
- }
- newNode.SetLastCheckIn()
- if err := logic.UpdateNode(¤tNode, newNode); err != nil {
- logger.Log(1, "error saving node", err.Error())
- return
- }
- if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
- PublishAllPeerUpdate()
- }
- logger.Log(1, "updated node", newNode.ID.String())
- }
- func hostUpdate(e *models.Event) {
- currentHost, err := logic.GetHost(e.ID)
- if err != nil {
- logger.Log(1, "error getting host ", e.ID, err.Error())
- return
- }
- hostUpdate := e.Payload.HostUpdate
- if hostUpdate == nil {
- logger.Log(0, "failed to update host", currentHost.Name, currentHost.ID.String())
- }
- logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
- var sendPeerUpdate bool
- switch hostUpdate.Action {
- case models.UpdateHost:
- sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
- err := logic.UpsertHost(currentHost)
- if err != nil {
- logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
- return
- }
- case models.DeleteHost:
- if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
- logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
- return
- }
- if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
- logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
- return
- }
- sendPeerUpdate = true
- }
- if sendPeerUpdate {
- PublishAllPeerUpdate()
- }
- }
- func updateMetrics(e *models.Event) {
- if servercfg.Is_EE {
- id := e.ID
- currentNode, err := logic.GetNodeByID(id)
- if err != nil {
- logger.Log(1, "error getting node ", id, err.Error())
- return
- }
- var newMetrics = e.Payload.Metrics
- if newMetrics == nil {
- logger.Log(1, "provided metrics were nil for node", id)
- return
- }
- shouldUpdate := updateNodeMetrics(¤tNode, newMetrics)
- if err = logic.UpdateMetrics(id, newMetrics); err != nil {
- logger.Log(1, "faield to update node metrics", id, err.Error())
- return
- }
- // TODO adapt metrics exporter..
- // if servercfg.IsMetricsExporter() {
- // if err := pushMetricsToExporter(newMetrics); err != nil {
- // logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
- // currentNode.ID, err))
- // }
- // }
- if newMetrics.Connectivity != nil {
- err := logic.EnterpriseFailoverFunc(¤tNode)
- if err != nil {
- logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
- }
- }
- if shouldUpdate {
- logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
- host, err := logic.GetHost(currentNode.HostID.String())
- if err == nil {
- if err = publishHostPeerUpdate(host); err != nil {
- logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
- }
- }
- }
- logger.Log(1, "updated node metrics", id)
- }
- }
- func clientPeerUpdate(e *models.Event) {
- id := e.ID
- node, err := logic.GetNodeByID(id)
- if err != nil {
- logger.Log(1, "error getting node", id, err.Error())
- return
- }
- host, err := logic.GetHost(node.HostID.String())
- if err != nil {
- logger.Log(1, "error getting node's host for peer update", id, err.Error())
- return
- }
- action := e.Payload.Action
- switch action {
- case ncutils.ACK:
- //do we still need this
- case ncutils.DONE:
- publishHostPeerUpdate(host)
- }
- logger.Log(1, "sent peer updates after signal received from", id)
- }
|