connection.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "io"
  19. mrand "math/rand"
  20. "net"
  21. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p/core/crypto"
  24. "github.com/libp2p/go-libp2p/core/host"
  25. "github.com/libp2p/go-libp2p/core/peer"
  26. conngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
  27. "github.com/libp2p/go-libp2p/p2p/net/swarm"
  28. hub "github.com/mudler/edgevpn/pkg/hub"
  29. multiaddr "github.com/multiformats/go-multiaddr"
  30. )
  31. // Host returns the libp2p peer host
  32. func (e *Node) Host() host.Host {
  33. return e.host
  34. }
  35. // ConnectionGater returns the underlying libp2p conngater
  36. func (e *Node) ConnectionGater() *conngater.BasicConnectionGater {
  37. return e.cg
  38. }
  39. // BlockSubnet blocks the CIDR subnet from connections
  40. func (e *Node) BlockSubnet(cidr string) error {
  41. // Avoid to loopback traffic by trying to connect to nodes in via VPN
  42. _, n, err := net.ParseCIDR(cidr)
  43. if err != nil {
  44. return err
  45. }
  46. return e.ConnectionGater().BlockSubnet(n)
  47. }
  48. func GenPrivKey(seed int64) (crypto.PrivKey, error) {
  49. var r io.Reader
  50. if seed == 0 {
  51. r = rand.Reader
  52. } else {
  53. r = mrand.New(mrand.NewSource(seed))
  54. }
  55. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 4096, r)
  56. return prvKey, err
  57. }
  58. func (e *Node) genHost(ctx context.Context) (host.Host, error) {
  59. var prvKey crypto.PrivKey
  60. opts := e.config.Options
  61. cg, err := conngater.NewBasicConnectionGater(nil)
  62. if err != nil {
  63. return nil, err
  64. }
  65. e.cg = cg
  66. if e.config.InterfaceAddress != "" {
  67. e.BlockSubnet(e.config.InterfaceAddress)
  68. }
  69. for _, b := range e.config.Blacklist {
  70. _, net, err := net.ParseCIDR(b)
  71. if err != nil {
  72. // Assume it's a peerID
  73. cg.BlockPeer(peer.ID(b))
  74. }
  75. if net != nil {
  76. cg.BlockSubnet(net)
  77. }
  78. }
  79. // generate privkey if not specified
  80. if len(e.config.PrivateKey) > 0 {
  81. prvKey, err = crypto.UnmarshalPrivateKey(e.config.PrivateKey)
  82. } else {
  83. prvKey, err = GenPrivKey(e.seed)
  84. }
  85. if err != nil {
  86. return nil, err
  87. }
  88. opts = append(opts, libp2p.ConnectionGater(cg), libp2p.Identity(prvKey))
  89. // Do not enable metrics for now
  90. opts = append(opts, libp2p.DisableMetrics())
  91. addrs := []multiaddr.Multiaddr{}
  92. for _, l := range e.config.ListenAddresses {
  93. addrs = append(addrs, []multiaddr.Multiaddr(l)...)
  94. }
  95. opts = append(opts, libp2p.ListenAddrs(addrs...))
  96. for _, d := range e.config.ServiceDiscovery {
  97. opts = append(opts, d.Option(ctx))
  98. }
  99. opts = append(opts, e.config.AdditionalOptions...)
  100. if e.config.Insecure {
  101. e.config.Logger.Info("Disabling Security transport layer")
  102. opts = append(opts, libp2p.NoSecurity)
  103. }
  104. opts = append(opts, FallbackDefaults)
  105. return libp2p.NewWithoutDefaults(opts...)
  106. }
  107. // FallbackDefaults applies default options to the libp2p node if and only if no
  108. // other relevant options have been applied. will be appended to the options
  109. // passed into New.
  110. var FallbackDefaults libp2p.Option = func(cfg *libp2p.Config) error {
  111. for _, def := range defaults {
  112. if !def.fallback(cfg) {
  113. continue
  114. }
  115. if err := cfg.Apply(def.opt); err != nil {
  116. return err
  117. }
  118. }
  119. return nil
  120. }
  121. var defaultUDPBlackHoleDetector = func(cfg *libp2p.Config) error {
  122. // A black hole is a binary property. On a network if UDP dials are blocked, all dials will
  123. // fail. So a low success rate of 5 out 100 dials is good enough.
  124. return cfg.Apply(libp2p.UDPBlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"}))
  125. }
  126. var defaultIPv6BlackHoleDetector = func(cfg *libp2p.Config) error {
  127. // A black hole is a binary property. On a network if there is no IPv6 connectivity, all
  128. // dials will fail. So a low success rate of 5 out 100 dials is good enough.
  129. return cfg.Apply(libp2p.IPv6BlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"}))
  130. }
  131. // Complete list of default options and when to fallback on them.
  132. //
  133. // Please *DON'T* specify default options any other way. Putting this all here
  134. // makes tracking defaults *much* easier.
  135. // https://github.com/libp2p/go-libp2p/blob/2209ae05976df6a1cc2631c961f57549d109008c/defaults.go#L227
  136. var defaults = []struct {
  137. fallback func(cfg *libp2p.Config) bool
  138. opt libp2p.Option
  139. }{
  140. {
  141. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil },
  142. opt: libp2p.DefaultListenAddrs,
  143. },
  144. {
  145. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK == nil },
  146. opt: libp2p.DefaultTransports,
  147. },
  148. {
  149. fallback: func(cfg *libp2p.Config) bool { return cfg.Transports == nil && cfg.PSK != nil },
  150. opt: libp2p.DefaultPrivateTransports,
  151. },
  152. {
  153. fallback: func(cfg *libp2p.Config) bool { return cfg.Muxers == nil },
  154. opt: libp2p.DefaultMuxers,
  155. },
  156. {
  157. fallback: func(cfg *libp2p.Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil },
  158. opt: libp2p.DefaultSecurity,
  159. },
  160. {
  161. fallback: func(cfg *libp2p.Config) bool { return cfg.PeerKey == nil },
  162. opt: libp2p.RandomIdentity,
  163. },
  164. {
  165. fallback: func(cfg *libp2p.Config) bool { return cfg.Peerstore == nil },
  166. opt: libp2p.DefaultPeerstore,
  167. },
  168. {
  169. fallback: func(cfg *libp2p.Config) bool { return !cfg.RelayCustom },
  170. opt: libp2p.DefaultEnableRelay,
  171. },
  172. {
  173. fallback: func(cfg *libp2p.Config) bool { return cfg.ResourceManager == nil },
  174. opt: libp2p.DefaultResourceManager,
  175. },
  176. //{
  177. // fallback: func(cfg *libp2p.Config) bool { return cfg.ResourceManager == nil },
  178. // opt: libp2p.DefaultResourceManager,
  179. //},
  180. {
  181. fallback: func(cfg *libp2p.Config) bool { return cfg.ConnManager == nil },
  182. // Filling the ConnManager is required, even if its a null one as libp2p will call functions of the
  183. // libp2p.Config.ConnManager so we need to have it not nil
  184. opt: libp2p.DefaultConnectionManager,
  185. //opt: libp2p.ConnectionManager(connmgr.NullConnMgr{}),
  186. },
  187. {
  188. fallback: func(cfg *libp2p.Config) bool {
  189. return !cfg.CustomUDPBlackHoleSuccessCounter && cfg.UDPBlackHoleSuccessCounter == nil
  190. },
  191. opt: defaultUDPBlackHoleDetector,
  192. },
  193. {
  194. fallback: func(cfg *libp2p.Config) bool {
  195. return !cfg.CustomIPv6BlackHoleSuccessCounter && cfg.IPv6BlackHoleSuccessCounter == nil
  196. },
  197. opt: defaultIPv6BlackHoleDetector,
  198. },
  199. //{
  200. // fallback: func(cfg *libp2p.Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
  201. // opt: libp2p.DefaultPrometheusRegisterer,
  202. //},
  203. }
  204. func (e *Node) sealkey() string {
  205. return internalCrypto.MD5(internalCrypto.TOTP(sha256.New, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.ExchangeKey))
  206. }
  207. func (e *Node) handleEvents(ctx context.Context, inputChannel chan *hub.Message, roomMessages chan *hub.Message, pub func(*hub.Message) error, handlers []Handler, peerGater bool) {
  208. for {
  209. select {
  210. case m := <-inputChannel:
  211. if m == nil {
  212. continue
  213. }
  214. c := m.Copy()
  215. str, err := e.config.Sealer.Seal(c.Message, e.sealkey())
  216. if err != nil {
  217. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  218. }
  219. c.Message = str
  220. if err := pub(c); err != nil {
  221. e.config.Logger.Warnf("publish error: %s", err)
  222. }
  223. case m := <-roomMessages:
  224. if m == nil {
  225. continue
  226. }
  227. if peerGater {
  228. if e.config.PeerGater != nil && e.config.PeerGater.Gate(e, peer.ID(m.SenderID)) {
  229. e.config.Logger.Warnf("gated message from %s", m.SenderID)
  230. continue
  231. }
  232. }
  233. if len(e.config.PeerTable) > 0 {
  234. found := false
  235. for _, p := range e.config.PeerTable {
  236. if p.String() == peer.ID(m.SenderID).String() {
  237. found = true
  238. }
  239. }
  240. if !found {
  241. e.config.Logger.Warnf("gated message from %s - not present in peertable", m.SenderID)
  242. continue
  243. }
  244. }
  245. c := m.Copy()
  246. str, err := e.config.Sealer.Unseal(c.Message, e.sealkey())
  247. if err != nil {
  248. e.config.Logger.Warnf("%w from %s", err.Error(), c.SenderID)
  249. }
  250. c.Message = str
  251. e.handleReceivedMessage(c, handlers, inputChannel)
  252. case <-ctx.Done():
  253. return
  254. }
  255. }
  256. }
  257. func (e *Node) handleReceivedMessage(m *hub.Message, handlers []Handler, c chan *hub.Message) {
  258. for _, h := range handlers {
  259. if err := h(e.ledger, m, c); err != nil {
  260. e.config.Logger.Warnf("handler error: %s", err)
  261. }
  262. }
  263. }