123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
- //
- // This program is free software; you can redistribute it and/or modify
- // it under the terms of the GNU General Public License as published by
- // the Free Software Foundation; either version 2 of the License, or
- // (at your option) any later version.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License along
- // with this program; if not, see <http://www.gnu.org/licenses/>.
- package service
- import (
- "context"
- "embed"
- "encoding/base64"
- "fmt"
- "io"
- "io/fs"
- "io/ioutil"
- "os"
- "path"
- "path/filepath"
- "time"
- "github.com/ipfs/go-log"
- "gopkg.in/yaml.v2"
- edgeVPNClient "github.com/mudler/edgevpn/api/client"
- edgevpn "github.com/mudler/edgevpn/pkg/node"
- )
- // Node is the service Node.
- // It have a set of defined available roles which nodes
- // in a network can take. It takes a network token or either generates one
- type Node struct {
- stateDir string
- tokenFile string
- uuid string
- networkToken string
- apiAddress string
- defaultRoles, persistentRoles, stopRoles string
- minNode int
- assets []string
- fs embed.FS
- client *Client
- roles map[Role]func(c *RoleConfig) error
- logger log.StandardLogger
- }
- // WithRoles defines a set of role keys
- func WithRoles(k ...RoleKey) Option {
- return func(mm *Node) error {
- m := map[Role]func(c *RoleConfig) error{}
- for _, kk := range k {
- m[kk.Role] = kk.RoleHandler
- }
- mm.roles = m
- return nil
- }
- }
- func WithMinNodes(i int) Option {
- return func(k *Node) error {
- k.minNode = i
- return nil
- }
- }
- // WithFS accepts an embed.FS file system where to copy binaries from
- func WithFS(fs embed.FS) Option {
- return func(k *Node) error {
- k.fs = fs
- return nil
- }
- }
- // WithAssets is a list of assets to copy to a temporary state dir from the embedded FS
- // It is used in conjunction with WithFS to ease out binary embedding
- func WithAssets(assets ...string) Option {
- return func(k *Node) error {
- k.assets = assets
- return nil
- }
- }
- // WithLogger defines a logger to be used across the whole execution
- func WithLogger(l log.StandardLogger) Option {
- return func(k *Node) error {
- k.logger = l
- return nil
- }
- }
- // WithStopRoles allows to set a list of comma separated roles that can be applied during cleanup
- func WithStopRoles(roles string) Option {
- return func(k *Node) error {
- k.stopRoles = roles
- return nil
- }
- }
- // WithPersistentRoles allows to set a list of comma separated roles that can is applied persistently
- func WithPersistentRoles(roles string) Option {
- return func(k *Node) error {
- k.persistentRoles = roles
- return nil
- }
- }
- // WithDefaultRoles allows to set a list of comma separated roles prefixed for the node.
- // Note, by setting this the node will refuse any assigned role
- func WithDefaultRoles(roles string) Option {
- return func(k *Node) error {
- k.defaultRoles = roles
- return nil
- }
- }
- // WithNetworkToken allows to set a network token.
- // If not set, it is automatically generated
- func WithNetworkToken(token string) Option {
- return func(k *Node) error {
- k.networkToken = token
- return nil
- }
- }
- // WithAPIAddress sets the EdgeVPN API address
- func WithAPIAddress(s string) Option {
- return func(k *Node) error {
- k.apiAddress = s
- return nil
- }
- }
- // WithStateDir sets the node state directory.
- // It will contain the unpacked assets (if any) and the
- // process states generated by the roles.
- func WithStateDir(s string) Option {
- return func(k *Node) error {
- k.stateDir = s
- return nil
- }
- }
- // WithUUID sets a node UUID
- func WithUUID(s string) Option {
- return func(k *Node) error {
- k.uuid = s
- return nil
- }
- }
- // WithTokenfile sets a token file.
- // If a token file and a network token is not found it is written
- // to such file
- func WithTokenfile(s string) Option {
- return func(k *Node) error {
- k.tokenFile = s
- return nil
- }
- }
- // WithClient sets a service client
- func WithClient(e *Client) Option {
- return func(o *Node) error {
- o.client = e
- return nil
- }
- }
- // Option is a Node option
- type Option func(k *Node) error
- // NewNode returns a new service Node
- // The service Node can have role applied which are
- // polled by the API.
- // This allows to bootstrap services using the API to coordinate nodes
- // and apply roles afterwards (e.g. start vpn with a dynamically received IP, etc. )
- func NewNode(o ...Option) (*Node, error) {
- k := &Node{
- stateDir: "/tmp/Node",
- apiAddress: "localhost:7070",
- }
- for _, oo := range o {
- err := oo(k)
- if err != nil {
- return nil, err
- }
- }
- return k, nil
- }
- func (k *Node) copyBinary() {
- for _, a := range k.assets {
- b := path.Base(a)
- aa := NewProcessController(k.stateDir)
- p := aa.BinaryPath(b)
- if _, err := os.Stat(p); err != nil {
- os.MkdirAll(filepath.Join(k.stateDir, "bin"), os.ModePerm)
- f, err := k.fs.Open(a)
- if err != nil {
- panic(err)
- }
- if err := copyFileContents(f, p); err != nil {
- panic(err)
- }
- }
- }
- }
- func copyFileContents(in fs.File, dst string) (err error) {
- defer in.Close()
- out, err := os.Create(dst)
- if err != nil {
- return
- }
- defer func() {
- cerr := out.Close()
- if err == nil {
- err = cerr
- }
- }()
- if _, err = io.Copy(out, in); err != nil {
- return
- }
- err = out.Sync()
- os.Chmod(dst, 0755)
- return
- }
- // Stop stops a node by calling the stop roles
- func (k *Node) Stop() {
- k.execRoles(k.stopRoles)
- }
- // Clean stops and cleanup a node
- func (k *Node) Clean() {
- k.client.Clean()
- k.Stop()
- if k.stateDir != "" {
- os.RemoveAll(k.stateDir)
- }
- }
- func (k *Node) prepare() error {
- k.copyBinary()
- if k.tokenFile != "" {
- f, err := ioutil.ReadFile(k.tokenFile)
- if err == nil {
- k.networkToken = string(f)
- }
- }
- if k.networkToken == "" {
- newData := edgevpn.GenerateNewConnectionData()
- bytesData, err := yaml.Marshal(newData)
- if err != nil {
- return err
- }
- token := base64.StdEncoding.EncodeToString(bytesData)
- k.logger.Infof("Token generated, writing to '%s'", k.tokenFile)
- ioutil.WriteFile(k.tokenFile, []byte(token), os.ModePerm)
- k.networkToken = token
- }
- k.execRoles(k.persistentRoles)
- if k.client == nil {
- k.client = NewClient("Node",
- edgeVPNClient.NewClient(edgeVPNClient.WithHost(fmt.Sprintf("http://%s", k.apiAddress))))
- }
- return nil
- }
- type roleMessage struct {
- Role Role
- }
- func (k *Node) options() (r []RoleOption) {
- r = []RoleOption{
- WithRoleLogger(k.logger),
- WithRole(k.roles),
- WithRoleClient(k.client),
- WithRoleUUID(k.uuid),
- WithRoleStateDir(k.stateDir),
- WithRoleAPIAddress(k.apiAddress),
- WithRoleToken(k.networkToken),
- }
- return
- }
- func (k *Node) execRoles(s string) {
- r := Role(s)
- k.logger.Infof("Applying role '%s'", r)
- r.Apply(k.options()...)
- }
- // Start starts the node with the context
- func (k *Node) Start(ctx context.Context) error {
- // prepare binaries and start the default roles
- if err := k.prepare(); err != nil {
- return err
- }
- k.client.Advertize(k.uuid)
- minNode := 2
- if k.minNode != 0 {
- minNode = k.minNode
- }
- i := 0
- for {
- select {
- case <-ctx.Done():
- return nil
- default:
- i++
- time.Sleep(10 * time.Second)
- if i%2 == 0 {
- k.client.Advertize(k.uuid)
- }
- uuids, _ := k.client.ActiveNodes()
- for _, n := range uuids {
- k.logger.Infof("Active: '%s'", n)
- }
- if k.persistentRoles != "" {
- k.execRoles(k.persistentRoles)
- }
- // If we have default roles, executes them and continue
- if k.defaultRoles != "" {
- k.execRoles(k.defaultRoles)
- continue
- }
- // Not enough nodes
- if len(uuids) < minNode {
- k.logger.Infof("not enough nodes available, sleeping... needed: %d, available: %d", minNode, len(uuids))
- continue
- }
- // Enough active nodes.
- d, err := k.client.Get("role", k.uuid)
- if err == nil {
- k.logger.Info("Roles assigned")
- k.execRoles(d)
- } else {
- // we don't have a role yet, sleeping
- k.logger.Info("No role assigned, sleeping")
- }
- }
- }
- }
|