2
0
Эх сурвалжийг харах

:seedling: Add peertable and static privatekey to the node

Signed-off-by: mudler <[email protected]>
mudler 2 жил өмнө
parent
commit
ac0e2c1a5f

+ 3 - 0
pkg/node/config.go

@@ -71,6 +71,9 @@ type Config struct {
 	// GenericHub enables generic hub
 	GenericHub bool
 
+	PrivateKey []byte
+	PeerTable  map[string]peer.ID
+
 	Sealer    Sealer
 	PeerGater Gater
 }

+ 25 - 5
pkg/node/connection.go

@@ -55,17 +55,14 @@ func (e *Node) BlockSubnet(cidr string) error {
 
 func (e *Node) genHost(ctx context.Context) (host.Host, error) {
 	var r io.Reader
+	var prvKey crypto.PrivKey
+
 	if e.seed == 0 {
 		r = rand.Reader
 	} else {
 		r = mrand.New(mrand.NewSource(e.seed))
 	}
 
-	prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
-	if err != nil {
-		return nil, err
-	}
-
 	opts := e.config.Options
 
 	cg, err := conngater.NewBasicConnectionGater(nil)
@@ -90,6 +87,17 @@ func (e *Node) genHost(ctx context.Context) (host.Host, error) {
 		}
 	}
 
+	// generate privkey if not specified
+	if len(e.config.PrivateKey) > 0 {
+		prvKey, err = crypto.UnmarshalPrivateKey(e.config.PrivateKey)
+	} else {
+		prvKey, _, err = crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
 	opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
 
 	addrs := []multiaddr.Multiaddr{}
@@ -147,6 +155,18 @@ func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message,
 					continue
 				}
 			}
+			if len(e.config.PeerTable) > 0 {
+				found := false
+				for _, p := range e.config.PeerTable {
+					if p.String() == peer.ID(m.SenderID).String() {
+						found = true
+					}
+				}
+				if !found {
+					e.config.Logger.Warnf("gated message from %s - not present in peertable", m.SenderID)
+					continue
+				}
+			}
 
 			c := m.Copy()
 			str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())

+ 18 - 0
pkg/node/options.go

@@ -20,6 +20,7 @@ import (
 
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
+	"github.com/libp2p/go-libp2p/core/peer"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	"github.com/mudler/edgevpn/pkg/protocol"
@@ -223,6 +224,23 @@ func WithDiscoveryBootstrapPeers(a discovery.AddrList) func(cfg *Config) error {
 	}
 }
 
+func WithPrivKey(b []byte) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.PrivateKey = b
+		return nil
+	}
+}
+
+func WithStaticPeer(ip string, p peer.ID) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		if cfg.PeerTable == nil {
+			cfg.PeerTable = make(map[string]peer.ID)
+		}
+		cfg.PeerTable[ip] = p
+		return nil
+	}
+}
+
 type OTPConfig struct {
 	Interval int    `yaml:"interval"`
 	Key      string `yaml:"key"`

+ 47 - 17
pkg/vpn/vpn.go

@@ -84,7 +84,7 @@ func VPNNetworkService(p ...Option) node.NetworkService {
 		}
 
 		// Set stream handler during runtime
-		n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce, c))
+		n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce, c, nc))
 
 		// Announce our IP
 		ip, _, err := net.ParseCIDR(c.InterfaceAddress)
@@ -117,7 +117,7 @@ func VPNNetworkService(p ...Option) node.NetworkService {
 		}
 
 		// read packets from the interface
-		return readPackets(ctx, mgr, c, n, b, ifce)
+		return readPackets(ctx, mgr, c, n, b, ifce, nc)
 	}
 }
 
@@ -127,9 +127,9 @@ func Register(p ...Option) ([]node.Option, error) {
 	return []node.Option{node.WithNetworkService(VPNNetworkService(p...))}, nil
 }
 
-func streamHandler(l *blockchain.Ledger, ifce *water.Interface, c *Config) func(stream network.Stream) {
+func streamHandler(l *blockchain.Ledger, ifce *water.Interface, c *Config, nc node.Config) func(stream network.Stream) {
 	return func(stream network.Stream) {
-		if !l.Exists(protocol.MachinesLedgerKey,
+		if len(nc.PeerTable) == 0 && !l.Exists(protocol.MachinesLedgerKey,
 			func(d blockchain.Data) bool {
 				machine := &types.Machine{}
 				d.Unmarshal(machine)
@@ -138,6 +138,18 @@ func streamHandler(l *blockchain.Ledger, ifce *water.Interface, c *Config) func(
 			stream.Reset()
 			return
 		}
+		if len(nc.PeerTable) > 0 {
+			found := false
+			for _, p := range nc.PeerTable {
+				if p.String() == stream.Conn().RemotePeer().String() {
+					found = true
+				}
+			}
+			if !found {
+				stream.Reset()
+				return
+			}
+		}
 		_, err := io.Copy(ifce.ReadWriteCloser, stream)
 		if err != nil {
 			stream.Reset()
@@ -174,7 +186,7 @@ func getFrame(ifce *water.Interface, c *Config) (ethernet.Frame, error) {
 	return frame, nil
 }
 
-func handleFrame(mgr streamManager, frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func handleFrame(mgr streamManager, frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface, nc node.Config) error {
 	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
 	defer cancel()
 
@@ -200,16 +212,33 @@ func handleFrame(mgr streamManager, frame ethernet.Frame, c *Config, n *node.Nod
 		}
 	}
 
-	// Query the routing table
-	value, found := ledger.GetKey(protocol.MachinesLedgerKey, dst)
-	if !found {
-		return fmt.Errorf("'%s' not found in the routing table", dst)
+	var d peer.ID
+	var err error
+	notFoundErr := fmt.Errorf("'%s' not found in the routing table", dst)
+	if len(nc.PeerTable) > 0 {
+		found := false
+		for ip, p := range nc.PeerTable {
+			if ip == dst {
+				found = true
+				d = peer.ID(p)
+			}
+		}
+		if !found {
+			return notFoundErr
+		}
+	} else {
+		// Query the routing table
+		value, found := ledger.GetKey(protocol.MachinesLedgerKey, dst)
+		if !found {
+			return notFoundErr
+		}
+		machine := &types.Machine{}
+		value.Unmarshal(machine)
+
+		// Decode the Peer
+		d, err = peer.Decode(machine.PeerID)
 	}
-	machine := &types.Machine{}
-	value.Unmarshal(machine)
 
-	// Decode the Peer
-	d, err := peer.Decode(machine.PeerID)
 	if err != nil {
 		return errors.Wrap(err, "could not decode peer")
 	}
@@ -251,17 +280,18 @@ func connectionWorker(
 	ip net.IP,
 	wg *sync.WaitGroup,
 	ledger *blockchain.Ledger,
-	ifce *water.Interface) {
+	ifce *water.Interface,
+	nc node.Config) {
 	defer wg.Done()
 	for f := range p {
-		if err := handleFrame(mgr, f, c, n, ip, ledger, ifce); err != nil {
+		if err := handleFrame(mgr, f, c, n, ip, ledger, ifce, nc); err != nil {
 			c.Logger.Debugf("could not handle frame: %s", err.Error())
 		}
 	}
 }
 
 // redirects packets from the interface to the node using the routing table in the blockchain
-func readPackets(ctx context.Context, mgr streamManager, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func readPackets(ctx context.Context, mgr streamManager, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface, nc node.Config) error {
 	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 	if err != nil {
 		return err
@@ -278,7 +308,7 @@ func readPackets(ctx context.Context, mgr streamManager, c *Config, n *node.Node
 
 	for i := 0; i < c.Concurrency; i++ {
 		wg.Add(1)
-		go connectionWorker(packets, mgr, c, n, ip, wg, ledger, ifce)
+		go connectionWorker(packets, mgr, c, n, ip, wg, ledger, ifce, nc)
 	}
 
 	for {