node.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. // Copyright © 2021 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package node
  16. import (
  17. "context"
  18. "fmt"
  19. "sync"
  20. "time"
  21. "github.com/ipfs/go-log"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p/core/host"
  24. "github.com/libp2p/go-libp2p/core/network"
  25. "github.com/libp2p/go-libp2p/p2p/net/conngater"
  26. "github.com/mudler/edgevpn/pkg/crypto"
  27. protocol "github.com/mudler/edgevpn/pkg/protocol"
  28. "github.com/mudler/edgevpn/pkg/blockchain"
  29. hub "github.com/mudler/edgevpn/pkg/hub"
  30. "github.com/mudler/edgevpn/pkg/logger"
  31. )
  32. type Node struct {
  33. config Config
  34. MessageHub *hub.MessageHub
  35. //HubRoom *hub.Room
  36. inputCh chan *hub.Message
  37. genericHubCh chan *hub.Message
  38. seed int64
  39. host host.Host
  40. cg *conngater.BasicConnectionGater
  41. ledger *blockchain.Ledger
  42. sync.Mutex
  43. }
  44. const defaultChanSize = 3000
  45. var defaultLibp2pOptions = []libp2p.Option{
  46. libp2p.EnableNATService(),
  47. libp2p.NATPortMap(),
  48. libp2p.EnableAutoRelay(),
  49. }
  50. func New(p ...Option) (*Node, error) {
  51. c := &Config{
  52. DiscoveryInterval: 5 * time.Minute,
  53. StreamHandlers: make(map[protocol.Protocol]StreamHandler),
  54. LedgerAnnounceTime: 5 * time.Second,
  55. LedgerSyncronizationTime: 5 * time.Second,
  56. SealKeyLength: 12,
  57. Options: defaultLibp2pOptions,
  58. Logger: logger.New(log.LevelDebug),
  59. Sealer: &crypto.AESSealer{},
  60. Store: &blockchain.MemoryStore{},
  61. }
  62. if err := c.Apply(p...); err != nil {
  63. return nil, err
  64. }
  65. return &Node{
  66. config: *c,
  67. inputCh: make(chan *hub.Message, defaultChanSize),
  68. genericHubCh: make(chan *hub.Message, defaultChanSize),
  69. seed: 0,
  70. }, nil
  71. }
  72. // Ledger return the ledger which uses the node
  73. // connection to broadcast messages
  74. func (e *Node) Ledger() (*blockchain.Ledger, error) {
  75. e.Lock()
  76. defer e.Unlock()
  77. if e.ledger != nil {
  78. return e.ledger, nil
  79. }
  80. mw, err := e.messageWriter()
  81. if err != nil {
  82. return nil, err
  83. }
  84. e.ledger = blockchain.New(mw, e.config.Store)
  85. return e.ledger, nil
  86. }
  87. // PeerGater returns the node peergater
  88. func (e *Node) PeerGater() Gater {
  89. return e.config.PeerGater
  90. }
  91. // Start joins the node over the p2p network
  92. func (e *Node) Start(ctx context.Context) error {
  93. ledger, err := e.Ledger()
  94. if err != nil {
  95. return err
  96. }
  97. // Set the handler when we receive messages
  98. // The ledger needs to read them and update the internal blockchain
  99. e.config.Handlers = append(e.config.Handlers, ledger.Update)
  100. e.config.Logger.Info("Starting EdgeVPN network")
  101. // Startup libp2p network
  102. err = e.startNetwork(ctx)
  103. if err != nil {
  104. return err
  105. }
  106. // Send periodically messages to the channel with our blockchain content
  107. ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
  108. // Start eventual declared NetworkServices
  109. for _, s := range e.config.NetworkServices {
  110. err := s(ctx, e.config, e, ledger)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. // messageWriter returns a new MessageWriter bound to the edgevpn instance
  118. // with the given options
  119. func (e *Node) messageWriter(opts ...hub.MessageOption) (*messageWriter, error) {
  120. mess := &hub.Message{}
  121. mess.Apply(opts...)
  122. return &messageWriter{
  123. c: e.config,
  124. input: e.inputCh,
  125. mess: mess,
  126. }, nil
  127. }
  128. func (e *Node) startNetwork(ctx context.Context) error {
  129. e.config.Logger.Debug("Generating host data")
  130. host, err := e.genHost(ctx)
  131. if err != nil {
  132. e.config.Logger.Error(err.Error())
  133. return err
  134. }
  135. e.host = host
  136. ledger, err := e.Ledger()
  137. if err != nil {
  138. return err
  139. }
  140. for pid, strh := range e.config.StreamHandlers {
  141. host.SetStreamHandler(pid.ID(), network.StreamHandler(strh(e, ledger)))
  142. }
  143. e.config.Logger.Info("Node ID:", host.ID())
  144. e.config.Logger.Info("Node Addresses:", host.Addrs())
  145. // Hub rotates within sealkey interval.
  146. // this time length should be enough to make room for few block exchanges. This is ideally on minutes (10, 20, etc. )
  147. // it makes sure that if a bruteforce is attempted over the encrypted messages, the real key is not exposed.
  148. e.MessageHub = hub.NewHub(e.config.RoomName, e.config.MaxMessageSize, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.GenericHub)
  149. for _, sd := range e.config.ServiceDiscovery {
  150. if err := sd.Run(e.config.Logger, ctx, host); err != nil {
  151. e.config.Logger.Fatal(err)
  152. }
  153. }
  154. go e.handleEvents(ctx, e.inputCh, e.MessageHub.Messages, e.MessageHub.PublishMessage, e.config.Handlers, true)
  155. go e.MessageHub.Start(ctx, host)
  156. // If generic hub is enabled one is created separately with a set of generic channel handlers associated with.
  157. // note peergating is disabled in order to freely exchange messages that can be used for authentication or for other public means.
  158. if e.config.GenericHub {
  159. go e.handleEvents(ctx, e.genericHubCh, e.MessageHub.PublicMessages, e.MessageHub.PublishPublicMessage, e.config.GenericChannelHandler, false)
  160. }
  161. e.config.Logger.Debug("Network started")
  162. return nil
  163. }
  164. // PublishMessage publishes a message to the generic channel (if enabled)
  165. // See GenericChannelHandlers(..) to attach handlers to receive messages from this channel.
  166. func (e *Node) PublishMessage(m *hub.Message) error {
  167. if !e.config.GenericHub {
  168. return fmt.Errorf("generic hub disabled")
  169. }
  170. e.genericHubCh <- m
  171. return nil
  172. }