node.go 5.8 KB


  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "github.com/ipfs/go-log"
  20. "github.com/libp2p/go-libp2p"
  21. "github.com/libp2p/go-libp2p/core/host"
  22. "github.com/libp2p/go-libp2p/core/network"
  23. "github.com/libp2p/go-libp2p/p2p/net/conngater"
  24. "github.com/mudler/edgevpn/pkg/crypto"
  25. protocol "github.com/mudler/edgevpn/pkg/protocol"
  26. "github.com/mudler/edgevpn/pkg/blockchain"
  27. hub "github.com/mudler/edgevpn/pkg/hub"
  28. "github.com/mudler/edgevpn/pkg/logger"
  29. )
  30. type Node struct {
  31. config Config
  32. MessageHub *hub.MessageHub
  33. //HubRoom *hub.Room
  34. inputCh chan *hub.Message
  35. genericHubCh chan *hub.Message
  36. seed int64
  37. host host.Host
  38. cg *conngater.BasicConnectionGater
  39. ledger *blockchain.Ledger
  40. sync.Mutex
  41. }
  42. const defaultChanSize = 3000
  43. var defaultLibp2pOptions = []libp2p.Option{
  44. libp2p.EnableNATService(),
  45. libp2p.NATPortMap(),
  46. }
  47. func New(p ...Option) (*Node, error) {
  48. c := &Config{
  49. DiscoveryInterval: 5 * time.Minute,
  50. StreamHandlers: make(map[protocol.Protocol]StreamHandler),
  51. LedgerAnnounceTime: 5 * time.Second,
  52. LedgerSyncronizationTime: 5 * time.Second,
  53. SealKeyLength: defaultKeyLength,
  54. Options: defaultLibp2pOptions,
  55. Logger: logger.New(log.LevelDebug),
  56. Sealer: &crypto.AESSealer{},
  57. Store: &blockchain.MemoryStore{},
  58. }
  59. if err := c.Apply(p...); err != nil {
  60. return nil, err
  61. }
  62. return &Node{
  63. config: *c,
  64. inputCh: make(chan *hub.Message, defaultChanSize),
  65. genericHubCh: make(chan *hub.Message, defaultChanSize),
  66. seed: 0,
  67. }, nil
  68. }
  69. // Ledger return the ledger which uses the node
  70. // connection to broadcast messages
  71. func (e *Node) Ledger() (*blockchain.Ledger, error) {
  72. e.Lock()
  73. defer e.Unlock()
  74. if e.ledger != nil {
  75. return e.ledger, nil
  76. }
  77. mw, err := e.messageWriter()
  78. if err != nil {
  79. return nil, err
  80. }
  81. e.ledger = blockchain.New(mw, e.config.Store)
  82. return e.ledger, nil
  83. }
  84. // PeerGater returns the node peergater
  85. func (e *Node) PeerGater() Gater {
  86. return e.config.PeerGater
  87. }
  88. // Start joins the node over the p2p network
  89. func (e *Node) Start(ctx context.Context) error {
  90. ledger, err := e.Ledger()
  91. if err != nil {
  92. return err
  93. }
  94. // Set the handler when we receive messages
  95. // The ledger needs to read them and update the internal blockchain
  96. e.config.Handlers = append(e.config.Handlers, ledger.Update)
  97. e.config.Logger.Info("Starting EdgeVPN network")
  98. // Startup libp2p network
  99. err = e.startNetwork(ctx)
  100. if err != nil {
  101. return err
  102. }
  103. // Send periodically messages to the channel with our blockchain content
  104. ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
  105. // Start eventual declared NetworkServices
  106. for _, s := range e.config.NetworkServices {
  107. err := s(ctx, e.config, e, ledger)
  108. if err != nil {
  109. return fmt.Errorf("error while starting network service: '%w'", err)
  110. }
  111. }
  112. return nil
  113. }
  114. // messageWriter returns a new MessageWriter bound to the edgevpn instance
  115. // with the given options
  116. func (e *Node) messageWriter(opts ...hub.MessageOption) (*messageWriter, error) {
  117. mess := &hub.Message{}
  118. mess.Apply(opts...)
  119. return &messageWriter{
  120. c: e.config,
  121. input: e.inputCh,
  122. mess: mess,
  123. }, nil
  124. }
  125. func (e *Node) startNetwork(ctx context.Context) error {
  126. e.config.Logger.Debug("Generating host data")
  127. host, err := e.genHost(ctx)
  128. if err != nil {
  129. e.config.Logger.Error(err.Error())
  130. return err
  131. }
  132. e.host = host
  133. ledger, err := e.Ledger()
  134. if err != nil {
  135. return err
  136. }
  137. for pid, strh := range e.config.StreamHandlers {
  138. host.SetStreamHandler(pid.ID(), network.StreamHandler(strh(e, ledger)))
  139. }
  140. e.config.Logger.Info("Node ID:", host.ID())
  141. e.config.Logger.Info("Node Addresses:", host.Addrs())
  142. // Hub rotates within sealkey interval.
  143. // this time length should be enough to make room for few block exchanges. This is ideally on minutes (10, 20, etc. )
  144. // it makes sure that if a bruteforce is attempted over the encrypted messages, the real key is not exposed.
  145. e.MessageHub = hub.NewHub(e.config.RoomName, e.config.MaxMessageSize, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.GenericHub)
  146. for _, sd := range e.config.ServiceDiscovery {
  147. if err := sd.Run(e.config.Logger, ctx, host); err != nil {
  148. e.config.Logger.Fatal(fmt.Errorf("while starting service discovery %+v: '%w", sd, err))
  149. }
  150. }
  151. go e.handleEvents(ctx, e.inputCh, e.MessageHub.Messages, e.MessageHub.PublishMessage, e.config.Handlers, true)
  152. go e.MessageHub.Start(ctx, host)
  153. // If generic hub is enabled one is created separately with a set of generic channel handlers associated with.
  154. // note peergating is disabled in order to freely exchange messages that can be used for authentication or for other public means.
  155. if e.config.GenericHub {
  156. go e.handleEvents(ctx, e.genericHubCh, e.MessageHub.PublicMessages, e.MessageHub.PublishPublicMessage, e.config.GenericChannelHandler, false)
  157. }
  158. e.config.Logger.Debug("Network started")
  159. return nil
  160. }
  161. // PublishMessage publishes a message to the generic channel (if enabled)
  162. // See GenericChannelHandlers(..) to attach handlers to receive messages from this channel.
  163. func (e *Node) PublishMessage(m *hub.Message) error {
  164. if !e.config.GenericHub {
  165. return fmt.Errorf("generic hub disabled")
  166. }
  167. e.genericHubCh <- m
  168. return nil
  169. }