Browse Source

:gear: Add concurrency option

Also allow to set a channel buffer size
Ettore Di Giacinto 3 years ago
parent
commit
4af7f136bc
5 changed files with 78 additions and 12 deletions
  1. 3 1
      cmd/main.go
  2. 13 0
      cmd/util.go
  3. 3 1
      pkg/edgevpn/config.go
  4. 45 10
      pkg/edgevpn/edgevpn.go
  5. 14 0
      pkg/edgevpn/options.go

+ 3 - 1
cmd/main.go

@@ -16,6 +16,7 @@
 package cmd
 package cmd
 
 
 import (
 import (
+	"context"
 	"encoding/base64"
 	"encoding/base64"
 	"fmt"
 	"fmt"
 	"os"
 	"os"
@@ -34,6 +35,7 @@ under certain conditions.`
 
 
 func MainFlags() []cli.Flag {
 func MainFlags() []cli.Flag {
 	return append([]cli.Flag{
 	return append([]cli.Flag{
+
 		&cli.BoolFlag{
 		&cli.BoolFlag{
 			Name:  "g",
 			Name:  "g",
 			Usage: "Generates a new configuration and prints it on screen",
 			Usage: "Generates a new configuration and prints it on screen",
@@ -103,7 +105,7 @@ func Main() func(c *cli.Context) error {
 			go api.API(c.String("api-listen"), 5*time.Second, 20*time.Second, ledger)
 			go api.API(c.String("api-listen"), 5*time.Second, 20*time.Second, ledger)
 		}
 		}
 
 
-		if err := e.Start(); err != nil {
+		if err := e.Start(context.Background()); err != nil {
 			e.Logger().Fatal(err.Error())
 			e.Logger().Fatal(err.Error())
 		}
 		}
 
 

+ 13 - 0
cmd/util.go

@@ -16,6 +16,7 @@
 package cmd
 package cmd
 
 
 import (
 import (
+	"runtime"
 	"time"
 	"time"
 
 
 	"github.com/ipfs/go-log"
 	"github.com/ipfs/go-log"
@@ -48,6 +49,12 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		EnvVar: "EDGEVPNMTU",
 		EnvVar: "EDGEVPNMTU",
 		Value:  1200,
 		Value:  1200,
 	},
 	},
+	&cli.IntFlag{
+		Name:   "channel-buffer-size",
+		Usage:  "Specify a channel buffer size",
+		EnvVar: "EDGEVPNCHANNELBUFFERSIZE",
+		Value:  0,
+	},
 	&cli.IntFlag{
 	&cli.IntFlag{
 		Name:   "discovery-interval",
 		Name:   "discovery-interval",
 		Usage:  "DHT discovery interval time",
 		Usage:  "DHT discovery interval time",
@@ -104,6 +111,11 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		Usage:  "Automatically act as a relay if the node can accept inbound connections",
 		Usage:  "Automatically act as a relay if the node can accept inbound connections",
 		EnvVar: "EDGEVPNAUTORELAY",
 		EnvVar: "EDGEVPNAUTORELAY",
 	},
 	},
+	&cli.IntFlag{
+		Name:  "concurrency",
+		Usage: "Number of concurrent requests to serve",
+		Value: runtime.NumCPU(),
+	},
 	&cli.BoolTFlag{
 	&cli.BoolTFlag{
 		Name:   "holepunch",
 		Name:   "holepunch",
 		Usage:  "Automatically try holepunching when possible",
 		Usage:  "Automatically try holepunching when possible",
@@ -206,6 +218,7 @@ func cliToOpts(c *cli.Context) []edgevpn.Option {
 		edgevpn.WithTimeout(c.String("timeout")),
 		edgevpn.WithTimeout(c.String("timeout")),
 		edgevpn.WithInterfaceType(water.TUN),
 		edgevpn.WithInterfaceType(water.TUN),
 		edgevpn.NetLinkBootstrap(true),
 		edgevpn.NetLinkBootstrap(true),
+		edgevpn.WithChannelBufferSize(c.Int("channel-buffer-size")),
 		edgevpn.FromBase64(mDNS, dht, token),
 		edgevpn.FromBase64(mDNS, dht, token),
 		edgevpn.FromYaml(mDNS, dht, config),
 		edgevpn.FromYaml(mDNS, dht, config),
 	}
 	}

+ 3 - 1
pkg/edgevpn/config.go

@@ -73,7 +73,9 @@ type Config struct {
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 	DiscoveryBootstrapPeers                                         discovery.AddrList
 	DiscoveryBootstrapPeers                                         discovery.AddrList
 
 
-	Timeout time.Duration
+	Timeout           time.Duration
+	Concurrency       int
+	ChannelBufferSize int
 }
 }
 
 
 type NetworkService func(context.Context, *EdgeVPN, *blockchain.Ledger) error
 type NetworkService func(context.Context, *EdgeVPN, *blockchain.Ledger) error

+ 45 - 10
pkg/edgevpn/edgevpn.go

@@ -22,6 +22,7 @@ import (
 	"net"
 	"net"
 	"os"
 	"os"
 	"runtime"
 	"runtime"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/ipfs/go-log"
 	"github.com/ipfs/go-log"
@@ -62,12 +63,14 @@ var defaultLibp2pOptions = []libp2p.Option{
 
 
 func New(p ...Option) *EdgeVPN {
 func New(p ...Option) *EdgeVPN {
 	c := Config{
 	c := Config{
+		Concurrency:              1,
 		DiscoveryInterval:        120 * time.Second,
 		DiscoveryInterval:        120 * time.Second,
 		StreamHandlers:           make(map[protocol.ID]StreamHandler),
 		StreamHandlers:           make(map[protocol.ID]StreamHandler),
 		LedgerAnnounceTime:       5 * time.Second,
 		LedgerAnnounceTime:       5 * time.Second,
 		LedgerSyncronizationTime: 5 * time.Second,
 		LedgerSyncronizationTime: 5 * time.Second,
 		SealKeyLength:            12,
 		SealKeyLength:            12,
 		Options:                  defaultLibp2pOptions,
 		Options:                  defaultLibp2pOptions,
+		Timeout:                  15 * time.Second,
 	}
 	}
 	c.Apply(p...)
 	c.Apply(p...)
 
 
@@ -139,7 +142,7 @@ func newBlockChainData(e *EdgeVPN, address string) types.Machine {
 }
 }
 
 
 // Start the vpn. Returns an error in case of failure
 // Start the vpn. Returns an error in case of failure
-func (e *EdgeVPN) Start() error {
+func (e *EdgeVPN) Start(ctx context.Context) error {
 	ifce, err := e.createInterface()
 	ifce, err := e.createInterface()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -192,7 +195,7 @@ func (e *EdgeVPN) Start() error {
 	}
 	}
 
 
 	// read packets from the interface
 	// read packets from the interface
-	return e.readPackets(ledger, ifce)
+	return e.readPackets(ctx, ledger, ifce)
 }
 }
 
 
 // end signals the event loop to exit gracefully
 // end signals the event loop to exit gracefully
@@ -281,21 +284,53 @@ func (e *EdgeVPN) handleFrame(frame ethernet.Frame, ip net.IP, ledger *blockchai
 	return nil
 	return nil
 }
 }
 
 
+func (e *EdgeVPN) connectionWorker(
+	p chan ethernet.Frame,
+	ip net.IP,
+	wg *sync.WaitGroup,
+	ledger *blockchain.Ledger,
+	ifce *water.Interface) {
+	defer wg.Done()
+	for f := range p {
+		if err := e.handleFrame(f, ip, ledger, ifce); err != nil {
+			e.config.Logger.Debugf("could not handle frame: %s", err.Error())
+		}
+	}
+}
+
 // redirects packets from the interface to the node using the routing table in the blockchain
 // redirects packets from the interface to the node using the routing table in the blockchain
-func (e *EdgeVPN) readPackets(ledger *blockchain.Ledger, ifce *water.Interface) error {
+func (e *EdgeVPN) readPackets(ctx context.Context, ledger *blockchain.Ledger, ifce *water.Interface) error {
 	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
 	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
+	wg := new(sync.WaitGroup)
+
+	packets := make(chan ethernet.Frame, e.config.ChannelBufferSize)
+
+	defer func() {
+		close(packets)
+		go wg.Wait()
+	}()
+
+	for i := 0; i < e.config.Concurrency; i++ {
+		wg.Add(1)
+		go e.connectionWorker(packets, ip, wg, ledger, ifce)
+	}
+
 	for {
 	for {
-		frame, err := e.getFrame(ifce)
-		if err != nil {
-			e.config.Logger.Errorf("could not get frame '%s'", err.Error())
-			continue
-		}
+		select {
+		case <-ctx.Done():
+			return nil
+		default:
+			frame, err := e.getFrame(ifce)
+			if err != nil {
+				e.config.Logger.Errorf("could not get frame '%s'", err.Error())
+				continue
+			}
 
 
-		if err := e.handleFrame(frame, ip, ledger, ifce); err != nil {
-			e.config.Logger.Errorf("could not handle frame '%s'", err.Error())
+			packets <- frame
 		}
 		}
 	}
 	}
 }
 }

+ 14 - 0
pkg/edgevpn/options.go

@@ -55,6 +55,20 @@ func WithTimeout(s string) Option {
 	}
 	}
 }
 }
 
 
+func WithConcurrency(i int) Option {
+	return func(cfg *Config) error {
+		cfg.Concurrency = i
+		return nil
+	}
+}
+
+func WithChannelBufferSize(i int) Option {
+	return func(cfg *Config) error {
+		cfg.ChannelBufferSize = i
+		return nil
+	}
+}
+
 func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.NetworkServices = append(cfg.NetworkServices, ns...)
 		cfg.NetworkServices = append(cfg.NetworkServices, ns...)