浏览代码

:gear: Add additional channel for extra-chain messages

mudler 3 年之前
父节点
当前提交
d3ea147241
共有 4 个文件被更改,包括 74 次插入27 次删除
  1. 4 1
      pkg/node/config.go
  2. 16 17
      pkg/node/connection.go
  3. 37 9
      pkg/node/node.go
  4. 17 0
      pkg/node/options.go

+ 4 - 1
pkg/node/config.go

@@ -46,7 +46,7 @@ type Config struct {
 	Insecure bool
 
 	// Handlers are a list of handlers subscribed to messages received by the vpn interface
-	Handlers []Handler
+	Handlers, GenericChannelHandler []Handler
 
 	MaxMessageSize  int
 	SealKeyInterval int
@@ -70,6 +70,9 @@ type Config struct {
 
 	Whitelist, Blacklist []string
 
+	// GenericHub enables generic hub
+	GenericHub bool
+
 	Sealer    Sealer
 	PeerGater Gater
 }

+ 16 - 17
pkg/node/connection.go

@@ -120,10 +120,10 @@ func (e *Node) sealkey() string {
 	return internalCrypto.MD5(gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now())
 }
 
-func (e *Node) handleEvents(ctx context.Context) {
+func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message, h *hub.MessageHub, handlers []Handler, peerGater bool) {
 	for {
 		select {
-		case m := <-e.inputCh:
+		case m := <-inputChannel:
 			if m == nil {
 				continue
 			}
@@ -133,15 +133,21 @@ func (e *Node) handleEvents(ctx context.Context) {
 				e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
 			}
 			c.Message = str
-			e.handleOutgoingMessage(c)
-		case m := <-e.MessageHub.Messages:
+
+			if err := h.PublishMessage(c); err != nil {
+				e.config.Logger.Warnf("publish error: %s", err)
+			}
+
+		case m := <-h.Messages:
 			if m == nil {
 				continue
 			}
 
-			if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
-				e.config.Logger.Warnf("gated message from %s", m.SenderID)
-				continue
+			if peerGater {
+				if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
+					e.config.Logger.Warnf("gated message from %s", m.SenderID)
+					continue
+				}
 			}
 
 			c := m.Copy()
@@ -150,24 +156,17 @@ func (e *Node) handleEvents(ctx context.Context) {
 				e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
 			}
 			c.Message = str
-			e.handleReceivedMessage(c)
+			e.handleReceivedMessage(c, handlers)
 		case <-ctx.Done():
 			return
 		}
 	}
 }
 
-func (e *Node) handleReceivedMessage(m *hub.Message) {
-	for _, h := range e.config.Handlers {
+func (e *Node) handleReceivedMessage(m *hub.Message, handlers []Handler) {
+	for _, h := range handlers {
 		if err := h(m); err != nil {
 			e.config.Logger.Warnf("handler error: %s", err)
 		}
 	}
 }
-
-func (e *Node) handleOutgoingMessage(m *hub.Message) {
-	err := e.MessageHub.PublishMessage(m)
-	if err != nil {
-		e.config.Logger.Warnf("publish error: %s", err)
-	}
-}

+ 37 - 9
pkg/node/node.go

@@ -17,6 +17,7 @@ package node
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"time"
 
@@ -37,15 +38,21 @@ import (
 type Node struct {
 	config     Config
 	MessageHub *hub.MessageHub
+	GenericHub *hub.MessageHub
+
 	//HubRoom *hub.Room
-	inputCh chan *hub.Message
-	seed    int64
-	host    host.Host
-	cg      *conngater.BasicConnectionGater
-	ledger  *blockchain.Ledger
+	inputCh      chan *hub.Message
+	genericHubCh chan *hub.Message
+
+	seed   int64
+	host   host.Host
+	cg     *conngater.BasicConnectionGater
+	ledger *blockchain.Ledger
 	sync.Mutex
 }
 
+const defaultChanSize = 3000
+
 var defaultLibp2pOptions = []libp2p.Option{
 	libp2p.EnableNATService(),
 	libp2p.NATPortMap(),
@@ -70,9 +77,10 @@ func New(p ...Option) (*Node, error) {
 	}
 
 	return &Node{
-		config:  *c,
-		inputCh: make(chan *hub.Message, 3000),
-		seed:    0,
+		config:       *c,
+		inputCh:      make(chan *hub.Message, defaultChanSize),
+		genericHubCh: make(chan *hub.Message, defaultChanSize),
+		seed:         0,
 	}, nil
 }
 
@@ -174,9 +182,29 @@ func (e *Node) startNetwork(ctx context.Context) error {
 		}
 	}
 
-	go e.handleEvents(ctx)
+	go e.handleEvents(ctx, e.inputCh, e.MessageHub, e.config.Handlers, true)
 	go e.MessageHub.Start(ctx, host)
 
+	// If generic hub is enabled one is created separately with a set of generic channel handlers associated with.
+	// note peergating is disabled in order to freely exchange messages that can be used for authentication or for other public means.
+	if e.config.GenericHub {
+		e.GenericHub = hub.NewHub(fmt.Sprintf("%s-generic", e.config.RoomName), e.config.MaxMessageSize, e.config.SealKeyLength, e.config.SealKeyInterval)
+		go e.handleEvents(ctx, e.genericHubCh, e.GenericHub, e.config.GenericChannelHandler, false)
+		go e.GenericHub.Start(ctx, host)
+	}
+
 	e.config.Logger.Debug("Network started")
 	return nil
 }
+
+// PublishMessage publishes a message to the generic channel (if enabled)
+// See GenericChannelHandlers(..) to attach handlers to receive messages from this channel.
+func (e *Node) PublishMessage(m *hub.Message) error {
+	if !e.config.GenericHub {
+		return fmt.Errorf("generic hub disabled")
+	}
+
+	e.genericHubCh <- m
+
+	return nil
+}

+ 17 - 0
pkg/node/options.go

@@ -96,6 +96,14 @@ func Handlers(h ...Handler) func(cfg *Config) error {
 	}
 }
 
+// GenericChannelHandlers adds a handler to the list that is called on each received message in the generic channel (not the one allocated for the blockchain)
+func GenericChannelHandlers(h ...Handler) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.GenericChannelHandler = append(cfg.GenericChannelHandler, h...)
+		return nil
+	}
+}
+
 // WithStreamHandler adds a handler to the list that is called on each received message
 func WithStreamHandler(id protocol.Protocol, h StreamHandler) func(cfg *Config) error {
 	return func(cfg *Config) error {
@@ -112,6 +120,15 @@ func DiscoveryService(s ...ServiceDiscovery) func(cfg *Config) error {
 	}
 }
 
+// EnableGenericHub enables an additional generic hub between peers.
+// This can be used to exchange messages between peers that are not related to any
+// blockchain event. For instance, messages could be used for authentication, or for other sort
+// of application.
+var EnableGenericHub = func(cfg *Config) error {
+	cfg.GenericHub = true
+	return nil
+}
+
 func ListenAddresses(ss ...string) func(cfg *Config) error {
 	return func(cfg *Config) error {
 		for _, s := range ss {