connection.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright © 2021 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package node
  16. import (
  17. "context"
  18. "crypto/rand"
  19. "io"
  20. mrand "math/rand"
  21. "net"
  22. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  23. "github.com/libp2p/go-libp2p"
  24. "github.com/libp2p/go-libp2p-core/crypto"
  25. "github.com/libp2p/go-libp2p-core/host"
  26. "github.com/libp2p/go-libp2p-core/peer"
  27. conngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
  28. hub "github.com/mudler/edgevpn/pkg/hub"
  29. multiaddr "github.com/multiformats/go-multiaddr"
  30. "github.com/xlzd/gotp"
  31. )
  32. // Host returns the libp2p peer host
  33. func (e *Node) Host() host.Host {
  34. return e.host
  35. }
  36. // ConnectionGater returns the underlying libp2p conngater
  37. func (e *Node) ConnectionGater() *conngater.BasicConnectionGater {
  38. return e.cg
  39. }
  40. // BlockSubnet blocks the CIDR subnet from connections
  41. func (e *Node) BlockSubnet(cidr string) error {
  42. // Avoid to loopback traffic by trying to connect to nodes in via VPN
  43. _, n, err := net.ParseCIDR(cidr)
  44. if err != nil {
  45. return err
  46. }
  47. return e.ConnectionGater().BlockSubnet(n)
  48. }
  49. func (e *Node) genHost(ctx context.Context) (host.Host, error) {
  50. var r io.Reader
  51. if e.seed == 0 {
  52. r = rand.Reader
  53. } else {
  54. r = mrand.New(mrand.NewSource(e.seed))
  55. }
  56. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
  57. if err != nil {
  58. return nil, err
  59. }
  60. opts := e.config.Options
  61. cg, err := conngater.NewBasicConnectionGater(nil)
  62. if err != nil {
  63. return nil, err
  64. }
  65. e.cg = cg
  66. if e.config.InterfaceAddress != "" {
  67. e.BlockSubnet(e.config.InterfaceAddress)
  68. }
  69. for _, b := range e.config.Blacklist {
  70. _, net, err := net.ParseCIDR(b)
  71. if err != nil {
  72. // Assume it's a peerID
  73. cg.BlockPeer(peer.ID(b))
  74. }
  75. if net != nil {
  76. cg.BlockSubnet(net)
  77. }
  78. }
  79. opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
  80. addrs := []multiaddr.Multiaddr{}
  81. for _, l := range e.config.ListenAddresses {
  82. addrs = append(addrs, []multiaddr.Multiaddr(l)...)
  83. }
  84. opts = append(opts, libp2p.ListenAddrs(addrs...))
  85. for _, d := range e.config.ServiceDiscovery {
  86. opts = append(opts, d.Option(ctx))
  87. }
  88. opts = append(opts, e.config.AdditionalOptions...)
  89. if e.config.Insecure {
  90. e.config.Logger.Info("Disabling Security transport layer")
  91. opts = append(opts, libp2p.NoSecurity)
  92. }
  93. opts = append(opts, libp2p.FallbackDefaults)
  94. return libp2p.New(opts...)
  95. }
  96. func (e *Node) sealkey() string {
  97. return internalCrypto.MD5(gotp.NewTOTP(e.config.ExchangeKey, e.config.SealKeyLength, e.config.SealKeyInterval, nil).Now())
  98. }
  99. func (e *Node) handleEvents(ctx context.Context) {
  100. for {
  101. select {
  102. case m := <-e.inputCh:
  103. if m == nil {
  104. continue
  105. }
  106. c := m.Copy()
  107. str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
  108. if err != nil {
  109. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  110. }
  111. c.Message = str
  112. e.handleOutgoingMessage(c)
  113. case m := <-e.MessageHub.Messages:
  114. if m == nil {
  115. continue
  116. }
  117. if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
  118. e.config.Logger.Warnf("gated message from %s", m.SenderID)
  119. continue
  120. }
  121. c := m.Copy()
  122. str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
  123. if err != nil {
  124. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  125. }
  126. c.Message = str
  127. e.handleReceivedMessage(c)
  128. case <-ctx.Done():
  129. return
  130. }
  131. }
  132. }
  133. func (e *Node) handleReceivedMessage(m *hub.Message) {
  134. for _, h := range e.config.Handlers {
  135. if err := h(m); err != nil {
  136. e.config.Logger.Warnf("handler error: %s", err)
  137. }
  138. }
  139. }
  140. func (e *Node) handleOutgoingMessage(m *hub.Message) {
  141. err := e.MessageHub.PublishMessage(m)
  142. if err != nil {
  143. e.config.Logger.Warnf("publish error: %s", err)
  144. }
  145. }