Browse Source

:gear: Add default connection timeout

Ettore Di Giacinto 3 years ago
parent
commit
dfccc807ed
4 changed files with 75 additions and 40 deletions
  1. 7 0
      cmd/util.go
  2. 2 0
      pkg/edgevpn/config.go
  3. 58 40
      pkg/edgevpn/edgevpn.go
  4. 8 0
      pkg/edgevpn/options.go

+ 7 - 0
cmd/util.go

@@ -36,6 +36,12 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		Usage:  "Specify a path to a edgevpn config file",
 		EnvVar: "EDGEVPNCONFIG",
 	},
+	&cli.StringFlag{
+		Name:   "timeout",
+		Usage:  "Specify a default timeout for connection stream",
+		EnvVar: "EDGEVPNTIMEOUT",
+		Value:  "15s",
+	},
 	&cli.IntFlag{
 		Name:   "mtu",
 		Usage:  "Specify a mtu",
@@ -197,6 +203,7 @@ func cliToOpts(c *cli.Context) []edgevpn.Option {
 		edgevpn.WithInterfaceAddress(address),
 		edgevpn.WithRouterAddress(router),
 		edgevpn.WithInterfaceName(iface),
+		edgevpn.WithTimeout(c.String("timeout")),
 		edgevpn.WithInterfaceType(water.TUN),
 		edgevpn.NetLinkBootstrap(true),
 		edgevpn.FromBase64(mDNS, dht, token),

+ 2 - 0
pkg/edgevpn/config.go

@@ -72,6 +72,8 @@ type Config struct {
 
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 	DiscoveryBootstrapPeers                                         discovery.AddrList
+
+	Timeout time.Duration
 }
 
 type NetworkService func(context.Context, *EdgeVPN, *blockchain.Ledger) error

+ 58 - 40
pkg/edgevpn/edgevpn.go

@@ -17,6 +17,7 @@ package edgevpn
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"net"
 	"os"
@@ -34,6 +35,7 @@ import (
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/edgevpn/types"
 	hub "github.com/mudler/edgevpn/pkg/hub"
+	"github.com/pkg/errors"
 	"github.com/songgao/packets/ethernet"
 	"github.com/songgao/water"
 	"golang.org/x/net/ipv4"
@@ -227,58 +229,74 @@ func streamHandler(ledger *blockchain.Ledger, ifce *water.Interface) func(stream
 	}
 }
 
+func (e *EdgeVPN) getFrame(ifce *water.Interface) (ethernet.Frame, error) {
+	var frame ethernet.Frame
+	frame.Resize(e.config.MTU)
+
+	n, err := ifce.Read([]byte(frame))
+	if err != nil {
+		return frame, errors.Wrap(err, "could not read from interface")
+	}
+
+	frame = frame[:n]
+	return frame, nil
+}
+
+func (e *EdgeVPN) handleFrame(frame ethernet.Frame, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
+	ctx, cancel := context.WithTimeout(context.Background(), e.config.Timeout)
+	defer cancel()
+
+	header, err := ipv4.ParseHeader(frame)
+	if err != nil {
+		return errors.Wrap(err, "could not parse ipv4 header from frame")
+	}
+
+	dst := header.Dst.String()
+	if e.config.RouterAddress != "" && header.Src.Equal(ip) {
+		dst = e.config.RouterAddress
+	}
+
+	// Query the routing table
+	value, found := ledger.GetKey(MachinesLedgerKey, dst)
+	if !found {
+		return fmt.Errorf("'%s' not found in the routing table", dst)
+	}
+	machine := &types.Machine{}
+	value.Unmarshal(machine)
+
+	// Decode the Peer
+	d, err := peer.Decode(machine.PeerID)
+	if err != nil {
+		return errors.Wrap(err, "could not decode peer")
+	}
+
+	// Open a stream
+	stream, err := e.host.NewStream(ctx, d, Protocol)
+	if err != nil {
+		return errors.Wrap(err, "could not open stream")
+	}
+
+	stream.Write(frame)
+	stream.Close()
+	return nil
+}
+
 // redirects packets from the interface to the node using the routing table in the blockchain
 func (e *EdgeVPN) readPackets(ledger *blockchain.Ledger, ifce *water.Interface) error {
-	ctx := context.Background()
 	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
 	if err != nil {
 		return err
 	}
 	for {
-		var frame ethernet.Frame
-		frame.Resize(e.config.MTU)
-		n, err := ifce.Read([]byte(frame))
-		if err != nil {
-			e.config.Logger.Debug("could not read from interface")
-			return err
-		}
-		frame = frame[:n]
-
-		header, err := ipv4.ParseHeader(frame)
-		if err != nil {
-			e.config.Logger.Debugf("could not parase ipv4 header from frame")
-			continue
-		}
-
-		dst := header.Dst.String()
-		if e.config.RouterAddress != "" && header.Src.Equal(ip) {
-			dst = e.config.RouterAddress
-		}
-
-		// Query the routing table
-		value, found := ledger.GetKey(MachinesLedgerKey, dst)
-		if !found {
-			e.config.Logger.Debugf("'%s' not found in the routing table", dst)
-			continue
-		}
-		machine := &types.Machine{}
-		value.Unmarshal(machine)
-
-		// Decode the Peer
-		d, err := peer.Decode(machine.PeerID)
+		frame, err := e.getFrame(ifce)
 		if err != nil {
-			e.config.Logger.Debugf("could not decode peer '%s'", value)
+			e.config.Logger.Errorf("could not get frame '%s'", err.Error())
 			continue
 		}
 
-		// Open a stream
-		stream, err := e.host.NewStream(ctx, d, Protocol)
-		if err != nil {
-			e.config.Logger.Debugf("could not open stream '%s'", err.Error())
-			continue
+		if err := e.handleFrame(frame, ip, ledger, ifce); err != nil {
+			e.config.Logger.Errorf("could not handle frame '%s'", err.Error())
 		}
-		stream.Write(frame)
-		stream.Close()
 	}
 }
 

+ 8 - 0
pkg/edgevpn/options.go

@@ -47,6 +47,14 @@ func WithLibp2pAdditionalOptions(i ...libp2p.Option) func(cfg *Config) error {
 	}
 }
 
+func WithTimeout(s string) Option {
+	return func(cfg *Config) error {
+		d, err := time.ParseDuration(s)
+		cfg.Timeout = d
+		return err
+	}
+}
+
 func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.NetworkServices = append(cfg.NetworkServices, ns...)