node.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package service
  16. import (
  17. "context"
  18. "embed"
  19. "encoding/base64"
  20. "fmt"
  21. "io"
  22. "io/fs"
  23. "io/ioutil"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "time"
  28. "github.com/ipfs/go-log"
  29. "gopkg.in/yaml.v2"
  30. edgeVPNClient "github.com/mudler/edgevpn/api/client"
  31. edgevpn "github.com/mudler/edgevpn/pkg/node"
  32. )
  33. // Node is the service Node.
  34. // It have a set of defined available roles which nodes
  35. // in a network can take. It takes a network token or either generates one
  36. type Node struct {
  37. stateDir string
  38. tokenFile string
  39. uuid string
  40. networkToken string
  41. apiAddress string
  42. defaultRoles, persistentRoles, stopRoles string
  43. minNode int
  44. assets []string
  45. fs embed.FS
  46. client *Client
  47. roles map[Role]func(c *RoleConfig) error
  48. logger log.StandardLogger
  49. }
  50. // WithRoles defines a set of role keys
  51. func WithRoles(k ...RoleKey) Option {
  52. return func(mm *Node) error {
  53. m := map[Role]func(c *RoleConfig) error{}
  54. for _, kk := range k {
  55. m[kk.Role] = kk.RoleHandler
  56. }
  57. mm.roles = m
  58. return nil
  59. }
  60. }
  61. func WithMinNodes(i int) Option {
  62. return func(k *Node) error {
  63. k.minNode = i
  64. return nil
  65. }
  66. }
  67. // WithFS accepts an embed.FS file system where to copy binaries from
  68. func WithFS(fs embed.FS) Option {
  69. return func(k *Node) error {
  70. k.fs = fs
  71. return nil
  72. }
  73. }
  74. // WithAssets is a list of assets to copy to a temporary state dir from the embedded FS
  75. // It is used in conjunction with WithFS to ease out binary embedding
  76. func WithAssets(assets ...string) Option {
  77. return func(k *Node) error {
  78. k.assets = assets
  79. return nil
  80. }
  81. }
  82. // WithLogger defines a logger to be used across the whole execution
  83. func WithLogger(l log.StandardLogger) Option {
  84. return func(k *Node) error {
  85. k.logger = l
  86. return nil
  87. }
  88. }
  89. // WithStopRoles allows to set a list of comma separated roles that can be applied during cleanup
  90. func WithStopRoles(roles string) Option {
  91. return func(k *Node) error {
  92. k.stopRoles = roles
  93. return nil
  94. }
  95. }
  96. // WithPersistentRoles allows to set a list of comma separated roles that can is applied persistently
  97. func WithPersistentRoles(roles string) Option {
  98. return func(k *Node) error {
  99. k.persistentRoles = roles
  100. return nil
  101. }
  102. }
  103. // WithDefaultRoles allows to set a list of comma separated roles prefixed for the node.
  104. // Note, by setting this the node will refuse any assigned role
  105. func WithDefaultRoles(roles string) Option {
  106. return func(k *Node) error {
  107. k.defaultRoles = roles
  108. return nil
  109. }
  110. }
  111. // WithNetworkToken allows to set a network token.
  112. // If not set, it is automatically generated
  113. func WithNetworkToken(token string) Option {
  114. return func(k *Node) error {
  115. k.networkToken = token
  116. return nil
  117. }
  118. }
  119. // WithAPIAddress sets the EdgeVPN API address
  120. func WithAPIAddress(s string) Option {
  121. return func(k *Node) error {
  122. k.apiAddress = s
  123. return nil
  124. }
  125. }
  126. // WithStateDir sets the node state directory.
  127. // It will contain the unpacked assets (if any) and the
  128. // process states generated by the roles.
  129. func WithStateDir(s string) Option {
  130. return func(k *Node) error {
  131. k.stateDir = s
  132. return nil
  133. }
  134. }
  135. // WithUUID sets a node UUID
  136. func WithUUID(s string) Option {
  137. return func(k *Node) error {
  138. k.uuid = s
  139. return nil
  140. }
  141. }
  142. // WithTokenfile sets a token file.
  143. // If a token file and a network token is not found it is written
  144. // to such file
  145. func WithTokenfile(s string) Option {
  146. return func(k *Node) error {
  147. k.tokenFile = s
  148. return nil
  149. }
  150. }
  151. // WithClient sets a service client
  152. func WithClient(e *Client) Option {
  153. return func(o *Node) error {
  154. o.client = e
  155. return nil
  156. }
  157. }
  158. // Option is a Node option
  159. type Option func(k *Node) error
  160. // NewNode returns a new service Node
  161. // The service Node can have role applied which are
  162. // polled by the API.
  163. // This allows to bootstrap services using the API to coordinate nodes
  164. // and apply roles afterwards (e.g. start vpn with a dynamically received IP, etc. )
  165. func NewNode(o ...Option) (*Node, error) {
  166. k := &Node{
  167. stateDir: "/tmp/Node",
  168. apiAddress: "localhost:7070",
  169. }
  170. for _, oo := range o {
  171. err := oo(k)
  172. if err != nil {
  173. return nil, err
  174. }
  175. }
  176. return k, nil
  177. }
  178. func (k *Node) copyBinary() {
  179. for _, a := range k.assets {
  180. b := path.Base(a)
  181. aa := NewProcessController(k.stateDir)
  182. p := aa.BinaryPath(b)
  183. if _, err := os.Stat(p); err != nil {
  184. os.MkdirAll(filepath.Join(k.stateDir, "bin"), os.ModePerm)
  185. f, err := k.fs.Open(a)
  186. if err != nil {
  187. panic(err)
  188. }
  189. if err := copyFileContents(f, p); err != nil {
  190. panic(err)
  191. }
  192. }
  193. }
  194. }
  195. func copyFileContents(in fs.File, dst string) (err error) {
  196. defer in.Close()
  197. out, err := os.Create(dst)
  198. if err != nil {
  199. return
  200. }
  201. defer func() {
  202. cerr := out.Close()
  203. if err == nil {
  204. err = cerr
  205. }
  206. }()
  207. if _, err = io.Copy(out, in); err != nil {
  208. return
  209. }
  210. err = out.Sync()
  211. os.Chmod(dst, 0755)
  212. return
  213. }
  214. // Stop stops a node by calling the stop roles
  215. func (k *Node) Stop() {
  216. k.execRoles(k.stopRoles)
  217. }
  218. // Clean stops and cleanup a node
  219. func (k *Node) Clean() {
  220. k.client.Clean()
  221. k.Stop()
  222. if k.stateDir != "" {
  223. os.RemoveAll(k.stateDir)
  224. }
  225. }
  226. func (k *Node) prepare() error {
  227. k.copyBinary()
  228. if k.tokenFile != "" {
  229. f, err := ioutil.ReadFile(k.tokenFile)
  230. if err == nil {
  231. k.networkToken = string(f)
  232. }
  233. }
  234. if k.networkToken == "" {
  235. newData := edgevpn.GenerateNewConnectionData()
  236. bytesData, err := yaml.Marshal(newData)
  237. if err != nil {
  238. return err
  239. }
  240. token := base64.StdEncoding.EncodeToString(bytesData)
  241. k.logger.Infof("Token generated, writing to '%s'", k.tokenFile)
  242. ioutil.WriteFile(k.tokenFile, []byte(token), os.ModePerm)
  243. k.networkToken = token
  244. }
  245. k.execRoles(k.persistentRoles)
  246. if k.client == nil {
  247. k.client = NewClient("Node",
  248. edgeVPNClient.NewClient(edgeVPNClient.WithHost(fmt.Sprintf("http://%s", k.apiAddress))))
  249. }
  250. return nil
  251. }
  252. type roleMessage struct {
  253. Role Role
  254. }
  255. func (k *Node) options() (r []RoleOption) {
  256. r = []RoleOption{
  257. WithRoleLogger(k.logger),
  258. WithRole(k.roles),
  259. WithRoleClient(k.client),
  260. WithRoleUUID(k.uuid),
  261. WithRoleStateDir(k.stateDir),
  262. WithRoleAPIAddress(k.apiAddress),
  263. WithRoleToken(k.networkToken),
  264. }
  265. return
  266. }
  267. func (k *Node) execRoles(s string) {
  268. r := Role(s)
  269. k.logger.Infof("Applying role '%s'", r)
  270. r.Apply(k.options()...)
  271. }
  272. // Start starts the node with the context
  273. func (k *Node) Start(ctx context.Context) error {
  274. // prepare binaries and start the default roles
  275. if err := k.prepare(); err != nil {
  276. return err
  277. }
  278. k.client.Advertize(k.uuid)
  279. minNode := 2
  280. if k.minNode != 0 {
  281. minNode = k.minNode
  282. }
  283. i := 0
  284. for {
  285. select {
  286. case <-ctx.Done():
  287. return nil
  288. default:
  289. i++
  290. time.Sleep(10 * time.Second)
  291. if i%2 == 0 {
  292. k.client.Advertize(k.uuid)
  293. }
  294. uuids, _ := k.client.ActiveNodes()
  295. for _, n := range uuids {
  296. k.logger.Infof("Active: '%s'", n)
  297. }
  298. if k.persistentRoles != "" {
  299. k.execRoles(k.persistentRoles)
  300. }
  301. // If we have default roles, executes them and continue
  302. if k.defaultRoles != "" {
  303. k.execRoles(k.defaultRoles)
  304. continue
  305. }
  306. // Not enough nodes
  307. if len(uuids) < minNode {
  308. k.logger.Infof("not enough nodes available, sleeping... needed: %d, available: %d", minNode, len(uuids))
  309. continue
  310. }
  311. // Enough active nodes.
  312. d, err := k.client.Get("role", k.uuid)
  313. if err == nil {
  314. k.logger.Info("Roles assigned")
  315. k.execRoles(d)
  316. } else {
  317. // we don't have a role yet, sleeping
  318. k.logger.Info("No role assigned, sleeping")
  319. }
  320. }
  321. }
  322. }