Browse Source

:rocket: Add experimental p2p ip negotiation

Ettore Di Giacinto 3 years ago
parent
commit
74b0516794
7 changed files with 187 additions and 25 deletions
  1. 15 4
      cmd/main.go
  2. 2 1
      go.mod
  3. 0 2
      go.sum
  4. 14 1
      pkg/services/alive.go
  5. 40 0
      pkg/utils/leader.go
  6. 100 0
      pkg/vpn/dhcp.go
  7. 16 17
      pkg/vpn/vpn.go

+ 15 - 4
cmd/main.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/mudler/edgevpn/api"
 	edgevpn "github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/services"
 	"github.com/mudler/edgevpn/pkg/vpn"
 	"github.com/urfave/cli"
 )
@@ -58,9 +59,8 @@ func MainFlags() []cli.Flag {
 		},
 		&cli.StringFlag{
 			Name:   "address",
-			Usage:  "VPN virtual address",
+			Usage:  "VPN virtual address, e.g. 10.1.0.1/24. No address specified enables p2p ip negotiation (experimental)",
 			EnvVar: "ADDRESS",
-			Value:  "10.1.0.1/24",
 		},
 		&cli.StringFlag{
 			Name:   "router",
@@ -90,13 +90,24 @@ func Main() func(c *cli.Context) error {
 		}
 		o, vpnOpts, ll := cliToOpts(c)
 
+		if c.String("address") == "" {
+			nodeOpts, vO := vpn.DHCP(ll, 10*time.Second)
+			o = append(
+				append(
+					o,
+					services.Alive(30*time.Second)...,
+				),
+				nodeOpts...,
+			)
+			vpnOpts = append(vpnOpts, vO...)
+		}
+
 		opts, err := vpn.Register(vpnOpts...)
 		if err != nil {
 			return err
 		}
-		o = append(o, opts...)
 
-		e := edgevpn.New(o...)
+		e := edgevpn.New(append(o, opts...)...)
 
 		displayStart(ll)
 

+ 2 - 1
go.mod

@@ -3,7 +3,7 @@ module github.com/mudler/edgevpn
 go 1.16
 
 require (
-	github.com/c-robinson/iplib v1.0.3 // indirect
+	github.com/c-robinson/iplib v1.0.3
 	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
 	github.com/fsnotify/fsnotify v1.5.1 // indirect
@@ -13,6 +13,7 @@ require (
 	github.com/ipfs/go-cid v0.1.0 // indirect
 	github.com/ipfs/go-datastore v0.5.1 // indirect
 	github.com/ipfs/go-log v1.0.5
+	github.com/ipfs/go-log/v2 v2.4.0
 	github.com/ipld/go-ipld-prime v0.14.3 // indirect
 	github.com/klauspost/compress v1.13.6 // indirect
 	github.com/labstack/echo/v4 v4.6.1

+ 0 - 2
go.sum

@@ -776,7 +776,6 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N
 github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
 github.com/miekg/dns v1.1.28/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
 github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
-github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg=
 github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
 github.com/miekg/dns v1.1.45 h1:g5fRIhm9nx7g8osrAvgb16QJfmyMsyOCb+J7LSv+Qzk=
 github.com/miekg/dns v1.1.45/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
@@ -856,7 +855,6 @@ github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/g
 github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
 github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
 github.com/multiformats/go-multicodec v0.2.0/go.mod h1:/y4YVwkfMyry5kFbMTbLJKErhycTIftytRV+llXdyS4=
-github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
 github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
 github.com/multiformats/go-multicodec v0.4.0 h1:fbqb6ky7erjdD+/zaEBJgZWu1i8D6i/wmPywGK7sdow=
 github.com/multiformats/go-multicodec v0.4.0/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=

+ 14 - 1
pkg/services/alive.go

@@ -21,6 +21,7 @@ import (
 
 	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/protocol"
+	"github.com/mudler/edgevpn/pkg/utils"
 
 	"github.com/mudler/edgevpn/pkg/blockchain"
 )
@@ -29,14 +30,26 @@ func Alive(announcetime time.Duration) []node.Option {
 	return []node.Option{
 		node.WithNetworkService(
 			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+				t := time.Now()
 				// By announcing periodically our service to the blockchain
 				b.Announce(
 					ctx,
 					announcetime,
 					func() {
+						// Keep-alive
 						b.Add(protocol.HealthCheckKey, map[string]interface{}{
 							n.Host().ID().String(): time.Now().Format(time.RFC3339),
 						})
+
+						// Keep-alive scrub
+						nodes := AvailableNodes(b)
+						lead := utils.Leader(nodes)
+						if lead == n.Host().ID().String() && !t.Add(2*time.Minute).After(time.Now()) {
+							// We scrub after some time passed
+							t = time.Now()
+							// Automatically scrub
+							b.DeleteBucket(protocol.HealthCheckKey)
+						}
 					},
 				)
 				return nil
@@ -50,7 +63,7 @@ func AvailableNodes(b *blockchain.Ledger) (active []string) {
 		var s string
 		t.Unmarshal(&s)
 		parsed, _ := time.Parse(time.RFC3339, s)
-		if parsed.Add(2 * time.Minute).After(time.Now()) {
+		if parsed.Add(15 * time.Minute).After(time.Now()) {
 			active = append(active, u)
 		}
 	}

+ 40 - 0
pkg/utils/leader.go

@@ -0,0 +1,40 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package utils
+
+import "hash/fnv"
+
+func hash(s string) uint32 {
+	h := fnv.New32a()
+	h.Write([]byte(s))
+	return h.Sum32()
+}
+
+func Leader(actives []string) string {
+	// first get available nodes
+	leaderboard := map[string]uint32{}
+
+	leader := actives[0]
+
+	// Compute who is leader at the moment
+	for _, a := range actives {
+		leaderboard[a] = hash(a)
+		if leaderboard[leader] < leaderboard[a] {
+			leader = a
+		}
+	}
+	return leader
+}

+ 100 - 0
pkg/vpn/dhcp.go

@@ -0,0 +1,100 @@
+// Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package vpn
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/ipfs/go-log/v2"
+	"github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/protocol"
+	"github.com/mudler/edgevpn/pkg/services"
+	"github.com/mudler/edgevpn/pkg/types"
+	"github.com/mudler/edgevpn/pkg/utils"
+
+	"github.com/mudler/edgevpn/pkg/blockchain"
+)
+
+func DHCP(l log.StandardLogger, announcetime time.Duration) ([]node.Option, []Option) {
+	ip := make(chan string, 1)
+	return []node.Option{
+			node.WithNetworkService(
+				func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+					//  whoever wants an IP:
+					//  1. Get available nodes. Filter from Machine those that do not have an IP.
+					//  2. Get the leader among them. If we are not, we wait
+					//  3. If we are the leader, pick an IP and start the VPN with that IP
+					var wantedIP string
+					for wantedIP == "" {
+						time.Sleep(5 * time.Second)
+
+						// This network service is blocking and calls in before VPN, hence it needs to registered before VPN
+						nodes := services.AvailableNodes(b)
+
+						currentIPs := map[string]string{}
+						ips := []string{}
+
+						for _, t := range b.LastBlock().Storage[protocol.MachinesLedgerKey] {
+							var m types.Machine
+							t.Unmarshal(&m)
+							currentIPs[m.PeerID] = m.Address
+
+							l.Debugf("%s uses %s", m.PeerID, m.Address)
+							ips = append(ips, m.Address)
+						}
+
+						nodesWithNoIP := []string{}
+						for _, nn := range nodes {
+							if _, exists := currentIPs[nn]; !exists {
+								nodesWithNoIP = append(nodesWithNoIP, nn)
+							}
+						}
+
+						if len(nodes) <= 1 {
+							l.Debug("not enough nodes to determine an IP, sleeping")
+							continue
+						}
+
+						lead := utils.Leader(nodesWithNoIP)
+						l.Debug("Nodes with no ip", nodesWithNoIP)
+
+						if n.Host().ID().String() != lead {
+							l.Debug("Not leader, sleeping")
+							time.Sleep(announcetime)
+							continue
+						}
+
+						// We are lead
+						l.Debug("picking up between", ips)
+
+						wantedIP = utils.NextIP("10.1.0.1", ips)
+					}
+
+					ip <- wantedIP
+					return nil
+				},
+			),
+		}, []Option{
+			func(cfg *Config) error {
+				cfg.InterfaceAddress = fmt.Sprintf("%s/24", <-ip)
+				close(ip)
+				l.Debug("IP Received", cfg.InterfaceAddress)
+				return nil
+			},
+		}
+}

+ 16 - 17
pkg/vpn/vpn.go

@@ -44,27 +44,26 @@ import (
 // Start the node and the vpn. Returns an error in case of failure
 // When starting the vpn, there is no need to start the node
 func Register(p ...Option) ([]node.Option, error) {
-	c := &Config{
-		Concurrency:        1,
-		LedgerAnnounceTime: 5 * time.Second,
-		Timeout:            15 * time.Second,
-		Logger:             logger.New(log.LevelDebug),
-	}
-	c.Apply(p...)
-
-	ifce, err := createInterface(c)
-	if err != nil {
-		return nil, err
-	}
 
 	return []node.Option{
-		node.WithStreamHandler(protocol.EdgeVPN,
-			func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {
-				return streamHandler(l, ifce)
-			},
-		),
 		node.WithNetworkService(func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
+			c := &Config{
+				Concurrency:        1,
+				LedgerAnnounceTime: 5 * time.Second,
+				Timeout:            15 * time.Second,
+				Logger:             logger.New(log.LevelDebug),
+			}
+			c.Apply(p...)
+
+			ifce, err := createInterface(c)
+			if err != nil {
+				return err
+			}
 			defer ifce.Close()
+
+			// Set stream handler during runtime
+			n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce))
+
 			// Announce our IP
 			ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 			if err != nil {