dht.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright © 2021 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package discovery
  16. import (
  17. "context"
  18. "sync"
  19. "time"
  20. internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
  21. "github.com/ipfs/go-log"
  22. "github.com/libp2p/go-libp2p"
  23. "github.com/libp2p/go-libp2p-core/host"
  24. "github.com/libp2p/go-libp2p-core/network"
  25. "github.com/libp2p/go-libp2p-core/peer"
  26. "github.com/libp2p/go-libp2p-core/routing"
  27. discovery "github.com/libp2p/go-libp2p-discovery"
  28. dht "github.com/libp2p/go-libp2p-kad-dht"
  29. "github.com/xlzd/gotp"
  30. )
  31. type DHT struct {
  32. OTPKey string
  33. OTPInterval int
  34. KeyLength int
  35. RendezvousString string
  36. BootstrapPeers AddrList
  37. latestRendezvous string
  38. RefreshDiscoveryTime time.Duration
  39. *dht.IpfsDHT
  40. dhtOptions []dht.Option
  41. }
  42. func NewDHT(d ...dht.Option) *DHT {
  43. return &DHT{dhtOptions: d}
  44. }
  45. func (d *DHT) Option(ctx context.Context) func(c *libp2p.Config) error {
  46. return libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  47. // make the DHT with the given Host
  48. return d.startDHT(ctx, h)
  49. })
  50. }
  51. func (d *DHT) Rendezvous() string {
  52. if d.OTPKey != "" {
  53. totp := gotp.NewTOTP(d.OTPKey, d.KeyLength, d.OTPInterval, nil)
  54. //totp := gotp.NewDefaultTOTP(d.OTPKey)
  55. rv := internalCrypto.MD5(totp.Now())
  56. d.latestRendezvous = rv
  57. return rv
  58. }
  59. return d.RendezvousString
  60. }
  61. func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
  62. if d.IpfsDHT == nil {
  63. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  64. // client because we want each peer to maintain its own local copy of the
  65. // DHT, so that the bootstrapping node of the DHT can go down without
  66. // inhibiting future peer discovery.
  67. kad, err := dht.New(ctx, h, d.dhtOptions...)
  68. if err != nil {
  69. return d.IpfsDHT, err
  70. }
  71. d.IpfsDHT = kad
  72. }
  73. return d.IpfsDHT, nil
  74. }
  75. func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
  76. if d.KeyLength == 0 {
  77. d.KeyLength = 12
  78. }
  79. if len(d.BootstrapPeers) == 0 {
  80. d.BootstrapPeers = dht.DefaultBootstrapPeers
  81. }
  82. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  83. // client because we want each peer to maintain its own local copy of the
  84. // DHT, so that the bootstrapping node of the DHT can go down without
  85. // inhibiting future peer discovery.
  86. kademliaDHT, err := d.startDHT(ctx, host)
  87. if err != nil {
  88. return err
  89. }
  90. // Bootstrap the DHT. In the default configuration, this spawns a Background
  91. // thread that will refresh the peer table every five minutes.
  92. c.Info("Bootstrapping DHT")
  93. if err = kademliaDHT.Bootstrap(ctx); err != nil {
  94. return err
  95. }
  96. connect := func() {
  97. d.bootstrapPeers(c, ctx, host)
  98. if d.latestRendezvous != "" {
  99. d.announceAndConnect(c, ctx, kademliaDHT, host, d.latestRendezvous)
  100. }
  101. rv := d.Rendezvous()
  102. d.announceAndConnect(c, ctx, kademliaDHT, host, rv)
  103. }
  104. go func() {
  105. connect()
  106. for {
  107. // We don't want a ticker here but a timer
  108. // this is less "talkative" as a DHT connect() can take up
  109. // long time and can exceed d.RefreshdiscoveryTime.
  110. // In this way we ensure we wait at least timeout to fire a connect()
  111. timer := time.NewTimer(d.RefreshDiscoveryTime)
  112. select {
  113. case <-ctx.Done():
  114. timer.Stop()
  115. return
  116. case <-timer.C:
  117. connect()
  118. }
  119. }
  120. }()
  121. return nil
  122. }
  123. func (d *DHT) bootstrapPeers(c log.StandardLogger, ctx context.Context, host host.Host) {
  124. // Let's connect to the bootstrap nodes first. They will tell us about the
  125. // other nodes in the network.
  126. var wg sync.WaitGroup
  127. for _, peerAddr := range d.BootstrapPeers {
  128. peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
  129. wg.Add(1)
  130. go func() {
  131. defer wg.Done()
  132. if host.Network().Connectedness(peerinfo.ID) != network.Connected {
  133. if err := host.Connect(ctx, *peerinfo); err != nil {
  134. c.Debug(err.Error())
  135. } else {
  136. c.Debug("Connection established with bootstrap node:", *peerinfo)
  137. }
  138. }
  139. }()
  140. }
  141. wg.Wait()
  142. }
  143. func (d *DHT) announceAndConnect(l log.StandardLogger, ctx context.Context, kademliaDHT *dht.IpfsDHT, host host.Host, rv string) error {
  144. l.Debug("Announcing ourselves...")
  145. routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
  146. discovery.Advertise(ctx, routingDiscovery, rv)
  147. l.Debug("Successfully announced!")
  148. // Now, look for others who have announced
  149. // This is like your friend telling you the location to meet you.
  150. l.Debug("Searching for other peers...")
  151. peerChan, err := routingDiscovery.FindPeers(ctx, rv)
  152. if err != nil {
  153. return err
  154. }
  155. for p := range peerChan {
  156. // Don't dial ourselves or peers without address
  157. if p.ID == host.ID() || len(p.Addrs) == 0 {
  158. continue
  159. }
  160. if host.Network().Connectedness(p.ID) != network.Connected {
  161. l.Debug("Found peer:", p)
  162. if err := host.Connect(ctx, p); err != nil {
  163. l.Debug("Failed connecting to", p)
  164. } else {
  165. l.Debug("Connected to:", p)
  166. }
  167. } else {
  168. l.Debug("Known peer (already connected):", p)
  169. }
  170. }
  171. return nil
  172. }