Răsfoiți Sursa

:gear: Lazy service registration

Services needs to be registered to the node so they get lazily called
after the node is initialized. This ensures that the node ID is computed
by then and it is safe for concurrent access.

Also make the memory store thread safe
Ettore Di Giacinto 3 ani în urmă
părinte
comite
68be143556

+ 9 - 0
pkg/blockchain/store_memory.go

@@ -15,18 +15,27 @@
 
 package blockchain
 
+import "sync"
+
 type MemoryStore struct {
+	sync.Mutex
 	block *Block
 }
 
 func (m *MemoryStore) Add(b Block) {
+	m.Lock()
 	m.block = &b
+	m.Unlock()
 }
 
 func (m *MemoryStore) Len() int {
+	m.Lock()
+	defer m.Unlock()
 	return m.block.Index
 }
 
 func (m *MemoryStore) Last() Block {
+	m.Lock()
+	defer m.Unlock()
 	return *m.block
 }

+ 6 - 6
pkg/node/connection.go

@@ -99,17 +99,17 @@ func (e *Node) handleEvents(ctx context.Context) {
 	for {
 		select {
 		case m := <-e.inputCh:
-			m = m.Copy()
-			if err := m.Seal(e.sealkey()); err != nil {
+			c := m.Copy()
+			if err := c.Seal(e.sealkey()); err != nil {
 				e.config.Logger.Warn(err.Error())
 			}
-			e.handleOutgoingMessage(m)
+			e.handleOutgoingMessage(c)
 		case m := <-e.HubRoom.Messages:
-			m = m.Copy()
-			if err := m.Unseal(e.sealkey()); err != nil {
+			c := m.Copy()
+			if err := c.Unseal(e.sealkey()); err != nil {
 				e.config.Logger.Warn(err.Error())
 			}
-			e.handleReceivedMessage(m)
+			e.handleReceivedMessage(c)
 		case <-ctx.Done():
 			return
 		}

+ 6 - 0
pkg/node/node.go

@@ -75,6 +75,12 @@ func (e *Node) AddStreamHandler(id protocol.Protocol, s types.StreamHandler) {
 	e.config.StreamHandlers[id.ID()] = s
 }
 
+// AddNetworkService register a network service to the node.
+// Note: must be called before Start().
+func (e *Node) AddNetworkService(n NetworkService) {
+	e.config.NetworkServices = append(e.config.NetworkServices, n)
+}
+
 // Ledger return the ledger which uses the node
 // connection to broadcast messages
 func (e *Node) Ledger() (*blockchain.Ledger, error) {

+ 32 - 25
pkg/services/files.go

@@ -22,6 +22,7 @@ import (
 	"time"
 
 	"github.com/ipfs/go-log"
+	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/protocol"
 
 	"github.com/libp2p/go-libp2p-core/network"
@@ -31,7 +32,9 @@ import (
 	"github.com/pkg/errors"
 )
 
-func ShareFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, fileID, filepath string) error {
+// ShareFile shares a file to the p2p network.
+// meant to be called before a node is started with Start()
+func ShareFile(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, fileID, filepath string) error {
 	_, err := os.Stat(filepath)
 	if err != nil {
 		return err
@@ -39,27 +42,30 @@ func ShareFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node,
 
 	l.Infof("Serving '%s' as '%s'", filepath, fileID)
 
-	// By announcing periodically our service to the blockchain
-	ledger.Announce(
-		ctx,
-		announcetime,
-		func() {
-			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
-			service := &types.Service{}
-			existingValue.Unmarshal(service)
-			// If mismatch, update the blockchain
-			if !found || service.PeerID != node.Host().ID().String() {
-				updatedMap := map[string]interface{}{}
-				updatedMap[fileID] = types.File{PeerID: node.Host().ID().String(), Name: fileID}
-				ledger.Add(protocol.FilesLedgerKey, updatedMap)
-			}
-		},
-	)
+	n.AddNetworkService(func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		// By announcing periodically our service to the blockchain
+		ledger.Announce(
+			ctx,
+			announcetime,
+			func() {
+				// Retrieve current ID for ip in the blockchain
+				existingValue, found := ledger.GetKey(protocol.FilesLedgerKey, fileID)
+				service := &types.Service{}
+				existingValue.Unmarshal(service)
+				// If mismatch, update the blockchain
+				if !found || service.PeerID != n.Host().ID().String() {
+					updatedMap := map[string]interface{}{}
+					updatedMap[fileID] = types.File{PeerID: n.Host().ID().String(), Name: fileID}
+					ledger.Add(protocol.FilesLedgerKey, updatedMap)
+				}
+			},
+		)
+		return nil
+	})
 
 	// 2) Set a stream handler
 	//    which connect to the given address/Port and Send what we receive from the Stream.
-	node.AddStreamHandler(protocol.FileProtocol, func(stream network.Stream) {
+	n.AddStreamHandler(protocol.FileProtocol, func(stream network.Stream) {
 		go func() {
 			l.Infof("(file %s) Received connection from %s", fileID, stream.Conn().RemotePeer().String())
 
@@ -88,19 +94,20 @@ func ShareFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node,
 	return nil
 }
 
-func ReceiveFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, fileID string, path string) error {
+func ReceiveFile(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, fileID string, path string) error {
 	// Announce ourselves so nodes accepts our connection
+
 	ledger.Announce(
 		ctx,
 		announcetime,
 		func() {
 			// Retrieve current ID for ip in the blockchain
-			_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, n.Host().ID().String())
 			// If mismatch, update the blockchain
 			if !found {
 				updatedMap := map[string]interface{}{}
-				updatedMap[node.Host().ID().String()] = &types.User{
-					PeerID:    node.Host().ID().String(),
+				updatedMap[n.Host().ID().String()] = &types.User{
+					PeerID:    n.Host().ID().String(),
 					Timestamp: time.Now().String(),
 				}
 				ledger.Add(protocol.UsersLedgerKey, updatedMap)
@@ -117,7 +124,7 @@ func ReceiveFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node
 
 			l.Debug("Attempting to find file in the blockchain")
 
-			_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
+			_, found := ledger.GetKey(protocol.UsersLedgerKey, n.Host().ID().String())
 			if !found {
 				continue
 			}
@@ -146,7 +153,7 @@ func ReceiveFile(ctx context.Context, ledger *blockchain.Ledger, node types.Node
 				}
 
 				// Open a stream
-				stream, err := node.Host().NewStream(context.Background(), d, protocol.FileProtocol.ID())
+				stream, err := n.Host().NewStream(context.Background(), d, protocol.FileProtocol.ID())
 				if err != nil {
 					l.Debugf("failed to dial %s, retrying in 5 seconds", d)
 					continue

+ 26 - 19
pkg/services/services.go

@@ -25,38 +25,44 @@ import (
 	"github.com/libp2p/go-libp2p-core/network"
 	"github.com/libp2p/go-libp2p-core/peer"
 	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/node"
 	protocol "github.com/mudler/edgevpn/pkg/protocol"
 	"github.com/pkg/errors"
 
 	"github.com/mudler/edgevpn/pkg/types"
 )
 
-func ExposeService(ctx context.Context, ledger *blockchain.Ledger, node types.Node, l log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) {
+// ExposeService exposes a service to the p2p network.
+// meant to be called before a node is started with Start()
+func ExposeService(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, l log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) {
 
 	l.Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
 
 	// 1) Register the ServiceID <-> PeerID Association
 	// By announcing periodically our service to the blockchain
-	ledger.Announce(
-		ctx,
-		announcetime,
-		func() {
-			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
-			service := &types.Service{}
-			existingValue.Unmarshal(service)
-			// If mismatch, update the blockchain
-			if !found || service.PeerID != node.Host().ID().String() {
-				updatedMap := map[string]interface{}{}
-				updatedMap[serviceID] = types.Service{PeerID: node.Host().ID().String(), Name: serviceID}
-				ledger.Add(protocol.ServicesLedgerKey, updatedMap)
-			}
-		},
-	)
+	n.AddNetworkService(func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		ledger.Announce(
+			ctx,
+			announcetime,
+			func() {
+				// Retrieve current ID for ip in the blockchain
+				existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
+				service := &types.Service{}
+				existingValue.Unmarshal(service)
+				// If mismatch, update the blockchain
+				if !found || service.PeerID != n.Host().ID().String() {
+					updatedMap := map[string]interface{}{}
+					updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
+					ledger.Add(protocol.ServicesLedgerKey, updatedMap)
+				}
+			},
+		)
+		return nil
+	})
 
 	// 2) Set a stream handler
 	//    which connect to the given address/Port and Send what we receive from the Stream.
-	node.AddStreamHandler(protocol.ServiceProtocol, func(stream network.Stream) {
+	n.AddStreamHandler(protocol.ServiceProtocol, func(stream network.Stream) {
 		go func() {
 			l.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
 
@@ -89,7 +95,7 @@ func ExposeService(ctx context.Context, ledger *blockchain.Ledger, node types.No
 	})
 }
 
-func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node types.Node, ll log.StandardLogger, announcetime time.Duration, serviceID string, srcaddr string) error {
+func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node *node.Node, ll log.StandardLogger, announcetime time.Duration, serviceID string, srcaddr string) error {
 
 	// Open local port for listening
 	l, err := net.Listen("tcp", srcaddr)
@@ -116,6 +122,7 @@ func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node types
 			}
 		},
 	)
+
 	defer l.Close()
 	for {
 		select {

+ 0 - 29
pkg/types/node.go

@@ -1,29 +0,0 @@
-// Copyright © 2021 Ettore Di Giacinto <[email protected]>
-//
-// This program is free software; you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation; either version 2 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along
-// with this program; if not, see <http://www.gnu.org/licenses/>.
-
-package types
-
-import (
-	"context"
-
-	"github.com/libp2p/go-libp2p-core/host"
-	"github.com/mudler/edgevpn/pkg/protocol"
-)
-
-type Node interface {
-	AddStreamHandler(protocol.Protocol, StreamHandler)
-	Start(context.Context) error
-	Host() host.Host
-}

+ 6 - 5
pkg/vpn/vpn.go

@@ -32,6 +32,7 @@ import (
 	"github.com/mudler/edgevpn/internal"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	"github.com/mudler/edgevpn/pkg/logger"
+	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/protocol"
 	"github.com/mudler/edgevpn/pkg/types"
 	"github.com/pkg/errors"
@@ -42,7 +43,7 @@ import (
 
 // Start the node and the vpn. Returns an error in case of failure
 // When starting the vpn, there is no need to start the node
-func Start(ctx context.Context, ledger *blockchain.Ledger, n types.Node, p ...Option) error {
+func Start(ctx context.Context, ledger *blockchain.Ledger, n *node.Node, p ...Option) error {
 	c := &Config{
 		Concurrency:        1,
 		LedgerAnnounceTime: 5 * time.Second,
@@ -118,7 +119,7 @@ func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream netw
 	}
 }
 
-func newBlockChainData(n types.Node, address string) types.Machine {
+func newBlockChainData(n *node.Node, address string) types.Machine {
 	hostname, _ := os.Hostname()
 
 	return types.Machine{
@@ -144,7 +145,7 @@ func getFrame(ifce *water.Interface, c *Config) (ethernet.Frame, error) {
 	return frame, nil
 }
 
-func handleFrame(frame ethernet.Frame, c *Config, n types.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func handleFrame(frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
 	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
 	defer cancel()
 
@@ -186,7 +187,7 @@ func handleFrame(frame ethernet.Frame, c *Config, n types.Node, ip net.IP, ledge
 func connectionWorker(
 	p chan ethernet.Frame,
 	c *Config,
-	n types.Node,
+	n *node.Node,
 	ip net.IP,
 	wg *sync.WaitGroup,
 	ledger *blockchain.Ledger,
@@ -200,7 +201,7 @@ func connectionWorker(
 }
 
 // redirects packets from the interface to the node using the routing table in the blockchain
-func readPackets(ctx context.Context, c *Config, n types.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func readPackets(ctx context.Context, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
 	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 	if err != nil {
 		return err