connection.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "io"
  19. mrand "math/rand"
  20. "net"
  21. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p/core/crypto"
  24. "github.com/libp2p/go-libp2p/core/host"
  25. "github.com/libp2p/go-libp2p/core/peer"
  26. conngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
  27. hub "github.com/mudler/edgevpn/pkg/hub"
  28. multiaddr "github.com/multiformats/go-multiaddr"
  29. )
  30. // Host returns the libp2p peer host
  31. func (e *Node) Host() host.Host {
  32. return e.host
  33. }
  34. // ConnectionGater returns the underlying libp2p conngater
  35. func (e *Node) ConnectionGater() *conngater.BasicConnectionGater {
  36. return e.cg
  37. }
  38. // BlockSubnet blocks the CIDR subnet from connections
  39. func (e *Node) BlockSubnet(cidr string) error {
  40. // Avoid to loopback traffic by trying to connect to nodes in via VPN
  41. _, n, err := net.ParseCIDR(cidr)
  42. if err != nil {
  43. return err
  44. }
  45. return e.ConnectionGater().BlockSubnet(n)
  46. }
  47. func GenPrivKey(seed int64) (crypto.PrivKey, error) {
  48. var r io.Reader
  49. if seed == 0 {
  50. r = rand.Reader
  51. } else {
  52. r = mrand.New(mrand.NewSource(seed))
  53. }
  54. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
  55. return prvKey, err
  56. }
  57. func (e *Node) genHost(ctx context.Context) (host.Host, error) {
  58. var prvKey crypto.PrivKey
  59. opts := e.config.Options
  60. cg, err := conngater.NewBasicConnectionGater(nil)
  61. if err != nil {
  62. return nil, err
  63. }
  64. e.cg = cg
  65. if e.config.InterfaceAddress != "" {
  66. e.BlockSubnet(e.config.InterfaceAddress)
  67. }
  68. for _, b := range e.config.Blacklist {
  69. _, net, err := net.ParseCIDR(b)
  70. if err != nil {
  71. // Assume it's a peerID
  72. cg.BlockPeer(peer.ID(b))
  73. }
  74. if net != nil {
  75. cg.BlockSubnet(net)
  76. }
  77. }
  78. // generate privkey if not specified
  79. if len(e.config.PrivateKey) > 0 {
  80. prvKey, err = crypto.UnmarshalPrivateKey(e.config.PrivateKey)
  81. } else {
  82. prvKey, err = GenPrivKey(e.seed)
  83. }
  84. if err != nil {
  85. return nil, err
  86. }
  87. opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
  88. addrs := []multiaddr.Multiaddr{}
  89. for _, l := range e.config.ListenAddresses {
  90. addrs = append(addrs, []multiaddr.Multiaddr(l)...)
  91. }
  92. opts = append(opts, libp2p.ListenAddrs(addrs...))
  93. for _, d := range e.config.ServiceDiscovery {
  94. opts = append(opts, d.Option(ctx))
  95. }
  96. opts = append(opts, e.config.AdditionalOptions...)
  97. if e.config.Insecure {
  98. e.config.Logger.Info("Disabling Security transport layer")
  99. opts = append(opts, libp2p.NoSecurity)
  100. }
  101. opts = append(opts, FallbackDefaults)
  102. return libp2p.New(opts...)
  103. }
  104. // FallbackDefaults applies default options to the libp2p node if and only if no
  105. // other relevant options have been applied. will be appended to the options
  106. // passed into New.
  107. var FallbackDefaults libp2p.Option = func(cfg *libp2p.Config) error {
  108. for _, def := range defaults {
  109. if !def.fallback(cfg) {
  110. continue
  111. }
  112. if err := cfg.Apply(def.opt); err != nil {
  113. return err
  114. }
  115. }
  116. return nil
  117. }
  118. // Complete list of default options and when to fallback on them.
  119. //
  120. // Please *DON'T* specify default options any other way. Putting this all here
  121. // makes tracking defaults *much* easier.
  122. var defaults = []struct {
  123. fallback func(cfg *libp2p.Config) bool
  124. opt libp2p.Option
  125. }{
  126. {
  127. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil },
  128. opt: libp2p.DefaultListenAddrs,
  129. },
  130. {
  131. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK == nil },
  132. opt: libp2p.DefaultTransports,
  133. },
  134. {
  135. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK != nil },
  136. opt: libp2p.DefaultPrivateTransports,
  137. },
  138. {
  139. fallback: func(cfg *libp2p.Config) bool { return cfg.Muxers == nil },
  140. opt: libp2p.DefaultMuxers,
  141. },
  142. {
  143. fallback: func(cfg *libp2p.Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil },
  144. opt: libp2p.DefaultSecurity,
  145. },
  146. {
  147. fallback: func(cfg *libp2p.Config) bool { return cfg.PeerKey == nil },
  148. opt: libp2p.RandomIdentity,
  149. },
  150. {
  151. fallback: func(cfg *libp2p.Config) bool { return cfg.Peerstore == nil },
  152. opt: libp2p.DefaultPeerstore,
  153. },
  154. {
  155. fallback: func(cfg *libp2p.Config) bool { return !cfg.RelayCustom },
  156. opt: libp2p.DefaultEnableRelay,
  157. },
  158. //{
  159. // fallback: func(cfg *libp2p.Config) bool { return cfg.ResourceManager == nil },
  160. // opt: libp2p.DefaultResourceManager,
  161. //},
  162. //{
  163. // fallback: func(cfg *libp2p.Config) bool { return cfg.ConnManager == nil },
  164. // opt: libp2p.DefaultConnectionManager,
  165. //},
  166. {
  167. fallback: func(cfg *libp2p.Config) bool { return cfg.MultiaddrResolver == nil },
  168. opt: libp2p.DefaultMultiaddrResolver,
  169. },
  170. //{
  171. // fallback: func(cfg *libp2p.Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
  172. // opt: libp2p.DefaultPrometheusRegisterer,
  173. //},
  174. }
  175. func (e *Node) sealkey() string {
  176. return internalCrypto.MD5(internalCrypto.TOTP(sha256.New, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.ExchangeKey))
  177. }
  178. func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message, roomMessages chan *hub.Message, pub func(*hub.Message) error, handlers []Handler, peerGater bool) {
  179. for {
  180. select {
  181. case m := <-inputChannel:
  182. if m == nil {
  183. continue
  184. }
  185. c := m.Copy()
  186. str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
  187. if err != nil {
  188. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  189. }
  190. c.Message = str
  191. if err := pub(c); err != nil {
  192. e.config.Logger.Warnf("publish error: %s", err)
  193. }
  194. case m := <-roomMessages:
  195. if m == nil {
  196. continue
  197. }
  198. if peerGater {
  199. if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
  200. e.config.Logger.Warnf("gated message from %s", m.SenderID)
  201. continue
  202. }
  203. }
  204. if len(e.config.PeerTable) > 0 {
  205. found := false
  206. for _, p := range e.config.PeerTable {
  207. if p.String() == peer.ID(m.SenderID).String() {
  208. found = true
  209. }
  210. }
  211. if !found {
  212. e.config.Logger.Warnf("gated message from %s - not present in peertable", m.SenderID)
  213. continue
  214. }
  215. }
  216. c := m.Copy()
  217. str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
  218. if err != nil {
  219. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  220. }
  221. c.Message = str
  222. e.handleReceivedMessage(c, handlers, inputChannel)
  223. case <-ctx.Done():
  224. return
  225. }
  226. }
  227. }
  228. func (e *Node) handleReceivedMessage(m *hub.Message, handlers []Handler, c chan *hub.Message) {
  229. for _, h := range handlers {
  230. if err := h(e.ledger, m, c); err != nil {
  231. e.config.Logger.Warnf("handler error: %s", err)
  232. }
  233. }
  234. }