Browse Source

:gear: Add Alive service node probe

Ettore Di Giacinto 3 years ago
parent
commit
6ecf1f95bd
9 changed files with 232 additions and 0 deletions
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 6 0
      pkg/node/connection.go
  4. 1 0
      pkg/protocol/protocol.go
  5. 59 0
      pkg/services/alive.go
  6. 60 0
      pkg/services/alive_test.go
  7. 41 0
      pkg/utils/ip.go
  8. 34 0
      pkg/utils/ip_test.go
  9. 28 0
      pkg/utils/utils_suite_test.go

+ 1 - 0
go.mod

@@ -3,6 +3,7 @@ module github.com/mudler/edgevpn
 go 1.16
 
 require (
+	github.com/c-robinson/iplib v1.0.3 // indirect
 	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
 	github.com/fsnotify/fsnotify v1.5.1 // indirect

+ 2 - 0
go.sum

@@ -105,6 +105,8 @@ github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg
 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
 github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
+github.com/c-robinson/iplib v1.0.3 h1:NG0UF0GoEsrC1/vyfX1Lx2Ss7CySWl3KqqXh3q4DdPU=
+github.com/c-robinson/iplib v1.0.3/go.mod h1:i3LuuFL1hRT5gFpBRnEydzw8R6yhGkF4szNDIbF8pgo=
 github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
 github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

+ 6 - 0
pkg/node/connection.go

@@ -99,12 +99,18 @@ func (e *Node) handleEvents(ctx context.Context) {
 	for {
 		select {
 		case m := <-e.inputCh:
+			if m == nil {
+				continue
+			}
 			c := m.Copy()
 			if err := c.Seal(e.sealkey()); err != nil {
 				e.config.Logger.Warn(err.Error())
 			}
 			e.handleOutgoingMessage(c)
 		case m := <-e.HubRoom.Messages:
+			if m == nil {
+				continue
+			}
 			c := m.Copy()
 			if err := c.Unseal(e.sealkey()); err != nil {
 				e.config.Logger.Warn(err.Error())

+ 1 - 0
pkg/protocol/protocol.go

@@ -30,6 +30,7 @@ const (
 	MachinesLedgerKey = "machines"
 	ServicesLedgerKey = "services"
 	UsersLedgerKey    = "users"
+	HealthCheckKey    = "healthcheck"
 )
 
 type Protocol string

+ 59 - 0
pkg/services/alive.go

@@ -0,0 +1,59 @@
+// Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package services
+
+import (
+	"context"
+	"time"
+
+	"github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/protocol"
+
+	"github.com/mudler/edgevpn/pkg/blockchain"
+)
+
+func Alive(announcetime time.Duration) []node.Option {
+	return []node.Option{
+		node.WithNetworkService(
+			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+				// By announcing periodically our service to the blockchain
+				b.Announce(
+					ctx,
+					announcetime,
+					func() {
+						b.Add(protocol.HealthCheckKey, map[string]interface{}{
+							n.Host().ID().String(): time.Now().Format(time.RFC3339),
+						})
+					},
+				)
+				return nil
+			},
+		),
+	}
+}
+
+func AvailableNodes(b *blockchain.Ledger) (active []string) {
+	for u, t := range b.LastBlock().Storage[protocol.HealthCheckKey] {
+		var s string
+		t.Unmarshal(&s)
+		parsed, _ := time.Parse(time.RFC3339, s)
+		if parsed.Add(2 * time.Minute).After(time.Now()) {
+			active = append(active, u)
+		}
+	}
+
+	return active
+}

+ 60 - 0
pkg/services/alive_test.go

@@ -0,0 +1,60 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package services_test
+
+import (
+	"context"
+	"time"
+
+	"github.com/ipfs/go-log"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/logger"
+	node "github.com/mudler/edgevpn/pkg/node"
+	. "github.com/mudler/edgevpn/pkg/services"
+)
+
+var _ = Describe("Alive service", func() {
+	token := node.GenerateNewConnectionData().Base64()
+
+	logg := logger.New(log.LevelError)
+	l := node.Logger(logg)
+
+	opts := append(
+		Alive(1*time.Second),
+		node.FromBase64(true, true, token),
+		node.WithStore(&blockchain.MemoryStore{}),
+		l)
+	e2 := node.New(opts...)
+	e1 := node.New(opts...)
+
+	Context("Aliveness check", func() {
+		It("detect both nodes alive after a while", func() {
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
+
+			e1.Start(ctx)
+			e2.Start(ctx)
+
+			Eventually(func() []string {
+				ll, _ := e1.Ledger()
+				return AvailableNodes(ll)
+			}, 100*time.Second, 1*time.Second).Should(ContainElement(e2.Host().ID().String()))
+		})
+	})
+})

+ 41 - 0
pkg/utils/ip.go

@@ -0,0 +1,41 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package utils
+
+import (
+	"net"
+	"sort"
+
+	"github.com/c-robinson/iplib"
+)
+
+func NextIP(defaultIP string, ips []string) string {
+	if len(ips) == 0 {
+		return defaultIP
+	}
+
+	r := []net.IP{}
+	for _, i := range ips {
+		ip := net.ParseIP(i)
+		r = append(r, ip)
+	}
+
+	sort.Sort(iplib.ByIP(r))
+
+	last := r[len(r)-1]
+
+	return iplib.NextIP(last).String()
+}

+ 34 - 0
pkg/utils/ip_test.go

@@ -0,0 +1,34 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package utils_test
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+
+	. "github.com/mudler/edgevpn/pkg/utils"
+)
+
+var _ = Describe("IP", func() {
+	Context("NextIP", func() {
+		It("gives a new IP", func() {
+			Expect(NextIP("10.1.1.0", []string{"1.1.0.1"})).To(Equal("1.1.0.2"))
+		})
+		It("return default", func() {
+			Expect(NextIP("10.1.1.0", []string{})).To(Equal("10.1.1.0"))
+		})
+	})
+})

+ 28 - 0
pkg/utils/utils_suite_test.go

@@ -0,0 +1,28 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package utils_test
+
+import (
+	"testing"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+func TestUtils(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Utils Suite")
+}