Jelajahi Sumber

:gear: lazy load services

Ettore Di Giacinto 3 tahun lalu
induk
melakukan
b48b9e5fa7

+ 5 - 5
cmd/file.go

@@ -71,16 +71,16 @@ This is also the ID used to refer when receiving it.`,
 				return err
 			}
 			o, _, ll := cliToOpts(c)
-			e := node.New(o...)
-
-			displayStart(ll)
 
-			ledger, err := e.Ledger()
+			opts, err := services.ShareFile(ll, time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, path)
 			if err != nil {
 				return err
 			}
+			o = append(o, opts...)
 
-			services.ShareFile(context.Background(), ledger, e, ll, time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, path)
+			e := node.New(o...)
+
+			displayStart(ll)
 
 			// Start the node to the network, using our ledger
 			if err := e.Start(context.Background()); err != nil {

+ 6 - 5
cmd/main.go

@@ -86,6 +86,12 @@ func Main() func(c *cli.Context) error {
 		}
 		o, vpnOpts, ll := cliToOpts(c)
 
+		opts, err := vpn.Register(vpnOpts...)
+		if err != nil {
+			return err
+		}
+		o = append(o, opts...)
+
 		e := edgevpn.New(o...)
 
 		displayStart(ll)
@@ -99,11 +105,6 @@ func Main() func(c *cli.Context) error {
 			go api.API(c.String("api-listen"), 5*time.Second, 20*time.Second, ledger)
 		}
 
-		err = vpn.Register(ledger, e, vpnOpts...)
-		if err != nil {
-			return err
-		}
-
 		return e.Start(context.Background())
 	}
 }

+ 3 - 7
cmd/service.go

@@ -70,16 +70,12 @@ For example, '192.168.1.1:80', or '127.0.0.1:22'.`,
 				return err
 			}
 			o, _, ll := cliToOpts(c)
-			e := node.New(o...)
 
-			displayStart(ll)
+			o = append(o, services.RegisterService(ll, time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, address)...)
 
-			ledger, err := e.Ledger()
-			if err != nil {
-				return err
-			}
+			e := node.New(o...)
 
-			services.ExposeService(context.Background(), ledger, e, ll, time.Duration(c.Int("ledger-announce-interval"))*time.Second, name, address)
+			displayStart(ll)
 
 			// Join the node to the network, using our ledger
 			if err := e.Start(context.Background()); err != nil {

+ 5 - 3
pkg/node/config.go

@@ -22,12 +22,12 @@ import (
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/host"
+	"github.com/libp2p/go-libp2p-core/network"
 
-	p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	hub "github.com/mudler/edgevpn/pkg/hub"
-	"github.com/mudler/edgevpn/pkg/types"
+	protocol "github.com/mudler/edgevpn/pkg/protocol"
 )
 
 // Config is the node configuration
@@ -61,7 +61,7 @@ type Config struct {
 
 	// Handle is a handle consumed by HumanInterfaces to handle received messages
 	Handle                     func(bool, *hub.Message)
-	StreamHandlers             map[p2pprotocol.ID]types.StreamHandler
+	StreamHandlers             map[protocol.Protocol]StreamHandler
 	AdditionalOptions, Options []libp2p.Option
 
 	DiscoveryInterval, LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
@@ -71,6 +71,8 @@ type Config struct {
 // NetworkService is a service running over the network. It takes a context, a node and a ledger
 type NetworkService func(context.Context, Config, *Node, *blockchain.Ledger) error
 
+type StreamHandler func(*Node, *blockchain.Ledger) func(stream network.Stream)
+
 type Handler func(*hub.Message) error
 
 type ServiceDiscovery interface {

+ 7 - 16
pkg/node/node.go

@@ -23,7 +23,6 @@ import (
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/host"
 	"github.com/libp2p/go-libp2p-core/network"
-	p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
 
 	protocol "github.com/mudler/edgevpn/pkg/protocol"
 
@@ -31,7 +30,6 @@ import (
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	"github.com/mudler/edgevpn/pkg/logger"
-	"github.com/mudler/edgevpn/pkg/types"
 )
 
 type Node struct {
@@ -53,7 +51,7 @@ var defaultLibp2pOptions = []libp2p.Option{
 func New(p ...Option) *Node {
 	c := Config{
 		DiscoveryInterval:        120 * time.Second,
-		StreamHandlers:           make(map[p2pprotocol.ID]types.StreamHandler),
+		StreamHandlers:           make(map[protocol.Protocol]StreamHandler),
 		LedgerAnnounceTime:       5 * time.Second,
 		LedgerSyncronizationTime: 5 * time.Second,
 		SealKeyLength:            12,
@@ -69,18 +67,6 @@ func New(p ...Option) *Node {
 	}
 }
 
-// AddStreamHandler adds a stream handler for the given protocol.
-// Note: must be called before Start().
-func (e *Node) AddStreamHandler(id protocol.Protocol, s types.StreamHandler) {
-	e.config.StreamHandlers[id.ID()] = s
-}
-
-// AddNetworkService register a network service to the node.
-// Note: must be called before Start().
-func (e *Node) AddNetworkService(n NetworkService) {
-	e.config.NetworkServices = append(e.config.NetworkServices, n)
-}
-
 // Ledger return the ledger which uses the node
 // connection to broadcast messages
 func (e *Node) Ledger() (*blockchain.Ledger, error) {
@@ -151,8 +137,13 @@ func (e *Node) startNetwork(ctx context.Context) error {
 	}
 	e.host = host
 
+	ledger, err := e.Ledger()
+	if err != nil {
+		return err
+	}
+
 	for pid, strh := range e.config.StreamHandlers {
-		host.SetStreamHandler(pid, network.StreamHandler(strh))
+		host.SetStreamHandler(pid.ID(), network.StreamHandler(strh(e, ledger)))
 	}
 
 	e.config.Logger.Info("Node ID:", host.ID())

+ 2 - 3
pkg/node/options.go

@@ -22,10 +22,9 @@ import (
 
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
-	"github.com/libp2p/go-libp2p-core/protocol"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
-	"github.com/mudler/edgevpn/pkg/types"
+	"github.com/mudler/edgevpn/pkg/protocol"
 	"github.com/mudler/edgevpn/pkg/utils"
 	"github.com/pkg/errors"
 	"github.com/xlzd/gotp"
@@ -83,7 +82,7 @@ func Handlers(h ...Handler) func(cfg *Config) error {
 }
 
 // WithStreamHandler adds a handler to the list that is called on each received message
-func WithStreamHandler(id protocol.ID, h types.StreamHandler) func(cfg *Config) error {
+func WithStreamHandler(id protocol.Protocol, h StreamHandler) func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.StreamHandlers[id] = h
 		return nil

+ 52 - 52
pkg/services/files.go

@@ -34,64 +34,64 @@ import (
 
 // ShareFile shares a file to the p2p network.
 // meant to be called before a node is started with Start()
-func ShareFile(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, fileID, filepath string) error {
+func ShareFile(ll log.StandardLogger, announcetime time.Duration, fileID, filepath string) ([]node.Option, error) {
 	_, err := os.Stat(filepath)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	l.Infof("Serving '%s' as '%s'", filepath, fileID)
-
-	n.AddNetworkService(func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-		// By announcing periodically our service to the blockchain
-		ledger.Announce(
-			ctx,
-			announcetime,
-			func() {
-				// Retrieve current ID for ip in the blockchain
-				existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
-				service := &types.Service{}
-				existingValue.Unmarshal(service)
-				// If mismatch, update the blockchain
-				if !found || service.PeerID != n.Host().ID().String() {
-					updatedMap := map[string]interface{}{}
-					updatedMap[fileID] = types.File{PeerID: n.Host().ID().String(), Name: fileID}
-					ledger.Add(protocol.FilesLedgerKey, updatedMap)
-				}
+	ll.Infof("Serving '%s' as '%s'", filepath, fileID)
+	return []node.Option{
+		node.WithNetworkService(
+			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+				// By announcing periodically our service to the blockchain
+				b.Announce(
+					ctx,
+					announcetime,
+					func() {
+						// Retrieve current ID for ip in the blockchain
+						existingValue, found := b.GetKey(protocol.FilesLedgerKey, fileID)
+						service := &types.Service{}
+						existingValue.Unmarshal(service)
+						// If mismatch, update the blockchain
+						if !found || service.PeerID != n.Host().ID().String() {
+							updatedMap := map[string]interface{}{}
+							updatedMap[fileID] = types.File{PeerID: n.Host().ID().String(), Name: fileID}
+							b.Add(protocol.FilesLedgerKey, updatedMap)
+						}
+					},
+				)
+				return nil
 			},
-		)
-		return nil
-	})
-
-	// 2) Set a stream handler
-	//    which connect to the given address/Port and Send what we receive from the Stream.
-	n.AddStreamHandler(protocol.FileProtocol, func(stream network.Stream) {
-		go func() {
-			l.Infof("(file %s) Received connection from %s", fileID, stream.Conn().RemotePeer().String())
-
-			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
-			// If mismatch, update the blockchain
-			if !found {
-				l.Info("Reset", stream.Conn().RemotePeer().String(), "Not found in the ledger")
-				stream.Reset()
-				return
-			}
-			f, err := os.Open(filepath)
-			if err != nil {
-				return
-			}
-			io.Copy(stream, f)
-			f.Close()
-
-			stream.Close()
-
-			l.Infof("(file %s) Done handling %s", fileID, stream.Conn().RemotePeer().String())
-
-		}()
-	})
+		),
+		node.WithStreamHandler(protocol.FileProtocol,
+			func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {
+				return func(stream network.Stream) {
+					go func() {
+						ll.Infof("(file %s) Received connection from %s", fileID, stream.Conn().RemotePeer().String())
+
+						// Retrieve current ID for ip in the blockchain
+						_, found := l.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
+						// If mismatch, update the blockchain
+						if !found {
+							ll.Info("Reset", stream.Conn().RemotePeer().String(), "Not found in the ledger")
+							stream.Reset()
+							return
+						}
+						f, err := os.Open(filepath)
+						if err != nil {
+							return
+						}
+						io.Copy(stream, f)
+						f.Close()
+
+						stream.Close()
+
+						ll.Infof("(file %s) Done handling %s", fileID, stream.Conn().RemotePeer().String())
+					}()
+				}
+			})}, nil
 
-	return nil
 }
 
 func ReceiveFile(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, fileID string, path string) error {

+ 4 - 4
pkg/services/files_test.go

@@ -37,7 +37,6 @@ var _ = Describe("File services", func() {
 	logg := logger.New(log.LevelError)
 	l := node.Logger(logg)
 
-	e := node.New(node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
 	e2 := node.New(node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
 
 	Context("File sharing", func() {
@@ -47,8 +46,6 @@ var _ = Describe("File services", func() {
 
 			fileUUID := "test"
 
-			le, _ := e.Ledger()
-
 			f, err := ioutil.TempFile("", "test")
 			Expect(err).ToNot(HaveOccurred())
 
@@ -57,9 +54,12 @@ var _ = Describe("File services", func() {
 			ioutil.WriteFile(f.Name(), []byte("testfile"), os.ModePerm)
 
 			// First node expose a file
-			err = ShareFile(ctx, le, e, logg, 1*time.Second, fileUUID, f.Name())
+			opts, err := ShareFile(logg, 1*time.Second, fileUUID, f.Name())
 			Expect(err).ToNot(HaveOccurred())
 
+			opts = append(opts, node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
+			e := node.New(opts...)
+
 			e.Start(ctx)
 			e2.Start(ctx)
 

+ 57 - 59
pkg/services/services.go

@@ -34,69 +34,67 @@ import (
 
 // ExposeService exposes a service to the p2p network.
 // meant to be called before a node is started with Start()
-func ExposeService(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) {
-
-	l.Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
-
-	// 1) Register the ServiceID <-> PeerID Association
-	// By announcing periodically our service to the blockchain
-	n.AddNetworkService(func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-		ledger.Announce(
-			ctx,
-			announcetime,
-			func() {
-				// Retrieve current ID for ip in the blockchain
-				existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
-				service := &types.Service{}
-				existingValue.Unmarshal(service)
-				// If mismatch, update the blockchain
-				if !found || service.PeerID != n.Host().ID().String() {
-					updatedMap := map[string]interface{}{}
-					updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
-					ledger.Add(protocol.ServicesLedgerKey, updatedMap)
-				}
-			},
-		)
-		return nil
-	})
-
-	// 2) Set a stream handler
-	//    which connect to the given address/Port and Send what we receive from the Stream.
-	n.AddStreamHandler(protocol.ServiceProtocol, func(stream network.Stream) {
-		go func() {
-			l.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
-
-			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
-			// If mismatch, update the blockchain
-			if !found {
-				l.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
-				stream.Reset()
-				return
-			}
-
-			l.Infof("Connecting to '%s'", dstaddress)
-			c, err := net.Dial("tcp", dstaddress)
-			if err != nil {
-				l.Debugf("Reset %s: %s", stream.Conn().RemotePeer().String(), err.Error())
-				stream.Reset()
-				return
+func RegisterService(ll log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) []node.Option {
+	ll.Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
+	return []node.Option{
+		node.WithStreamHandler(protocol.ServiceProtocol, func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {
+			return func(stream network.Stream) {
+				go func() {
+					ll.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
+
+					// Retrieve current ID for ip in the blockchain
+					_, found := l.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
+					// If mismatch, update the blockchain
+					if !found {
+						ll.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
+						stream.Reset()
+						return
+					}
+
+					ll.Infof("Connecting to '%s'", dstaddress)
+					c, err := net.Dial("tcp", dstaddress)
+					if err != nil {
+						ll.Debugf("Reset %s: %s", stream.Conn().RemotePeer().String(), err.Error())
+						stream.Reset()
+						return
+					}
+					closer := make(chan struct{}, 2)
+					go copyStream(closer, stream, c)
+					go copyStream(closer, c, stream)
+					<-closer
+
+					stream.Close()
+					c.Close()
+
+					ll.Infof("(service %s) Handled correctly '%s'", serviceID, stream.Conn().RemotePeer().String())
+				}()
 			}
-			closer := make(chan struct{}, 2)
-			go copyStream(closer, stream, c)
-			go copyStream(closer, c, stream)
-			<-closer
-
-			stream.Close()
-			c.Close()
-
-			l.Infof("(service %s) Handled correctly '%s'", serviceID, stream.Conn().RemotePeer().String())
-		}()
-	})
+		}),
+
+		node.WithNetworkService(
+			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+				b.Announce(
+					ctx,
+					announcetime,
+					func() {
+						// Retrieve current ID for ip in the blockchain
+						existingValue, found := b.GetKey(protocol.ServicesLedgerKey, serviceID)
+						service := &types.Service{}
+						existingValue.Unmarshal(service)
+						// If mismatch, update the blockchain
+						if !found || service.PeerID != n.Host().ID().String() {
+							updatedMap := map[string]interface{}{}
+							updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
+							b.Add(protocol.ServicesLedgerKey, updatedMap)
+						}
+					},
+				)
+				return nil
+			},
+		)}
 }
 
 func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node *node.Node, ll log.StandardLogger, announcetime time.Duration, serviceID string, srcaddr string) error {
-
 	// Open local port for listening
 	l, err := net.Listen("tcp", srcaddr)
 	if err != nil {

+ 3 - 3
pkg/services/services_test.go

@@ -59,7 +59,6 @@ var _ = Describe("Expose services", func() {
 	logg := logger.New(log.LevelFatal)
 	l := node.Logger(logg)
 
-	e := node.New(node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
 	e2 := node.New(node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
 
 	Context("Service sharing", func() {
@@ -69,11 +68,12 @@ var _ = Describe("Expose services", func() {
 
 			serviceUUID := "test"
 
-			le, _ := e.Ledger()
+			opts := RegisterService(logg, 1*time.Second, serviceUUID, "142.250.184.35:80")
+			opts = append(opts, node.FromBase64(true, true, token), node.WithStore(&blockchain.MemoryStore{}), l)
+			e := node.New(opts...)
 
 			// First node expose a service
 			// redirects to google:80
-			ExposeService(ctx, le, e, logg, 1*time.Second, serviceUUID, "142.250.184.35:80")
 
 			e.Start(ctx)
 			e2.Start(ctx)

+ 0 - 20
pkg/types/stream.go

@@ -1,20 +0,0 @@
-// Copyright © 2022 Ettore Di Giacinto <[email protected]>
-//
-// This program is free software; you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation; either version 2 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along
-// with this program; if not, see <http://www.gnu.org/licenses/>.
-
-package types
-
-import "github.com/libp2p/go-libp2p-core/network"
-
-type StreamHandler func(stream network.Stream)

+ 41 - 35
pkg/vpn/vpn.go

@@ -43,7 +43,7 @@ import (
 
 // Start the node and the vpn. Returns an error in case of failure
 // When starting the vpn, there is no need to start the node
-func Register(ledger *blockchain.Ledger, n *node.Node, p ...Option) error {
+func Register(p ...Option) ([]node.Option, error) {
 	c := &Config{
 		Concurrency:        1,
 		LedgerAnnounceTime: 5 * time.Second,
@@ -54,46 +54,52 @@ func Register(ledger *blockchain.Ledger, n *node.Node, p ...Option) error {
 
 	ifce, err := createInterface(c)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	n.AddStreamHandler(protocol.EdgeVPN, streamHandler(ledger, ifce))
-	n.AddNetworkService(func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
-		defer ifce.Close()
-		// Announce our IP
-		ip, _, err := net.ParseCIDR(c.InterfaceAddress)
-		if err != nil {
-			return err
-		}
-
-		ledger.Announce(
-			ctx,
-			c.LedgerAnnounceTime,
-			func() {
-				machine := &types.Machine{}
-				// Retrieve current ID for ip in the blockchain
-				existingValue, found := ledger.GetKey(protocol.MachinesLedgerKey, ip.String())
-				existingValue.Unmarshal(machine)
-
-				// If mismatch, update the blockchain
-				if !found || machine.PeerID != n.Host().ID().String() {
-					updatedMap := map[string]interface{}{}
-					updatedMap[ip.String()] = newBlockChainData(n, ip.String())
-					ledger.Add(protocol.MachinesLedgerKey, updatedMap)
-				}
+	return []node.Option{
+		node.WithStreamHandler(protocol.EdgeVPN,
+			func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {
+				return streamHandler(l, ifce)
 			},
-		)
-
-		if c.NetLinkBootstrap {
-			if err := prepareInterface(c); err != nil {
+		),
+		node.WithNetworkService(func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
+			defer ifce.Close()
+			// Announce our IP
+			ip, _, err := net.ParseCIDR(c.InterfaceAddress)
+			if err != nil {
 				return err
 			}
-		}
 
-		// read packets from the interface
-		return readPackets(ctx, c, n, ledger, ifce)
-	})
-	return nil
+			b.Announce(
+				ctx,
+				c.LedgerAnnounceTime,
+				func() {
+					machine := &types.Machine{}
+					// Retrieve current ID for ip in the blockchain
+					existingValue, found := b.GetKey(protocol.MachinesLedgerKey, ip.String())
+					existingValue.Unmarshal(machine)
+
+					// If mismatch, update the blockchain
+					if !found || machine.PeerID != n.Host().ID().String() {
+						updatedMap := map[string]interface{}{}
+						updatedMap[ip.String()] = newBlockChainData(n, ip.String())
+						b.Add(protocol.MachinesLedgerKey, updatedMap)
+					}
+				},
+			)
+
+			if c.NetLinkBootstrap {
+				if err := prepareInterface(c); err != nil {
+					return err
+				}
+			}
+
+			// read packets from the interface
+			return readPackets(ctx, c, n, b, ifce)
+		}),
+	}, nil
+
 }
 
 func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {