Browse Source

:gear: Split networkservices

Ettore Di Giacinto 3 years ago
parent
commit
91ef3d2d9a
6 changed files with 229 additions and 219 deletions
  1. 36 34
      pkg/services/alive.go
  2. 22 21
      pkg/services/dns.go
  3. 24 20
      pkg/services/files.go
  4. 23 22
      pkg/services/services.go
  5. 74 72
      pkg/vpn/dhcp.go
  6. 50 50
      pkg/vpn/vpn.go

+ 36 - 34
pkg/services/alive.go

@@ -26,44 +26,46 @@ import (
 	"github.com/mudler/edgevpn/pkg/blockchain"
 )
 
+func AliveNetworkService(announcetime, scrubTime, maxtime time.Duration) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		t := time.Now()
+		// By announcing periodically our service to the blockchain
+		b.Announce(
+			ctx,
+			announcetime,
+			func() {
+				// Keep-alive
+				b.Add(protocol.HealthCheckKey, map[string]interface{}{
+					n.Host().ID().String(): time.Now().Format(time.RFC3339),
+				})
+
+				// Keep-alive scrub
+				nodes := AvailableNodes(b, maxtime)
+				if len(nodes) == 0 {
+					return
+				}
+				lead := utils.Leader(nodes)
+				if !t.Add(scrubTime).After(time.Now()) {
+					// Update timer so not-leader do not attempt to delete bucket afterwards
+					// prevent cycles
+					t = time.Now()
+
+					if lead == n.Host().ID().String() {
+						// Automatically scrub after some time passed
+						b.DeleteBucket(protocol.HealthCheckKey)
+					}
+				}
+			},
+		)
+		return nil
+	}
+}
+
 // Alive announce the node every announce time, with a periodic scrub time for healthchecks
 // the maxtime is the time used to determine when a node is unreachable (after maxtime, its unreachable)
 func Alive(announcetime, scrubTime, maxtime time.Duration) []node.Option {
 	return []node.Option{
-		node.WithNetworkService(
-			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-				t := time.Now()
-				// By announcing periodically our service to the blockchain
-				b.Announce(
-					ctx,
-					announcetime,
-					func() {
-						// Keep-alive
-						b.Add(protocol.HealthCheckKey, map[string]interface{}{
-							n.Host().ID().String(): time.Now().Format(time.RFC3339),
-						})
-
-						// Keep-alive scrub
-						nodes := AvailableNodes(b, maxtime)
-						if len(nodes) == 0 {
-							return
-						}
-						lead := utils.Leader(nodes)
-						if !t.Add(scrubTime).After(time.Now()) {
-							// Update timer so not-leader do not attempt to delete bucket afterwards
-							// prevent cycles
-							t = time.Now()
-
-							if lead == n.Host().ID().String() {
-								// Automatically scrub after some time passed
-								b.DeleteBucket(protocol.HealthCheckKey)
-							}
-						}
-					},
-				)
-				return nil
-			},
-		),
+		node.WithNetworkService(AliveNetworkService(announcetime, scrubTime, maxtime)),
 	}
 }
 

+ 22 - 21
pkg/services/dns.go

@@ -31,31 +31,32 @@ import (
 	"github.com/pkg/errors"
 )
 
+func DNSNetworkService(listenAddr string, forwarder bool, forward []string, cacheSize int) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		server := &dns.Server{Addr: listenAddr, Net: "udp"}
+		cache, err := lru.New(cacheSize)
+		if err != nil {
+			return err
+		}
+		go func() {
+			dns.HandleFunc(".", dnsHandler{ctx, b, forwarder, forward, cache}.handleDNSRequest())
+			fmt.Println(server.ListenAndServe())
+		}()
+
+		go func() {
+			<-ctx.Done()
+			server.Shutdown()
+		}()
+
+		return nil
+	}
+}
+
 // DNS returns a network service binding a dns blockchain resolver on listenAddr.
 // Takes an associated name for the addresses in the blockchain
 func DNS(listenAddr string, forwarder bool, forward []string, cacheSize int) []node.Option {
 	return []node.Option{
-		node.WithNetworkService(
-			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-
-				server := &dns.Server{Addr: listenAddr, Net: "udp"}
-				cache, err := lru.New(cacheSize)
-				if err != nil {
-					return err
-				}
-				go func() {
-					dns.HandleFunc(".", dnsHandler{ctx, b, forwarder, forward, cache}.handleDNSRequest())
-					fmt.Println(server.ListenAndServe())
-				}()
-
-				go func() {
-					<-ctx.Done()
-					server.Shutdown()
-				}()
-
-				return nil
-			},
-		),
+		node.WithNetworkService(DNSNetworkService(listenAddr, forwarder, forward, cacheSize)),
 	}
 }
 

+ 24 - 20
pkg/services/files.go

@@ -32,6 +32,29 @@ import (
 	"github.com/pkg/errors"
 )
 
+func SharefileNetworkService(announcetime time.Duration, fileID string) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		// By announcing periodically our service to the blockchain
+		b.Announce(
+			ctx,
+			announcetime,
+			func() {
+				// Retrieve current ID for ip in the blockchain
+				existingValue, found := b.GetKey(protocol.FilesLedgerKey, fileID)
+				service := &types.Service{}
+				existingValue.Unmarshal(service)
+				// If mismatch, update the blockchain
+				if !found || service.PeerID != n.Host().ID().String() {
+					updatedMap := map[string]interface{}{}
+					updatedMap[fileID] = types.File{PeerID: n.Host().ID().String(), Name: fileID}
+					b.Add(protocol.FilesLedgerKey, updatedMap)
+				}
+			},
+		)
+		return nil
+	}
+}
+
 // ShareFile shares a file to the p2p network.
 // meant to be called before a node is started with Start()
 func ShareFile(ll log.StandardLogger, announcetime time.Duration, fileID, filepath string) ([]node.Option, error) {
@@ -43,26 +66,7 @@ func ShareFile(ll log.StandardLogger, announcetime time.Duration, fileID, filepa
 	ll.Infof("Serving '%s' as '%s'", filepath, fileID)
 	return []node.Option{
 		node.WithNetworkService(
-			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-				// By announcing periodically our service to the blockchain
-				b.Announce(
-					ctx,
-					announcetime,
-					func() {
-						// Retrieve current ID for ip in the blockchain
-						existingValue, found := b.GetKey(protocol.FilesLedgerKey, fileID)
-						service := &types.Service{}
-						existingValue.Unmarshal(service)
-						// If mismatch, update the blockchain
-						if !found || service.PeerID != n.Host().ID().String() {
-							updatedMap := map[string]interface{}{}
-							updatedMap[fileID] = types.File{PeerID: n.Host().ID().String(), Name: fileID}
-							b.Add(protocol.FilesLedgerKey, updatedMap)
-						}
-					},
-				)
-				return nil
-			},
+			SharefileNetworkService(announcetime, fileID),
 		),
 		node.WithStreamHandler(protocol.FileProtocol,
 			func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {

+ 23 - 22
pkg/services/services.go

@@ -32,6 +32,28 @@ import (
 	"github.com/mudler/edgevpn/pkg/types"
 )
 
+func ExposeNetworkService(announcetime time.Duration, serviceID string) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		b.Announce(
+			ctx,
+			announcetime,
+			func() {
+				// Retrieve current ID for ip in the blockchain
+				existingValue, found := b.GetKey(protocol.ServicesLedgerKey, serviceID)
+				service := &types.Service{}
+				existingValue.Unmarshal(service)
+				// If mismatch, update the blockchain
+				if !found || service.PeerID != n.Host().ID().String() {
+					updatedMap := map[string]interface{}{}
+					updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
+					b.Add(protocol.ServicesLedgerKey, updatedMap)
+				}
+			},
+		)
+		return nil
+	}
+}
+
 // ExposeService exposes a service to the p2p network.
 // meant to be called before a node is started with Start()
 func RegisterService(ll log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) []node.Option {
@@ -70,28 +92,7 @@ func RegisterService(ll log.StandardLogger, announcetime time.Duration, serviceI
 				}()
 			}
 		}),
-
-		node.WithNetworkService(
-			func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-				b.Announce(
-					ctx,
-					announcetime,
-					func() {
-						// Retrieve current ID for ip in the blockchain
-						existingValue, found := b.GetKey(protocol.ServicesLedgerKey, serviceID)
-						service := &types.Service{}
-						existingValue.Unmarshal(service)
-						// If mismatch, update the blockchain
-						if !found || service.PeerID != n.Host().ID().String() {
-							updatedMap := map[string]interface{}{}
-							updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
-							b.Add(protocol.ServicesLedgerKey, updatedMap)
-						}
-					},
-				)
-				return nil
-			},
-		)}
+		node.WithNetworkService(ExposeNetworkService(announcetime, serviceID))}
 }
 
 func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node *node.Node, ll log.StandardLogger, announcetime time.Duration, serviceID string, srcaddr string) error {

+ 74 - 72
pkg/vpn/dhcp.go

@@ -46,6 +46,79 @@ func checkDHCPLease(c node.Config, leasedir string) string {
 	return ""
 }
 
+// DHCPNetworkService returns a DHCP network service
+func DHCPNetworkService(ip chan string, l log.StandardLogger, maxTime time.Duration, leasedir string, address string) node.NetworkService {
+	return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
+		os.MkdirAll(leasedir, 0600)
+
+		// retrieve lease if present
+		var wantedIP = checkDHCPLease(c, leasedir)
+
+		//  whoever wants a new IP:
+		//  1. Get available nodes. Filter from Machine those that do not have an IP.
+		//  2. Get the leader among them. If we are not, we wait
+		//  3. If we are the leader, pick an IP and start the VPN with that IP
+		for wantedIP == "" {
+			time.Sleep(5 * time.Second)
+
+			// This network service is blocking and calls in before VPN, hence it needs to registered before VPN
+			nodes := services.AvailableNodes(b, maxTime)
+
+			currentIPs := map[string]string{}
+			ips := []string{}
+
+			for _, t := range b.LastBlock().Storage[protocol.MachinesLedgerKey] {
+				var m types.Machine
+				t.Unmarshal(&m)
+				currentIPs[m.PeerID] = m.Address
+
+				l.Debugf("%s uses %s", m.PeerID, m.Address)
+				ips = append(ips, m.Address)
+			}
+
+			nodesWithNoIP := []string{}
+			for _, nn := range nodes {
+				if _, exists := currentIPs[nn]; !exists {
+					nodesWithNoIP = append(nodesWithNoIP, nn)
+				}
+			}
+
+			if len(nodes) <= 1 {
+				l.Debug("not enough nodes to determine an IP, sleeping")
+				continue
+			}
+
+			lead := utils.Leader(nodesWithNoIP)
+			l.Debug("Nodes with no ip", nodesWithNoIP)
+
+			if n.Host().ID().String() != lead {
+				l.Debug("Not leader, sleeping")
+				time.Sleep(5 * time.Second)
+				continue
+			}
+
+			// We are lead
+			l.Debug("picking up between", ips)
+
+			wantedIP = utils.NextIP(address, ips)
+		}
+
+		// Save lease to disk
+		leaseFileName := crypto.MD5(fmt.Sprintf("%s-ek", c.ExchangeKey))
+		leaseFile := filepath.Join(leasedir, leaseFileName)
+		l.Debugf("Writing lease to '%s'", leaseFile)
+		if err := ioutil.WriteFile(leaseFile, []byte(wantedIP), 0600); err != nil {
+			l.Warn(err)
+		}
+
+		// propagate ip to channel that is read while starting vpn
+		ip <- wantedIP
+
+		// Gate connections from VPN
+		return n.BlockSubnet(fmt.Sprintf("%s/24", wantedIP))
+	}
+}
+
 // DHCP returns a DHCP network service. It requires the Alive Service in order to determine available nodes.
 // Nodes available are used to determine which needs an IP and when maxTime expires nodes are marked as offline and
 // not considered.
@@ -60,78 +133,7 @@ func DHCP(l log.StandardLogger, maxTime time.Duration, leasedir string, address
 				}
 				return nil
 			},
-			node.WithNetworkService(
-				func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
-
-					os.MkdirAll(leasedir, 0600)
-
-					// retrieve lease if present
-					var wantedIP = checkDHCPLease(c, leasedir)
-
-					//  whoever wants a new IP:
-					//  1. Get available nodes. Filter from Machine those that do not have an IP.
-					//  2. Get the leader among them. If we are not, we wait
-					//  3. If we are the leader, pick an IP and start the VPN with that IP
-					for wantedIP == "" {
-						time.Sleep(5 * time.Second)
-
-						// This network service is blocking and calls in before VPN, hence it needs to registered before VPN
-						nodes := services.AvailableNodes(b, maxTime)
-
-						currentIPs := map[string]string{}
-						ips := []string{}
-
-						for _, t := range b.LastBlock().Storage[protocol.MachinesLedgerKey] {
-							var m types.Machine
-							t.Unmarshal(&m)
-							currentIPs[m.PeerID] = m.Address
-
-							l.Debugf("%s uses %s", m.PeerID, m.Address)
-							ips = append(ips, m.Address)
-						}
-
-						nodesWithNoIP := []string{}
-						for _, nn := range nodes {
-							if _, exists := currentIPs[nn]; !exists {
-								nodesWithNoIP = append(nodesWithNoIP, nn)
-							}
-						}
-
-						if len(nodes) <= 1 {
-							l.Debug("not enough nodes to determine an IP, sleeping")
-							continue
-						}
-
-						lead := utils.Leader(nodesWithNoIP)
-						l.Debug("Nodes with no ip", nodesWithNoIP)
-
-						if n.Host().ID().String() != lead {
-							l.Debug("Not leader, sleeping")
-							time.Sleep(5 * time.Second)
-							continue
-						}
-
-						// We are lead
-						l.Debug("picking up between", ips)
-
-						wantedIP = utils.NextIP(address, ips)
-					}
-
-					// Save lease to disk
-					leaseFileName := crypto.MD5(fmt.Sprintf("%s-ek", c.ExchangeKey))
-					leaseFile := filepath.Join(leasedir, leaseFileName)
-					l.Debugf("Writing lease to '%s'", leaseFile)
-					if err := ioutil.WriteFile(leaseFile, []byte(wantedIP), 0600); err != nil {
-						l.Warn(err)
-					}
-
-					// propagate ip to channel that is read while starting vpn
-					ip <- wantedIP
-
-					// Gate connections from VPN
-					return n.BlockSubnet(fmt.Sprintf("%s/24", wantedIP))
-				},
-			),
+			node.WithNetworkService(DHCPNetworkService(ip, l, maxTime, leasedir, address)),
 		}, []Option{
 			func(cfg *Config) error {
 				// read back IP when starting vpn

+ 50 - 50
pkg/vpn/vpn.go

@@ -41,64 +41,64 @@ import (
 	"golang.org/x/net/ipv4"
 )
 
-// Start the node and the vpn. Returns an error in case of failure
-// When starting the vpn, there is no need to start the node
-func Register(p ...Option) ([]node.Option, error) {
-
-	return []node.Option{
-		node.WithNetworkService(func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
-			c := &Config{
-				Concurrency:        1,
-				LedgerAnnounceTime: 5 * time.Second,
-				Timeout:            15 * time.Second,
-				Logger:             logger.New(log.LevelDebug),
-			}
-			c.Apply(p...)
+func VPNNetworkService(p ...Option) node.NetworkService {
+	return func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
+		c := &Config{
+			Concurrency:        1,
+			LedgerAnnounceTime: 5 * time.Second,
+			Timeout:            15 * time.Second,
+			Logger:             logger.New(log.LevelDebug),
+		}
+		c.Apply(p...)
 
-			ifce, err := createInterface(c)
-			if err != nil {
-				return err
-			}
-			defer ifce.Close()
+		ifce, err := createInterface(c)
+		if err != nil {
+			return err
+		}
+		defer ifce.Close()
 
-			// Set stream handler during runtime
-			n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce))
+		// Set stream handler during runtime
+		n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce))
 
-			// Announce our IP
-			ip, _, err := net.ParseCIDR(c.InterfaceAddress)
-			if err != nil {
-				return err
-			}
+		// Announce our IP
+		ip, _, err := net.ParseCIDR(c.InterfaceAddress)
+		if err != nil {
+			return err
+		}
 
-			b.Announce(
-				ctx,
-				c.LedgerAnnounceTime,
-				func() {
-					machine := &types.Machine{}
-					// Retrieve current ID for ip in the blockchain
-					existingValue, found := b.GetKey(protocol.MachinesLedgerKey, ip.String())
-					existingValue.Unmarshal(machine)
-
-					// If mismatch, update the blockchain
-					if !found || machine.PeerID != n.Host().ID().String() {
-						updatedMap := map[string]interface{}{}
-						updatedMap[ip.String()] = newBlockChainData(n, ip.String())
-						b.Add(protocol.MachinesLedgerKey, updatedMap)
-					}
-				},
-			)
-
-			if c.NetLinkBootstrap {
-				if err := prepareInterface(c); err != nil {
-					return err
+		b.Announce(
+			ctx,
+			c.LedgerAnnounceTime,
+			func() {
+				machine := &types.Machine{}
+				// Retrieve current ID for ip in the blockchain
+				existingValue, found := b.GetKey(protocol.MachinesLedgerKey, ip.String())
+				existingValue.Unmarshal(machine)
+
+				// If mismatch, update the blockchain
+				if !found || machine.PeerID != n.Host().ID().String() {
+					updatedMap := map[string]interface{}{}
+					updatedMap[ip.String()] = newBlockChainData(n, ip.String())
+					b.Add(protocol.MachinesLedgerKey, updatedMap)
 				}
+			},
+		)
+
+		if c.NetLinkBootstrap {
+			if err := prepareInterface(c); err != nil {
+				return err
 			}
+		}
 
-			// read packets from the interface
-			return readPackets(ctx, c, n, b, ifce)
-		}),
-	}, nil
+		// read packets from the interface
+		return readPackets(ctx, c, n, b, ifce)
+	}
+}
 
+// Start the node and the vpn. Returns an error in case of failure
+// When starting the vpn, there is no need to start the node
+func Register(p ...Option) ([]node.Option, error) {
+	return []node.Option{node.WithNetworkService(VPNNetworkService(p...))}, nil
 }
 
 func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {