node.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package service
  16. import (
  17. "context"
  18. "embed"
  19. "encoding/base64"
  20. "fmt"
  21. "io"
  22. "io/fs"
  23. "io/ioutil"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "time"
  28. "github.com/ipfs/go-log"
  29. "gopkg.in/yaml.v2"
  30. edgeVPNClient "github.com/mudler/edgevpn/api/client"
  31. edgevpn "github.com/mudler/edgevpn/pkg/node"
  32. )
  33. // Node is the service Node.
  34. // It have a set of defined available roles which nodes
  35. // in a network can take. It takes a network token or either generates one
  36. type Node struct {
  37. stateDir string
  38. tokenFile string
  39. uuid string
  40. networkToken string
  41. apiAddress string
  42. defaultRoles, persistentRoles, stopRoles string
  43. assets []string
  44. fs embed.FS
  45. client *Client
  46. roles map[Role]func(c *RoleConfig) error
  47. logger log.StandardLogger
  48. }
  49. // WithRoles defines a set of role keys
  50. func WithRoles(k ...RoleKey) Option {
  51. return func(mm *Node) error {
  52. m := map[Role]func(c *RoleConfig) error{}
  53. for _, kk := range k {
  54. m[kk.Role] = kk.RoleHandler
  55. }
  56. mm.roles = m
  57. return nil
  58. }
  59. }
  60. // WithFS accepts an embed.FS file system where to copy binaries from
  61. func WithFS(fs embed.FS) Option {
  62. return func(k *Node) error {
  63. k.fs = fs
  64. return nil
  65. }
  66. }
  67. // WithAssets is a list of assets to copy to a temporary state dir from the embedded FS
  68. // It is used in conjunction with WithFS to ease out binary embedding
  69. func WithAssets(assets ...string) Option {
  70. return func(k *Node) error {
  71. k.assets = assets
  72. return nil
  73. }
  74. }
  75. // WithLogger defines a logger to be used across the whole execution
  76. func WithLogger(l log.StandardLogger) Option {
  77. return func(k *Node) error {
  78. k.logger = l
  79. return nil
  80. }
  81. }
  82. // WithStopRoles allows to set a list of comma separated roles that can be applied during cleanup
  83. func WithStopRoles(roles string) Option {
  84. return func(k *Node) error {
  85. k.stopRoles = roles
  86. return nil
  87. }
  88. }
  89. // WithPersistentRoles allows to set a list of comma separated roles that can is applied persistently
  90. func WithPersistentRoles(roles string) Option {
  91. return func(k *Node) error {
  92. k.persistentRoles = roles
  93. return nil
  94. }
  95. }
  96. // WithDefaultRoles allows to set a list of comma separated roles prefixed for the node.
  97. // Note, by setting this the node will refuse any assigned role
  98. func WithDefaultRoles(roles string) Option {
  99. return func(k *Node) error {
  100. k.defaultRoles = roles
  101. return nil
  102. }
  103. }
  104. // WithNetworkToken allows to set a network token.
  105. // If not set, it is automatically generated
  106. func WithNetworkToken(token string) Option {
  107. return func(k *Node) error {
  108. k.networkToken = token
  109. return nil
  110. }
  111. }
  112. // WithAPIAddress sets the EdgeVPN API address
  113. func WithAPIAddress(s string) Option {
  114. return func(k *Node) error {
  115. k.apiAddress = s
  116. return nil
  117. }
  118. }
  119. // WithStateDir sets the node state directory.
  120. // It will contain the unpacked assets (if any) and the
  121. // process states generated by the roles.
  122. func WithStateDir(s string) Option {
  123. return func(k *Node) error {
  124. k.stateDir = s
  125. return nil
  126. }
  127. }
  128. // WithUUID sets a node UUID
  129. func WithUUID(s string) Option {
  130. return func(k *Node) error {
  131. k.uuid = s
  132. return nil
  133. }
  134. }
  135. // WithTokenfile sets a token file.
  136. // If a token file and a network token is not found it is written
  137. // to such file
  138. func WithTokenfile(s string) Option {
  139. return func(k *Node) error {
  140. k.tokenFile = s
  141. return nil
  142. }
  143. }
  144. // WithClient sets a service client
  145. func WithClient(e *Client) Option {
  146. return func(o *Node) error {
  147. o.client = e
  148. return nil
  149. }
  150. }
  151. // Option is a Node option
  152. type Option func(k *Node) error
  153. // NewNode returns a new service Node
  154. // The service Node can have role applied which are
  155. // polled by the API.
  156. // This allows to bootstrap services using the API to coordinate nodes
  157. // and apply roles afterwards (e.g. start vpn with a dynamically received IP, etc. )
  158. func NewNode(o ...Option) (*Node, error) {
  159. k := &Node{
  160. stateDir: "/tmp/Node",
  161. apiAddress: "localhost:7070",
  162. }
  163. for _, oo := range o {
  164. err := oo(k)
  165. if err != nil {
  166. return nil, err
  167. }
  168. }
  169. return k, nil
  170. }
  171. func (k *Node) copyBinary() {
  172. for _, a := range k.assets {
  173. b := path.Base(a)
  174. aa := NewProcessController(k.stateDir)
  175. p := aa.BinaryPath(b)
  176. if _, err := os.Stat(p); err != nil {
  177. os.MkdirAll(filepath.Join(k.stateDir, "bin"), os.ModePerm)
  178. f, err := k.fs.Open(a)
  179. if err != nil {
  180. panic(err)
  181. }
  182. if err := copyFileContents(f, p); err != nil {
  183. panic(err)
  184. }
  185. }
  186. }
  187. }
  188. func copyFileContents(in fs.File, dst string) (err error) {
  189. defer in.Close()
  190. out, err := os.Create(dst)
  191. if err != nil {
  192. return
  193. }
  194. defer func() {
  195. cerr := out.Close()
  196. if err == nil {
  197. err = cerr
  198. }
  199. }()
  200. if _, err = io.Copy(out, in); err != nil {
  201. return
  202. }
  203. err = out.Sync()
  204. os.Chmod(dst, 0755)
  205. return
  206. }
  207. // Stop stops a node by calling the stop roles
  208. func (k *Node) Stop() {
  209. k.execRoles(k.stopRoles)
  210. }
  211. // Clean stops and cleanup a node
  212. func (k *Node) Clean() {
  213. k.client.Clean()
  214. k.Stop()
  215. if k.stateDir != "" {
  216. os.RemoveAll(k.stateDir)
  217. }
  218. }
  219. func (k *Node) prepare() error {
  220. k.copyBinary()
  221. if k.tokenFile != "" {
  222. f, err := ioutil.ReadFile(k.tokenFile)
  223. if err == nil {
  224. k.networkToken = string(f)
  225. }
  226. }
  227. if k.networkToken == "" {
  228. newData := edgevpn.GenerateNewConnectionData()
  229. bytesData, err := yaml.Marshal(newData)
  230. if err != nil {
  231. return err
  232. }
  233. token := base64.StdEncoding.EncodeToString(bytesData)
  234. k.logger.Infof("Token generated, writing to '%s'", k.tokenFile)
  235. ioutil.WriteFile(k.tokenFile, []byte(token), os.ModePerm)
  236. k.networkToken = token
  237. }
  238. k.execRoles(k.persistentRoles)
  239. if k.client == nil {
  240. k.client = NewClient("Node",
  241. edgeVPNClient.NewClient(edgeVPNClient.WithHost(fmt.Sprintf("http://%s", k.apiAddress))))
  242. }
  243. return nil
  244. }
  245. type roleMessage struct {
  246. Role Role
  247. }
  248. func (k *Node) options() (r []RoleOption) {
  249. r = []RoleOption{
  250. WithRoleLogger(k.logger),
  251. WithRole(k.roles),
  252. WithRoleClient(k.client),
  253. WithRoleUUID(k.uuid),
  254. WithRoleStateDir(k.stateDir),
  255. WithRoleAPIAddress(k.apiAddress),
  256. WithRoleToken(k.networkToken),
  257. }
  258. return
  259. }
  260. func (k *Node) execRoles(s string) {
  261. r := Role(s)
  262. k.logger.Infof("Applying role '%s'", r)
  263. r.Apply(k.options()...)
  264. }
  265. // Start starts the node with the context
  266. func (k *Node) Start(ctx context.Context) error {
  267. // prepare binaries and start the default roles
  268. if err := k.prepare(); err != nil {
  269. return err
  270. }
  271. k.client.Advertize(k.uuid)
  272. i := 0
  273. for {
  274. select {
  275. case <-ctx.Done():
  276. return nil
  277. default:
  278. i++
  279. time.Sleep(10 * time.Second)
  280. if i%2 == 0 {
  281. k.client.Advertize(k.uuid)
  282. }
  283. uuids, _ := k.client.ActiveNodes()
  284. for _, n := range uuids {
  285. k.logger.Infof("Active: '%s'", n)
  286. }
  287. if k.persistentRoles != "" {
  288. k.execRoles(k.persistentRoles)
  289. }
  290. // If we have default roles, executes them and continue
  291. if k.defaultRoles != "" {
  292. k.execRoles(k.defaultRoles)
  293. continue
  294. }
  295. // Not enough nodes
  296. if len(uuids) <= 1 {
  297. k.logger.Info("not enough nodes available, sleeping...")
  298. continue
  299. }
  300. // Enough active nodes.
  301. d, err := k.client.Get("role", k.uuid)
  302. if err == nil {
  303. k.logger.Info("Roles assigned")
  304. k.execRoles(d)
  305. } else {
  306. // we don't have a role yet, sleeping
  307. k.logger.Info("No role assigned, sleeping")
  308. }
  309. }
  310. }
  311. }