dht.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package discovery
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/ipfs/go-log"
  7. "github.com/libp2p/go-libp2p"
  8. "github.com/libp2p/go-libp2p-core/host"
  9. "github.com/libp2p/go-libp2p-core/network"
  10. "github.com/libp2p/go-libp2p-core/peer"
  11. "github.com/libp2p/go-libp2p-core/routing"
  12. discovery "github.com/libp2p/go-libp2p-discovery"
  13. dht "github.com/libp2p/go-libp2p-kad-dht"
  14. "github.com/xlzd/gotp"
  15. )
  16. type DHT struct {
  17. OTPKey string
  18. OTPInterval int
  19. KeyLength int
  20. RendezvousString string
  21. BootstrapPeers AddrList
  22. latestRendezvous string
  23. console log.StandardLogger
  24. RefreshDiscoveryTime time.Duration
  25. dht *dht.IpfsDHT
  26. }
  27. func (d *DHT) Option(ctx context.Context) func(c *libp2p.Config) error {
  28. return libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  29. // make the DHT with the given Host
  30. return d.startDHT(ctx, h)
  31. })
  32. }
  33. func (d *DHT) Rendezvous() string {
  34. if d.OTPKey != "" {
  35. totp := gotp.NewTOTP(d.OTPKey, d.KeyLength, d.OTPInterval, nil)
  36. //totp := gotp.NewDefaultTOTP(d.OTPKey)
  37. rv := totp.Now()
  38. d.latestRendezvous = rv
  39. return rv
  40. }
  41. return d.RendezvousString
  42. }
  43. func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
  44. if d.dht == nil {
  45. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  46. // client because we want each peer to maintain its own local copy of the
  47. // DHT, so that the bootstrapping node of the DHT can go down without
  48. // inhibiting future peer discovery.
  49. kad, err := dht.New(ctx, h)
  50. if err != nil {
  51. return d.dht, err
  52. }
  53. d.dht = kad
  54. }
  55. return d.dht, nil
  56. }
  57. func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
  58. if d.KeyLength == 0 {
  59. d.KeyLength = 12
  60. }
  61. d.console = c
  62. if len(d.BootstrapPeers) == 0 {
  63. d.BootstrapPeers = dht.DefaultBootstrapPeers
  64. }
  65. // Start a DHT, for use in peer discovery. We can't just make a new DHT
  66. // client because we want each peer to maintain its own local copy of the
  67. // DHT, so that the bootstrapping node of the DHT can go down without
  68. // inhibiting future peer discovery.
  69. kademliaDHT, err := d.startDHT(ctx, host)
  70. if err != nil {
  71. return err
  72. }
  73. // Bootstrap the DHT. In the default configuration, this spawns a Background
  74. // thread that will refresh the peer table every five minutes.
  75. c.Info("Bootstrapping DHT")
  76. if err = kademliaDHT.Bootstrap(ctx); err != nil {
  77. return err
  78. }
  79. connect := func() {
  80. d.bootstrapPeers(c, ctx, host)
  81. if d.latestRendezvous != "" {
  82. d.announceAndConnect(ctx, kademliaDHT, host, d.latestRendezvous)
  83. }
  84. rv := d.Rendezvous()
  85. d.announceAndConnect(ctx, kademliaDHT, host, rv)
  86. }
  87. go func() {
  88. connect()
  89. for {
  90. // We don't want a ticker here but a timer
  91. // this is less "talkative" as a DHT connect() can take up
  92. // long time and can exceed d.RefreshdiscoveryTime.
  93. // In this way we ensure we wait at least timeout to fire a connect()
  94. timer := time.NewTimer(d.RefreshDiscoveryTime)
  95. select {
  96. case <-ctx.Done():
  97. timer.Stop()
  98. return
  99. case <-timer.C:
  100. connect()
  101. }
  102. }
  103. }()
  104. return nil
  105. }
  106. func (d *DHT) bootstrapPeers(c log.StandardLogger, ctx context.Context, host host.Host) {
  107. // Let's connect to the bootstrap nodes first. They will tell us about the
  108. // other nodes in the network.
  109. var wg sync.WaitGroup
  110. for _, peerAddr := range d.BootstrapPeers {
  111. peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
  112. wg.Add(1)
  113. go func() {
  114. defer wg.Done()
  115. if host.Network().Connectedness(peerinfo.ID) != network.Connected {
  116. if err := host.Connect(ctx, *peerinfo); err != nil {
  117. c.Debug(err.Error())
  118. } else {
  119. c.Debug("Connection established with bootstrap node:", *peerinfo)
  120. }
  121. }
  122. }()
  123. }
  124. wg.Wait()
  125. }
  126. func (d *DHT) announceAndConnect(ctx context.Context, kademliaDHT *dht.IpfsDHT, host host.Host, rv string) error {
  127. d.console.Debug("Announcing ourselves...")
  128. routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
  129. discovery.Advertise(ctx, routingDiscovery, rv)
  130. d.console.Debug("Successfully announced!")
  131. // Now, look for others who have announced
  132. // This is like your friend telling you the location to meet you.
  133. d.console.Debug("Searching for other peers...")
  134. peerChan, err := routingDiscovery.FindPeers(ctx, rv)
  135. if err != nil {
  136. return err
  137. }
  138. for p := range peerChan {
  139. // Don't dial ourselves or peers without address
  140. if p.ID == host.ID() || len(p.Addrs) == 0 {
  141. continue
  142. }
  143. if host.Network().Connectedness(p.ID) != network.Connected {
  144. d.console.Debug("Found peer:", p)
  145. if err := host.Connect(ctx, p); err != nil {
  146. d.console.Debug("Failed connecting to", p)
  147. } else {
  148. d.console.Debug("Connected to:", p)
  149. }
  150. } else {
  151. d.console.Debug("Known peer (already connected):", p)
  152. }
  153. }
  154. return nil
  155. }