node.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. // Copyright © 2021 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package node
  16. import (
  17. "context"
  18. "time"
  19. "github.com/ipfs/go-log"
  20. "github.com/libp2p/go-libp2p"
  21. "github.com/libp2p/go-libp2p-core/host"
  22. "github.com/libp2p/go-libp2p-core/network"
  23. p2pprotocol "github.com/libp2p/go-libp2p-core/protocol"
  24. protocol "github.com/mudler/edgevpn/pkg/protocol"
  25. pubsub "github.com/libp2p/go-libp2p-pubsub"
  26. "github.com/mudler/edgevpn/pkg/blockchain"
  27. hub "github.com/mudler/edgevpn/pkg/hub"
  28. "github.com/mudler/edgevpn/pkg/logger"
  29. "github.com/mudler/edgevpn/pkg/types"
  30. )
  31. type Node struct {
  32. config Config
  33. HubRoom *hub.Room
  34. inputCh chan *hub.Message
  35. seed int64
  36. host host.Host
  37. ledger *blockchain.Ledger
  38. }
  39. var defaultLibp2pOptions = []libp2p.Option{
  40. libp2p.EnableNATService(),
  41. libp2p.NATPortMap(),
  42. libp2p.EnableAutoRelay(),
  43. }
  44. func New(p ...Option) *Node {
  45. c := Config{
  46. DiscoveryInterval: 120 * time.Second,
  47. StreamHandlers: make(map[p2pprotocol.ID]types.StreamHandler),
  48. LedgerAnnounceTime: 5 * time.Second,
  49. LedgerSyncronizationTime: 5 * time.Second,
  50. SealKeyLength: 12,
  51. Options: defaultLibp2pOptions,
  52. Logger: logger.New(log.LevelDebug),
  53. }
  54. c.Apply(p...)
  55. return &Node{
  56. config: c,
  57. inputCh: make(chan *hub.Message, 3000),
  58. seed: 0,
  59. }
  60. }
  61. // AddStreamHandler adds a stream handler for the given protocol.
  62. // Note: must be called before Start().
  63. func (e *Node) AddStreamHandler(id protocol.Protocol, s types.StreamHandler) {
  64. e.config.StreamHandlers[id.ID()] = s
  65. }
  66. // AddNetworkService register a network service to the node.
  67. // Note: must be called before Start().
  68. func (e *Node) AddNetworkService(n NetworkService) {
  69. e.config.NetworkServices = append(e.config.NetworkServices, n)
  70. }
  71. // Ledger return the ledger which uses the node
  72. // connection to broadcast messages
  73. func (e *Node) Ledger() (*blockchain.Ledger, error) {
  74. if e.ledger != nil {
  75. return e.ledger, nil
  76. }
  77. mw, err := e.messageWriter()
  78. if err != nil {
  79. return nil, err
  80. }
  81. e.ledger = blockchain.New(mw, e.config.Store)
  82. return e.ledger, nil
  83. }
  84. // Start joins the node over the p2p network
  85. func (e *Node) Start(ctx context.Context) error {
  86. ledger, err := e.Ledger()
  87. if err != nil {
  88. return err
  89. }
  90. // Set the handler when we receive messages
  91. // The ledger needs to read them and update the internal blockchain
  92. e.config.Handlers = append(e.config.Handlers, ledger.Update)
  93. e.config.Logger.Info("Starting EdgeVPN network")
  94. // Startup libp2p network
  95. err = e.startNetwork(ctx)
  96. if err != nil {
  97. return err
  98. }
  99. // Send periodically messages to the channel with our blockchain content
  100. ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
  101. // Start eventual declared NetworkServices
  102. for _, s := range e.config.NetworkServices {
  103. s(ctx, e.config, e, ledger)
  104. }
  105. return nil
  106. }
  107. // messageWriter returns a new MessageWriter bound to the edgevpn instance
  108. // with the given options
  109. func (e *Node) messageWriter(opts ...hub.MessageOption) (*messageWriter, error) {
  110. mess := &hub.Message{}
  111. mess.Apply(opts...)
  112. return &messageWriter{
  113. c: e.config,
  114. input: e.inputCh,
  115. mess: mess,
  116. }, nil
  117. }
  118. func (e *Node) startNetwork(ctx context.Context) error {
  119. e.config.Logger.Debug("Generating host data")
  120. host, err := e.genHost(ctx)
  121. if err != nil {
  122. e.config.Logger.Error(err.Error())
  123. return err
  124. }
  125. e.host = host
  126. for pid, strh := range e.config.StreamHandlers {
  127. host.SetStreamHandler(pid, network.StreamHandler(strh))
  128. }
  129. e.config.Logger.Info("Node ID:", host.ID())
  130. e.config.Logger.Info("Node Addresses:", host.Addrs())
  131. // create a new PubSub service using the GossipSub router
  132. ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithMaxMessageSize(e.config.MaxMessageSize))
  133. if err != nil {
  134. return err
  135. }
  136. // join the "chat" room
  137. cr, err := hub.JoinRoom(ctx, ps, host.ID(), e.config.RoomName)
  138. if err != nil {
  139. return err
  140. }
  141. e.HubRoom = cr
  142. for _, sd := range e.config.ServiceDiscovery {
  143. if err := sd.Run(e.config.Logger, ctx, host); err != nil {
  144. e.config.Logger.Fatal(err)
  145. }
  146. }
  147. go e.handleEvents(ctx)
  148. e.config.Logger.Debug("Network started")
  149. return nil
  150. }