Browse Source

:gear: Add autorelay discovery

Ettore Di Giacinto 3 years ago
parent
commit
f800a084b8
4 changed files with 118 additions and 19 deletions
  1. 19 7
      cmd/util.go
  2. 25 6
      pkg/config/config.go
  3. 6 6
      pkg/discovery/dht.go
  4. 68 0
      pkg/services/dht_relay_discovery.go

+ 19 - 7
cmd/util.go

@@ -77,6 +77,12 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		EnvVar: "EDGEVPNLEDGERINTERVAL",
 		EnvVar: "EDGEVPNLEDGERINTERVAL",
 		Value:  10,
 		Value:  10,
 	},
 	},
+	&cli.StringFlag{
+		Name:   "autorelay-discovery-interval",
+		Usage:  "Autorelay discovery interval (Experimental. 0 to disable)",
+		EnvVar: "EDGEVPNAUTORELAYDISCOVERYINTERVAL",
+		Value:  "0",
+	},
 	&cli.IntFlag{
 	&cli.IntFlag{
 		Name:   "ledger-syncronization-interval",
 		Name:   "ledger-syncronization-interval",
 		Usage:  "Ledger syncronization interval time",
 		Usage:  "Ledger syncronization interval time",
@@ -314,6 +320,11 @@ func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option, *logger.Logger) {
 
 
 	var limitConfig *node.NetLimitConfig
 	var limitConfig *node.NetLimitConfig
 
 
+	autorelayInterval, err := time.ParseDuration(c.String("autorelay-discovery-interval"))
+	if err != nil {
+		autorelayInterval = 0
+	}
+
 	if c.Bool("limit-config") {
 	if c.Bool("limit-config") {
 		limitConfig = &node.NetLimitConfig{
 		limitConfig = &node.NetLimitConfig{
 			Dynamic:         c.Bool("limit-config-dynamic"),
 			Dynamic:         c.Bool("limit-config-dynamic"),
@@ -368,13 +379,14 @@ func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option, *logger.Logger) {
 			Interval:       time.Duration(c.Int("discovery-interval")) * time.Second,
 			Interval:       time.Duration(c.Int("discovery-interval")) * time.Second,
 		},
 		},
 		Connection: config.Connection{
 		Connection: config.Connection{
-			AutoRelay:      c.Bool("autorelay"),
-			RelayV1:        c.Bool("autorelay-v1"),
-			MaxConnections: c.Int("max-connections"),
-			MaxStreams:     c.Int("max-streams"),
-			HolePunch:      c.Bool("holepunch"),
-			Mplex:          c.Bool("mplex-multiplexer"),
-			StaticRelays:   c.StringSlice("autorelay-static-peer"),
+			AutoRelay:                  c.Bool("autorelay"),
+			RelayV1:                    c.Bool("autorelay-v1"),
+			MaxConnections:             c.Int("max-connections"),
+			MaxStreams:                 c.Int("max-streams"),
+			HolePunch:                  c.Bool("holepunch"),
+			Mplex:                      c.Bool("mplex-multiplexer"),
+			StaticRelays:               c.StringSlice("autorelay-static-peer"),
+			AutoRelayDiscoveryInterval: autorelayInterval,
 		},
 		},
 		Limit: config.ResourceLimit{
 		Limit: config.ResourceLimit{
 			Enable:      c.Bool("limit-enable"),
 			Enable:      c.Bool("limit-enable"),

+ 25 - 6
pkg/config/config.go

@@ -36,6 +36,7 @@ import (
 	"github.com/mudler/edgevpn/pkg/discovery"
 	"github.com/mudler/edgevpn/pkg/discovery"
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/services"
 	"github.com/mudler/edgevpn/pkg/vpn"
 	"github.com/mudler/edgevpn/pkg/vpn"
 	"github.com/peterbourgon/diskv"
 	"github.com/peterbourgon/diskv"
 	"github.com/songgao/water"
 	"github.com/songgao/water"
@@ -88,10 +89,12 @@ type Discovery struct {
 // Connection is the configuration section
 // Connection is the configuration section
 // relative to the connection services
 // relative to the connection services
 type Connection struct {
 type Connection struct {
-	HolePunch    bool
-	AutoRelay    bool
-	RelayV1      bool
-	StaticRelays []string
+	HolePunch bool
+	AutoRelay bool
+
+	AutoRelayDiscoveryInterval time.Duration
+	RelayV1                    bool
+	StaticRelays               []string
 
 
 	Mplex          bool
 	Mplex          bool
 	MaxConnections int
 	MaxConnections int
@@ -214,16 +217,32 @@ func (c Config) ToOpts(l *logger.Logger) ([]node.Option, []vpn.Option, error) {
 
 
 	libp2pOpts := []libp2p.Option{libp2p.UserAgent("edgevpn")}
 	libp2pOpts := []libp2p.Option{libp2p.UserAgent("edgevpn")}
 
 
+	// AutoRelay section configuration
 	if c.Connection.AutoRelay {
 	if c.Connection.AutoRelay {
 		relayOpts := []autorelay.Option{}
 		relayOpts := []autorelay.Option{}
 		if c.Connection.RelayV1 {
 		if c.Connection.RelayV1 {
 			relayOpts = append(relayOpts, autorelay.WithCircuitV1Support())
 			relayOpts = append(relayOpts, autorelay.WithCircuitV1Support())
 		}
 		}
 
 
-		if len(c.Connection.StaticRelays) == 0 {
+		// If no relays are specified and no discovery interval, then just use default static relays (to be deprecated)
+		if len(c.Connection.StaticRelays) == 0 && c.Connection.AutoRelayDiscoveryInterval == 0 {
 			relayOpts = append(relayOpts, autorelay.WithDefaultStaticRelays())
 			relayOpts = append(relayOpts, autorelay.WithDefaultStaticRelays())
-		} else {
+		} else if len(c.Connection.StaticRelays) > 0 {
 			relayOpts = append(relayOpts, autorelay.WithStaticRelays(peers2AddrInfo(c.Connection.StaticRelays)))
 			relayOpts = append(relayOpts, autorelay.WithStaticRelays(peers2AddrInfo(c.Connection.StaticRelays)))
+		} else {
+			peerChan := make(chan peer.AddrInfo)
+			// Add AutoRelayFeederService (needs a DHT Service discovery)
+			opts = append(opts, func(cfg *node.Config) error {
+				for _, sd := range cfg.ServiceDiscovery {
+					switch d := sd.(type) {
+					case *discovery.DHT:
+						llger.Debugf("DHT automatic relay discovery configured every '%s'\n", c.Connection.AutoRelayDiscoveryInterval.String())
+						cfg.NetworkServices = append(cfg.NetworkServices, services.AutoRelayFeederService(llger, peerChan, d, c.Connection.AutoRelayDiscoveryInterval))
+					}
+				}
+				return nil
+			})
+			relayOpts = append(relayOpts, autorelay.WithPeerSource(peerChan))
 		}
 		}
 
 
 		libp2pOpts = append(libp2pOpts,
 		libp2pOpts = append(libp2pOpts,

+ 6 - 6
pkg/discovery/dht.go

@@ -41,8 +41,8 @@ type DHT struct {
 	BootstrapPeers       AddrList
 	BootstrapPeers       AddrList
 	latestRendezvous     string
 	latestRendezvous     string
 	RefreshDiscoveryTime time.Duration
 	RefreshDiscoveryTime time.Duration
-	dht                  *dht.IpfsDHT
-	dhtOptions           []dht.Option
+	*dht.IpfsDHT
+	dhtOptions []dht.Option
 }
 }
 
 
 func NewDHT(d ...dht.Option) *DHT {
 func NewDHT(d ...dht.Option) *DHT {
@@ -68,19 +68,19 @@ func (d *DHT) Rendezvous() string {
 }
 }
 
 
 func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
 func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
-	if d.dht == nil {
+	if d.IpfsDHT == nil {
 		// Start a DHT, for use in peer discovery. We can't just make a new DHT
 		// Start a DHT, for use in peer discovery. We can't just make a new DHT
 		// client because we want each peer to maintain its own local copy of the
 		// client because we want each peer to maintain its own local copy of the
 		// DHT, so that the bootstrapping node of the DHT can go down without
 		// DHT, so that the bootstrapping node of the DHT can go down without
 		// inhibiting future peer discovery.
 		// inhibiting future peer discovery.
 		kad, err := dht.New(ctx, h, d.dhtOptions...)
 		kad, err := dht.New(ctx, h, d.dhtOptions...)
 		if err != nil {
 		if err != nil {
-			return d.dht, err
+			return d.IpfsDHT, err
 		}
 		}
-		d.dht = kad
+		d.IpfsDHT = kad
 	}
 	}
 
 
-	return d.dht, nil
+	return d.IpfsDHT, nil
 }
 }
 
 
 func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
 func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {

+ 68 - 0
pkg/services/dht_relay_discovery.go

@@ -0,0 +1,68 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package services
+
+import (
+	"context"
+	"time"
+
+	"github.com/ipfs/go-log"
+	"github.com/libp2p/go-libp2p-core/peer"
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/discovery"
+	"github.com/mudler/edgevpn/pkg/node"
+)
+
+// AutoRelayFeederService is a service responsible to returning periodically peers to
+// scan for relays from a DHT discovery service
+func AutoRelayFeederService(ll log.StandardLogger, peerChan chan peer.AddrInfo, dht *discovery.DHT, duration time.Duration) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		ll.Debug("[relay discovery] Service starts")
+		ctx, cancel := context.WithCancel(ctx)
+		go func() {
+			t := time.NewTicker(duration)
+			defer t.Stop()
+			defer cancel()
+			for {
+				select {
+				case <-t.C:
+				case <-ctx.Done():
+					ll.Debug("[relay discovery] stopped")
+					return
+				}
+				ll.Debug("[relay discovery] Finding relays from closest peer")
+				closestPeers, err := dht.GetClosestPeers(ctx, n.Host().ID().String())
+				if err != nil {
+					ll.Error(err)
+					continue
+				}
+				for _, p := range closestPeers {
+					addrs := n.Host().Peerstore().Addrs(p)
+					if len(addrs) == 0 {
+						continue
+					}
+					ll.Debugf("[relay discovery] Found close peer '%s'", p.Pretty())
+					select {
+					case peerChan <- peer.AddrInfo{ID: p, Addrs: addrs}:
+					case <-ctx.Done():
+						return
+					}
+				}
+			}
+		}()
+		return nil
+	}
+}