2
0
Эх сурвалжийг харах

:wrench: Add ability to expose Services

Add service-connect and service-add to connect and expose services
without connecting to the VPN.

Service connect binds a local port to a service over the network, while
service-add binds a tcp connection to a service over the vpn.
Ettore Di Giacinto 3 жил өмнө
parent
commit
61b69df5b5

+ 68 - 0
cmd/service.go

@@ -0,0 +1,68 @@
+package cmd
+
+import (
+	"github.com/mudler/edgevpn/pkg/blockchain"
+	"github.com/mudler/edgevpn/pkg/edgevpn"
+	"github.com/urfave/cli"
+	"go.uber.org/zap"
+)
+
+func ServiceAdd(l *zap.Logger) cli.Command {
+	return cli.Command{
+		Name:        "service-add",
+		Description: "expose a service to the network",
+		Flags: append(CommonFlags,
+			cli.StringFlag{Name: "name"},
+			cli.StringFlag{Name: "remoteaddress"},
+		),
+		Action: func(c *cli.Context) error {
+			e := edgevpn.New(cliToOpts(l, c)...)
+
+			mw, err := e.MessageWriter()
+			if err != nil {
+				return err
+			}
+
+			ledger := blockchain.New(mw, 1000)
+
+			// Join the node to the network, using our ledger
+			e.ExposeService(ledger, c.String("name"), c.String("remoteaddress"))
+			// Join the node to the network, using our ledger
+			if err := e.Join(ledger); err != nil {
+				return err
+			}
+
+			for {
+			}
+		},
+	}
+}
+
+func ServiceConnect(l *zap.Logger) cli.Command {
+	return cli.Command{
+		Name:        "service-connect",
+		Description: "bind a local port to connect to a remote service",
+		Flags: append(CommonFlags,
+			cli.StringFlag{Name: "name"},
+
+			cli.StringFlag{Name: "srcaddress"},
+		),
+		Action: func(c *cli.Context) error {
+			e := edgevpn.New(cliToOpts(l, c)...)
+
+			mw, err := e.MessageWriter()
+			if err != nil {
+				return err
+			}
+
+			ledger := blockchain.New(mw, 1000)
+
+			// Join the node to the network, using our ledger
+			if err := e.Join(ledger); err != nil {
+				return err
+			}
+
+			return e.ConnectToService(ledger, c.String("name"), c.String("srcaddress"))
+		},
+	}
+}

+ 2 - 0
main.go

@@ -40,6 +40,8 @@ func main() {
 		Commands: []cli.Command{
 			cmd.Join(l),
 			cmd.API(l),
+			cmd.ServiceAdd(l),
+			cmd.ServiceConnect(l),
 		},
 
 		Action: cmd.Main(l),

+ 2 - 0
pkg/blockchain/ledger.go

@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/mudler/edgevpn/pkg/hub"
+	"github.com/pkg/errors"
 )
 
 type Ledger struct {
@@ -73,6 +74,7 @@ func (l *Ledger) Update(h *hub.Message) (err error) {
 
 	err = json.Unmarshal([]byte(h.Message), &chain)
 	if err != nil {
+		err = errors.Wrap(err, "failed unmarshalling blockchain data")
 		return
 	}
 

+ 4 - 1
pkg/edgevpn/connection.go

@@ -16,7 +16,10 @@ import (
 	"github.com/xlzd/gotp"
 )
 
-const Protocol = "/edgevpn/0.1"
+const (
+	Protocol        = "/edgevpn/0.1"
+	ServiceProtocol = "/edgevpn/service/0.1"
+)
 
 var defaultLibp2pOptions = []libp2p.Option{
 	libp2p.EnableAutoRelay(),

+ 112 - 0
pkg/edgevpn/edgevpn.go

@@ -2,6 +2,7 @@ package edgevpn
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"net"
 	"os"
@@ -47,6 +48,117 @@ func New(p ...Option) *EdgeVPN {
 	}
 }
 
+func (e *EdgeVPN) ExposeService(ledger *blockchain.Ledger, serviceID, dstaddress string) {
+	// 1) Register the ServiceID <-> PeerID Association
+	// By announcing periodically our service to the blockchain
+	ledger.Announce(
+		context.Background(),
+		e.config.LedgerAnnounceTime,
+		func() {
+			key := fmt.Sprintf("service-%s", serviceID)
+			// Retrieve current ID for ip in the blockchain
+			existingValue, found := ledger.GetKey(key)
+			// If mismatch, update the blockchain
+			if !found || existingValue.PeerID != e.host.ID().String() {
+				updatedMap := map[string]blockchain.Data{}
+				updatedMap[key] = blockchain.Data{PeerID: e.host.ID().String()}
+				ledger.Add(updatedMap)
+			}
+		},
+	)
+
+	// 2) Set a stream handler
+	//    which connect to the given address/Port and Send what we receive from the Stream.
+	e.config.StreamHandlers[protocol.ID(ServiceProtocol)] = func(stream network.Stream) {
+		go func() {
+			e.config.Logger.Sugar().Info("Received connection from", stream.Conn().RemotePeer().String())
+			// TODO: Gate connection by PeerID: stream.Conn().RemotePeer().String()
+			// we need a list of known peers
+			e.config.Logger.Sugar().Info("Dialing", dstaddress)
+
+			c, err := net.Dial("tcp", dstaddress)
+			if err != nil {
+				e.config.Logger.Sugar().Info("Reset", stream.Conn().RemotePeer().String(), err.Error())
+				stream.Reset()
+				return
+			}
+			closer := make(chan struct{}, 2)
+			go copyStream(closer, stream, c)
+			go copyStream(closer, c, stream)
+			<-closer
+
+			stream.Close()
+			c.Close()
+
+			e.config.Logger.Sugar().Info("Done", stream.Conn().RemotePeer().String())
+
+		}()
+	}
+
+}
+
+func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
+	_, _ = io.Copy(dst, src)
+	closer <- struct{}{} // connection is closed, send signal to stop proxy
+}
+
+func (e *EdgeVPN) ConnectToService(ledger *blockchain.Ledger, serviceID string, srcaddr string) error {
+	// Open local port for listening
+	l, err := net.Listen("tcp", srcaddr)
+	if err != nil {
+		return err
+	}
+	fmt.Println("Listening on ", srcaddr)
+
+	defer l.Close()
+	for {
+		// Listen for an incoming connection.
+		conn, err := l.Accept()
+		if err != nil {
+			fmt.Println("Error accepting: ", err.Error())
+			os.Exit(1)
+		}
+		e.config.Logger.Sugar().Info("New connection from", l.Addr().String())
+		// Handle connections in a new goroutine, forwarding to the p2p service
+		go func() {
+			key := fmt.Sprintf("service-%s", serviceID)
+			// Retrieve current ID for ip in the blockchain
+			existingValue, found := ledger.GetKey(key)
+			// If mismatch, update the blockchain
+			if !found {
+				e.config.Logger.Sugar().Info("service not found on blockchain")
+				return
+			}
+			// Decode the Peer
+			d, err := peer.Decode(existingValue.PeerID)
+			if err != nil {
+				e.config.Logger.Sugar().Infof("could not decode peer '%s'", existingValue.PeerID)
+				return
+			}
+
+			// Open a stream
+			stream, err := e.host.NewStream(context.Background(), d, ServiceProtocol)
+			if err != nil {
+				e.config.Logger.Sugar().Infof("could not open stream '%s'", err.Error())
+				return
+			}
+			e.config.Logger.Sugar().Info("Redirecting", l.Addr().String(), "to", serviceID)
+
+			closer := make(chan struct{}, 2)
+			go copyStream(closer, stream, conn)
+			go copyStream(closer, conn, stream)
+			<-closer
+
+			stream.Close()
+			conn.Close()
+			e.config.Logger.Sugar().Info("Done handling", l.Addr().String(), "to", serviceID)
+		}()
+	}
+}
+
+// Join the network with the ledger.
+// It does the minimal action required to be connected
+// without any active packet routing
 func (e *EdgeVPN) Join(ledger *blockchain.Ledger) error {
 	// Set the handler when we receive messages
 	// The ledger needs to read them and update the internal blockchain