nodes.go 18 KB


  1. package logic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sort"
  9. "sync"
  10. "time"
  11. validator "github.com/go-playground/validator/v10"
  12. "github.com/google/uuid"
  13. "github.com/gravitl/netmaker/database"
  14. "github.com/gravitl/netmaker/logger"
  15. "github.com/gravitl/netmaker/logic/acls"
  16. "github.com/gravitl/netmaker/logic/acls/nodeacls"
  17. "github.com/gravitl/netmaker/models"
  18. "github.com/gravitl/netmaker/servercfg"
  19. "github.com/gravitl/netmaker/validation"
  20. "golang.org/x/exp/slog"
  21. )
  22. var (
  23. nodeCacheMutex = &sync.RWMutex{}
  24. nodesCacheMap = make(map[string]models.Node)
  25. )
  26. func getNodeFromCache(nodeID string) (node models.Node, ok bool) {
  27. nodeCacheMutex.RLock()
  28. node, ok = nodesCacheMap[nodeID]
  29. nodeCacheMutex.RUnlock()
  30. return
  31. }
  32. func getNodesFromCache() (nodes []models.Node) {
  33. nodeCacheMutex.RLock()
  34. for _, node := range nodesCacheMap {
  35. nodes = append(nodes, node)
  36. }
  37. nodeCacheMutex.RUnlock()
  38. return
  39. }
  40. func deleteNodeFromCache(nodeID string) {
  41. nodeCacheMutex.Lock()
  42. delete(nodesCacheMap, nodeID)
  43. nodeCacheMutex.Unlock()
  44. }
  45. func storeNodeInCache(node models.Node) {
  46. nodeCacheMutex.Lock()
  47. nodesCacheMap[node.ID.String()] = node
  48. nodeCacheMutex.Unlock()
  49. }
  50. func loadNodesIntoCache(nMap map[string]models.Node) {
  51. nodeCacheMutex.Lock()
  52. nodesCacheMap = nMap
  53. nodeCacheMutex.Unlock()
  54. }
  55. func ClearNodeCache() {
  56. nodeCacheMutex.Lock()
  57. nodesCacheMap = make(map[string]models.Node)
  58. nodeCacheMutex.Unlock()
  59. }
  60. const (
  61. // RELAY_NODE_ERR - error to return if relay node is unfound
  62. RELAY_NODE_ERR = "could not find relay for node"
  63. // NodePurgeTime time to wait for node to response to a NODE_DELETE actions
  64. NodePurgeTime = time.Second * 10
  65. // NodePurgeCheckTime is how often to check nodes for Pending Delete
  66. NodePurgeCheckTime = time.Second * 30
  67. )
  68. // GetNetworkNodes - gets the nodes of a network
  69. func GetNetworkNodes(network string) ([]models.Node, error) {
  70. allnodes, err := GetAllNodes()
  71. if err != nil {
  72. return []models.Node{}, err
  73. }
  74. return GetNetworkNodesMemory(allnodes, network), nil
  75. }
  76. // GetHostNodes - fetches all nodes part of the host
  77. func GetHostNodes(host *models.Host) []models.Node {
  78. nodes := []models.Node{}
  79. for _, nodeID := range host.Nodes {
  80. node, err := GetNodeByID(nodeID)
  81. if err == nil {
  82. nodes = append(nodes, node)
  83. }
  84. }
  85. return nodes
  86. }
  87. // GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory
  88. func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node {
  89. var nodes = []models.Node{}
  90. for i := range allNodes {
  91. node := allNodes[i]
  92. if node.Network == network {
  93. nodes = append(nodes, node)
  94. }
  95. }
  96. return nodes
  97. }
  98. // UpdateNodeCheckin - updates the checkin time of a node
  99. func UpdateNodeCheckin(node *models.Node) error {
  100. node.SetLastCheckIn()
  101. data, err := json.Marshal(node)
  102. if err != nil {
  103. return err
  104. }
  105. err = database.Insert(node.ID.String(), string(data), database.NODES_TABLE_NAME)
  106. if err != nil {
  107. return err
  108. }
  109. if servercfg.CacheEnabled() {
  110. storeNodeInCache(*node)
  111. }
  112. return nil
  113. }
  114. // UpsertNode - updates node in the DB
  115. func UpsertNode(newNode *models.Node) error {
  116. newNode.SetLastModified()
  117. data, err := json.Marshal(newNode)
  118. if err != nil {
  119. return err
  120. }
  121. err = database.Insert(newNode.ID.String(), string(data), database.NODES_TABLE_NAME)
  122. if err != nil {
  123. return err
  124. }
  125. if servercfg.CacheEnabled() {
  126. storeNodeInCache(*newNode)
  127. }
  128. return nil
  129. }
  130. // UpdateNode - takes a node and updates another node with it's values
  131. func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
  132. if newNode.Address.IP.String() != currentNode.Address.IP.String() {
  133. if network, err := GetParentNetwork(newNode.Network); err == nil {
  134. if !IsAddressInCIDR(newNode.Address.IP, network.AddressRange) {
  135. return fmt.Errorf("invalid address provided; out of network range for node %s", newNode.ID)
  136. }
  137. }
  138. }
  139. nodeACLDelta := currentNode.DefaultACL != newNode.DefaultACL
  140. newNode.Fill(currentNode, servercfg.IsPro)
  141. // check for un-settable server values
  142. if err := ValidateNode(newNode, true); err != nil {
  143. return err
  144. }
  145. if newNode.ID == currentNode.ID {
  146. if nodeACLDelta {
  147. if err := UpdateProNodeACLs(newNode); err != nil {
  148. logger.Log(1, "failed to apply node level ACLs during creation of node", newNode.ID.String(), "-", err.Error())
  149. return err
  150. }
  151. }
  152. newNode.SetLastModified()
  153. if data, err := json.Marshal(newNode); err != nil {
  154. return err
  155. } else {
  156. err = database.Insert(newNode.ID.String(), string(data), database.NODES_TABLE_NAME)
  157. if err != nil {
  158. return err
  159. }
  160. if servercfg.CacheEnabled() {
  161. storeNodeInCache(*newNode)
  162. }
  163. return nil
  164. }
  165. }
  166. return fmt.Errorf("failed to update node " + currentNode.ID.String() + ", cannot change ID.")
  167. }
  168. // DeleteNode - marks node for deletion (and adds to zombie list) if called by UI or deletes node if called by node
  169. func DeleteNode(node *models.Node, purge bool) error {
  170. alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
  171. node.Action = models.NODE_DELETE
  172. //delete ext clients if node is ingress gw
  173. if node.IsIngressGateway {
  174. if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
  175. slog.Error("failed to delete ext clients", "nodeid", node.ID.String(), "error", err.Error())
  176. }
  177. }
  178. if node.IsRelayed {
  179. // cleanup node from relayednodes on relay node
  180. relayNode, err := GetNodeByID(node.RelayedBy)
  181. if err == nil {
  182. relayedNodes := []string{}
  183. for _, relayedNodeID := range relayNode.RelayedNodes {
  184. if relayedNodeID == node.ID.String() {
  185. continue
  186. }
  187. relayedNodes = append(relayedNodes, relayedNodeID)
  188. }
  189. relayNode.RelayedNodes = relayedNodes
  190. UpsertNode(&relayNode)
  191. }
  192. }
  193. if node.FailedOverBy != uuid.Nil {
  194. ResetFailedOverPeer(node)
  195. }
  196. if node.IsRelay {
  197. // unset all the relayed nodes
  198. SetRelayedNodes(false, node.ID.String(), node.RelayedNodes)
  199. }
  200. if node.InternetGwID != "" {
  201. inetNode, err := GetNodeByID(node.InternetGwID)
  202. if err == nil {
  203. clientNodeIDs := []string{}
  204. for _, inetNodeClientID := range inetNode.InetNodeReq.InetNodeClientIDs {
  205. if inetNodeClientID == node.ID.String() {
  206. continue
  207. }
  208. clientNodeIDs = append(clientNodeIDs, inetNodeClientID)
  209. }
  210. inetNode.InetNodeReq.InetNodeClientIDs = clientNodeIDs
  211. UpsertNode(&inetNode)
  212. }
  213. }
  214. if node.IsInternetGateway {
  215. UnsetInternetGw(node)
  216. }
  217. if !purge && !alreadyDeleted {
  218. newnode := *node
  219. newnode.PendingDelete = true
  220. if err := UpdateNode(node, &newnode); err != nil {
  221. return err
  222. }
  223. newZombie <- node.ID
  224. return nil
  225. }
  226. if alreadyDeleted {
  227. logger.Log(1, "forcibly deleting node", node.ID.String())
  228. }
  229. host, err := GetHost(node.HostID.String())
  230. if err != nil {
  231. logger.Log(1, "no host found for node", node.ID.String(), "deleting..")
  232. if delErr := DeleteNodeByID(node); delErr != nil {
  233. logger.Log(0, "failed to delete node", node.ID.String(), delErr.Error())
  234. }
  235. return err
  236. }
  237. if err := DissasociateNodeFromHost(node, host); err != nil {
  238. return err
  239. }
  240. return nil
  241. }
  242. // GetNodeByHostRef - gets the node by host id and network
  243. func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
  244. nodes, err := GetNetworkNodes(network)
  245. if err != nil {
  246. return models.Node{}, err
  247. }
  248. for _, node := range nodes {
  249. if node.HostID.String() == hostid && node.Network == network {
  250. return node, nil
  251. }
  252. }
  253. return models.Node{}, errors.New("node not found")
  254. }
  255. // DeleteNodeByID - deletes a node from database
  256. func DeleteNodeByID(node *models.Node) error {
  257. var err error
  258. var key = node.ID.String()
  259. if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
  260. if !database.IsEmptyRecord(err) {
  261. return err
  262. }
  263. }
  264. if servercfg.CacheEnabled() {
  265. deleteNodeFromCache(node.ID.String())
  266. }
  267. if servercfg.IsDNSMode() {
  268. SetDNS()
  269. }
  270. _, err = nodeacls.RemoveNodeACL(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()))
  271. if err != nil {
  272. // ignoring for now, could hit a nil pointer if delete called twice
  273. logger.Log(2, "attempted to remove node ACL for node", node.ID.String())
  274. }
  275. // removeZombie <- node.ID
  276. if err = DeleteMetrics(node.ID.String()); err != nil {
  277. logger.Log(1, "unable to remove metrics from DB for node", node.ID.String(), err.Error())
  278. }
  279. return nil
  280. }
  281. // IsNodeIDUnique - checks if node id is unique
  282. func IsNodeIDUnique(node *models.Node) (bool, error) {
  283. _, err := database.FetchRecord(database.NODES_TABLE_NAME, node.ID.String())
  284. return database.IsEmptyRecord(err), err
  285. }
  286. // ValidateNode - validates node values
  287. func ValidateNode(node *models.Node, isUpdate bool) error {
  288. v := validator.New()
  289. _ = v.RegisterValidation("id_unique", func(fl validator.FieldLevel) bool {
  290. if isUpdate {
  291. return true
  292. }
  293. isFieldUnique, _ := IsNodeIDUnique(node)
  294. return isFieldUnique
  295. })
  296. _ = v.RegisterValidation("network_exists", func(fl validator.FieldLevel) bool {
  297. _, err := GetNetworkByNode(node)
  298. return err == nil
  299. })
  300. _ = v.RegisterValidation("checkyesornoorunset", func(f1 validator.FieldLevel) bool {
  301. return validation.CheckYesOrNoOrUnset(f1)
  302. })
  303. err := v.Struct(node)
  304. return err
  305. }
  306. // GetAllNodes - returns all nodes in the DB
  307. func GetAllNodes() ([]models.Node, error) {
  308. var nodes []models.Node
  309. if servercfg.CacheEnabled() {
  310. nodes = getNodesFromCache()
  311. if len(nodes) != 0 {
  312. return nodes, nil
  313. }
  314. }
  315. nodesMap := make(map[string]models.Node)
  316. if servercfg.CacheEnabled() {
  317. defer loadNodesIntoCache(nodesMap)
  318. }
  319. collection, err := database.FetchRecords(database.NODES_TABLE_NAME)
  320. if err != nil {
  321. if database.IsEmptyRecord(err) {
  322. return []models.Node{}, nil
  323. }
  324. return []models.Node{}, err
  325. }
  326. for _, value := range collection {
  327. var node models.Node
  328. // ignore legacy nodes in database
  329. if err := json.Unmarshal([]byte(value), &node); err != nil {
  330. logger.Log(3, "legacy node detected: ", err.Error())
  331. continue
  332. }
  333. // add node to our array
  334. nodes = append(nodes, node)
  335. nodesMap[node.ID.String()] = node
  336. }
  337. return nodes, nil
  338. }
  339. // GetNetworkByNode - gets the network model from a node
  340. func GetNetworkByNode(node *models.Node) (models.Network, error) {
  341. var network = models.Network{}
  342. networkData, err := database.FetchRecord(database.NETWORKS_TABLE_NAME, node.Network)
  343. if err != nil {
  344. return network, err
  345. }
  346. if err = json.Unmarshal([]byte(networkData), &network); err != nil {
  347. return models.Network{}, err
  348. }
  349. return network, nil
  350. }
  351. // SetNodeDefaults - sets the defaults of a node to avoid empty fields
  352. func SetNodeDefaults(node *models.Node) {
  353. parentNetwork, _ := GetNetworkByNode(node)
  354. _, cidr, err := net.ParseCIDR(parentNetwork.AddressRange)
  355. if err == nil {
  356. node.NetworkRange = *cidr
  357. }
  358. _, cidr, err = net.ParseCIDR(parentNetwork.AddressRange6)
  359. if err == nil {
  360. node.NetworkRange6 = *cidr
  361. }
  362. if node.DefaultACL == "" {
  363. node.DefaultACL = parentNetwork.DefaultACL
  364. }
  365. if node.FailOverPeers == nil {
  366. node.FailOverPeers = make(map[string]struct{})
  367. }
  368. node.SetLastModified()
  369. node.SetLastCheckIn()
  370. node.SetDefaultConnected()
  371. node.SetExpirationDateTime()
  372. }
  373. // GetRecordKey - get record key
  374. // depricated
  375. func GetRecordKey(id string, network string) (string, error) {
  376. if id == "" || network == "" {
  377. return "", errors.New("unable to get record key")
  378. }
  379. return id + "###" + network, nil
  380. }
  381. func GetNodeByID(uuid string) (models.Node, error) {
  382. if servercfg.CacheEnabled() {
  383. if node, ok := getNodeFromCache(uuid); ok {
  384. return node, nil
  385. }
  386. }
  387. var record, err = database.FetchRecord(database.NODES_TABLE_NAME, uuid)
  388. if err != nil {
  389. return models.Node{}, err
  390. }
  391. var node models.Node
  392. if err = json.Unmarshal([]byte(record), &node); err != nil {
  393. return models.Node{}, err
  394. }
  395. if servercfg.CacheEnabled() {
  396. storeNodeInCache(node)
  397. }
  398. return node, nil
  399. }
  400. // GetDeletedNodeByID - get a deleted node
  401. func GetDeletedNodeByID(uuid string) (models.Node, error) {
  402. var node models.Node
  403. record, err := database.FetchRecord(database.DELETED_NODES_TABLE_NAME, uuid)
  404. if err != nil {
  405. return models.Node{}, err
  406. }
  407. if err = json.Unmarshal([]byte(record), &node); err != nil {
  408. return models.Node{}, err
  409. }
  410. SetNodeDefaults(&node)
  411. return node, nil
  412. }
  413. // FindRelay - returns the node that is the relay for a relayed node
  414. func FindRelay(node *models.Node) *models.Node {
  415. relay, err := GetNodeByID(node.RelayedBy)
  416. if err != nil {
  417. logger.Log(0, "FindRelay: "+err.Error())
  418. return nil
  419. }
  420. return &relay
  421. }
  422. // GetAllNodesAPI - get all nodes for api usage
  423. func GetAllNodesAPI(nodes []models.Node) []models.ApiNode {
  424. apiNodes := []models.ApiNode{}
  425. for i := range nodes {
  426. newApiNode := nodes[i].ConvertToAPINode()
  427. apiNodes = append(apiNodes, *newApiNode)
  428. }
  429. return apiNodes[:]
  430. }
  431. // DeleteExpiredNodes - goroutine which deletes nodes which are expired
  432. func DeleteExpiredNodes(ctx context.Context, peerUpdate chan *models.Node) {
  433. // Delete Expired Nodes Every Hour
  434. ticker := time.NewTicker(time.Hour)
  435. for {
  436. select {
  437. case <-ctx.Done():
  438. ticker.Stop()
  439. return
  440. case <-ticker.C:
  441. allnodes, err := GetAllNodes()
  442. if err != nil {
  443. slog.Error("failed to retrieve all nodes", "error", err.Error())
  444. return
  445. }
  446. for _, node := range allnodes {
  447. node := node
  448. if time.Now().After(node.ExpirationDateTime) {
  449. peerUpdate <- &node
  450. slog.Info("deleting expired node", "nodeid", node.ID.String())
  451. }
  452. }
  453. }
  454. }
  455. }
  456. // createNode - creates a node in database
  457. func createNode(node *models.Node) error {
  458. // lock because we need unique IPs and having it concurrent makes parallel calls result in same "unique" IPs
  459. addressLock.Lock()
  460. defer addressLock.Unlock()
  461. host, err := GetHost(node.HostID.String())
  462. if err != nil {
  463. return err
  464. }
  465. if !node.DNSOn {
  466. if servercfg.IsDNSMode() {
  467. node.DNSOn = true
  468. } else {
  469. node.DNSOn = false
  470. }
  471. }
  472. SetNodeDefaults(node)
  473. defaultACLVal := acls.Allowed
  474. parentNetwork, err := GetNetwork(node.Network)
  475. if err == nil {
  476. if parentNetwork.DefaultACL != "yes" {
  477. defaultACLVal = acls.NotAllowed
  478. }
  479. }
  480. if node.DefaultACL == "" {
  481. node.DefaultACL = "unset"
  482. }
  483. if node.Address.IP == nil {
  484. if parentNetwork.IsIPv4 == "yes" {
  485. if node.Address.IP, err = UniqueAddress(node.Network, false); err != nil {
  486. return err
  487. }
  488. _, cidr, err := net.ParseCIDR(parentNetwork.AddressRange)
  489. if err != nil {
  490. return err
  491. }
  492. node.Address.Mask = net.CIDRMask(cidr.Mask.Size())
  493. }
  494. } else if !IsIPUnique(node.Network, node.Address.String(), database.NODES_TABLE_NAME, false) {
  495. return fmt.Errorf("invalid address: ipv4 " + node.Address.String() + " is not unique")
  496. }
  497. if node.Address6.IP == nil {
  498. if parentNetwork.IsIPv6 == "yes" {
  499. if node.Address6.IP, err = UniqueAddress6(node.Network, false); err != nil {
  500. return err
  501. }
  502. _, cidr, err := net.ParseCIDR(parentNetwork.AddressRange6)
  503. if err != nil {
  504. return err
  505. }
  506. node.Address6.Mask = net.CIDRMask(cidr.Mask.Size())
  507. }
  508. } else if !IsIPUnique(node.Network, node.Address6.String(), database.NODES_TABLE_NAME, true) {
  509. return fmt.Errorf("invalid address: ipv6 " + node.Address6.String() + " is not unique")
  510. }
  511. node.ID = uuid.New()
  512. //Create a JWT for the node
  513. tokenString, _ := CreateJWT(node.ID.String(), host.MacAddress.String(), node.Network)
  514. if tokenString == "" {
  515. //logic.ReturnErrorResponse(w, r, errorResponse)
  516. return err
  517. }
  518. err = ValidateNode(node, false)
  519. if err != nil {
  520. return err
  521. }
  522. CheckZombies(node)
  523. nodebytes, err := json.Marshal(&node)
  524. if err != nil {
  525. return err
  526. }
  527. err = database.Insert(node.ID.String(), string(nodebytes), database.NODES_TABLE_NAME)
  528. if err != nil {
  529. return err
  530. }
  531. if servercfg.CacheEnabled() {
  532. storeNodeInCache(*node)
  533. }
  534. _, err = nodeacls.CreateNodeACL(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), defaultACLVal)
  535. if err != nil {
  536. logger.Log(1, "failed to create node ACL for node,", node.ID.String(), "err:", err.Error())
  537. return err
  538. }
  539. if err = UpdateProNodeACLs(node); err != nil {
  540. logger.Log(1, "failed to apply node level ACLs during creation of node", node.ID.String(), "-", err.Error())
  541. return err
  542. }
  543. if err = UpdateMetrics(node.ID.String(), &models.Metrics{Connectivity: make(map[string]models.Metric)}); err != nil {
  544. logger.Log(1, "failed to initialize metrics for node", node.ID.String(), err.Error())
  545. }
  546. SetNetworkNodesLastModified(node.Network)
  547. if servercfg.IsDNSMode() {
  548. err = SetDNS()
  549. }
  550. return err
  551. }
  552. // SortApiNodes - Sorts slice of ApiNodes by their ID alphabetically with numbers first
  553. func SortApiNodes(unsortedNodes []models.ApiNode) {
  554. sort.Slice(unsortedNodes, func(i, j int) bool {
  555. return unsortedNodes[i].ID < unsortedNodes[j].ID
  556. })
  557. }
  558. func ValidateParams(nodeid, netid string) (models.Node, error) {
  559. node, err := GetNodeByID(nodeid)
  560. if err != nil {
  561. slog.Error("error fetching node", "node", nodeid, "error", err.Error())
  562. return node, fmt.Errorf("error fetching node during parameter validation: %v", err)
  563. }
  564. if node.Network != netid {
  565. slog.Error("network url param does not match node id", "url nodeid", netid, "node", node.Network)
  566. return node, fmt.Errorf("network url param does not match node network")
  567. }
  568. return node, nil
  569. }
  570. func ValidateEgressRange(gateway models.EgressGatewayRequest) error {
  571. network, err := GetNetworkSettings(gateway.NetID)
  572. if err != nil {
  573. slog.Error("error getting network with netid", "error", gateway.NetID, err.Error)
  574. return errors.New("error getting network with netid: " + gateway.NetID + " " + err.Error())
  575. }
  576. _, ipv4Net, _ := net.ParseCIDR(network.AddressRange)
  577. _, ipv6Net, _ := net.ParseCIDR(network.AddressRange6)
  578. for _, v := range gateway.Ranges {
  579. _, cidr, _ := net.ParseCIDR(v)
  580. if ipv4Net != nil {
  581. if ContainsCIDR(ipv4Net, cidr) || ContainsCIDR(cidr, ipv4Net) {
  582. slog.Error("egress range should not be the same as or contained in the netmaker network address", "error", cidr.String(), ipv4Net.String())
  583. return errors.New("egress range should not be the same as or contained in the netmaker network address" + cidr.String() + " " + ipv4Net.String())
  584. }
  585. }
  586. if ipv6Net != nil {
  587. if ContainsCIDR(ipv6Net, cidr) || ContainsCIDR(cidr, ipv6Net) {
  588. slog.Error("egress range should not be the same as or contained in the netmaker network address", "error", cidr.String(), ipv6Net.String())
  589. return errors.New("egress range should not be the same as or contained in the netmaker network address" + cidr.String() + " " + ipv6Net.String())
  590. }
  591. }
  592. }
  593. return nil
  594. }
  595. func ContainsCIDR(net1, net2 *net.IPNet) bool {
  596. net1Size, _ := net1.Mask.Size()
  597. net2Size, _ := net2.Mask.Size()
  598. return net1Size <= net2Size && net1.Contains(net2.IP)
  599. }
  600. // GetAllFailOvers - gets all the nodes that are failovers
  601. func GetAllFailOvers() ([]models.Node, error) {
  602. nodes, err := GetAllNodes()
  603. if err != nil {
  604. return nil, err
  605. }
  606. igs := make([]models.Node, 0)
  607. for _, node := range nodes {
  608. if node.IsFailOver {
  609. igs = append(igs, node)
  610. }
  611. }
  612. return igs, nil
  613. }