connection.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "io"
  19. mrand "math/rand"
  20. "net"
  21. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p/core/connmgr"
  24. "github.com/libp2p/go-libp2p/core/crypto"
  25. "github.com/libp2p/go-libp2p/core/host"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. conngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
  28. "github.com/libp2p/go-libp2p/p2p/net/swarm"
  29. hub "github.com/mudler/edgevpn/pkg/hub"
  30. multiaddr "github.com/multiformats/go-multiaddr"
  31. )
  32. // Host returns the libp2p peer host
  33. func (e *Node) Host() host.Host {
  34. return e.host
  35. }
  36. // ConnectionGater returns the underlying libp2p conngater
  37. func (e *Node) ConnectionGater() *conngater.BasicConnectionGater {
  38. return e.cg
  39. }
  40. // BlockSubnet blocks the CIDR subnet from connections
  41. func (e *Node) BlockSubnet(cidr string) error {
  42. // Avoid to loopback traffic by trying to connect to nodes in via VPN
  43. _, n, err := net.ParseCIDR(cidr)
  44. if err != nil {
  45. return err
  46. }
  47. return e.ConnectionGater().BlockSubnet(n)
  48. }
  49. func GenPrivKey(seed int64) (crypto.PrivKey, error) {
  50. var r io.Reader
  51. if seed == 0 {
  52. r = rand.Reader
  53. } else {
  54. r = mrand.New(mrand.NewSource(seed))
  55. }
  56. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
  57. return prvKey, err
  58. }
  59. func (e *Node) genHost(ctx context.Context) (host.Host, error) {
  60. var prvKey crypto.PrivKey
  61. opts := e.config.Options
  62. cg, err := conngater.NewBasicConnectionGater(nil)
  63. if err != nil {
  64. return nil, err
  65. }
  66. e.cg = cg
  67. if e.config.InterfaceAddress != "" {
  68. e.BlockSubnet(e.config.InterfaceAddress)
  69. }
  70. for _, b := range e.config.Blacklist {
  71. _, net, err := net.ParseCIDR(b)
  72. if err != nil {
  73. // Assume it's a peerID
  74. cg.BlockPeer(peer.ID(b))
  75. }
  76. if net != nil {
  77. cg.BlockSubnet(net)
  78. }
  79. }
  80. // generate privkey if not specified
  81. if len(e.config.PrivateKey) > 0 {
  82. prvKey, err = crypto.UnmarshalPrivateKey(e.config.PrivateKey)
  83. } else {
  84. prvKey, err = GenPrivKey(e.seed)
  85. }
  86. if err != nil {
  87. return nil, err
  88. }
  89. opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
  90. // Do not enable metrics for now
  91. opts = append(opts, libp2p.DisableMetrics())
  92. addrs := []multiaddr.Multiaddr{}
  93. for _, l := range e.config.ListenAddresses {
  94. addrs = append(addrs, []multiaddr.Multiaddr(l)...)
  95. }
  96. opts = append(opts, libp2p.ListenAddrs(addrs...))
  97. for _, d := range e.config.ServiceDiscovery {
  98. opts = append(opts, d.Option(ctx))
  99. }
  100. opts = append(opts, e.config.AdditionalOptions...)
  101. if e.config.Insecure {
  102. e.config.Logger.Info("Disabling Security transport layer")
  103. opts = append(opts, libp2p.NoSecurity)
  104. }
  105. opts = append(opts, FallbackDefaults)
  106. return libp2p.NewWithoutDefaults(opts...)
  107. }
  108. // FallbackDefaults applies default options to the libp2p node if and only if no
  109. // other relevant options have been applied. will be appended to the options
  110. // passed into New.
  111. var FallbackDefaults libp2p.Option = func(cfg *libp2p.Config) error {
  112. for _, def := range defaults {
  113. if !def.fallback(cfg) {
  114. continue
  115. }
  116. if err := cfg.Apply(def.opt); err != nil {
  117. return err
  118. }
  119. }
  120. return nil
  121. }
  122. var defaultUDPBlackHoleDetector = func(cfg *libp2p.Config) error {
  123. // A black hole is a binary property. On a network if UDP dials are blocked, all dials will
  124. // fail. So a low success rate of 5 out 100 dials is good enough.
  125. return cfg.Apply(libp2p.UDPBlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"}))
  126. }
  127. var defaultIPv6BlackHoleDetector = func(cfg *libp2p.Config) error {
  128. // A black hole is a binary property. On a network if there is no IPv6 connectivity, all
  129. // dials will fail. So a low success rate of 5 out 100 dials is good enough.
  130. return cfg.Apply(libp2p.IPv6BlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"}))
  131. }
  132. // Complete list of default options and when to fallback on them.
  133. //
  134. // Please *DON'T* specify default options any other way. Putting this all here
  135. // makes tracking defaults *much* easier.
  136. // https://github.com/libp2p/go-libp2p/blob/2209ae05976df6a1cc2631c961f57549d109008c/defaults.go#L227
  137. var defaults = []struct {
  138. fallback func(cfg *libp2p.Config) bool
  139. opt libp2p.Option
  140. }{
  141. {
  142. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil },
  143. opt: libp2p.DefaultListenAddrs,
  144. },
  145. {
  146. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK == nil },
  147. opt: libp2p.DefaultTransports,
  148. },
  149. {
  150. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK != nil },
  151. opt: libp2p.DefaultPrivateTransports,
  152. },
  153. {
  154. fallback: func(cfg *libp2p.Config) bool { return cfg.Muxers == nil },
  155. opt: libp2p.DefaultMuxers,
  156. },
  157. {
  158. fallback: func(cfg *libp2p.Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil },
  159. opt: libp2p.DefaultSecurity,
  160. },
  161. {
  162. fallback: func(cfg *libp2p.Config) bool { return cfg.PeerKey == nil },
  163. opt: libp2p.RandomIdentity,
  164. },
  165. {
  166. fallback: func(cfg *libp2p.Config) bool { return cfg.Peerstore == nil },
  167. opt: libp2p.DefaultPeerstore,
  168. },
  169. {
  170. fallback: func(cfg *libp2p.Config) bool { return !cfg.RelayCustom },
  171. opt: libp2p.DefaultEnableRelay,
  172. },
  173. //{
  174. // fallback: func(cfg *libp2p.Config) bool { return cfg.ResourceManager == nil },
  175. // opt: libp2p.DefaultResourceManager,
  176. //},
  177. {
  178. fallback: func(cfg *libp2p.Config) bool { return cfg.ConnManager == nil },
  179. // Filling the ConnManager is required, even if its a null one as libp2p will call functions of the
  180. // libp2p.Config.ConnManager so we need to have it not nil
  181. opt: libp2p.ConnectionManager(connmgr.NullConnMgr{}),
  182. },
  183. {
  184. fallback: func(cfg *libp2p.Config) bool {
  185. return !cfg.CustomUDPBlackHoleSuccessCounter && cfg.UDPBlackHoleSuccessCounter == nil
  186. },
  187. opt: defaultUDPBlackHoleDetector,
  188. },
  189. {
  190. fallback: func(cfg *libp2p.Config) bool {
  191. return !cfg.CustomIPv6BlackHoleSuccessCounter && cfg.IPv6BlackHoleSuccessCounter == nil
  192. },
  193. opt: defaultIPv6BlackHoleDetector,
  194. },
  195. //{
  196. // fallback: func(cfg *libp2p.Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
  197. // opt: libp2p.DefaultPrometheusRegisterer,
  198. //},
  199. }
  200. func (e *Node) sealkey() string {
  201. return internalCrypto.MD5(internalCrypto.TOTP(sha256.New, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.ExchangeKey))
  202. }
  203. func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message, roomMessages chan *hub.Message, pub func(*hub.Message) error, handlers []Handler, peerGater bool) {
  204. for {
  205. select {
  206. case m := <-inputChannel:
  207. if m == nil {
  208. continue
  209. }
  210. c := m.Copy()
  211. str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
  212. if err != nil {
  213. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  214. }
  215. c.Message = str
  216. if err := pub(c); err != nil {
  217. e.config.Logger.Warnf("publish error: %s", err)
  218. }
  219. case m := <-roomMessages:
  220. if m == nil {
  221. continue
  222. }
  223. if peerGater {
  224. if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
  225. e.config.Logger.Warnf("gated message from %s", m.SenderID)
  226. continue
  227. }
  228. }
  229. if len(e.config.PeerTable) > 0 {
  230. found := false
  231. for _, p := range e.config.PeerTable {
  232. if p.String() == peer.ID(m.SenderID).String() {
  233. found = true
  234. }
  235. }
  236. if !found {
  237. e.config.Logger.Warnf("gated message from %s - not present in peertable", m.SenderID)
  238. continue
  239. }
  240. }
  241. c := m.Copy()
  242. str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
  243. if err != nil {
  244. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  245. }
  246. c.Message = str
  247. e.handleReceivedMessage(c, handlers, inputChannel)
  248. case <-ctx.Done():
  249. return
  250. }
  251. }
  252. }
  253. func (e *Node) handleReceivedMessage(m *hub.Message, handlers []Handler, c chan *hub.Message) {
  254. for _, h := range handlers {
  255. if err := h(e.ledger, m, c); err != nil {
  256. e.config.Logger.Warnf("handler error: %s", err)
  257. }
  258. }
  259. }