// Copyright © 2021-2022 Ettore Di Giacinto // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License along // with this program; if not, see . package service import ( "context" "embed" "encoding/base64" "fmt" "io" "io/fs" "io/ioutil" "os" "path" "path/filepath" "time" "github.com/ipfs/go-log" "gopkg.in/yaml.v2" edgeVPNClient "github.com/mudler/edgevpn/api/client" edgevpn "github.com/mudler/edgevpn/pkg/node" ) // Node is the service Node. // It have a set of defined available roles which nodes // in a network can take. It takes a network token or either generates one type Node struct { stateDir string tokenFile string uuid string networkToken string apiAddress string defaultRoles, persistentRoles, stopRoles string minNode int assets []string fs embed.FS client *Client roles map[Role]func(c *RoleConfig) error logger log.StandardLogger } // WithRoles defines a set of role keys func WithRoles(k ...RoleKey) Option { return func(mm *Node) error { m := map[Role]func(c *RoleConfig) error{} for _, kk := range k { m[kk.Role] = kk.RoleHandler } mm.roles = m return nil } } func WithMinNodes(i int) Option { return func(k *Node) error { k.minNode = i return nil } } // WithFS accepts an embed.FS file system where to copy binaries from func WithFS(fs embed.FS) Option { return func(k *Node) error { k.fs = fs return nil } } // WithAssets is a list of assets to copy to a temporary state dir from the embedded FS // It is used in conjunction with WithFS to ease out binary embedding func WithAssets(assets ...string) Option { return func(k *Node) error { k.assets = assets return nil } } // WithLogger defines a logger to be used across the whole execution func WithLogger(l log.StandardLogger) Option { return func(k *Node) error { k.logger = l return nil } } // WithStopRoles allows to set a list of comma separated roles that can be applied during cleanup func WithStopRoles(roles string) Option { return func(k *Node) error { k.stopRoles = roles return nil } } // WithPersistentRoles allows to set a list of comma separated roles that can is applied persistently func WithPersistentRoles(roles string) Option { return func(k *Node) error { k.persistentRoles = roles return nil } } // WithDefaultRoles allows to set a list of comma separated roles prefixed for the node. // Note, by setting this the node will refuse any assigned role func WithDefaultRoles(roles string) Option { return func(k *Node) error { k.defaultRoles = roles return nil } } // WithNetworkToken allows to set a network token. // If not set, it is automatically generated func WithNetworkToken(token string) Option { return func(k *Node) error { k.networkToken = token return nil } } // WithAPIAddress sets the EdgeVPN API address func WithAPIAddress(s string) Option { return func(k *Node) error { k.apiAddress = s return nil } } // WithStateDir sets the node state directory. // It will contain the unpacked assets (if any) and the // process states generated by the roles. func WithStateDir(s string) Option { return func(k *Node) error { k.stateDir = s return nil } } // WithUUID sets a node UUID func WithUUID(s string) Option { return func(k *Node) error { k.uuid = s return nil } } // WithTokenfile sets a token file. // If a token file and a network token is not found it is written // to such file func WithTokenfile(s string) Option { return func(k *Node) error { k.tokenFile = s return nil } } // WithClient sets a service client func WithClient(e *Client) Option { return func(o *Node) error { o.client = e return nil } } // Option is a Node option type Option func(k *Node) error // NewNode returns a new service Node // The service Node can have role applied which are // polled by the API. // This allows to bootstrap services using the API to coordinate nodes // and apply roles afterwards (e.g. start vpn with a dynamically received IP, etc. ) func NewNode(o ...Option) (*Node, error) { k := &Node{ stateDir: "/tmp/Node", apiAddress: "localhost:7070", } for _, oo := range o { err := oo(k) if err != nil { return nil, err } } return k, nil } func (k *Node) copyBinary() { for _, a := range k.assets { b := path.Base(a) aa := NewProcessController(k.stateDir) p := aa.BinaryPath(b) if _, err := os.Stat(p); err != nil { os.MkdirAll(filepath.Join(k.stateDir, "bin"), os.ModePerm) f, err := k.fs.Open(a) if err != nil { panic(err) } if err := copyFileContents(f, p); err != nil { panic(err) } } } } func copyFileContents(in fs.File, dst string) (err error) { defer in.Close() out, err := os.Create(dst) if err != nil { return } defer func() { cerr := out.Close() if err == nil { err = cerr } }() if _, err = io.Copy(out, in); err != nil { return } err = out.Sync() os.Chmod(dst, 0755) return } // Stop stops a node by calling the stop roles func (k *Node) Stop() { k.execRoles(k.stopRoles) } // Clean stops and cleanup a node func (k *Node) Clean() { k.client.Clean() k.Stop() if k.stateDir != "" { os.RemoveAll(k.stateDir) } } func (k *Node) prepare() error { k.copyBinary() if k.tokenFile != "" { f, err := ioutil.ReadFile(k.tokenFile) if err == nil { k.networkToken = string(f) } } if k.networkToken == "" { newData := edgevpn.GenerateNewConnectionData() bytesData, err := yaml.Marshal(newData) if err != nil { return err } token := base64.StdEncoding.EncodeToString(bytesData) k.logger.Infof("Token generated, writing to '%s'", k.tokenFile) ioutil.WriteFile(k.tokenFile, []byte(token), os.ModePerm) k.networkToken = token } k.execRoles(k.persistentRoles) if k.client == nil { k.client = NewClient("Node", edgeVPNClient.NewClient(edgeVPNClient.WithHost(fmt.Sprintf("http://%s", k.apiAddress)))) } return nil } type roleMessage struct { Role Role } func (k *Node) options() (r []RoleOption) { r = []RoleOption{ WithRoleLogger(k.logger), WithRole(k.roles), WithRoleClient(k.client), WithRoleUUID(k.uuid), WithRoleStateDir(k.stateDir), WithRoleAPIAddress(k.apiAddress), WithRoleToken(k.networkToken), } return } func (k *Node) execRoles(s string) { r := Role(s) k.logger.Infof("Applying role '%s'", r) r.Apply(k.options()...) } // Start starts the node with the context func (k *Node) Start(ctx context.Context) error { // prepare binaries and start the default roles if err := k.prepare(); err != nil { return err } k.client.Advertize(k.uuid) minNode := 2 if k.minNode != 0 { minNode = k.minNode } i := 0 for { select { case <-ctx.Done(): return nil default: i++ time.Sleep(10 * time.Second) if i%2 == 0 { k.client.Advertize(k.uuid) } uuids, _ := k.client.ActiveNodes() for _, n := range uuids { k.logger.Infof("Active: '%s'", n) } if k.persistentRoles != "" { k.execRoles(k.persistentRoles) } // If we have default roles, executes them and continue if k.defaultRoles != "" { k.execRoles(k.defaultRoles) continue } // Not enough nodes if len(uuids) < minNode { k.logger.Infof("not enough nodes available, sleeping... needed: %d, available: %d", minNode, len(uuids)) continue } // Enough active nodes. d, err := k.client.Get("role", k.uuid) if err == nil { k.logger.Info("Roles assigned") k.execRoles(d) } else { // we don't have a role yet, sleeping k.logger.Info("No role assigned, sleeping") } } } }