Prechádzať zdrojové kódy

:gear: Rotate pubsub subscription

Ettore Di Giacinto 3 rokov pred
rodič
commit
2607dc0f1f

+ 20 - 5
api/api.go

@@ -98,7 +98,11 @@ func API(ctx context.Context, l string, defaultInterval, timeout time.Duration,
 		machines := len(ledger.CurrentData()[protocol.MachinesLedgerKey])
 		users := len(ledger.CurrentData()[protocol.UsersLedgerKey])
 		services := len(ledger.CurrentData()[protocol.ServicesLedgerKey])
-		onChainNodes := len(e.HubRoom.Topic.ListPeers())
+		peers, err := e.MessageHub.ListPeers()
+		if err != nil {
+			return err
+		}
+		onChainNodes := len(peers)
 		p2pPeers := len(e.Host().Network().Peerstore().Peers())
 		nodeID := e.Host().ID().String()
 
@@ -125,7 +129,11 @@ func API(ctx context.Context, l string, defaultInterval, timeout time.Duration,
 			if e.Host().Network().Connectedness(peer.ID(machine.PeerID)) == network.Connected {
 				m.Connected = true
 			}
-			for _, p := range e.HubRoom.Topic.ListPeers() {
+			peers, err := e.MessageHub.ListPeers()
+			if err != nil {
+				return err
+			}
+			for _, p := range peers {
 				if p.String() == machine.PeerID {
 					m.OnChain = true
 				}
@@ -139,7 +147,11 @@ func API(ctx context.Context, l string, defaultInterval, timeout time.Duration,
 
 	ec.GET(NodesURL, func(c echo.Context) error {
 		list := []apiTypes.Peer{}
-		for _, v := range e.HubRoom.Topic.ListPeers() {
+		peers, err := e.MessageHub.ListPeers()
+		if err != nil {
+			return err
+		}
+		for _, v := range peers {
 			list = append(list, apiTypes.Peer{ID: v.String()})
 		}
 
@@ -151,9 +163,12 @@ func API(ctx context.Context, l string, defaultInterval, timeout time.Duration,
 		for _, v := range e.Host().Network().Peerstore().Peers() {
 			list = append(list, apiTypes.Peer{ID: v.String()})
 		}
-		e.HubRoom.Topic.ListPeers()
+		peers, err := e.MessageHub.ListPeers()
+		if err != nil {
+			return err
+		}
 
-		return c.JSON(http.StatusOK, list)
+		return c.JSON(http.StatusOK, peers)
 	})
 
 	ec.GET(UsersURL, func(c echo.Context) error {

+ 6 - 0
cmd/util.go

@@ -206,6 +206,11 @@ var CommonFlags []cli.Flag = []cli.Flag{
 		Usage:  "Enable inline resource limit configuration",
 		EnvVar: "LIMITCONFIG",
 	},
+	&cli.BoolFlag{
+		Name:   "limit-disable",
+		Usage:  "Disable resource limit",
+		EnvVar: "LIMITDISABLE",
+	},
 	&cli.BoolFlag{
 		Name:   "limit-config-dynamic",
 		Usage:  "Enable dynamic resource limit configuration",
@@ -348,6 +353,7 @@ func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option, *logger.Logger) {
 			HolePunch:      c.Bool("holepunch"),
 		},
 		Limit: config.ResourceLimit{
+			Disable:     c.Bool("limit-disable"),
 			FileLimit:   c.String("limit-file"),
 			Scope:       c.String("limit-scope"),
 			LimitConfig: limitConfig,

+ 30 - 20
pkg/config/config.go

@@ -23,6 +23,7 @@ import (
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
 	connmanager "github.com/libp2p/go-libp2p-connmgr"
+	"github.com/libp2p/go-libp2p-core/network"
 	dht "github.com/libp2p/go-libp2p-kad-dht"
 	rcmgr "github.com/libp2p/go-libp2p-resource-manager"
 	"github.com/mudler/edgevpn/pkg/blockchain"
@@ -60,6 +61,7 @@ type ResourceLimit struct {
 	FileLimit   string
 	LimitConfig *node.NetLimitConfig
 	Scope       string
+	Disable     bool
 }
 
 // Ledger is the ledger configuration structure
@@ -210,33 +212,41 @@ func (c Config) ToOpts(l *logger.Logger) ([]node.Option, []vpn.Option, error) {
 
 	libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(cm))
 
-	var limiter *rcmgr.BasicLimiter
-
-	if c.Limit.FileLimit != "" {
-		limitFile, err := os.Open(c.Limit.FileLimit)
-		if err != nil {
-			return opts, vpnOpts, err
+	if c.Limit.Disable {
+		libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(network.NullResourceManager))
+	} else {
+		var limiter *rcmgr.BasicLimiter
+
+		if c.Limit.FileLimit != "" {
+			limitFile, err := os.Open(c.Limit.FileLimit)
+			if err != nil {
+				return opts, vpnOpts, err
+			}
+			defer limitFile.Close()
+
+			limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitFile)
+			if err != nil {
+				return opts, vpnOpts, err
+			}
+		} else {
+			limiter = rcmgr.NewDefaultLimiter()
 		}
-		defer limitFile.Close()
 
-		limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitFile)
+		libp2p.SetDefaultServiceLimits(limiter)
+
+		rc, err := rcmgr.NewResourceManager(limiter)
 		if err != nil {
-			return opts, vpnOpts, err
+			llger.Fatal("could not create resource manager")
 		}
-	} else {
-		limiter = rcmgr.NewDefaultLimiter()
-	}
-
-	libp2p.SetDefaultServiceLimits(limiter)
 
-	rc, err := rcmgr.NewResourceManager(limiter)
-	if c.Limit.LimitConfig != nil {
-		if err := node.NetSetLimit(rc, c.Limit.Scope, *c.Limit.LimitConfig); err != nil {
-			return opts, vpnOpts, err
+		if c.Limit.LimitConfig != nil {
+			if err := node.NetSetLimit(rc, c.Limit.Scope, *c.Limit.LimitConfig); err != nil {
+				return opts, vpnOpts, err
+			}
 		}
-	}
 
-	libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(rc))
+		libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(rc))
+	}
 
 	if c.Connection.HolePunch {
 		libp2pOpts = append(libp2pOpts, libp2p.EnableHolePunching())

+ 3 - 1
pkg/discovery/dht.go

@@ -20,6 +20,8 @@ import (
 	"sync"
 	"time"
 
+	internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
+
 	"github.com/ipfs/go-log"
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/host"
@@ -59,7 +61,7 @@ func (d *DHT) Rendezvous() string {
 		totp := gotp.NewTOTP(d.OTPKey, d.KeyLength, d.OTPInterval, nil)
 
 		//totp := gotp.NewDefaultTOTP(d.OTPKey)
-		rv := totp.Now()
+		rv := internalCrypto.MD5(totp.Now())
 		d.latestRendezvous = rv
 		return rv
 	}

+ 130 - 0
pkg/hub/hub.go

@@ -0,0 +1,130 @@
+// Copyright © 2022 Ettore Di Giacinto <[email protected]>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, see <http://www.gnu.org/licenses/>.
+
+package hub
+
+import (
+	"context"
+	"errors"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/mudler/edgevpn/pkg/crypto"
+	"github.com/xlzd/gotp"
+
+	"github.com/libp2p/go-libp2p-core/host"
+	"github.com/libp2p/go-libp2p-core/peer"
+	pubsub "github.com/libp2p/go-libp2p-pubsub"
+)
+
+type MessageHub struct {
+	sync.Mutex
+
+	r         *Room
+	otpKey    string
+	maxsize   int
+	keyLength int
+	interval  int
+
+	ctxCancel context.CancelFunc
+	Messages  chan *Message
+}
+
+func NewHub(otp string, maxsize, keyLength, interval int) *MessageHub {
+	return &MessageHub{otpKey: otp, maxsize: maxsize, keyLength: keyLength, interval: interval,
+		Messages: make(chan *Message, RoomBufSize)}
+}
+
+func (m *MessageHub) topicKey() string {
+	totp := gotp.NewTOTP(strings.ToUpper(m.otpKey), m.keyLength, m.interval, nil)
+	return crypto.MD5(totp.Now())
+}
+
+func (m *MessageHub) joinRoom(host host.Host) error {
+	m.Lock()
+	defer m.Unlock()
+
+	if m.ctxCancel != nil {
+		m.ctxCancel()
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	m.ctxCancel = cancel
+
+	// create a new PubSub service using the GossipSub router
+	ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithMaxMessageSize(m.maxsize))
+	if err != nil {
+		return err
+	}
+
+	// join the "chat" room
+	cr, err := JoinRoom(ctx, ps, host.ID(), m.topicKey(), m.Messages)
+	if err != nil {
+		return err
+	}
+	m.r = cr
+
+	return nil
+}
+
+func (m *MessageHub) Start(ctx context.Context, host host.Host) error {
+	c := make(chan interface{})
+	go func(c context.Context, cc chan interface{}) {
+		k := ""
+		for {
+			select {
+			default:
+				currentKey := m.topicKey()
+				if currentKey != k {
+					k = currentKey
+					cc <- nil
+				}
+				time.Sleep(1 * time.Second)
+			case <-ctx.Done():
+				close(cc)
+				return
+			}
+		}
+	}(ctx, c)
+
+	for range c {
+		m.joinRoom(host)
+	}
+
+	// Close eventual open contexts
+	if m.ctxCancel != nil {
+		m.ctxCancel()
+	}
+	return nil
+}
+
+func (m *MessageHub) PublishMessage(mess *Message) error {
+	m.Lock()
+	defer m.Unlock()
+	if m.r != nil {
+		return m.r.PublishMessage(mess)
+	}
+	return errors.New("no message room available")
+}
+
+func (m *MessageHub) ListPeers() ([]peer.ID, error) {
+	m.Lock()
+	defer m.Unlock()
+	if m.r != nil {
+		return m.r.Topic.ListPeers(), nil
+	}
+	return nil, errors.New("no message room available")
+}

+ 4 - 9
pkg/hub/room.go

@@ -31,9 +31,6 @@ const RoomBufSize = 128
 // can be published to the topic with Room.Publish, and received
 // messages are pushed to the Messages channel.
 type Room struct {
-	// Messages is a channel of messages received from other peers in the chat room
-	Messages chan *Message
-
 	ctx   context.Context
 	ps    *pubsub.PubSub
 	Topic *pubsub.Topic
@@ -45,7 +42,7 @@ type Room struct {
 
 // JoinRoom tries to subscribe to the PubSub topic for the room name, returning
 // a Room on success.
-func JoinRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, roomName string) (*Room, error) {
+func JoinRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, roomName string, messageChan chan *Message) (*Room, error) {
 	// join the pubsub topic
 	topic, err := ps.Join(roomName)
 	if err != nil {
@@ -65,11 +62,10 @@ func JoinRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, roomName s
 		sub:      sub,
 		self:     selfID,
 		roomName: roomName,
-		Messages: make(chan *Message, RoomBufSize),
 	}
 
 	// start reading messages from the subscription in a loop
-	go cr.readLoop()
+	go cr.readLoop(messageChan)
 	return cr, nil
 }
 
@@ -98,11 +94,10 @@ func (cr *Room) PublishMessage(m *Message) error {
 }
 
 // readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
-func (cr *Room) readLoop() {
+func (cr *Room) readLoop(messageChan chan *Message) {
 	for {
 		msg, err := cr.sub.Next(cr.ctx)
 		if err != nil {
-			close(cr.Messages)
 			return
 		}
 		// only forward messages delivered by others
@@ -115,6 +110,6 @@ func (cr *Room) readLoop() {
 			continue
 		}
 		// send valid messages onto the Messages channel
-		cr.Messages <- cm
+		messageChan <- cm
 	}
 }

+ 1 - 1
pkg/node/config.go

@@ -35,7 +35,7 @@ type Config struct {
 	// ExchangeKey is a Symmetric key used to seal the messages
 	ExchangeKey string
 
-	// RoomName is the gossip room where all peers are subscribed to
+	// RoomName is the OTP token gossip room where all peers are subscribed to
 	RoomName string
 
 	// ListenAddresses is the discovery peer initial bootstrap addresses

+ 7 - 5
pkg/node/connection.go

@@ -22,6 +22,8 @@ import (
 	mrand "math/rand"
 	"net"
 
+	internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
+
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/crypto"
 	"github.com/libp2p/go-libp2p-core/host"
@@ -115,7 +117,7 @@ func (e *Node) genHost(ctx context.Context) (host.Host, error) {
 }
 
 func (e *Node) sealkey() string {
-	return gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now()
+	return internalCrypto.MD5(gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now())
 }
 
 func (e *Node) handleEvents(ctx context.Context) {
@@ -128,18 +130,18 @@ func (e *Node) handleEvents(ctx context.Context) {
 			c := m.Copy()
 			str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
 			if err != nil {
-				e.config.Logger.Warn(err.Error())
+				e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
 			}
 			c.Message = str
 			e.handleOutgoingMessage(c)
-		case m := <-e.HubRoom.Messages:
+		case m := <-e.MessageHub.Messages:
 			if m == nil {
 				continue
 			}
 			c := m.Copy()
 			str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
 			if err != nil {
-				e.config.Logger.Warn(err.Error())
+				e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
 			}
 			c.Message = str
 			e.handleReceivedMessage(c)
@@ -158,7 +160,7 @@ func (e *Node) handleReceivedMessage(m *hub.Message) {
 }
 
 func (e *Node) handleOutgoingMessage(m *hub.Message) {
-	err := e.HubRoom.PublishMessage(m)
+	err := e.MessageHub.PublishMessage(m)
 	if err != nil {
 		e.config.Logger.Warnf("publish error: %s", err)
 	}

+ 5 - 16
pkg/node/node.go

@@ -28,15 +28,15 @@ import (
 	"github.com/mudler/edgevpn/pkg/crypto"
 	protocol "github.com/mudler/edgevpn/pkg/protocol"
 
-	pubsub "github.com/libp2p/go-libp2p-pubsub"
 	"github.com/mudler/edgevpn/pkg/blockchain"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	"github.com/mudler/edgevpn/pkg/logger"
 )
 
 type Node struct {
-	config  Config
-	HubRoom *hub.Room
+	config     Config
+	MessageHub *hub.MessageHub
+	//HubRoom *hub.Room
 	inputCh chan *hub.Message
 	seed    int64
 	host    host.Host
@@ -159,19 +159,7 @@ func (e *Node) startNetwork(ctx context.Context) error {
 	e.config.Logger.Info("Node ID:", host.ID())
 	e.config.Logger.Info("Node Addresses:", host.Addrs())
 
-	// create a new PubSub service using the GossipSub router
-	ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithMaxMessageSize(e.config.MaxMessageSize))
-	if err != nil {
-		return err
-	}
-
-	// join the "chat" room
-	cr, err := hub.JoinRoom(ctx, ps, host.ID(), e.config.RoomName)
-	if err != nil {
-		return err
-	}
-
-	e.HubRoom = cr
+	e.MessageHub = hub.NewHub(e.config.RoomName, e.config.MaxMessageSize, e.config.SealKeyLength, e.config.SealKeyInterval)
 
 	for _, sd := range e.config.ServiceDiscovery {
 		if err := sd.Run(e.config.Logger, ctx, host); err != nil {
@@ -180,6 +168,7 @@ func (e *Node) startNetwork(ctx context.Context) error {
 	}
 
 	go e.handleEvents(ctx)
+	go e.MessageHub.Start(ctx, host)
 
 	e.config.Logger.Debug("Network started")
 	return nil

+ 1 - 1
pkg/node/options.go

@@ -273,7 +273,7 @@ func GenerateNewConnectionData(i ...int) *YAMLConnectionConfig {
 
 	return &YAMLConnectionConfig{
 		MaxMessageSize: maxMessSize,
-		RoomName:       utils.RandStringRunes(defaultKeyLength),
+		RoomName:       gotp.RandomSecret(defaultKeyLength),
 		Rendezvous:     utils.RandStringRunes(defaultKeyLength),
 		MDNS:           utils.RandStringRunes(defaultKeyLength),
 		OTP: OTP{