dht.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "sync"
  18. "time"
  19. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  20. "github.com/mudler/edgevpn/pkg/utils"
  21. "github.com/ipfs/go-log"
  22. "github.com/libp2p/go-libp2p"
  23. dht "github.com/libp2p/go-libp2p-kad-dht"
  24. "github.com/libp2p/go-libp2p/core/host"
  25. "github.com/libp2p/go-libp2p/core/network"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. "github.com/libp2p/go-libp2p/core/routing"
  28. discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing"
  29. )
  30. type DHT struct {
  31. OTPKey string
  32. OTPInterval int
  33. KeyLength int
  34. RendezvousString string
  35. BootstrapPeers AddrList
  36. rendezvousHistory Ring
  37. RefreshDiscoveryTime time.Duration
  38. *dht.IpfsDHT
  39. dhtOptions []dht.Option
  40. }
  41. func NewDHT(d ...dht.Option) *DHT {
  42. return &DHT{dhtOptions: d, rendezvousHistory: Ring{Length: 2}}
  43. }
  44. func (d *DHT) Option(ctx context.Context) func(c *libp2p.Config) error {
  45. return libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  46. // make the DHT with the given Host
  47. return d.startDHT(ctx, h)
  48. })
  49. }
  50. func (d *DHT) Rendezvous() string {
  51. if d.OTPKey != "" {
  52. totp := internalCrypto.TOTP(sha256.New, d.KeyLength, d.OTPInterval, d.OTPKey)
  53. rv := internalCrypto.MD5(totp)
  54. return rv
  55. }
  56. return d.RendezvousString
  57. }
  58. func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
  59. if d.IpfsDHT == nil {
  60. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  61. // client because we want each peer to maintain its own local copy of the
  62. // DHT, so that the bootstrapping node of the DHT can go down without
  63. // inhibiting future peer discovery.
  64. kad, err := dht.New(ctx, h, d.dhtOptions...)
  65. if err != nil {
  66. return d.IpfsDHT, err
  67. }
  68. d.IpfsDHT = kad
  69. }
  70. return d.IpfsDHT, nil
  71. }
  72. func (d *DHT) announceRendezvous(c log.StandardLogger, ctx context.Context, host host.Host, kademliaDHT *dht.IpfsDHT) {
  73. d.bootstrapPeers(c, ctx, host)
  74. rv := d.Rendezvous()
  75. d.rendezvousHistory.Add(rv)
  76. c.Debugf("The following rendezvous points are being used: %+v", d.rendezvousHistory.Data)
  77. for _, r := range d.rendezvousHistory.Data {
  78. c.Debugf("Announcing with rendezvous: %s", r)
  79. d.announceAndConnect(c, ctx, kademliaDHT, host, r)
  80. }
  81. c.Debug("Announcing to rendezvous done")
  82. }
  83. func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
  84. if d.KeyLength == 0 {
  85. d.KeyLength = 12
  86. }
  87. if len(d.BootstrapPeers) == 0 {
  88. d.BootstrapPeers = dht.DefaultBootstrapPeers
  89. }
  90. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  91. // client because we want each peer to maintain its own local copy of the
  92. // DHT, so that the bootstrapping node of the DHT can go down without
  93. // inhibiting future peer discovery.
  94. kademliaDHT, err := d.startDHT(ctx, host)
  95. if err != nil {
  96. return err
  97. }
  98. // Bootstrap the DHT. In the default configuration, this spawns a Background
  99. // thread that will refresh the peer table every five minutes.
  100. c.Info("Bootstrapping DHT")
  101. if err = kademliaDHT.Bootstrap(ctx); err != nil {
  102. return err
  103. }
  104. go d.runBackground(c, ctx, host, kademliaDHT)
  105. return nil
  106. }
  107. func (d *DHT) runBackground(c log.StandardLogger, ctx context.Context, host host.Host, kademliaDHT *dht.IpfsDHT) {
  108. d.announceRendezvous(c, ctx, host, kademliaDHT)
  109. t := utils.NewBackoffTicker(utils.BackoffMaxInterval(d.RefreshDiscoveryTime))
  110. defer t.Stop()
  111. for {
  112. select {
  113. case <-t.C:
  114. // We announce ourselves to the rendezvous point for all the peers.
  115. // We have a safeguard of 1 hour to avoid blocking the main loop
  116. // in case of network issues.
  117. // The TTL of DHT is by default no longer than 3 hours, so we should
  118. // be safe by having an entry less than that.
  119. safeTimeout, cancel := context.WithTimeout(ctx, time.Hour)
  120. endChan := make(chan struct{})
  121. go func() {
  122. d.announceRendezvous(c, safeTimeout, host, kademliaDHT)
  123. endChan <- struct{}{}
  124. }()
  125. select {
  126. case <-endChan:
  127. cancel()
  128. case <-safeTimeout.Done():
  129. c.Error("Timeout while announcing rendezvous")
  130. cancel()
  131. }
  132. case <-ctx.Done():
  133. return
  134. }
  135. }
  136. }
  137. func (d *DHT) bootstrapPeers(c log.StandardLogger, ctx context.Context, host host.Host) {
  138. // Let's connect to the bootstrap nodes first. They will tell us about the
  139. // other nodes in the network.
  140. var wg sync.WaitGroup
  141. for _, peerAddr := range d.BootstrapPeers {
  142. peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
  143. wg.Add(1)
  144. go func() {
  145. defer wg.Done()
  146. if host.Network().Connectedness(peerinfo.ID) != network.Connected {
  147. if err := host.Connect(ctx, *peerinfo); err != nil {
  148. c.Debug(err.Error())
  149. } else {
  150. c.Debug("Connection established with bootstrap node:", *peerinfo)
  151. }
  152. }
  153. }()
  154. }
  155. wg.Wait()
  156. }
  157. func (d *DHT) FindClosePeers(ll log.StandardLogger, onlyStaticRelays bool, static ...string) func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
  158. return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
  159. peerChan := make(chan peer.AddrInfo, numPeers)
  160. go func() {
  161. toStream := []peer.AddrInfo{}
  162. if !onlyStaticRelays {
  163. closestPeers, err := d.GetClosestPeers(ctx, d.PeerID().String())
  164. if err != nil {
  165. ll.Debug("Error getting closest peers: ", err)
  166. }
  167. for _, p := range closestPeers {
  168. addrs := d.Host().Peerstore().Addrs(p)
  169. if len(addrs) == 0 {
  170. continue
  171. }
  172. ll.Debugf("[relay discovery] Found close peer '%s'", p.String())
  173. toStream = append(toStream, peer.AddrInfo{ID: p, Addrs: addrs})
  174. }
  175. }
  176. for _, r := range static {
  177. pi, err := peer.AddrInfoFromString(r)
  178. if err == nil {
  179. ll.Debug("[static relay discovery] scanning ", pi.ID)
  180. toStream = append(toStream, peer.AddrInfo{ID: pi.ID, Addrs: pi.Addrs})
  181. }
  182. }
  183. if len(toStream) > numPeers {
  184. toStream = toStream[0 : numPeers-1]
  185. }
  186. for _, t := range toStream {
  187. peerChan <- t
  188. }
  189. close(peerChan)
  190. }()
  191. return peerChan
  192. }
  193. }
  194. func (d *DHT) announceAndConnect(l log.StandardLogger, ctx context.Context, kademliaDHT *dht.IpfsDHT, host host.Host, rv string) error {
  195. l.Debug("Announcing ourselves...")
  196. tCtx, c := context.WithTimeout(ctx, time.Second*120)
  197. defer c()
  198. routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
  199. routingDiscovery.Advertise(tCtx, rv)
  200. l.Debug("Successfully announced!")
  201. // Now, look for others who have announced
  202. // This is like your friend telling you the location to meet you.
  203. l.Debug("Searching for other peers...")
  204. fCtx, cf := context.WithTimeout(ctx, time.Second*120)
  205. defer cf()
  206. peerChan, err := routingDiscovery.FindPeers(fCtx, rv)
  207. if err != nil {
  208. return err
  209. }
  210. for p := range peerChan {
  211. // Don't dial ourselves or peers without address
  212. if p.ID == host.ID() || len(p.Addrs) == 0 {
  213. continue
  214. }
  215. if host.Network().Connectedness(p.ID) != network.Connected {
  216. l.Debug("Found peer:", p)
  217. timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*120)
  218. defer cancel()
  219. if err := host.Connect(timeoutCtx, p); err != nil {
  220. l.Debugf("Failed connecting to '%s', error: '%s'", p, err.Error())
  221. } else {
  222. l.Debug("Connected to:", p)
  223. }
  224. } else {
  225. l.Debug("Known peer (already connected):", p)
  226. }
  227. }
  228. l.Debug("Finished searching for peers.")
  229. return nil
  230. }