connection.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "io"
  19. mrand "math/rand"
  20. "net"
  21. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p/core/connmgr"
  24. "github.com/libp2p/go-libp2p/core/crypto"
  25. "github.com/libp2p/go-libp2p/core/host"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. conngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
  28. hub "github.com/mudler/edgevpn/pkg/hub"
  29. multiaddr "github.com/multiformats/go-multiaddr"
  30. )
  31. // Host returns the libp2p peer host
  32. func (e *Node) Host() host.Host {
  33. return e.host
  34. }
  35. // ConnectionGater returns the underlying libp2p conngater
  36. func (e *Node) ConnectionGater() *conngater.BasicConnectionGater {
  37. return e.cg
  38. }
  39. // BlockSubnet blocks the CIDR subnet from connections
  40. func (e *Node) BlockSubnet(cidr string) error {
  41. // Avoid to loopback traffic by trying to connect to nodes in via VPN
  42. _, n, err := net.ParseCIDR(cidr)
  43. if err != nil {
  44. return err
  45. }
  46. return e.ConnectionGater().BlockSubnet(n)
  47. }
  48. func GenPrivKey(seed int64) (crypto.PrivKey, error) {
  49. var r io.Reader
  50. if seed == 0 {
  51. r = rand.Reader
  52. } else {
  53. r = mrand.New(mrand.NewSource(seed))
  54. }
  55. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
  56. return prvKey, err
  57. }
  58. func (e *Node) genHost(ctx context.Context) (host.Host, error) {
  59. var prvKey crypto.PrivKey
  60. opts := e.config.Options
  61. cg, err := conngater.NewBasicConnectionGater(nil)
  62. if err != nil {
  63. return nil, err
  64. }
  65. e.cg = cg
  66. if e.config.InterfaceAddress != "" {
  67. e.BlockSubnet(e.config.InterfaceAddress)
  68. }
  69. for _, b := range e.config.Blacklist {
  70. _, net, err := net.ParseCIDR(b)
  71. if err != nil {
  72. // Assume it's a peerID
  73. cg.BlockPeer(peer.ID(b))
  74. }
  75. if net != nil {
  76. cg.BlockSubnet(net)
  77. }
  78. }
  79. // generate privkey if not specified
  80. if len(e.config.PrivateKey) > 0 {
  81. prvKey, err = crypto.UnmarshalPrivateKey(e.config.PrivateKey)
  82. } else {
  83. prvKey, err = GenPrivKey(e.seed)
  84. }
  85. if err != nil {
  86. return nil, err
  87. }
  88. opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
  89. // Do not enable metrics for now
  90. opts = append(opts, libp2p.DisableMetrics())
  91. addrs := []multiaddr.Multiaddr{}
  92. for _, l := range e.config.ListenAddresses {
  93. addrs = append(addrs, []multiaddr.Multiaddr(l)...)
  94. }
  95. opts = append(opts, libp2p.ListenAddrs(addrs...))
  96. for _, d := range e.config.ServiceDiscovery {
  97. opts = append(opts, d.Option(ctx))
  98. }
  99. opts = append(opts, e.config.AdditionalOptions...)
  100. if e.config.Insecure {
  101. e.config.Logger.Info("Disabling Security transport layer")
  102. opts = append(opts, libp2p.NoSecurity)
  103. }
  104. opts = append(opts, FallbackDefaults)
  105. return libp2p.NewWithoutDefaults(opts...)
  106. }
  107. // FallbackDefaults applies default options to the libp2p node if and only if no
  108. // other relevant options have been applied. will be appended to the options
  109. // passed into New.
  110. var FallbackDefaults libp2p.Option = func(cfg *libp2p.Config) error {
  111. for _, def := range defaults {
  112. if !def.fallback(cfg) {
  113. continue
  114. }
  115. if err := cfg.Apply(def.opt); err != nil {
  116. return err
  117. }
  118. }
  119. return nil
  120. }
  121. // Complete list of default options and when to fallback on them.
  122. //
  123. // Please *DON'T* specify default options any other way. Putting this all here
  124. // makes tracking defaults *much* easier.
  125. var defaults = []struct {
  126. fallback func(cfg *libp2p.Config) bool
  127. opt libp2p.Option
  128. }{
  129. {
  130. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil },
  131. opt: libp2p.DefaultListenAddrs,
  132. },
  133. {
  134. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK == nil },
  135. opt: libp2p.DefaultTransports,
  136. },
  137. {
  138. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK != nil },
  139. opt: libp2p.DefaultPrivateTransports,
  140. },
  141. {
  142. fallback: func(cfg *libp2p.Config) bool { return cfg.Muxers == nil },
  143. opt: libp2p.DefaultMuxers,
  144. },
  145. {
  146. fallback: func(cfg *libp2p.Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil },
  147. opt: libp2p.DefaultSecurity,
  148. },
  149. {
  150. fallback: func(cfg *libp2p.Config) bool { return cfg.PeerKey == nil },
  151. opt: libp2p.RandomIdentity,
  152. },
  153. {
  154. fallback: func(cfg *libp2p.Config) bool { return cfg.Peerstore == nil },
  155. opt: libp2p.DefaultPeerstore,
  156. },
  157. {
  158. fallback: func(cfg *libp2p.Config) bool { return !cfg.RelayCustom },
  159. opt: libp2p.DefaultEnableRelay,
  160. },
  161. //{
  162. // fallback: func(cfg *libp2p.Config) bool { return cfg.ResourceManager == nil },
  163. // opt: libp2p.DefaultResourceManager,
  164. //},
  165. {
  166. fallback: func(cfg *libp2p.Config) bool { return cfg.ConnManager == nil },
  167. // Filling the ConnManager is required, even if its a null one as libp2p will call functions of the
  168. // libp2p.Config.ConnManager so we need to have it not nil
  169. opt: libp2p.ConnectionManager(connmgr.NullConnMgr{}),
  170. },
  171. {
  172. fallback: func(cfg *libp2p.Config) bool { return cfg.MultiaddrResolver == nil },
  173. opt: libp2p.DefaultMultiaddrResolver,
  174. },
  175. //{
  176. // fallback: func(cfg *libp2p.Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
  177. // opt: libp2p.DefaultPrometheusRegisterer,
  178. //},
  179. }
  180. func (e *Node) sealkey() string {
  181. return internalCrypto.MD5(internalCrypto.TOTP(sha256.New, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.ExchangeKey))
  182. }
  183. func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message, roomMessages chan *hub.Message, pub func(*hub.Message) error, handlers []Handler, peerGater bool) {
  184. for {
  185. select {
  186. case m := <-inputChannel:
  187. if m == nil {
  188. continue
  189. }
  190. c := m.Copy()
  191. str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
  192. if err != nil {
  193. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  194. }
  195. c.Message = str
  196. if err := pub(c); err != nil {
  197. e.config.Logger.Warnf("publish error: %s", err)
  198. }
  199. case m := <-roomMessages:
  200. if m == nil {
  201. continue
  202. }
  203. if peerGater {
  204. if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
  205. e.config.Logger.Warnf("gated message from %s", m.SenderID)
  206. continue
  207. }
  208. }
  209. if len(e.config.PeerTable) > 0 {
  210. found := false
  211. for _, p := range e.config.PeerTable {
  212. if p.String() == peer.ID(m.SenderID).String() {
  213. found = true
  214. }
  215. }
  216. if !found {
  217. e.config.Logger.Warnf("gated message from %s - not present in peertable", m.SenderID)
  218. continue
  219. }
  220. }
  221. c := m.Copy()
  222. str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
  223. if err != nil {
  224. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  225. }
  226. c.Message = str
  227. e.handleReceivedMessage(c, handlers, inputChannel)
  228. case <-ctx.Done():
  229. return
  230. }
  231. }
  232. }
  233. func (e *Node) handleReceivedMessage(m *hub.Message, handlers []Handler, c chan *hub.Message) {
  234. for _, h := range handlers {
  235. if err := h(e.ledger, m, c); err != nil {
  236. e.config.Logger.Warnf("handler error: %s", err)
  237. }
  238. }
  239. }