Ver Fonte

:ledger: Allow to store arbitrary data to the blockchain with sub buckets

Ettore Di Giacinto há 3 anos atrás
pai
commit
dadccf07cb

+ 18 - 4
api/api.go

@@ -7,6 +7,8 @@ import (
 
 	"github.com/labstack/echo/v4"
 	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/mudler/edgevpn/pkg/edgevpn/types"
 )
 
 //go:embed public
@@ -25,10 +27,22 @@ func API(l string, ledger *blockchain.Ledger) error {
 	ec := echo.New()
 	assetHandler := http.FileServer(getFileSystem())
 
-	ec.GET("/api/data", func(c echo.Context) error {
-		list := []blockchain.Data{}
-		for _, v := range ledger.CurrentData() {
-			list = append(list, v)
+	ec.GET("/api/machines", func(c echo.Context) error {
+		list := []*types.Machine{}
+		for _, v := range ledger.CurrentData()[edgevpn.MachinesLedgerKey] {
+			machine := &types.Machine{}
+			v.Unmarshal(machine)
+			list = append(list, machine)
+		}
+		return c.JSON(http.StatusOK, list)
+	})
+
+	ec.GET("/api/services", func(c echo.Context) error {
+		list := []*types.Service{}
+		for _, v := range ledger.CurrentData()[edgevpn.ServicesLedgerKey] {
+			srvc := &types.Service{}
+			v.Unmarshal(srvc)
+			list = append(list, srvc)
 		}
 		return c.JSON(http.StatusOK, list)
 	})

+ 4 - 2
pkg/blockchain/block.go

@@ -7,11 +7,13 @@ import (
 	"time"
 )
 
+type DataString string
+
 // Block represents each 'item' in the blockchain
 type Block struct {
 	Index     int
 	Timestamp string
-	Storage   map[string]Data
+	Storage   map[string]map[string]Data
 	Hash      string
 	PrevHash  string
 }
@@ -50,7 +52,7 @@ func (b Block) Checksum() string {
 }
 
 // create a new block using previous block's hash
-func (oldBlock Block) NewBlock(s map[string]Data) Block {
+func (oldBlock Block) NewBlock(s map[string]map[string]Data) Block {
 	var newBlock Block
 
 	t := time.Now()

+ 8 - 7
pkg/blockchain/data.go

@@ -1,10 +1,11 @@
 package blockchain
 
-type Data struct {
-	PeerID   string
-	Hostname string
-	OS       string
-	Arch     string
-	Address  string
-	Version  string
+import "encoding/json"
+
+type Data string
+
+// Unmarshal the result into the interface. Use it to retrieve data
+// set with SetValue
+func (d Data) Unmarshal(i interface{}) error {
+	return json.Unmarshal([]byte(d), i)
 }

+ 16 - 10
pkg/blockchain/ledger.go

@@ -30,7 +30,7 @@ func New(w io.Writer, maxChainSize int) *Ledger {
 func (l *Ledger) newGenesis() {
 	t := time.Now()
 	genesisBlock := Block{}
-	genesisBlock = Block{0, t.String(), map[string]Data{}, genesisBlock.Checksum(), ""}
+	genesisBlock = Block{0, t.String(), map[string]map[string]Data{}, genesisBlock.Checksum(), ""}
 	l.blockchain = append(l.blockchain, genesisBlock)
 }
 
@@ -112,27 +112,29 @@ func (l *Ledger) lastBlock() Block {
 }
 
 // GetKey retrieve the current key from the blockchain
-func (l *Ledger) GetKey(s string) (value Data, exists bool) {
+func (l *Ledger) GetKey(b, s string) (value Data, exists bool) {
 	l.Lock()
 	defer l.Unlock()
 
 	if len(l.blockchain) > 0 {
 		last := l.lastBlock()
-		value, exists = last.Storage[s]
+		if _, exists = last.Storage[b]; !exists {
+			return
+		}
+		value, exists = last.Storage[b][s]
 		if exists {
 			return
 		}
 	}
-
 	return
 }
 
 // ExistsValue returns true if there is one element with a matching value
-func (l *Ledger) Exists(f func(Data) bool) (exists bool) {
+func (l *Ledger) Exists(b string, f func(Data) bool) (exists bool) {
 	l.Lock()
 	defer l.Unlock()
 	if len(l.blockchain) > 0 {
-		for _, bv := range l.lastBlock().Storage {
+		for _, bv := range l.lastBlock().Storage[b] {
 			if f(bv) {
 				exists = true
 				return
@@ -143,7 +145,7 @@ func (l *Ledger) Exists(f func(Data) bool) (exists bool) {
 	return
 }
 
-func (l *Ledger) CurrentData() map[string]Data {
+func (l *Ledger) CurrentData() map[string]map[string]Data {
 	l.Lock()
 	defer l.Unlock()
 	return l.lastBlock().Storage
@@ -156,17 +158,21 @@ func (l *Ledger) BlockChain() Blockchain {
 }
 
 // Add data to the blockchain
-func (l *Ledger) Add(s map[string]Data) {
+func (l *Ledger) Add(b string, s map[string]interface{}) {
 	l.Lock()
 	current := l.lastBlock().Storage
 	for s, k := range s {
-		current[s] = k
+		if _, exists := current[b]; !exists {
+			current[b] = make(map[string]Data)
+		}
+		dat, _ := json.Marshal(k)
+		current[b][s] = Data(string(dat))
 	}
 	l.Unlock()
 	l.writeData(current)
 }
 
-func (l *Ledger) writeData(s map[string]Data) {
+func (l *Ledger) writeData(s map[string]map[string]Data) {
 	newBlock := l.lastBlock().NewBlock(s)
 
 	if newBlock.IsValid(l.lastBlock()) {

+ 20 - 119
pkg/edgevpn/edgevpn.go

@@ -2,7 +2,6 @@ package edgevpn
 
 import (
 	"context"
-	"fmt"
 	"io"
 	"net"
 	"os"
@@ -16,12 +15,15 @@ import (
 	pubsub "github.com/libp2p/go-libp2p-pubsub"
 	"github.com/mudler/edgevpn/internal"
 	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/edgevpn/types"
 	hub "github.com/mudler/edgevpn/pkg/hub"
 	"github.com/songgao/packets/ethernet"
 	"github.com/songgao/water"
 	"golang.org/x/net/ipv4"
 )
 
+const MachinesLedgerKey = "machines"
+
 type EdgeVPN struct {
 	config  Config
 	HubRoom *hub.Room
@@ -48,114 +50,6 @@ func New(p ...Option) *EdgeVPN {
 	}
 }
 
-func (e *EdgeVPN) ExposeService(ledger *blockchain.Ledger, serviceID, dstaddress string) {
-	// 1) Register the ServiceID <-> PeerID Association
-	// By announcing periodically our service to the blockchain
-	ledger.Announce(
-		context.Background(),
-		e.config.LedgerAnnounceTime,
-		func() {
-			key := fmt.Sprintf("service-%s", serviceID)
-			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(key)
-			// If mismatch, update the blockchain
-			if !found || existingValue.PeerID != e.host.ID().String() {
-				updatedMap := map[string]blockchain.Data{}
-				updatedMap[key] = blockchain.Data{PeerID: e.host.ID().String()}
-				ledger.Add(updatedMap)
-			}
-		},
-	)
-
-	// 2) Set a stream handler
-	//    which connect to the given address/Port and Send what we receive from the Stream.
-	e.config.StreamHandlers[protocol.ID(ServiceProtocol)] = func(stream network.Stream) {
-		go func() {
-			e.config.Logger.Sugar().Info("Received connection from", stream.Conn().RemotePeer().String())
-			// TODO: Gate connection by PeerID: stream.Conn().RemotePeer().String()
-			// we need a list of known peers
-			e.config.Logger.Sugar().Info("Dialing", dstaddress)
-
-			c, err := net.Dial("tcp", dstaddress)
-			if err != nil {
-				e.config.Logger.Sugar().Info("Reset", stream.Conn().RemotePeer().String(), err.Error())
-				stream.Reset()
-				return
-			}
-			closer := make(chan struct{}, 2)
-			go copyStream(closer, stream, c)
-			go copyStream(closer, c, stream)
-			<-closer
-
-			stream.Close()
-			c.Close()
-
-			e.config.Logger.Sugar().Info("Done", stream.Conn().RemotePeer().String())
-
-		}()
-	}
-
-}
-
-func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
-	_, _ = io.Copy(dst, src)
-	closer <- struct{}{} // connection is closed, send signal to stop proxy
-}
-
-func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string, srcaddr string) error {
-	// Open local port for listening
-	l, err := net.Listen("tcp", srcaddr)
-	if err != nil {
-		return err
-	}
-	fmt.Println("Listening on ", srcaddr)
-
-	defer l.Close()
-	for {
-		// Listen for an incoming connection.
-		conn, err := l.Accept()
-		if err != nil {
-			fmt.Println("Error accepting: ", err.Error())
-			os.Exit(1)
-		}
-		e.config.Logger.Sugar().Info("New connection from", l.Addr().String())
-		// Handle connections in a new goroutine, forwarding to the p2p service
-		go func() {
-			key := fmt.Sprintf("service-%s", serviceID)
-			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(key)
-			// If mismatch, update the blockchain
-			if !found {
-				e.config.Logger.Sugar().Info("service not found on blockchain")
-				return
-			}
-			// Decode the Peer
-			d, err := peer.Decode(existingValue.PeerID)
-			if err != nil {
-				e.config.Logger.Sugar().Infof("could not decode peer '%s'", existingValue.PeerID)
-				return
-			}
-
-			// Open a stream
-			stream, err := e.host.NewStream(context.Background(), d, ServiceProtocol)
-			if err != nil {
-				e.config.Logger.Sugar().Infof("could not open stream '%s'", err.Error())
-				return
-			}
-			e.config.Logger.Sugar().Info("Redirecting", l.Addr().String(), "to", serviceID)
-
-			closer := make(chan struct{}, 2)
-			go copyStream(closer, stream, conn)
-			go copyStream(closer, conn, stream)
-			<-closer
-
-			stream.Close()
-			conn.Close()
-			e.config.Logger.Sugar().Info("Done handling", l.Addr().String(), "to", serviceID)
-		}()
-	}
-}
-
 // Join the network with the ledger.
 // It does the minimal action required to be connected
 // without any active packet routing
@@ -178,10 +72,10 @@ func (e *EdgeVPN) Join(ledger *blockchain.Ledger) error {
 	return nil
 }
 
-func newBlockChainData(e *EdgeVPN, address string) blockchain.Data {
+func newBlockChainData(e *EdgeVPN, address string) types.Machine {
 	hostname, _ := os.Hostname()
 
-	return blockchain.Data{
+	return types.Machine{
 		PeerID:   e.host.ID().String(),
 		Hostname: hostname,
 		OS:       runtime.GOOS,
@@ -225,13 +119,16 @@ func (e *EdgeVPN) Start() error {
 		context.Background(),
 		e.config.LedgerAnnounceTime,
 		func() {
+			machine := &types.Machine{}
 			// Retrieve current ID for ip in the blockchain
-			existingValue, found := ledger.GetKey(ip.String())
+			existingValue, found := ledger.GetKey(MachinesLedgerKey, ip.String())
+			existingValue.Unmarshal(machine)
+
 			// If mismatch, update the blockchain
-			if !found || existingValue.PeerID != e.host.ID().String() {
-				updatedMap := map[string]blockchain.Data{}
+			if !found || machine.PeerID != e.host.ID().String() {
+				updatedMap := map[string]interface{}{}
 				updatedMap[ip.String()] = newBlockChainData(e, ip.String())
-				ledger.Add(updatedMap)
+				ledger.Add(MachinesLedgerKey, updatedMap)
 			}
 		},
 	)
@@ -266,9 +163,11 @@ func (e *EdgeVPN) MessageWriter(opts ...hub.MessageOption) (*MessageWriter, erro
 
 func streamHandler(ledger *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
 	return func(stream network.Stream) {
-		if !ledger.Exists(
+		if !ledger.Exists(MachinesLedgerKey,
 			func(d blockchain.Data) bool {
-				return d.PeerID == stream.Conn().RemotePeer().String()
+				machine := &types.Machine{}
+				d.Unmarshal(machine)
+				return machine.PeerID == stream.Conn().RemotePeer().String()
 			}) {
 			stream.Reset()
 			return
@@ -300,14 +199,16 @@ func (e *EdgeVPN) readPackets(ledger *blockchain.Ledger, ifce *water.Interface)
 		dst := header.Dst.String()
 
 		// Query the routing table
-		value, found := ledger.GetKey(dst)
+		value, found := ledger.GetKey(MachinesLedgerKey, dst)
 		if !found {
 			e.config.Logger.Sugar().Infof("'%s' not found in the routing table", dst)
 			continue
 		}
+		machine := &types.Machine{}
+		value.Unmarshal(machine)
 
 		// Decode the Peer
-		d, err := peer.Decode(value.PeerID)
+		d, err := peer.Decode(machine.PeerID)
 		if err != nil {
 			e.config.Logger.Sugar().Infof("could not decode peer '%s'", value)
 			continue

+ 128 - 0
pkg/edgevpn/services.go

@@ -0,0 +1,128 @@
+package edgevpn
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"os"
+
+	"github.com/libp2p/go-libp2p-core/network"
+	"github.com/libp2p/go-libp2p-core/peer"
+	"github.com/libp2p/go-libp2p-core/protocol"
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/edgevpn/types"
+)
+
+const ServicesLedgerKey = "services"
+
+func (e *EdgeVPN) ExposeService(ledger *blockchain.Ledger, serviceID, dstaddress string) {
+	// 1) Register the ServiceID <-> PeerID Association
+	// By announcing periodically our service to the blockchain
+	ledger.Announce(
+		context.Background(),
+		e.config.LedgerAnnounceTime,
+		func() {
+			// Retrieve current ID for ip in the blockchain
+			existingValue, found := ledger.GetKey(ServicesLedgerKey, serviceID)
+			service := &types.Service{}
+			existingValue.Unmarshal(service)
+			// If mismatch, update the blockchain
+			fmt.Println(service)
+			if !found || service.PeerID != e.host.ID().String() {
+				updatedMap := map[string]interface{}{}
+				updatedMap[serviceID] = types.Service{PeerID: e.host.ID().String(), Name: serviceID}
+				ledger.Add(ServicesLedgerKey, updatedMap)
+			}
+		},
+	)
+
+	// 2) Set a stream handler
+	//    which connect to the given address/Port and Send what we receive from the Stream.
+	e.config.StreamHandlers[protocol.ID(ServiceProtocol)] = func(stream network.Stream) {
+		go func() {
+			e.config.Logger.Sugar().Info("Received connection from", stream.Conn().RemotePeer().String())
+			// TODO: Gate connection by PeerID: stream.Conn().RemotePeer().String()
+			// we need a list of known peers
+			e.config.Logger.Sugar().Info("Dialing", dstaddress)
+
+			c, err := net.Dial("tcp", dstaddress)
+			if err != nil {
+				e.config.Logger.Sugar().Info("Reset", stream.Conn().RemotePeer().String(), err.Error())
+				stream.Reset()
+				return
+			}
+			closer := make(chan struct{}, 2)
+			go copyStream(closer, stream, c)
+			go copyStream(closer, c, stream)
+			<-closer
+
+			stream.Close()
+			c.Close()
+
+			e.config.Logger.Sugar().Info("Done", stream.Conn().RemotePeer().String())
+
+		}()
+	}
+
+}
+
+func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string, srcaddr string) error {
+	// Open local port for listening
+	l, err := net.Listen("tcp", srcaddr)
+	if err != nil {
+		return err
+	}
+	fmt.Println("Listening on ", srcaddr)
+
+	defer l.Close()
+	for {
+		// Listen for an incoming connection.
+		conn, err := l.Accept()
+		if err != nil {
+			fmt.Println("Error accepting: ", err.Error())
+			os.Exit(1)
+		}
+		e.config.Logger.Sugar().Info("New connection from", l.Addr().String())
+		// Handle connections in a new goroutine, forwarding to the p2p service
+		go func() {
+			// Retrieve current ID for ip in the blockchain
+			existingValue, found := ledger.GetKey(ServicesLedgerKey, serviceID)
+			service := &types.Service{}
+			existingValue.Unmarshal(service)
+			// If mismatch, update the blockchain
+			if !found {
+				e.config.Logger.Sugar().Info("service not found on blockchain")
+				return
+			}
+			// Decode the Peer
+			d, err := peer.Decode(service.PeerID)
+			if err != nil {
+				e.config.Logger.Sugar().Infof("could not decode peer '%s'", service.PeerID)
+				return
+			}
+
+			// Open a stream
+			stream, err := e.host.NewStream(context.Background(), d, ServiceProtocol)
+			if err != nil {
+				e.config.Logger.Sugar().Infof("could not open stream '%s'", err.Error())
+				return
+			}
+			e.config.Logger.Sugar().Info("Redirecting", l.Addr().String(), "to", serviceID)
+
+			closer := make(chan struct{}, 2)
+			go copyStream(closer, stream, conn)
+			go copyStream(closer, conn, stream)
+			<-closer
+
+			stream.Close()
+			conn.Close()
+			e.config.Logger.Sugar().Info("Done handling", l.Addr().String(), "to", serviceID)
+		}()
+	}
+}
+
+func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
+	_, _ = io.Copy(dst, src)
+	closer <- struct{}{} // connection is closed, send signal to stop proxy
+}

+ 10 - 0
pkg/edgevpn/types/machine.go

@@ -0,0 +1,10 @@
+package types
+
+type Machine struct {
+	PeerID   string
+	Hostname string
+	OS       string
+	Arch     string
+	Address  string
+	Version  string
+}

+ 6 - 0
pkg/edgevpn/types/service.go

@@ -0,0 +1,6 @@
+package types
+
+type Service struct {
+	PeerID string
+	Name   string
+}