alive.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package services
  16. import (
  17. "context"
  18. "time"
  19. "github.com/mudler/edgevpn/pkg/node"
  20. "github.com/mudler/edgevpn/pkg/protocol"
  21. "github.com/mudler/edgevpn/pkg/utils"
  22. "github.com/mudler/edgevpn/pkg/blockchain"
  23. )
  24. func AliveNetworkService(announcetime, scrubTime, maxtime time.Duration) node.NetworkService {
  25. return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
  26. t := time.Now()
  27. // By announcing periodically our service to the blockchain
  28. b.Announce(
  29. ctx,
  30. announcetime,
  31. func() {
  32. // Keep-alive
  33. b.Add(protocol.HealthCheckKey, map[string]interface{}{
  34. n.Host().ID().String(): time.Now().UTC().Format(time.RFC3339),
  35. })
  36. // Keep-alive scrub
  37. nodes := AvailableNodes(b, maxtime)
  38. if len(nodes) == 0 {
  39. return
  40. }
  41. lead := utils.Leader(nodes)
  42. if !t.Add(scrubTime).After(time.Now()) {
  43. // Update timer so not-leader do not attempt to delete bucket afterwards
  44. // prevent cycles
  45. t = time.Now()
  46. if lead == n.Host().ID().String() {
  47. // Automatically scrub after some time passed
  48. b.DeleteBucket(protocol.HealthCheckKey)
  49. }
  50. }
  51. },
  52. )
  53. return nil
  54. }
  55. }
  56. // Alive announce the node every announce time, with a periodic scrub time for healthchecks
  57. // the maxtime is the time used to determine when a node is unreachable (after maxtime, its unreachable)
  58. func Alive(announcetime, scrubTime, maxtime time.Duration) []node.Option {
  59. return []node.Option{
  60. node.WithNetworkService(AliveNetworkService(announcetime, scrubTime, maxtime)),
  61. }
  62. }
  63. // AvailableNodes returns the available nodes which sent a healthcheck in the last maxTime
  64. func AvailableNodes(b *blockchain.Ledger, maxTime time.Duration) (active []string) {
  65. for u, t := range b.LastBlock().Storage[protocol.HealthCheckKey] {
  66. var s string
  67. t.Unmarshal(&s)
  68. parsed, _ := time.Parse(time.RFC3339, s)
  69. if parsed.Add(maxTime).After(time.Now().UTC()) {
  70. active = append(active, u)
  71. }
  72. }
  73. return active
  74. }