dht.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "sync"
  18. "time"
  19. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  20. "github.com/mudler/edgevpn/pkg/utils"
  21. "github.com/ipfs/go-log"
  22. "github.com/libp2p/go-libp2p"
  23. dht "github.com/libp2p/go-libp2p-kad-dht"
  24. "github.com/libp2p/go-libp2p/core/host"
  25. "github.com/libp2p/go-libp2p/core/network"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. "github.com/libp2p/go-libp2p/core/routing"
  28. discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing"
  29. )
  30. type DHT struct {
  31. OTPKey string
  32. OTPInterval int
  33. KeyLength int
  34. RendezvousString string
  35. BootstrapPeers AddrList
  36. latestRendezvous string
  37. RefreshDiscoveryTime time.Duration
  38. *dht.IpfsDHT
  39. dhtOptions []dht.Option
  40. }
  41. func NewDHT(d ...dht.Option) *DHT {
  42. return &DHT{dhtOptions: d}
  43. }
  44. func (d *DHT) Option(ctx context.Context) func(c *libp2p.Config) error {
  45. return libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  46. // make the DHT with the given Host
  47. return d.startDHT(ctx, h)
  48. })
  49. }
  50. func (d *DHT) Rendezvous() string {
  51. if d.OTPKey != "" {
  52. totp := internalCrypto.TOTP(sha256.New, d.KeyLength, d.OTPInterval, d.OTPKey)
  53. rv := internalCrypto.MD5(totp)
  54. d.latestRendezvous = rv
  55. return rv
  56. }
  57. return d.RendezvousString
  58. }
  59. func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
  60. if d.IpfsDHT == nil {
  61. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  62. // client because we want each peer to maintain its own local copy of the
  63. // DHT, so that the bootstrapping node of the DHT can go down without
  64. // inhibiting future peer discovery.
  65. kad, err := dht.New(ctx, h, d.dhtOptions...)
  66. if err != nil {
  67. return d.IpfsDHT, err
  68. }
  69. d.IpfsDHT = kad
  70. }
  71. return d.IpfsDHT, nil
  72. }
  73. func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
  74. if d.KeyLength == 0 {
  75. d.KeyLength = 12
  76. }
  77. if len(d.BootstrapPeers) == 0 {
  78. d.BootstrapPeers = dht.DefaultBootstrapPeers
  79. }
  80. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  81. // client because we want each peer to maintain its own local copy of the
  82. // DHT, so that the bootstrapping node of the DHT can go down without
  83. // inhibiting future peer discovery.
  84. kademliaDHT, err := d.startDHT(ctx, host)
  85. if err != nil {
  86. return err
  87. }
  88. // Bootstrap the DHT. In the default configuration, this spawns a Background
  89. // thread that will refresh the peer table every five minutes.
  90. c.Info("Bootstrapping DHT")
  91. if err = kademliaDHT.Bootstrap(ctx); err != nil {
  92. return err
  93. }
  94. connect := func() {
  95. d.bootstrapPeers(c, ctx, host)
  96. if d.latestRendezvous != "" {
  97. d.announceAndConnect(c, ctx, kademliaDHT, host, d.latestRendezvous)
  98. }
  99. rv := d.Rendezvous()
  100. d.announceAndConnect(c, ctx, kademliaDHT, host, rv)
  101. }
  102. go func() {
  103. connect()
  104. t := utils.NewBackoffTicker(utils.BackoffMaxInterval(d.RefreshDiscoveryTime))
  105. defer t.Stop()
  106. for {
  107. select {
  108. case <-t.C:
  109. connect()
  110. case <-ctx.Done():
  111. return
  112. }
  113. }
  114. }()
  115. return nil
  116. }
  117. func (d *DHT) bootstrapPeers(c log.StandardLogger, ctx context.Context, host host.Host) {
  118. // Let's connect to the bootstrap nodes first. They will tell us about the
  119. // other nodes in the network.
  120. var wg sync.WaitGroup
  121. for _, peerAddr := range d.BootstrapPeers {
  122. peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
  123. wg.Add(1)
  124. go func() {
  125. defer wg.Done()
  126. if host.Network().Connectedness(peerinfo.ID) != network.Connected {
  127. if err := host.Connect(ctx, *peerinfo); err != nil {
  128. c.Debug(err.Error())
  129. } else {
  130. c.Debug("Connection established with bootstrap node:", *peerinfo)
  131. }
  132. }
  133. }()
  134. }
  135. wg.Wait()
  136. }
  137. func (d *DHT) FindClosePeers(ll log.StandardLogger, onlyStaticRelays bool, static ...string) func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
  138. return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
  139. peerChan := make(chan peer.AddrInfo, numPeers)
  140. go func() {
  141. toStream := []peer.AddrInfo{}
  142. if !onlyStaticRelays {
  143. closestPeers, err := d.GetClosestPeers(ctx, d.PeerID().String())
  144. if err != nil {
  145. close(peerChan)
  146. }
  147. for _, p := range closestPeers {
  148. addrs := d.Host().Peerstore().Addrs(p)
  149. if len(addrs) == 0 {
  150. continue
  151. }
  152. ll.Debugf("[relay discovery] Found close peer '%s'", p.Pretty())
  153. toStream = append(toStream, peer.AddrInfo{ID: p, Addrs: addrs})
  154. }
  155. }
  156. for _, r := range static {
  157. pi, err := peer.AddrInfoFromString(r)
  158. if err == nil {
  159. ll.Debug("[static relay discovery] scanning ", pi.ID)
  160. toStream = append(toStream, peer.AddrInfo{ID: pi.ID, Addrs: pi.Addrs})
  161. }
  162. }
  163. if len(toStream) > numPeers {
  164. toStream = toStream[0 : numPeers-1]
  165. }
  166. for _, t := range toStream {
  167. peerChan <- t
  168. }
  169. close(peerChan)
  170. }()
  171. return peerChan
  172. }
  173. }
  174. func (d *DHT) announceAndConnect(l log.StandardLogger, ctx context.Context, kademliaDHT *dht.IpfsDHT, host host.Host, rv string) error {
  175. l.Debug("Announcing ourselves...")
  176. routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
  177. routingDiscovery.Advertise(ctx, rv)
  178. l.Debug("Successfully announced!")
  179. // Now, look for others who have announced
  180. // This is like your friend telling you the location to meet you.
  181. l.Debug("Searching for other peers...")
  182. peerChan, err := routingDiscovery.FindPeers(ctx, rv)
  183. if err != nil {
  184. return err
  185. }
  186. for p := range peerChan {
  187. // Don't dial ourselves or peers without address
  188. if p.ID == host.ID() || len(p.Addrs) == 0 {
  189. continue
  190. }
  191. if host.Network().Connectedness(p.ID) != network.Connected {
  192. l.Debug("Found peer:", p)
  193. if err := host.Connect(ctx, p); err != nil {
  194. l.Debug("Failed connecting to", p)
  195. } else {
  196. l.Debug("Connected to:", p)
  197. }
  198. } else {
  199. l.Debug("Known peer (already connected):", p)
  200. }
  201. }
  202. return nil
  203. }