Ettore Di Giacinto 3 vuotta sitten
vanhempi
commit
5efff8e81d
3 muutettua tiedostoa jossa 24 lisäystä ja 10 poistoa
  1. 2 2
      pkg/blockchain/ledger.go
  2. 6 0
      pkg/edgevpn/config.go
  3. 16 8
      pkg/edgevpn/edgevpn.go

+ 2 - 2
pkg/blockchain/ledger.go

@@ -85,11 +85,11 @@ func (l *Ledger) Update(h *hub.Message) (err error) {
 	return
 }
 
-// Persist an async data to the blockchain.
+// Announce keeps updating async data to the blockchain.
 // Sends a broadcast at the specified interval
 // by making sure the async retrieved value is written to the
 // blockchain
-func (l *Ledger) Persist(ctx context.Context, t time.Duration, key string, async func() string) {
+func (l *Ledger) Announce(ctx context.Context, t time.Duration, key string, async func() string) {
 	go func() {
 		t := time.NewTicker(t)
 		defer t.Stop()

+ 6 - 0
pkg/edgevpn/config.go

@@ -2,9 +2,12 @@ package edgevpn
 
 import (
 	"context"
+	"time"
 
 	"github.com/libp2p/go-libp2p"
 	"github.com/libp2p/go-libp2p-core/host"
+	"github.com/libp2p/go-libp2p-core/network"
+	"github.com/libp2p/go-libp2p-core/protocol"
 	discovery "github.com/mudler/edgevpn/pkg/discovery"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	"github.com/songgao/water"
@@ -46,7 +49,10 @@ type Config struct {
 
 	// Handle is a handle consumed by HumanInterfaces to handle received messages
 	Handle                     func(bool, *hub.Message)
+	StreamHandlers             map[protocol.ID]func(stream network.Stream)
 	AdditionalOptions, Options []libp2p.Option
+
+	LedgerSyncronizationTime, LedgerAnnounceTime time.Duration
 }
 
 type Handler func(*hub.Message) error

+ 16 - 8
pkg/edgevpn/edgevpn.go

@@ -28,7 +28,11 @@ type EdgeVPN struct {
 }
 
 func New(p ...Option) *EdgeVPN {
-	c := Config{}
+	c := Config{
+		StreamHandlers:           make(map[protocol.ID]func(stream network.Stream)),
+		LedgerAnnounceTime:       5 * time.Second,
+		LedgerSyncronizationTime: 5 * time.Second,
+	}
 	c.Apply(p...)
 
 	return &EdgeVPN{
@@ -74,11 +78,13 @@ func (e *EdgeVPN) Start() error {
 	// Set the handler when we receive messages
 	// The ledger needs to read them and update the internal blockchain
 	e.config.Handlers = append(e.config.Handlers, ledger.Update)
+	// Set the stream handler to get back the packets from the stream to the interface
+	e.config.StreamHandlers[protocol.ID(Protocol)] = streamHandler(ledger, ifce)
 
 	e.config.Logger.Sugar().Info("starting edgevpn background daemon")
 
 	// Startup libp2p network
-	err = e.network(ledger, ifce)
+	err = e.network()
 	if err != nil {
 		return err
 	}
@@ -90,10 +96,10 @@ func (e *EdgeVPN) Start() error {
 	}
 
 	// Updates the blockchain
-	ledger.Syncronizer(context.Background(), 5*time.Second)
-	ledger.Persist(
+	ledger.Syncronizer(context.Background(), e.config.LedgerSyncronizationTime)
+	ledger.Announce(
 		context.Background(),
-		5*time.Second,
+		e.config.LedgerAnnounceTime,
 		ip.String(),
 		func() string { return e.host.ID().String() },
 	)
@@ -126,7 +132,7 @@ func (e *EdgeVPN) MessageWriter(opts ...hub.MessageOption) (*MessageWriter, erro
 	}, nil
 }
 
-func (e *EdgeVPN) streamHandler(ledger *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
+func streamHandler(ledger *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
 	return func(stream network.Stream) {
 		if !ledger.ExistsValue(stream.Conn().RemotePeer().String()) {
 			stream.Reset()
@@ -183,7 +189,7 @@ func (e *EdgeVPN) readPackets(ledger *blockchain.Ledger, ifce *water.Interface)
 	}
 }
 
-func (e *EdgeVPN) network(ledger *blockchain.Ledger, ifce *water.Interface) error {
+func (e *EdgeVPN) network() error {
 	ctx := context.Background()
 	e.config.Logger.Sugar().Info("generating host data")
 
@@ -194,7 +200,9 @@ func (e *EdgeVPN) network(ledger *blockchain.Ledger, ifce *water.Interface) erro
 	}
 	e.host = host
 
-	host.SetStreamHandler(protocol.ID(Protocol), e.streamHandler(ledger, ifce))
+	for pid, strh := range e.config.StreamHandlers {
+		host.SetStreamHandler(pid, strh)
+	}
 
 	e.config.Logger.Sugar().Info("Host created. We are:", host.ID())
 	e.config.Logger.Sugar().Info(host.Addrs())