Browse Source

:art: Split edgevpn package

Ettore Di Giacinto 3 years ago
parent
commit
f389d823de

+ 10 - 10
api/api.go

@@ -24,8 +24,8 @@ import (
 
 
 	"github.com/labstack/echo/v4"
 	"github.com/labstack/echo/v4"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
-	"github.com/mudler/edgevpn/pkg/edgevpn"
-	"github.com/mudler/edgevpn/pkg/edgevpn/types"
+	"github.com/mudler/edgevpn/pkg/protocol"
+	"github.com/mudler/edgevpn/pkg/types"
 )
 )
 
 
 //go:embed public
 //go:embed public
@@ -47,7 +47,7 @@ func API(l string, defaultInterval, timeout time.Duration, ledger *blockchain.Le
 	// Get data from ledger
 	// Get data from ledger
 	ec.GET("/api/files", func(c echo.Context) error {
 	ec.GET("/api/files", func(c echo.Context) error {
 		list := []*types.File{}
 		list := []*types.File{}
-		for _, v := range ledger.CurrentData()[edgevpn.FilesLedgerKey] {
+		for _, v := range ledger.CurrentData()[protocol.FilesLedgerKey] {
 			machine := &types.File{}
 			machine := &types.File{}
 			v.Unmarshal(machine)
 			v.Unmarshal(machine)
 			list = append(list, machine)
 			list = append(list, machine)
@@ -56,10 +56,10 @@ func API(l string, defaultInterval, timeout time.Duration, ledger *blockchain.Le
 	})
 	})
 
 
 	ec.GET("/api/summary", func(c echo.Context) error {
 	ec.GET("/api/summary", func(c echo.Context) error {
-		files := len(ledger.CurrentData()[edgevpn.FilesLedgerKey])
-		machines := len(ledger.CurrentData()[edgevpn.MachinesLedgerKey])
-		users := len(ledger.CurrentData()[edgevpn.UsersLedgerKey])
-		services := len(ledger.CurrentData()[edgevpn.ServicesLedgerKey])
+		files := len(ledger.CurrentData()[protocol.FilesLedgerKey])
+		machines := len(ledger.CurrentData()[protocol.MachinesLedgerKey])
+		users := len(ledger.CurrentData()[protocol.UsersLedgerKey])
+		services := len(ledger.CurrentData()[protocol.ServicesLedgerKey])
 		blockchain := ledger.Index()
 		blockchain := ledger.Index()
 
 
 		return c.JSON(http.StatusOK, struct {
 		return c.JSON(http.StatusOK, struct {
@@ -69,7 +69,7 @@ func API(l string, defaultInterval, timeout time.Duration, ledger *blockchain.Le
 
 
 	ec.GET("/api/machines", func(c echo.Context) error {
 	ec.GET("/api/machines", func(c echo.Context) error {
 		list := []*types.Machine{}
 		list := []*types.Machine{}
-		for _, v := range ledger.CurrentData()[edgevpn.MachinesLedgerKey] {
+		for _, v := range ledger.CurrentData()[protocol.MachinesLedgerKey] {
 			machine := &types.Machine{}
 			machine := &types.Machine{}
 			v.Unmarshal(machine)
 			v.Unmarshal(machine)
 			list = append(list, machine)
 			list = append(list, machine)
@@ -79,7 +79,7 @@ func API(l string, defaultInterval, timeout time.Duration, ledger *blockchain.Le
 
 
 	ec.GET("/api/users", func(c echo.Context) error {
 	ec.GET("/api/users", func(c echo.Context) error {
 		user := []*types.User{}
 		user := []*types.User{}
-		for _, v := range ledger.CurrentData()[edgevpn.UsersLedgerKey] {
+		for _, v := range ledger.CurrentData()[protocol.UsersLedgerKey] {
 			u := &types.User{}
 			u := &types.User{}
 			v.Unmarshal(u)
 			v.Unmarshal(u)
 			user = append(user, u)
 			user = append(user, u)
@@ -89,7 +89,7 @@ func API(l string, defaultInterval, timeout time.Duration, ledger *blockchain.Le
 
 
 	ec.GET("/api/services", func(c echo.Context) error {
 	ec.GET("/api/services", func(c echo.Context) error {
 		list := []*types.Service{}
 		list := []*types.Service{}
-		for _, v := range ledger.CurrentData()[edgevpn.ServicesLedgerKey] {
+		for _, v := range ledger.CurrentData()[protocol.ServicesLedgerKey] {
 			srvc := &types.Service{}
 			srvc := &types.Service{}
 			v.Unmarshal(srvc)
 			v.Unmarshal(srvc)
 			list = append(list, srvc)
 			list = append(list, srvc)

+ 1 - 1
api/client/client.go

@@ -24,7 +24,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
-	"github.com/mudler/edgevpn/pkg/edgevpn/types"
+	"github.com/mudler/edgevpn/pkg/types"
 )
 )
 
 
 type (
 type (

+ 1 - 1
api/client/service/node.go

@@ -32,7 +32,7 @@ import (
 	"gopkg.in/yaml.v2"
 	"gopkg.in/yaml.v2"
 
 
 	edgeVPNClient "github.com/mudler/edgevpn/api/client"
 	edgeVPNClient "github.com/mudler/edgevpn/api/client"
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	edgevpn "github.com/mudler/edgevpn/pkg/node"
 )
 )
 
 
 // Node is the service Node.
 // Node is the service Node.

+ 6 - 4
cmd/api.go

@@ -20,7 +20,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/mudler/edgevpn/api"
 	"github.com/mudler/edgevpn/api"
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
 )
 )
 
 
@@ -39,14 +39,16 @@ A simple UI interface is available to display network data.`,
 			},
 			},
 		),
 		),
 		Action: func(c *cli.Context) error {
 		Action: func(c *cli.Context) error {
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
-			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+			// Start the node to the network, using our ledger
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
+
 			ledger, _ := e.Ledger()
 			ledger, _ := e.Ledger()
 			return api.API(c.String("listen"), 5*time.Second, 20*time.Second, ledger)
 			return api.API(c.String("listen"), 5*time.Second, 20*time.Second, ledger)
 		},
 		},

+ 15 - 11
cmd/file.go

@@ -18,8 +18,10 @@ package cmd
 import (
 import (
 	"context"
 	"context"
 	"errors"
 	"errors"
+	"time"
 
 
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/services"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
 )
 )
 
 
@@ -68,7 +70,8 @@ This is also the ID used to refer when receiving it.`,
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
@@ -76,10 +79,11 @@ This is also the ID used to refer when receiving it.`,
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			// Join the node to the network, using our ledger
-			e.SendFile(ledger, name, path)
-			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+
+			services.SendFile(ledger, e, e.Logger(), time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, path)
+
+			// Start the node to the network, using our ledger
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
 
 
@@ -111,19 +115,19 @@ func FileReceive() cli.Command {
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
-			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+			// Start the node to the network, using our ledger
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
 
 
 			ledger, _ := e.Ledger()
 			ledger, _ := e.Ledger()
 
 
-			return e.ReceiveFile(ledger, name, path)
+			return services.ReceiveFile(ledger, e, e.Logger(), time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, path)
 		},
 		},
 	}
 	}
 }
 }

+ 9 - 8
cmd/join.go

@@ -18,25 +18,26 @@ package cmd
 import (
 import (
 	"context"
 	"context"
 
 
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
 )
 )
 
 
-func Join() cli.Command {
+func Start() cli.Command {
 	return cli.Command{
 	return cli.Command{
-		Name:  "join",
-		Usage: "Join the network without activating any interface",
+		Name:  "Start",
+		Usage: "Start the network without activating any interface",
 		Description: `Connect over the p2p network without establishing a VPN.
 		Description: `Connect over the p2p network without establishing a VPN.
 Useful for setting up relays or hop nodes to improve the network connectivity.`,
 Useful for setting up relays or hop nodes to improve the network connectivity.`,
-		UsageText: "edgevpn join",
+		UsageText: "edgevpn Start",
 		Flags:     CommonFlags,
 		Flags:     CommonFlags,
 		Action: func(c *cli.Context) error {
 		Action: func(c *cli.Context) error {
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
-			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+			// Start the node to the network, using our ledger
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
 
 

+ 6 - 4
cmd/main.go

@@ -22,11 +22,12 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/mudler/edgevpn/api"
 	"github.com/mudler/edgevpn/api"
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	edgevpn "github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/vpn"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
 )
 )
 
 
-const Copyright string = `	edgevpn  Copyright (C) 2021 Ettore Di Giacinto
+const Copyright string = `	edgevpn  Copyright (C) 2021-2022 Ettore Di Giacinto
 This program comes with ABSOLUTELY NO WARRANTY.
 This program comes with ABSOLUTELY NO WARRANTY.
 This is free software, and you are welcome to redistribute it
 This is free software, and you are welcome to redistribute it
 under certain conditions.`
 under certain conditions.`
@@ -83,8 +84,9 @@ func Main() func(c *cli.Context) error {
 
 
 			os.Exit(0)
 			os.Exit(0)
 		}
 		}
+		o, vpnOpts := cliToOpts(c)
 
 
-		e := edgevpn.New(cliToOpts(c)...)
+		e := edgevpn.New(o...)
 
 
 		displayStart(e)
 		displayStart(e)
 
 
@@ -97,7 +99,7 @@ func Main() func(c *cli.Context) error {
 			go api.API(c.String("api-listen"), 5*time.Second, 20*time.Second, ledger)
 			go api.API(c.String("api-listen"), 5*time.Second, 20*time.Second, ledger)
 		}
 		}
 
 
-		if err := e.Start(context.Background()); err != nil {
+		if err := vpn.Start(context.Background(), ledger, e, vpnOpts...); err != nil {
 			e.Logger().Fatal(err.Error())
 			e.Logger().Fatal(err.Error())
 		}
 		}
 
 

+ 12 - 8
cmd/service.go

@@ -18,8 +18,10 @@ package cmd
 import (
 import (
 	"context"
 	"context"
 	"errors"
 	"errors"
+	"time"
 
 
-	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/services"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
 )
 )
 
 
@@ -67,7 +69,8 @@ For example, '192.168.1.1:80', or '127.0.0.1:22'.`,
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
@@ -76,10 +79,10 @@ For example, '192.168.1.1:80', or '127.0.0.1:22'.`,
 				return err
 				return err
 			}
 			}
 
 
+			services.ExposeService(ledger, e, e.Logger(), time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, address)
+
 			// Join the node to the network, using our ledger
 			// Join the node to the network, using our ledger
-			e.ExposeService(ledger, name, address)
-			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
 
 
@@ -114,17 +117,18 @@ to the service over the network`,
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			e := edgevpn.New(cliToOpts(c)...)
+			o, _ := cliToOpts(c)
+			e := node.New(o...)
 
 
 			displayStart(e)
 			displayStart(e)
 
 
 			// Join the node to the network, using our ledger
 			// Join the node to the network, using our ledger
-			if err := e.Join(context.Background()); err != nil {
+			if err := e.Start(context.Background()); err != nil {
 				return err
 				return err
 			}
 			}
 
 
 			ledger, _ := e.Ledger()
 			ledger, _ := e.Ledger()
-			return e.ConnectToService(ledger, name, address)
+			return services.ConnectToService(ledger, e, e.Logger(), time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, address)
 		},
 		},
 	}
 	}
 }
 }

+ 33 - 25
cmd/util.go

@@ -24,8 +24,9 @@ import (
 	"github.com/mudler/edgevpn/internal"
 	"github.com/mudler/edgevpn/internal"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/discovery"
 	"github.com/mudler/edgevpn/pkg/discovery"
-	"github.com/mudler/edgevpn/pkg/edgevpn"
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/logger"
+	node "github.com/mudler/edgevpn/pkg/node"
+	"github.com/mudler/edgevpn/pkg/vpn"
 	"github.com/peterbourgon/diskv"
 	"github.com/peterbourgon/diskv"
 	"github.com/songgao/water"
 	"github.com/songgao/water"
 	"github.com/urfave/cli"
 	"github.com/urfave/cli"
@@ -159,13 +160,13 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		EnvVar: "EDGEVPNTOKEN",
 		EnvVar: "EDGEVPNTOKEN",
 	}}
 	}}
 
 
-func displayStart(e *edgevpn.EdgeVPN) {
+func displayStart(e *node.Node) {
 	e.Logger().Info(Copyright)
 	e.Logger().Info(Copyright)
 
 
 	e.Logger().Infof("Version: %s commit: %s", internal.Version, internal.Commit)
 	e.Logger().Infof("Version: %s commit: %s", internal.Version, internal.Commit)
 }
 }
 
 
-func cliToOpts(c *cli.Context) []edgevpn.Option {
+func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option) {
 	config := c.String("config")
 	config := c.String("config")
 	address := c.String("address")
 	address := c.String("address")
 	router := c.String("router")
 	router := c.String("router")
@@ -203,24 +204,31 @@ func cliToOpts(c *cli.Context) []edgevpn.Option {
 		}
 		}
 	}
 	}
 
 
-	opts := []edgevpn.Option{
-		edgevpn.WithDiscoveryInterval(time.Duration(c.Int("discovery-interval")) * time.Second),
-		edgevpn.WithLedgerAnnounceTime(time.Duration(c.Int("ledger-announce-interval")) * time.Second),
-		edgevpn.WithLedgerInterval(time.Duration(c.Int("ledger-syncronization-interval")) * time.Second),
-		edgevpn.Logger(llger),
-		edgevpn.WithDiscoveryBootstrapPeers(addrsList),
-		edgevpn.LibP2PLogLevel(libp2plvl),
-		edgevpn.WithInterfaceMTU(c.Int("mtu")),
-		edgevpn.WithPacketMTU(1420),
-		edgevpn.WithInterfaceAddress(address),
-		edgevpn.WithRouterAddress(router),
-		edgevpn.WithInterfaceName(iface),
-		edgevpn.WithTimeout(c.String("timeout")),
-		edgevpn.WithInterfaceType(water.TUN),
-		edgevpn.NetLinkBootstrap(true),
-		edgevpn.WithChannelBufferSize(c.Int("channel-buffer-size")),
-		edgevpn.FromBase64(mDNS, dht, token),
-		edgevpn.FromYaml(mDNS, dht, config),
+	opts := []node.Option{
+		node.WithDiscoveryInterval(time.Duration(c.Int("discovery-interval")) * time.Second),
+		node.WithLedgerAnnounceTime(time.Duration(c.Int("ledger-announce-interval")) * time.Second),
+		node.WithLedgerInterval(time.Duration(c.Int("ledger-syncronization-interval")) * time.Second),
+		node.Logger(llger),
+		node.WithDiscoveryBootstrapPeers(addrsList),
+		node.LibP2PLogLevel(libp2plvl),
+		node.WithInterfaceAddress(address),
+		node.FromBase64(mDNS, dht, token),
+		node.FromYaml(mDNS, dht, config),
+	}
+
+	vpnOpts := []vpn.Option{
+		vpn.WithConcurrency(c.Int("concurrency")),
+		vpn.WithInterfaceAddress(address),
+		vpn.WithLedgerAnnounceTime(time.Duration(c.Int("ledger-announce-interval")) * time.Second),
+		vpn.Logger(llger),
+		vpn.WithTimeout(c.String("timeout")),
+		vpn.WithInterfaceType(water.TUN),
+		vpn.NetLinkBootstrap(true),
+		vpn.WithChannelBufferSize(c.Int("channel-buffer-size")),
+		vpn.WithInterfaceMTU(c.Int("mtu")),
+		vpn.WithPacketMTU(1420),
+		vpn.WithRouterAddress(router),
+		vpn.WithInterfaceName(iface),
 	}
 	}
 
 
 	libp2pOpts := []libp2p.Option{libp2p.UserAgent("edgevpn")}
 	libp2pOpts := []libp2p.Option{libp2p.UserAgent("edgevpn")}
@@ -249,17 +257,17 @@ func cliToOpts(c *cli.Context) []edgevpn.Option {
 		libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
 		libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
 	}
 	}
 
 
-	opts = append(opts, edgevpn.WithLibp2pOptions(libp2pOpts...))
+	opts = append(opts, node.WithLibp2pOptions(libp2pOpts...))
 
 
 	if ledgerState != "" {
 	if ledgerState != "" {
-		opts = append(opts, edgevpn.WithStore(blockchain.NewDiskStore(diskv.New(diskv.Options{
+		opts = append(opts, node.WithStore(blockchain.NewDiskStore(diskv.New(diskv.Options{
 			BasePath:     ledgerState,
 			BasePath:     ledgerState,
 			CacheSizeMax: uint64(50), // 50MB
 			CacheSizeMax: uint64(50), // 50MB
 		}))))
 		}))))
 	} else {
 	} else {
-		opts = append(opts, edgevpn.WithStore(&blockchain.MemoryStore{}))
+		opts = append(opts, node.WithStore(&blockchain.MemoryStore{}))
 
 
 	}
 	}
 
 
-	return opts
+	return opts, vpnOpts
 }
 }

+ 1 - 1
main.go

@@ -36,7 +36,7 @@ func main() {
 		Copyright:   cmd.Copyright,
 		Copyright:   cmd.Copyright,
 		Flags:       cmd.MainFlags(),
 		Flags:       cmd.MainFlags(),
 		Commands: []cli.Command{
 		Commands: []cli.Command{
-			cmd.Join(),
+			cmd.Start(),
 			cmd.API(),
 			cmd.API(),
 			cmd.ServiceAdd(),
 			cmd.ServiceAdd(),
 			cmd.ServiceConnect(),
 			cmd.ServiceConnect(),

+ 0 - 386
pkg/edgevpn/edgevpn.go

@@ -1,386 +0,0 @@
-// Copyright © 2021 Ettore Di Giacinto <[email protected]>
-//
-// This program is free software; you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation; either version 2 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along
-// with this program; if not, see <http://www.gnu.org/licenses/>.
-
-package edgevpn
-
-import (
-	"context"
-	"fmt"
-	"io"
-	"net"
-	"os"
-	"runtime"
-	"sync"
-	"time"
-
-	"github.com/ipfs/go-log"
-	"github.com/libp2p/go-libp2p"
-	"github.com/libp2p/go-libp2p-core/host"
-	"github.com/libp2p/go-libp2p-core/network"
-	"github.com/libp2p/go-libp2p-core/peer"
-	"github.com/libp2p/go-libp2p-core/protocol"
-	pubsub "github.com/libp2p/go-libp2p-pubsub"
-	"github.com/mudler/edgevpn/internal"
-	"github.com/mudler/edgevpn/pkg/blockchain"
-	"github.com/mudler/edgevpn/pkg/edgevpn/types"
-	hub "github.com/mudler/edgevpn/pkg/hub"
-	"github.com/mudler/edgevpn/pkg/logger"
-	"github.com/pkg/errors"
-	"github.com/songgao/packets/ethernet"
-	"github.com/songgao/water"
-	"golang.org/x/net/ipv4"
-)
-
-const MachinesLedgerKey = "machines"
-
-type EdgeVPN struct {
-	config  Config
-	HubRoom *hub.Room
-	doneCh  chan struct{}
-	inputCh chan *hub.Message
-	seed    int64
-	host    host.Host
-
-	ledger *blockchain.Ledger
-}
-
-var defaultLibp2pOptions = []libp2p.Option{
-	libp2p.EnableNATService(),
-	libp2p.NATPortMap(),
-	libp2p.EnableAutoRelay(),
-}
-
-func New(p ...Option) *EdgeVPN {
-	c := Config{
-		Concurrency:              1,
-		DiscoveryInterval:        120 * time.Second,
-		StreamHandlers:           make(map[protocol.ID]StreamHandler),
-		LedgerAnnounceTime:       5 * time.Second,
-		LedgerSyncronizationTime: 5 * time.Second,
-		SealKeyLength:            12,
-		Options:                  defaultLibp2pOptions,
-		Timeout:                  15 * time.Second,
-		Logger:                   logger.New(log.LevelDebug),
-	}
-	c.Apply(p...)
-
-	return &EdgeVPN{
-		config:  c,
-		doneCh:  make(chan struct{}, 1),
-		inputCh: make(chan *hub.Message, 3000),
-		seed:    0,
-	}
-}
-
-func (e *EdgeVPN) Ledger() (*blockchain.Ledger, error) {
-	if e.ledger != nil {
-		return e.ledger, nil
-	}
-	mw, err := e.MessageWriter()
-	if err != nil {
-		return nil, err
-	}
-
-	e.ledger = blockchain.New(mw, e.config.Store)
-	return e.ledger, nil
-}
-
-// Join the network with the ledger.
-// It does the minimal action required to be connected
-// without any active packet routing
-func (e *EdgeVPN) Join(ctx context.Context) error {
-
-	ledger, err := e.Ledger()
-	if err != nil {
-		return err
-	}
-
-	// Set the handler when we receive messages
-	// The ledger needs to read them and update the internal blockchain
-	e.config.Handlers = append(e.config.Handlers, ledger.Update)
-
-	e.config.Logger.Info("Starting EdgeVPN network")
-
-	// Startup libp2p network
-	err = e.startNetwork()
-	if err != nil {
-		return err
-	}
-
-	// Send periodically messages to the channel with our blockchain content
-	ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
-
-	// Start eventual declared NetworkServices
-	for _, s := range e.config.NetworkServices {
-		go s(ctx, e, ledger)
-	}
-
-	return nil
-}
-
-func newBlockChainData(e *EdgeVPN, address string) types.Machine {
-	hostname, _ := os.Hostname()
-
-	return types.Machine{
-		PeerID:   e.host.ID().String(),
-		Hostname: hostname,
-		OS:       runtime.GOOS,
-		Arch:     runtime.GOARCH,
-		Version:  internal.Version,
-		Address:  address,
-	}
-}
-
-// Start the vpn. Returns an error in case of failure
-func (e *EdgeVPN) Start(ctx context.Context) error {
-	ifce, err := e.createInterface()
-	if err != nil {
-		return err
-	}
-	defer ifce.Close()
-
-	ledger, err := e.Ledger()
-	if err != nil {
-		return err
-	}
-
-	// Set the stream handler to get back the packets from the stream to the interface
-	e.config.StreamHandlers[protocol.ID(Protocol)] = streamHandler(ledger, ifce)
-
-	// Join the node to the network, using our ledger
-	// it also starts up a goroutine that periodically sends
-	// messages to the network with our blockchain content
-	if err := e.Join(ctx); err != nil {
-		return err
-	}
-
-	// Announce our IP
-	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
-	if err != nil {
-		return err
-	}
-
-	ledger.Announce(
-		context.Background(),
-		e.config.LedgerAnnounceTime,
-		func() {
-			machine := &types.Machine{}
-			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(MachinesLedgerKey, ip.String())
-			existingValue.Unmarshal(machine)
-
-			// If mismatch, update the blockchain
-			if !found || machine.PeerID != e.host.ID().String() {
-				updatedMap := map[string]interface{}{}
-				updatedMap[ip.String()] = newBlockChainData(e, ip.String())
-				ledger.Add(MachinesLedgerKey, updatedMap)
-			}
-		},
-	)
-
-	if e.config.NetLinkBootstrap {
-		if err := e.prepareInterface(); err != nil {
-			return err
-		}
-	}
-
-	// read packets from the interface
-	return e.readPackets(ctx, ledger, ifce)
-}
-
-// end signals the event loop to exit gracefully
-func (e *EdgeVPN) Stop() {
-	close(e.doneCh)
-}
-
-// MessageWriter returns a new MessageWriter bound to the edgevpn instance
-// with the given options
-func (e *EdgeVPN) MessageWriter(opts ...hub.MessageOption) (*MessageWriter, error) {
-	mess := &hub.Message{}
-	mess.Apply(opts...)
-
-	return &MessageWriter{
-		c:     e.config,
-		input: e.inputCh,
-		mess:  mess,
-	}, nil
-}
-
-func streamHandler(ledger *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
-	return func(stream network.Stream) {
-		if !ledger.Exists(MachinesLedgerKey,
-			func(d blockchain.Data) bool {
-				machine := &types.Machine{}
-				d.Unmarshal(machine)
-				return machine.PeerID == stream.Conn().RemotePeer().String()
-			}) {
-			stream.Reset()
-			return
-		}
-		io.Copy(ifce.ReadWriteCloser, stream)
-		stream.Close()
-	}
-}
-
-func (e *EdgeVPN) getFrame(ifce *water.Interface) (ethernet.Frame, error) {
-	var frame ethernet.Frame
-	frame.Resize(e.config.MTU)
-
-	n, err := ifce.Read([]byte(frame))
-	if err != nil {
-		return frame, errors.Wrap(err, "could not read from interface")
-	}
-
-	frame = frame[:n]
-	return frame, nil
-}
-
-func (e *EdgeVPN) handleFrame(frame ethernet.Frame, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
-	ctx, cancel := context.WithTimeout(context.Background(), e.config.Timeout)
-	defer cancel()
-
-	header, err := ipv4.ParseHeader(frame)
-	if err != nil {
-		return errors.Wrap(err, "could not parse ipv4 header from frame")
-	}
-
-	dst := header.Dst.String()
-	if e.config.RouterAddress != "" && header.Src.Equal(ip) {
-		dst = e.config.RouterAddress
-	}
-
-	// Query the routing table
-	value, found := ledger.GetKey(MachinesLedgerKey, dst)
-	if !found {
-		return fmt.Errorf("'%s' not found in the routing table", dst)
-	}
-	machine := &types.Machine{}
-	value.Unmarshal(machine)
-
-	// Decode the Peer
-	d, err := peer.Decode(machine.PeerID)
-	if err != nil {
-		return errors.Wrap(err, "could not decode peer")
-	}
-
-	// Open a stream
-	stream, err := e.host.NewStream(ctx, d, Protocol)
-	if err != nil {
-		return errors.Wrap(err, "could not open stream")
-	}
-
-	stream.Write(frame)
-	stream.Close()
-	return nil
-}
-
-func (e *EdgeVPN) connectionWorker(
-	p chan ethernet.Frame,
-	ip net.IP,
-	wg *sync.WaitGroup,
-	ledger *blockchain.Ledger,
-	ifce *water.Interface) {
-	defer wg.Done()
-	for f := range p {
-		if err := e.handleFrame(f, ip, ledger, ifce); err != nil {
-			e.config.Logger.Debugf("could not handle frame: %s", err.Error())
-		}
-	}
-}
-
-// redirects packets from the interface to the node using the routing table in the blockchain
-func (e *EdgeVPN) readPackets(ctx context.Context, ledger *blockchain.Ledger, ifce *water.Interface) error {
-	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
-	if err != nil {
-		return err
-	}
-
-	wg := new(sync.WaitGroup)
-
-	packets := make(chan ethernet.Frame, e.config.ChannelBufferSize)
-
-	defer func() {
-		close(packets)
-		wg.Wait()
-	}()
-
-	for i := 0; i < e.config.Concurrency; i++ {
-		wg.Add(1)
-		go e.connectionWorker(packets, ip, wg, ledger, ifce)
-	}
-
-	for {
-		select {
-		case <-ctx.Done():
-			return nil
-		default:
-			frame, err := e.getFrame(ifce)
-			if err != nil {
-				e.config.Logger.Errorf("could not get frame '%s'", err.Error())
-				continue
-			}
-
-			packets <- frame
-		}
-	}
-}
-
-func (e *EdgeVPN) Logger() log.StandardLogger {
-	return e.config.Logger
-}
-
-func (e *EdgeVPN) startNetwork() error {
-	ctx := context.Background()
-	e.config.Logger.Debug("Generating host data")
-
-	host, err := e.genHost(ctx)
-	if err != nil {
-		e.config.Logger.Error(err.Error())
-		return err
-	}
-	e.host = host
-
-	for pid, strh := range e.config.StreamHandlers {
-		host.SetStreamHandler(pid, network.StreamHandler(strh))
-	}
-
-	e.config.Logger.Info("Node ID:", host.ID())
-	e.config.Logger.Info("Node Addresses:", host.Addrs())
-
-	// create a new PubSub service using the GossipSub router
-	ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithMaxMessageSize(e.config.MaxMessageSize))
-	if err != nil {
-		return err
-	}
-
-	// join the "chat" room
-	cr, err := hub.JoinRoom(ctx, ps, host.ID(), e.config.RoomName)
-	if err != nil {
-		return err
-	}
-
-	e.HubRoom = cr
-
-	for _, sd := range e.config.ServiceDiscovery {
-		if err := sd.Run(e.config.Logger, ctx, host); err != nil {
-			e.config.Logger.Fatal(err)
-		}
-	}
-
-	go e.handleEvents(ctx)
-
-	e.Logger().Debug("Network started")
-	return nil
-}

+ 13 - 24
pkg/edgevpn/config.go → pkg/node/config.go

@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package node
 
 
 import (
 import (
 	"context"
 	"context"
@@ -22,14 +22,15 @@ import (
 	"github.com/ipfs/go-log"
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/host"
 	"github.com/libp2p/go-libp2p-core/host"
-	"github.com/libp2p/go-libp2p-core/network"
-	"github.com/libp2p/go-libp2p-core/protocol"
+
+	p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	hub "github.com/mudler/edgevpn/pkg/hub"
-	"github.com/songgao/water"
+	"github.com/mudler/edgevpn/pkg/types"
 )
 )
 
 
+// Config is the node configuration
 type Config struct {
 type Config struct {
 	// ExchangeKey is a Symmetric key used to seal the messages
 	// ExchangeKey is a Symmetric key used to seal the messages
 	ExchangeKey string
 	ExchangeKey string
@@ -46,41 +47,29 @@ type Config struct {
 	// Handlers are a list of handlers subscribed to messages received by the vpn interface
 	// Handlers are a list of handlers subscribed to messages received by the vpn interface
 	Handlers []Handler
 	Handlers []Handler
 
 
-	MaxMessageSize   int
-	SealKeyInterval  int
-	Interface        *water.Interface
-	InterfaceName    string
-	InterfaceAddress string
-	RouterAddress    string
-	InterfaceMTU     int
-	MTU              int
-	DeviceType       water.DeviceType
+	MaxMessageSize  int
+	SealKeyInterval int
+
 	ServiceDiscovery []ServiceDiscovery
 	ServiceDiscovery []ServiceDiscovery
 	NetworkServices  []NetworkService
 	NetworkServices  []NetworkService
 	Logger           log.StandardLogger
 	Logger           log.StandardLogger
 
 
-	SealKeyLength int
-
-	NetLinkBootstrap bool
+	SealKeyLength    int
+	InterfaceAddress string
 
 
 	Store blockchain.Store
 	Store blockchain.Store
 
 
 	// Handle is a handle consumed by HumanInterfaces to handle received messages
 	// Handle is a handle consumed by HumanInterfaces to handle received messages
 	Handle                     func(bool, *hub.Message)
 	Handle                     func(bool, *hub.Message)
-	StreamHandlers             map[protocol.ID]StreamHandler
+	StreamHandlers             map[p2pprotocol.ID]types.StreamHandler
 	AdditionalOptions, Options []libp2p.Option
 	AdditionalOptions, Options []libp2p.Option
 
 
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 	DiscoveryBootstrapPeers                                         discovery.AddrList
 	DiscoveryBootstrapPeers                                         discovery.AddrList
-
-	Timeout           time.Duration
-	Concurrency       int
-	ChannelBufferSize int
 }
 }
 
 
-type NetworkService func(context.Context, *EdgeVPN, *blockchain.Ledger) error
-
-type StreamHandler func(stream network.Stream)
+// NetworkService is a service running over the network. It takes a context, a node and a ledger
+type NetworkService func(context.Context, Config, *Node, *blockchain.Ledger) error
 
 
 type Handler func(*hub.Message) error
 type Handler func(*hub.Message) error
 
 

+ 8 - 15
pkg/edgevpn/connection.go → pkg/node/connection.go

@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package node
 
 
 import (
 import (
 	"context"
 	"context"
@@ -31,17 +31,12 @@ import (
 	"github.com/xlzd/gotp"
 	"github.com/xlzd/gotp"
 )
 )
 
 
-const (
-	Protocol        = "/edgevpn/0.1"
-	ServiceProtocol = "/edgevpn/service/0.1"
-	FileProtocol    = "/edgevpn/file/0.1"
-)
-
-func (e *EdgeVPN) Host() host.Host {
+// Host returns the libp2p peer host
+func (e *Node) Host() host.Host {
 	return e.host
 	return e.host
 }
 }
 
 
-func (e *EdgeVPN) genHost(ctx context.Context) (host.Host, error) {
+func (e *Node) genHost(ctx context.Context) (host.Host, error) {
 	var r io.Reader
 	var r io.Reader
 	if e.seed == 0 {
 	if e.seed == 0 {
 		r = rand.Reader
 		r = rand.Reader
@@ -96,11 +91,11 @@ func (e *EdgeVPN) genHost(ctx context.Context) (host.Host, error) {
 	return libp2p.New(opts...)
 	return libp2p.New(opts...)
 }
 }
 
 
-func (e *EdgeVPN) sealkey() string {
+func (e *Node) sealkey() string {
 	return gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now()
 	return gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now()
 }
 }
 
 
-func (e *EdgeVPN) handleEvents(ctx context.Context) {
+func (e *Node) handleEvents(ctx context.Context) {
 	for {
 	for {
 		select {
 		select {
 		case m := <-e.inputCh:
 		case m := <-e.inputCh:
@@ -115,13 +110,11 @@ func (e *EdgeVPN) handleEvents(ctx context.Context) {
 			e.handleReceivedMessage(m)
 			e.handleReceivedMessage(m)
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return
-		case <-e.doneCh:
-			return
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (e *EdgeVPN) handleReceivedMessage(m *hub.Message) {
+func (e *Node) handleReceivedMessage(m *hub.Message) {
 	for _, h := range e.config.Handlers {
 	for _, h := range e.config.Handlers {
 		if err := h(m); err != nil {
 		if err := h(m); err != nil {
 			e.config.Logger.Warnf("handler error: %s", err)
 			e.config.Logger.Warnf("handler error: %s", err)
@@ -129,7 +122,7 @@ func (e *EdgeVPN) handleReceivedMessage(m *hub.Message) {
 	}
 	}
 }
 }
 
 
-func (e *EdgeVPN) handleOutgoingMessage(m *hub.Message) {
+func (e *Node) handleOutgoingMessage(m *hub.Message) {
 	err := e.HubRoom.PublishMessage(m)
 	err := e.HubRoom.PublishMessage(m)
 	if err != nil {
 	if err != nil {
 		e.config.Logger.Warnf("publish error: %s", err)
 		e.config.Logger.Warnf("publish error: %s", err)

+ 8 - 7
pkg/edgevpn/message.go → pkg/node/message.go

@@ -13,32 +13,33 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package node
 
 
 import (
 import (
-	"fmt"
-
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 )
 )
 
 
+// MessageWriter is a struct returned by the node that satisfies the io.Writer interface
+// on the underlying hub.
+// Everything Write into the message writer is enqueued to a message channel
+// which is sealed and processed by the node
 type MessageWriter struct {
 type MessageWriter struct {
 	input chan<- *hub.Message
 	input chan<- *hub.Message
 	c     Config
 	c     Config
 	mess  *hub.Message
 	mess  *hub.Message
 }
 }
 
 
-func Message(template string, opts ...interface{}) []byte {
-	return []byte(fmt.Sprintf(template, opts...))
-}
-
+// Write writes a slice of bytes to the message channel
 func (mw *MessageWriter) Write(p []byte) (n int, err error) {
 func (mw *MessageWriter) Write(p []byte) (n int, err error) {
 	return mw.Send(mw.mess.WithMessage(string(p)))
 	return mw.Send(mw.mess.WithMessage(string(p)))
 }
 }
 
 
+// WriteString writes a string to the message channel
 func (mw *MessageWriter) WriteString(p string) (n int, err error) {
 func (mw *MessageWriter) WriteString(p string) (n int, err error) {
 	return mw.Send(mw.mess.WithMessage(p))
 	return mw.Send(mw.mess.WithMessage(p))
 }
 }
 
 
+// Send sends a message to the channel
 func (mw *MessageWriter) Send(copy *hub.Message) (n int, err error) {
 func (mw *MessageWriter) Send(copy *hub.Message) (n int, err error) {
 	mw.input <- copy
 	mw.input <- copy
 	return len(copy.Message), nil
 	return len(copy.Message), nil

+ 180 - 0
pkg/node/node.go

@@ -0,0 +1,180 @@
+// Copyright © 2021 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package node
+
+import (
+	"context"
+	"time"
+
+	"github.com/ipfs/go-log"
+	"github.com/libp2p/go-libp2p"
+	"github.com/libp2p/go-libp2p-core/host"
+	"github.com/libp2p/go-libp2p-core/network"
+	p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
+
+	protocol "github.com/mudler/edgevpn/pkg/protocol"
+
+	pubsub "github.com/libp2p/go-libp2p-pubsub"
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	hub "github.com/mudler/edgevpn/pkg/hub"
+	"github.com/mudler/edgevpn/pkg/logger"
+	"github.com/mudler/edgevpn/pkg/types"
+)
+
+type Node struct {
+	config  Config
+	HubRoom *hub.Room
+	inputCh chan *hub.Message
+	seed    int64
+	host    host.Host
+
+	ledger *blockchain.Ledger
+}
+
+var defaultLibp2pOptions = []libp2p.Option{
+	libp2p.EnableNATService(),
+	libp2p.NATPortMap(),
+	libp2p.EnableAutoRelay(),
+}
+
+func New(p ...Option) *Node {
+	c := Config{
+		DiscoveryInterval:        120 * time.Second,
+		StreamHandlers:           make(map[p2pprotocol.ID]types.StreamHandler),
+		LedgerAnnounceTime:       5 * time.Second,
+		LedgerSyncronizationTime: 5 * time.Second,
+		SealKeyLength:            12,
+		Options:                  defaultLibp2pOptions,
+		Logger:                   logger.New(log.LevelDebug),
+	}
+	c.Apply(p...)
+
+	return &Node{
+		config:  c,
+		inputCh: make(chan *hub.Message, 3000),
+		seed:    0,
+	}
+}
+
+func (e *Node) AddStreamHandler(id protocol.Protocol, s types.StreamHandler) {
+	e.config.StreamHandlers[id.ID()] = s
+}
+
+// Ledger return the ledger associated to the node
+func (e *Node) Ledger() (*blockchain.Ledger, error) {
+	if e.ledger != nil {
+		return e.ledger, nil
+	}
+	mw, err := e.MessageWriter()
+	if err != nil {
+		return nil, err
+	}
+
+	e.ledger = blockchain.New(mw, e.config.Store)
+	return e.ledger, nil
+}
+
+// Start joins the node over the p2p network
+func (e *Node) Start(ctx context.Context) error {
+
+	ledger, err := e.Ledger()
+	if err != nil {
+		return err
+	}
+
+	// Set the handler when we receive messages
+	// The ledger needs to read them and update the internal blockchain
+	e.config.Handlers = append(e.config.Handlers, ledger.Update)
+
+	e.config.Logger.Info("Starting EdgeVPN network")
+
+	// Startup libp2p network
+	err = e.startNetwork(ctx)
+	if err != nil {
+		return err
+	}
+
+	// Send periodically messages to the channel with our blockchain content
+	ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
+
+	// Start eventual declared NetworkServices
+	for _, s := range e.config.NetworkServices {
+		go s(ctx, e.config, e, ledger)
+	}
+
+	return nil
+}
+
+// MessageWriter returns a new MessageWriter bound to the edgevpn instance
+// with the given options
+func (e *Node) MessageWriter(opts ...hub.MessageOption) (*MessageWriter, error) {
+	mess := &hub.Message{}
+	mess.Apply(opts...)
+
+	return &MessageWriter{
+		c:     e.config,
+		input: e.inputCh,
+		mess:  mess,
+	}, nil
+}
+
+// Logger returns the node logger
+func (e *Node) Logger() log.StandardLogger {
+	return e.config.Logger
+}
+
+func (e *Node) startNetwork(ctx context.Context) error {
+	e.config.Logger.Debug("Generating host data")
+
+	host, err := e.genHost(ctx)
+	if err != nil {
+		e.config.Logger.Error(err.Error())
+		return err
+	}
+	e.host = host
+
+	for pid, strh := range e.config.StreamHandlers {
+		host.SetStreamHandler(pid, network.StreamHandler(strh))
+	}
+
+	e.config.Logger.Info("Node ID:", host.ID())
+	e.config.Logger.Info("Node Addresses:", host.Addrs())
+
+	// create a new PubSub service using the GossipSub router
+	ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithMaxMessageSize(e.config.MaxMessageSize))
+	if err != nil {
+		return err
+	}
+
+	// join the "chat" room
+	cr, err := hub.JoinRoom(ctx, ps, host.ID(), e.config.RoomName)
+	if err != nil {
+		return err
+	}
+
+	e.HubRoom = cr
+
+	for _, sd := range e.config.ServiceDiscovery {
+		if err := sd.Run(e.config.Logger, ctx, host); err != nil {
+			e.config.Logger.Fatal(err)
+		}
+	}
+
+	go e.handleEvents(ctx)
+
+	e.Logger().Debug("Network started")
+	return nil
+}

+ 1 - 1
pkg/edgevpn/edgevpn_suite_test.go → pkg/node/node_suite_test.go

@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn_test
+package node_test
 
 
 import (
 import (
 	"testing"
 	"testing"

+ 4 - 4
pkg/edgevpn/edgevpn_test.go → pkg/node/node_test.go

@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn_test
+package node_test
 
 
 import (
 import (
 	"context"
 	"context"
@@ -25,8 +25,8 @@ import (
 	. "github.com/onsi/gomega"
 	. "github.com/onsi/gomega"
 
 
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
-	. "github.com/mudler/edgevpn/pkg/edgevpn"
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/logger"
+	. "github.com/mudler/edgevpn/pkg/node"
 )
 )
 
 
 var _ = Describe("EdgeVPN", func() {
 var _ = Describe("EdgeVPN", func() {
@@ -41,8 +41,8 @@ var _ = Describe("EdgeVPN", func() {
 		It("see each other node ID", func() {
 		It("see each other node ID", func() {
 			ctx, cancel := context.WithCancel(context.Background())
 			ctx, cancel := context.WithCancel(context.Background())
 			defer cancel()
 			defer cancel()
-			e.Join(ctx)
-			e2.Join(ctx)
+			e.Start(ctx)
+			e2.Start(ctx)
 
 
 			Eventually(func() []peer.ID {
 			Eventually(func() []peer.ID {
 				return e.Host().Network().Peers()
 				return e.Host().Network().Peers()

+ 5 - 77
pkg/edgevpn/options.go → pkg/node/options.go

@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package node
 
 
 import (
 import (
 	"encoding/base64"
 	"encoding/base64"
@@ -25,9 +25,9 @@ import (
 	"github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
+	"github.com/mudler/edgevpn/pkg/types"
 	"github.com/mudler/edgevpn/pkg/utils"
 	"github.com/mudler/edgevpn/pkg/utils"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
-	"github.com/songgao/water"
 	"github.com/xlzd/gotp"
 	"github.com/xlzd/gotp"
 	"gopkg.in/yaml.v2"
 	"gopkg.in/yaml.v2"
 )
 )
@@ -47,28 +47,6 @@ func WithLibp2pAdditionalOptions(i ...libp2p.Option) func(cfg *Config) error {
 	}
 	}
 }
 }
 
 
-func WithTimeout(s string) Option {
-	return func(cfg *Config) error {
-		d, err := time.ParseDuration(s)
-		cfg.Timeout = d
-		return err
-	}
-}
-
-func WithConcurrency(i int) Option {
-	return func(cfg *Config) error {
-		cfg.Concurrency = i
-		return nil
-	}
-}
-
-func WithChannelBufferSize(i int) Option {
-	return func(cfg *Config) error {
-		cfg.ChannelBufferSize = i
-		return nil
-	}
-}
-
 func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.NetworkServices = append(cfg.NetworkServices, ns...)
 		cfg.NetworkServices = append(cfg.NetworkServices, ns...)
@@ -76,13 +54,6 @@ func WithNetworkService(ns ...NetworkService) func(cfg *Config) error {
 	}
 	}
 }
 }
 
 
-func WithInterface(i *water.Interface) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.Interface = i
-		return nil
-	}
-}
-
 func WithInterfaceAddress(i string) func(cfg *Config) error {
 func WithInterfaceAddress(i string) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.InterfaceAddress = i
 		cfg.InterfaceAddress = i
@@ -90,34 +61,12 @@ func WithInterfaceAddress(i string) func(cfg *Config) error {
 	}
 	}
 }
 }
 
 
-func WithRouterAddress(i string) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.RouterAddress = i
-		return nil
-	}
-}
-
-func WithInterfaceMTU(i int) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.InterfaceMTU = i
-		return nil
-	}
-}
-
-func WithPacketMTU(i int) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.MTU = i
-		return nil
-	}
-}
-
-func WithInterfaceType(d water.DeviceType) func(cfg *Config) error {
+func Logger(l log.StandardLogger) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
-		cfg.DeviceType = d
+		cfg.Logger = l
 		return nil
 		return nil
 	}
 	}
 }
 }
-
 func WithStore(s blockchain.Store) func(cfg *Config) error {
 func WithStore(s blockchain.Store) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.Store = s
 		cfg.Store = s
@@ -125,20 +74,6 @@ func WithStore(s blockchain.Store) func(cfg *Config) error {
 	}
 	}
 }
 }
 
 
-func WithInterfaceName(i string) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.InterfaceName = i
-		return nil
-	}
-}
-
-func Logger(l log.StandardLogger) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.Logger = l
-		return nil
-	}
-}
-
 // Handlers adds a handler to the list that is called on each received message
 // Handlers adds a handler to the list that is called on each received message
 func Handlers(h ...Handler) func(cfg *Config) error {
 func Handlers(h ...Handler) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
@@ -148,7 +83,7 @@ func Handlers(h ...Handler) func(cfg *Config) error {
 }
 }
 
 
 // StreamHandlers adds a handler to the list that is called on each received message
 // StreamHandlers adds a handler to the list that is called on each received message
-func WithStreamHandler(id protocol.ID, h StreamHandler) func(cfg *Config) error {
+func WithStreamHandler(id protocol.ID, h types.StreamHandler) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.StreamHandlers[id] = h
 		cfg.StreamHandlers[id] = h
 		return nil
 		return nil
@@ -177,13 +112,6 @@ func ListenAddresses(ss ...string) func(cfg *Config) error {
 	}
 	}
 }
 }
 
 
-func NetLinkBootstrap(b bool) func(cfg *Config) error {
-	return func(cfg *Config) error {
-		cfg.NetLinkBootstrap = b
-		return nil
-	}
-}
-
 func Insecure(b bool) func(cfg *Config) error {
 func Insecure(b bool) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.Insecure = b
 		cfg.Insecure = b

+ 39 - 0
pkg/protocol/protocol.go

@@ -0,0 +1,39 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package protocol
+
+import (
+	p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
+)
+
+const (
+	EdgeVPN         Protocol = "/edgevpn/0.1"
+	ServiceProtocol Protocol = "/edgevpn/service/0.1"
+	FileProtocol    Protocol = "/edgevpn/file/0.1"
+)
+
+const (
+	FilesLedgerKey    = "files"
+	MachinesLedgerKey = "machines"
+	ServicesLedgerKey = "services"
+	UsersLedgerKey    = "users"
+)
+
+type Protocol string
+
+func (p Protocol) ID() p2pprotocol.ID {
+	return p2pprotocol.ID(string(p))
+}

+ 34 - 35
pkg/edgevpn/files.go → pkg/services/files.go

@@ -1,4 +1,4 @@
-// Copyright © 2021 Ettore Di Giacinto <[email protected]>
+// Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
 //
 //
 // This program is free software; you can redistribute it and/or modify
 // This program is free software; you can redistribute it and/or modify
 // it under the terms of the GNU General Public License as published by
 // it under the terms of the GNU General Public License as published by
@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package services
 
 
 import (
 import (
 	"context"
 	"context"
@@ -21,36 +21,34 @@ import (
 	"os"
 	"os"
 	"time"
 	"time"
 
 
+	"github.com/ipfs/go-log"
+	"github.com/mudler/edgevpn/pkg/protocol"
+
 	"github.com/libp2p/go-libp2p-core/network"
 	"github.com/libp2p/go-libp2p-core/network"
 	"github.com/libp2p/go-libp2p-core/peer"
 	"github.com/libp2p/go-libp2p-core/peer"
-	"github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
-	"github.com/mudler/edgevpn/pkg/edgevpn/types"
+	"github.com/mudler/edgevpn/pkg/types"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 )
 )
 
 
-const (
-	FilesLedgerKey = "files"
-)
-
-func (e *EdgeVPN) SendFile(ledger *blockchain.Ledger, fileID, filepath string) error {
+func SendFile(ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, fileID, filepath string) error {
 
 
-	e.Logger().Infof("Serving '%s' as '%s'", filepath, fileID)
+	l.Infof("Serving '%s' as '%s'", filepath, fileID)
 
 
 	// By announcing periodically our service to the blockchain
 	// By announcing periodically our service to the blockchain
 	ledger.Announce(
 	ledger.Announce(
 		context.Background(),
 		context.Background(),
-		e.config.LedgerAnnounceTime,
+		announcetime,
 		func() {
 		func() {
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(FilesLedgerKey, fileID)
+			existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
 			service := &types.Service{}
 			service := &types.Service{}
 			existingValue.Unmarshal(service)
 			existingValue.Unmarshal(service)
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
-			if !found || service.PeerID != e.host.ID().String() {
+			if !found || service.PeerID != node.Host().ID().String() {
 				updatedMap := map[string]interface{}{}
 				updatedMap := map[string]interface{}{}
-				updatedMap[fileID] = types.File{PeerID: e.host.ID().String(), Name: fileID}
-				ledger.Add(FilesLedgerKey, updatedMap)
+				updatedMap[fileID] = types.File{PeerID: node.Host().ID().String(), Name: fileID}
+				ledger.Add(protocol.FilesLedgerKey, updatedMap)
 			}
 			}
 		},
 		},
 	)
 	)
@@ -58,17 +56,18 @@ func (e *EdgeVPN) SendFile(ledger *blockchain.Ledger, fileID, filepath string) e
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	// 2) Set a stream handler
 	// 2) Set a stream handler
 	//    which connect to the given address/Port and Send what we receive from the Stream.
 	//    which connect to the given address/Port and Send what we receive from the Stream.
-	e.config.StreamHandlers[protocol.ID(FileProtocol)] = func(stream network.Stream) {
+	node.AddStreamHandler(protocol.FileProtocol, func(stream network.Stream) {
 		go func() {
 		go func() {
-			e.config.Logger.Infof("(file %s) Received connection from %s", fileID, stream.Conn().RemotePeer().String())
+			l.Infof("(file %s) Received connection from %s", fileID, stream.Conn().RemotePeer().String())
 
 
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(UsersLedgerKey, stream.Conn().RemotePeer().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
 			if !found {
 			if !found {
-				e.config.Logger.Info("Reset", stream.Conn().RemotePeer().String(), "Not found in the ledger")
+				l.Info("Reset", stream.Conn().RemotePeer().String(), "Not found in the ledger")
 				stream.Reset()
 				stream.Reset()
 				return
 				return
 			}
 			}
@@ -81,49 +80,49 @@ func (e *EdgeVPN) SendFile(ledger *blockchain.Ledger, fileID, filepath string) e
 
 
 			stream.Close()
 			stream.Close()
 
 
-			e.config.Logger.Infof("(file %s) Done handling %s", fileID, stream.Conn().RemotePeer().String())
+			l.Infof("(file %s) Done handling %s", fileID, stream.Conn().RemotePeer().String())
 
 
 		}()
 		}()
-	}
+	})
 
 
 	return nil
 	return nil
 }
 }
 
 
-func (e *EdgeVPN) ReceiveFile(ledger *blockchain.Ledger, fileID string, path string) error {
+func ReceiveFile(ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, fileID string, path string) error {
 
 
 	// Announce ourselves so nodes accepts our connection
 	// Announce ourselves so nodes accepts our connection
 	ledger.Announce(
 	ledger.Announce(
 		context.Background(),
 		context.Background(),
-		e.config.LedgerAnnounceTime,
+		announcetime,
 		func() {
 		func() {
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(UsersLedgerKey, e.host.ID().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
 			if !found {
 			if !found {
 				updatedMap := map[string]interface{}{}
 				updatedMap := map[string]interface{}{}
-				updatedMap[e.host.ID().String()] = &types.User{
-					PeerID:    e.host.ID().String(),
+				updatedMap[node.Host().ID().String()] = &types.User{
+					PeerID:    node.Host().ID().String(),
 					Timestamp: time.Now().String(),
 					Timestamp: time.Now().String(),
 				}
 				}
-				ledger.Add(UsersLedgerKey, updatedMap)
+				ledger.Add(protocol.UsersLedgerKey, updatedMap)
 			}
 			}
 		},
 		},
 	)
 	)
 	for {
 	for {
 		time.Sleep(5 * time.Second)
 		time.Sleep(5 * time.Second)
 
 
-		e.config.Logger.Debug("Attempting to find file in the blockchain")
+		l.Debug("Attempting to find file in the blockchain")
 
 
-		_, found := ledger.GetKey(UsersLedgerKey, e.host.ID().String())
+		_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
 		if !found {
 		if !found {
 			continue
 			continue
 		}
 		}
-		existingValue, found := ledger.GetKey(FilesLedgerKey, fileID)
+		existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
 		fi := &types.File{}
 		fi := &types.File{}
 		existingValue.Unmarshal(fi)
 		existingValue.Unmarshal(fi)
 		// If mismatch, update the blockchain
 		// If mismatch, update the blockchain
 		if !found {
 		if !found {
-			e.config.Logger.Debug("file not found on blockchain, retrying in 5 seconds")
+			l.Debug("file not found on blockchain, retrying in 5 seconds")
 			continue
 			continue
 		} else {
 		} else {
 			break
 			break
@@ -132,7 +131,7 @@ func (e *EdgeVPN) ReceiveFile(ledger *blockchain.Ledger, fileID string, path str
 	// Listen for an incoming connection.
 	// Listen for an incoming connection.
 
 
 	// Retrieve current ID for ip in the blockchain
 	// Retrieve current ID for ip in the blockchain
-	existingValue, found := ledger.GetKey(FilesLedgerKey, fileID)
+	existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
 	fi := &types.File{}
 	fi := &types.File{}
 	existingValue.Unmarshal(fi)
 	existingValue.Unmarshal(fi)
 
 
@@ -148,11 +147,11 @@ func (e *EdgeVPN) ReceiveFile(ledger *blockchain.Ledger, fileID string, path str
 	}
 	}
 
 
 	// Open a stream
 	// Open a stream
-	stream, err := e.host.NewStream(context.Background(), d, FileProtocol)
+	stream, err := node.Host().NewStream(context.Background(), d, protocol.FileProtocol.ID())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	e.Logger().Infof("Saving file %s to %s", fileID, path)
+	l.Infof("Saving file %s to %s", fileID, path)
 
 
 	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
 	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
 	if err != nil {
 	if err != nil {
@@ -163,6 +162,6 @@ func (e *EdgeVPN) ReceiveFile(ledger *blockchain.Ledger, fileID string, path str
 
 
 	f.Close()
 	f.Close()
 
 
-	e.Logger().Infof("Received file %s to %s", fileID, path)
+	l.Infof("Received file %s to %s", fileID, path)
 	return nil
 	return nil
 }
 }

+ 36 - 39
pkg/edgevpn/services.go → pkg/services/services.go

@@ -1,4 +1,4 @@
-// Copyright © 2021 Ettore Di Giacinto <[email protected]>
+// Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
 //
 //
 // This program is free software; you can redistribute it and/or modify
 // This program is free software; you can redistribute it and/or modify
 // it under the terms of the GNU General Public License as published by
 // it under the terms of the GNU General Public License as published by
@@ -13,7 +13,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package services
 
 
 import (
 import (
 	"context"
 	"context"
@@ -21,60 +21,57 @@ import (
 	"net"
 	"net"
 	"time"
 	"time"
 
 
+	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p-core/network"
 	"github.com/libp2p/go-libp2p-core/network"
 	"github.com/libp2p/go-libp2p-core/peer"
 	"github.com/libp2p/go-libp2p-core/peer"
-	"github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/blockchain"
-	"github.com/mudler/edgevpn/pkg/edgevpn/types"
-)
+	protocol "github.com/mudler/edgevpn/pkg/protocol"
 
 
-const (
-	ServicesLedgerKey = "services"
-	UsersLedgerKey    = "users"
+	"github.com/mudler/edgevpn/pkg/types"
 )
 )
 
 
-func (e *EdgeVPN) ExposeService(ledger *blockchain.Ledger, serviceID, dstaddress string) {
+func ExposeService(ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) {
 
 
-	e.Logger().Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
+	l.Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
 
 
 	// 1) Register the ServiceID <-> PeerID Association
 	// 1) Register the ServiceID <-> PeerID Association
 	// By announcing periodically our service to the blockchain
 	// By announcing periodically our service to the blockchain
 	ledger.Announce(
 	ledger.Announce(
 		context.Background(),
 		context.Background(),
-		e.config.LedgerAnnounceTime,
+		announcetime,
 		func() {
 		func() {
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(ServicesLedgerKey, serviceID)
+			existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
 			service := &types.Service{}
 			service := &types.Service{}
 			existingValue.Unmarshal(service)
 			existingValue.Unmarshal(service)
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
-			if !found || service.PeerID != e.host.ID().String() {
+			if !found || service.PeerID != node.Host().ID().String() {
 				updatedMap := map[string]interface{}{}
 				updatedMap := map[string]interface{}{}
-				updatedMap[serviceID] = types.Service{PeerID: e.host.ID().String(), Name: serviceID}
-				ledger.Add(ServicesLedgerKey, updatedMap)
+				updatedMap[serviceID] = types.Service{PeerID: node.Host().ID().String(), Name: serviceID}
+				ledger.Add(protocol.ServicesLedgerKey, updatedMap)
 			}
 			}
 		},
 		},
 	)
 	)
 
 
 	// 2) Set a stream handler
 	// 2) Set a stream handler
 	//    which connect to the given address/Port and Send what we receive from the Stream.
 	//    which connect to the given address/Port and Send what we receive from the Stream.
-	e.config.StreamHandlers[protocol.ID(ServiceProtocol)] = func(stream network.Stream) {
+	node.AddStreamHandler(protocol.ServiceProtocol, func(stream network.Stream) {
 		go func() {
 		go func() {
-			e.config.Logger.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
+			l.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
 
 
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(UsersLedgerKey, stream.Conn().RemotePeer().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
 			if !found {
 			if !found {
-				e.config.Logger.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
+				l.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
 				stream.Reset()
 				stream.Reset()
 				return
 				return
 			}
 			}
 
 
-			e.config.Logger.Infof("Connecting to '%s'", dstaddress)
+			l.Infof("Connecting to '%s'", dstaddress)
 			c, err := net.Dial("tcp", dstaddress)
 			c, err := net.Dial("tcp", dstaddress)
 			if err != nil {
 			if err != nil {
-				e.config.Logger.Debugf("Reset %s: %s", stream.Conn().RemotePeer().String(), err.Error())
+				l.Debugf("Reset %s: %s", stream.Conn().RemotePeer().String(), err.Error())
 				stream.Reset()
 				stream.Reset()
 				return
 				return
 			}
 			}
@@ -86,35 +83,35 @@ func (e *EdgeVPN) ExposeService(ledger *blockchain.Ledger, serviceID, dstaddress
 			stream.Close()
 			stream.Close()
 			c.Close()
 			c.Close()
 
 
-			e.config.Logger.Infof("(service %s) Handled correctly '%s'", serviceID, stream.Conn().RemotePeer().String())
+			l.Infof("(service %s) Handled correctly '%s'", serviceID, stream.Conn().RemotePeer().String())
 		}()
 		}()
-	}
+	})
 }
 }
 
 
-func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string, srcaddr string) error {
+func ConnectToService(ledger *blockchain.Ledger, node types.Node, ll log.StandardLogger, announcetime time.Duration, serviceID string, srcaddr string) error {
 
 
 	// Open local port for listening
 	// Open local port for listening
 	l, err := net.Listen("tcp", srcaddr)
 	l, err := net.Listen("tcp", srcaddr)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	e.Logger().Info("Binding local port on", srcaddr)
+	ll.Info("Binding local port on", srcaddr)
 
 
 	// Announce ourselves so nodes accepts our connection
 	// Announce ourselves so nodes accepts our connection
 	ledger.Announce(
 	ledger.Announce(
 		context.Background(),
 		context.Background(),
-		e.config.LedgerAnnounceTime,
+		announcetime,
 		func() {
 		func() {
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(UsersLedgerKey, e.host.ID().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
 			if !found {
 			if !found {
 				updatedMap := map[string]interface{}{}
 				updatedMap := map[string]interface{}{}
-				updatedMap[e.host.ID().String()] = &types.User{
-					PeerID:    e.host.ID().String(),
+				updatedMap[node.Host().ID().String()] = &types.User{
+					PeerID:    node.Host().ID().String(),
 					Timestamp: time.Now().String(),
 					Timestamp: time.Now().String(),
 				}
 				}
-				ledger.Add(UsersLedgerKey, updatedMap)
+				ledger.Add(protocol.UsersLedgerKey, updatedMap)
 			}
 			}
 		},
 		},
 	)
 	)
@@ -123,21 +120,21 @@ func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string,
 		// Listen for an incoming connection.
 		// Listen for an incoming connection.
 		conn, err := l.Accept()
 		conn, err := l.Accept()
 		if err != nil {
 		if err != nil {
-			e.config.Logger.Error("Error accepting: ", err.Error())
+			ll.Error("Error accepting: ", err.Error())
 			continue
 			continue
 		}
 		}
 
 
-		e.config.Logger.Info("New connection from", l.Addr().String())
+		ll.Info("New connection from", l.Addr().String())
 		// Handle connections in a new goroutine, forwarding to the p2p service
 		// Handle connections in a new goroutine, forwarding to the p2p service
 		go func() {
 		go func() {
 			// Retrieve current ID for ip in the blockchain
 			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(ServicesLedgerKey, serviceID)
+			existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
 			service := &types.Service{}
 			service := &types.Service{}
 			existingValue.Unmarshal(service)
 			existingValue.Unmarshal(service)
 			// If mismatch, update the blockchain
 			// If mismatch, update the blockchain
 			if !found {
 			if !found {
 				conn.Close()
 				conn.Close()
-				e.config.Logger.Debugf("service '%s' not found on blockchain", serviceID)
+				ll.Debugf("service '%s' not found on blockchain", serviceID)
 				return
 				return
 			}
 			}
 
 
@@ -145,18 +142,18 @@ func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string,
 			d, err := peer.Decode(service.PeerID)
 			d, err := peer.Decode(service.PeerID)
 			if err != nil {
 			if err != nil {
 				conn.Close()
 				conn.Close()
-				e.config.Logger.Debugf("could not decode peer '%s'", service.PeerID)
+				ll.Debugf("could not decode peer '%s'", service.PeerID)
 				return
 				return
 			}
 			}
 
 
 			// Open a stream
 			// Open a stream
-			stream, err := e.host.NewStream(context.Background(), d, ServiceProtocol)
+			stream, err := node.Host().NewStream(context.Background(), d, protocol.ServiceProtocol.ID())
 			if err != nil {
 			if err != nil {
 				conn.Close()
 				conn.Close()
-				e.config.Logger.Debugf("could not open stream '%s'", err.Error())
+				ll.Debugf("could not open stream '%s'", err.Error())
 				return
 				return
 			}
 			}
-			e.config.Logger.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
+			ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
 
 
 			closer := make(chan struct{}, 2)
 			closer := make(chan struct{}, 2)
 			go copyStream(closer, stream, conn)
 			go copyStream(closer, stream, conn)
@@ -165,7 +162,7 @@ func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string,
 
 
 			stream.Close()
 			stream.Close()
 			conn.Close()
 			conn.Close()
-			e.config.Logger.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
+			ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
 		}()
 		}()
 	}
 	}
 }
 }

+ 0 - 0
pkg/edgevpn/types/file.go → pkg/types/file.go


+ 0 - 0
pkg/edgevpn/types/machine.go → pkg/types/machine.go


+ 29 - 0
pkg/types/node.go

@@ -0,0 +1,29 @@
+// Copyright © 2021 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package types
+
+import (
+	"context"
+
+	"github.com/libp2p/go-libp2p-core/host"
+	"github.com/mudler/edgevpn/pkg/protocol"
+)
+
+type Node interface {
+	AddStreamHandler(protocol.Protocol, StreamHandler)
+	Start(context.Context) error
+	Host() host.Host
+}

+ 0 - 0
pkg/edgevpn/types/service.go → pkg/types/service.go


+ 20 - 0
pkg/types/stream.go

@@ -0,0 +1,20 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package types
+
+import "github.com/libp2p/go-libp2p-core/network"
+
+type StreamHandler func(stream network.Stream)

+ 0 - 0
pkg/edgevpn/types/user.go → pkg/types/user.go


+ 149 - 0
pkg/vpn/config.go

@@ -0,0 +1,149 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package vpn
+
+import (
+	"time"
+
+	"github.com/ipfs/go-log"
+	"github.com/songgao/water"
+)
+
+type Config struct {
+	Interface        *water.Interface
+	InterfaceName    string
+	InterfaceAddress string
+	RouterAddress    string
+	InterfaceMTU     int
+	MTU              int
+	DeviceType       water.DeviceType
+
+	LedgerAnnounceTime time.Duration
+	Logger             log.StandardLogger
+
+	NetLinkBootstrap bool
+
+	Timeout           time.Duration
+	Concurrency       int
+	ChannelBufferSize int
+}
+
+type Option func(cfg *Config) error
+
+// Apply applies the given options to the config, returning the first error
+// encountered (if any).
+func (cfg *Config) Apply(opts ...Option) error {
+	for _, opt := range opts {
+		if opt == nil {
+			continue
+		}
+		if err := opt(cfg); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func WithInterface(i *water.Interface) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.Interface = i
+		return nil
+	}
+}
+
+func NetLinkBootstrap(b bool) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.NetLinkBootstrap = b
+		return nil
+	}
+}
+
+func WithTimeout(s string) Option {
+	return func(cfg *Config) error {
+		d, err := time.ParseDuration(s)
+		cfg.Timeout = d
+		return err
+	}
+}
+
+func Logger(l log.StandardLogger) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.Logger = l
+		return nil
+	}
+}
+func WithRouterAddress(i string) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.RouterAddress = i
+		return nil
+	}
+}
+
+func WithLedgerAnnounceTime(t time.Duration) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.LedgerAnnounceTime = t
+		return nil
+	}
+}
+
+func WithConcurrency(i int) Option {
+	return func(cfg *Config) error {
+		cfg.Concurrency = i
+		return nil
+	}
+}
+
+func WithChannelBufferSize(i int) Option {
+	return func(cfg *Config) error {
+		cfg.ChannelBufferSize = i
+		return nil
+	}
+}
+
+func WithInterfaceMTU(i int) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.InterfaceMTU = i
+		return nil
+	}
+}
+
+func WithPacketMTU(i int) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.MTU = i
+		return nil
+	}
+}
+
+func WithInterfaceType(d water.DeviceType) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.DeviceType = d
+		return nil
+	}
+}
+
+func WithInterfaceName(i string) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.InterfaceName = i
+		return nil
+	}
+}
+
+func WithInterfaceAddress(i string) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.InterfaceAddress = i
+		return nil
+	}
+}

+ 8 - 8
pkg/edgevpn/interface.go → pkg/vpn/interface.go

@@ -16,33 +16,33 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package vpn
 
 
 import (
 import (
 	"github.com/songgao/water"
 	"github.com/songgao/water"
 	"github.com/vishvananda/netlink"
 	"github.com/vishvananda/netlink"
 )
 )
 
 
-func (e *EdgeVPN) createInterface() (*water.Interface, error) {
+func createInterface(c *Config) (*water.Interface, error) {
 	config := water.Config{
 	config := water.Config{
-		DeviceType: e.config.DeviceType,
+		DeviceType: c.DeviceType,
 	}
 	}
-	config.Name = e.config.InterfaceName
+	config.Name = c.InterfaceName
 
 
 	return water.New(config)
 	return water.New(config)
 }
 }
 
 
-func (e *EdgeVPN) prepareInterface() error {
-	link, err := netlink.LinkByName(e.config.InterfaceName)
+func prepareInterface(c *Config) error {
+	link, err := netlink.LinkByName(c.InterfaceName)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	addr, err := netlink.ParseAddr(e.config.InterfaceAddress)
+	addr, err := netlink.ParseAddr(c.InterfaceAddress)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	err = netlink.LinkSetMTU(link, e.config.InterfaceMTU)
+	err = netlink.LinkSetMTU(link, c.InterfaceMTU)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 8 - 8
pkg/edgevpn/interface_windows.go → pkg/vpn/interface_windows.go

@@ -16,7 +16,7 @@
 // You should have received a copy of the GNU General Public License along
 // You should have received a copy of the GNU General Public License along
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 // with this program; if not, see <http://www.gnu.org/licenses/>.
 
 
-package edgevpn
+package vpn
 
 
 import (
 import (
 	"fmt"
 	"fmt"
@@ -27,24 +27,24 @@ import (
 	"github.com/songgao/water"
 	"github.com/songgao/water"
 )
 )
 
 
-func (e *EdgeVPN) prepareInterface() error {
-	err := netsh("interface", "ip", "set", "address", "name=", e.config.InterfaceName, "static", e.config.InterfaceAddress)
+func prepareInterface(c *Config) error {
+	err := netsh("interface", "ip", "set", "address", "name=", c.InterfaceName, "static", c.InterfaceAddress)
 	if err != nil {
 	if err != nil {
 		log.Println(err)
 		log.Println(err)
 	}
 	}
-	err = netsh("interface", "ipv4", "set", "subinterface", e.config.InterfaceName, "mtu=", fmt.Sprintf("%d", e.config.InterfaceMTU))
+	err = netsh("interface", "ipv4", "set", "subinterface", c.InterfaceName, "mtu=", fmt.Sprintf("%d", c.InterfaceMTU))
 	if err != nil {
 	if err != nil {
 		log.Println(err)
 		log.Println(err)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
-func (e *EdgeVPN) createInterface() (*water.Interface, error) {
+func createInterface(c *Config) (*water.Interface, error) {
 	// TUN on Windows requires address and network to be set on device creation stage
 	// TUN on Windows requires address and network to be set on device creation stage
 	// We also set network to 0.0.0.0/0 so we able to reach networks behind the node
 	// We also set network to 0.0.0.0/0 so we able to reach networks behind the node
 	// https://github.com/songgao/water/blob/master/params_windows.go
 	// https://github.com/songgao/water/blob/master/params_windows.go
 	// https://gitlab.com/openconnect/openconnect/-/blob/master/tun-win32.c
 	// https://gitlab.com/openconnect/openconnect/-/blob/master/tun-win32.c
-	ip, _, err := net.ParseCIDR(e.config.InterfaceAddress)
+	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -53,10 +53,10 @@ func (e *EdgeVPN) createInterface() (*water.Interface, error) {
 		Mask: net.IPv4Mask(0, 0, 0, 0),
 		Mask: net.IPv4Mask(0, 0, 0, 0),
 	}
 	}
 	config := water.Config{
 	config := water.Config{
-		DeviceType: e.config.DeviceType,
+		DeviceType: c.DeviceType,
 		PlatformSpecificParams: water.PlatformSpecificParams{
 		PlatformSpecificParams: water.PlatformSpecificParams{
 			ComponentID:   "tap0901",
 			ComponentID:   "tap0901",
-			InterfaceName: e.config.InterfaceName,
+			InterfaceName: c.InterfaceName,
 			Network:       network.String(),
 			Network:       network.String(),
 		},
 		},
 	}
 	}

+ 237 - 0
pkg/vpn/vpn.go

@@ -0,0 +1,237 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package vpn
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"runtime"
+	"sync"
+	"time"
+
+	"github.com/ipfs/go-log"
+	"github.com/libp2p/go-libp2p-core/network"
+	"github.com/libp2p/go-libp2p-core/peer"
+
+	"github.com/mudler/edgevpn/internal"
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/logger"
+	"github.com/mudler/edgevpn/pkg/protocol"
+	"github.com/mudler/edgevpn/pkg/types"
+	"github.com/pkg/errors"
+	"github.com/songgao/packets/ethernet"
+	"github.com/songgao/water"
+	"golang.org/x/net/ipv4"
+)
+
+// Start the node and the vpn. Returns an error in case of failure
+// When starting the vpn, there is no need to start the node
+func Start(ctx context.Context, ledger *blockchain.Ledger, n types.Node, p ...Option) error {
+	c := &Config{
+		Concurrency:        1,
+		LedgerAnnounceTime: 5 * time.Second,
+		Timeout:            15 * time.Second,
+		Logger:             logger.New(log.LevelDebug),
+	}
+	c.Apply(p...)
+
+	ifce, err := createInterface(c)
+	if err != nil {
+		return err
+	}
+	defer ifce.Close()
+
+	n.AddStreamHandler(protocol.EdgeVPN, streamHandler(ledger, ifce))
+
+	// Set the stream handler to get back the packets from the stream to the interface
+
+	// Join the node to the network, using our ledger
+	// it also starts up a goroutine that periodically sends
+	// messages to the network with our blockchain content
+	if err := n.Start(ctx); err != nil {
+		return err
+	}
+
+	// Announce our IP
+	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
+	if err != nil {
+		return err
+	}
+
+	ledger.Announce(
+		ctx,
+		c.LedgerAnnounceTime,
+		func() {
+			machine := &types.Machine{}
+			// Retrieve current ID for ip in the blockchain
+			existingValue, found := ledger.GetKey(protocol.MachinesLedgerKey, ip.String())
+			existingValue.Unmarshal(machine)
+
+			// If mismatch, update the blockchain
+			if !found || machine.PeerID != n.Host().ID().String() {
+				updatedMap := map[string]interface{}{}
+				updatedMap[ip.String()] = newBlockChainData(n, ip.String())
+				ledger.Add(protocol.MachinesLedgerKey, updatedMap)
+			}
+		},
+	)
+
+	if c.NetLinkBootstrap {
+		if err := prepareInterface(c); err != nil {
+			return err
+		}
+	}
+
+	// read packets from the interface
+	return readPackets(ctx, c, n, ledger, ifce)
+}
+
+func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
+	return func(stream network.Stream) {
+		if !l.Exists(protocol.MachinesLedgerKey,
+			func(d blockchain.Data) bool {
+				machine := &types.Machine{}
+				d.Unmarshal(machine)
+				return machine.PeerID == stream.Conn().RemotePeer().String()
+			}) {
+			stream.Reset()
+			return
+		}
+		io.Copy(ifce.ReadWriteCloser, stream)
+		stream.Close()
+	}
+}
+
+func newBlockChainData(n types.Node, address string) types.Machine {
+	hostname, _ := os.Hostname()
+
+	return types.Machine{
+		PeerID:   n.Host().ID().String(),
+		Hostname: hostname,
+		OS:       runtime.GOOS,
+		Arch:     runtime.GOARCH,
+		Version:  internal.Version,
+		Address:  address,
+	}
+}
+
+func getFrame(ifce *water.Interface, c *Config) (ethernet.Frame, error) {
+	var frame ethernet.Frame
+	frame.Resize(c.MTU)
+
+	n, err := ifce.Read([]byte(frame))
+	if err != nil {
+		return frame, errors.Wrap(err, "could not read from interface")
+	}
+
+	frame = frame[:n]
+	return frame, nil
+}
+
+func handleFrame(frame ethernet.Frame, c *Config, n types.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
+	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
+	defer cancel()
+
+	header, err := ipv4.ParseHeader(frame)
+	if err != nil {
+		return errors.Wrap(err, "could not parse ipv4 header from frame")
+	}
+
+	dst := header.Dst.String()
+	if c.RouterAddress != "" && header.Src.Equal(ip) {
+		dst = c.RouterAddress
+	}
+
+	// Query the routing table
+	value, found := ledger.GetKey(protocol.MachinesLedgerKey, dst)
+	if !found {
+		return fmt.Errorf("'%s' not found in the routing table", dst)
+	}
+	machine := &types.Machine{}
+	value.Unmarshal(machine)
+
+	// Decode the Peer
+	d, err := peer.Decode(machine.PeerID)
+	if err != nil {
+		return errors.Wrap(err, "could not decode peer")
+	}
+
+	// Open a stream
+	stream, err := n.Host().NewStream(ctx, d, protocol.EdgeVPN.ID())
+	if err != nil {
+		return errors.Wrap(err, "could not open stream")
+	}
+
+	stream.Write(frame)
+	stream.Close()
+	return nil
+}
+
+func connectionWorker(
+	p chan ethernet.Frame,
+	c *Config,
+	n types.Node,
+	ip net.IP,
+	wg *sync.WaitGroup,
+	ledger *blockchain.Ledger,
+	ifce *water.Interface) {
+	defer wg.Done()
+	for f := range p {
+		if err := handleFrame(f, c, n, ip, ledger, ifce); err != nil {
+			c.Logger.Debugf("could not handle frame: %s", err.Error())
+		}
+	}
+}
+
+// redirects packets from the interface to the node using the routing table in the blockchain
+func readPackets(ctx context.Context, c *Config, n types.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
+	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
+	if err != nil {
+		return err
+	}
+
+	wg := new(sync.WaitGroup)
+
+	packets := make(chan ethernet.Frame, c.ChannelBufferSize)
+
+	defer func() {
+		close(packets)
+		wg.Wait()
+	}()
+
+	for i := 0; i < c.Concurrency; i++ {
+		wg.Add(1)
+		go connectionWorker(packets, c, n, ip, wg, ledger, ifce)
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		default:
+			frame, err := getFrame(ifce, c)
+			if err != nil {
+				c.Logger.Errorf("could not get frame '%s'", err.Error())
+				continue
+			}
+
+			packets <- frame
+		}
+	}
+}