123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- /*
- Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package discovery
- import (
- "context"
- "crypto/sha256"
- "sync"
- "time"
- internalCrypto "github.com/mudler/edgevpn/pkg/crypto"
- "github.com/mudler/edgevpn/pkg/utils"
- "github.com/ipfs/go-log"
- "github.com/libp2p/go-libp2p"
- dht "github.com/libp2p/go-libp2p-kad-dht"
- "github.com/libp2p/go-libp2p/core/host"
- "github.com/libp2p/go-libp2p/core/network"
- "github.com/libp2p/go-libp2p/core/peer"
- "github.com/libp2p/go-libp2p/core/routing"
- discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing"
- )
- type DHT struct {
- OTPKey string
- OTPInterval int
- KeyLength int
- RendezvousString string
- BootstrapPeers AddrList
- rendezvousHistory Ring
- RefreshDiscoveryTime time.Duration
- *dht.IpfsDHT
- dhtOptions []dht.Option
- }
- func NewDHT(d ...dht.Option) *DHT {
- return &DHT{dhtOptions: d, rendezvousHistory: Ring{Length: 2}}
- }
- func (d *DHT) Option(ctx context.Context) func(c *libp2p.Config) error {
- return libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
- // make the DHT with the given Host
- return d.startDHT(ctx, h)
- })
- }
- func (d *DHT) Rendezvous() string {
- if d.OTPKey != "" {
- totp := internalCrypto.TOTP(sha256.New, d.KeyLength, d.OTPInterval, d.OTPKey)
- rv := internalCrypto.MD5(totp)
- return rv
- }
- return d.RendezvousString
- }
- func (d *DHT) startDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
- if d.IpfsDHT == nil {
- // Start a DHT, for use in peer discovery. We can't just make a new DHT
- // client because we want each peer to maintain its own local copy of the
- // DHT, so that the bootstrapping node of the DHT can go down without
- // inhibiting future peer discovery.
- kad, err := dht.New(ctx, h, d.dhtOptions...)
- if err != nil {
- return d.IpfsDHT, err
- }
- d.IpfsDHT = kad
- }
- return d.IpfsDHT, nil
- }
- func (d *DHT) announceRendezvous(c log.StandardLogger, ctx context.Context, host host.Host, kademliaDHT *dht.IpfsDHT) {
- d.bootstrapPeers(c, ctx, host)
- rv := d.Rendezvous()
- d.rendezvousHistory.Add(rv)
- c.Debugf("The following rendezvous points are being used: %+v", d.rendezvousHistory.Data)
- for _, r := range d.rendezvousHistory.Data {
- c.Debugf("Announcing with rendezvous: %s", r)
- d.announceAndConnect(c, ctx, kademliaDHT, host, r)
- }
- c.Debug("Announcing to rendezvous done")
- }
- func (d *DHT) Run(c log.StandardLogger, ctx context.Context, host host.Host) error {
- if d.KeyLength == 0 {
- d.KeyLength = 12
- }
- if len(d.BootstrapPeers) == 0 {
- d.BootstrapPeers = dht.DefaultBootstrapPeers
- }
- // Start a DHT, for use in peer discovery. We can't just make a new DHT
- // client because we want each peer to maintain its own local copy of the
- // DHT, so that the bootstrapping node of the DHT can go down without
- // inhibiting future peer discovery.
- kademliaDHT, err := d.startDHT(ctx, host)
- if err != nil {
- return err
- }
- // Bootstrap the DHT. In the default configuration, this spawns a Background
- // thread that will refresh the peer table every five minutes.
- c.Info("Bootstrapping DHT")
- if err = kademliaDHT.Bootstrap(ctx); err != nil {
- return err
- }
- go d.runBackground(c, ctx, host, kademliaDHT)
- return nil
- }
- func (d *DHT) runBackground(c log.StandardLogger, ctx context.Context, host host.Host, kademliaDHT *dht.IpfsDHT) {
- d.announceRendezvous(c, ctx, host, kademliaDHT)
- t := utils.NewBackoffTicker(utils.BackoffMaxInterval(d.RefreshDiscoveryTime))
- defer t.Stop()
- for {
- select {
- case <-t.C:
- // We announce ourselves to the rendezvous point for all the peers.
- // We have a safeguard of 1 hour to avoid blocking the main loop
- // in case of network issues.
- // The TTL of DHT is by default no longer than 3 hours, so we should
- // be safe by having an entry less than that.
- safeTimeout, cancel := context.WithTimeout(ctx, time.Hour)
- endChan := make(chan struct{})
- go func() {
- d.announceRendezvous(c, safeTimeout, host, kademliaDHT)
- endChan <- struct{}{}
- }()
- select {
- case <-endChan:
- cancel()
- case <-safeTimeout.Done():
- c.Error("Timeout while announcing rendezvous")
- cancel()
- }
- case <-ctx.Done():
- return
- }
- }
- }
- func (d *DHT) bootstrapPeers(c log.StandardLogger, ctx context.Context, host host.Host) {
- // Let's connect to the bootstrap nodes first. They will tell us about the
- // other nodes in the network.
- var wg sync.WaitGroup
- for _, peerAddr := range d.BootstrapPeers {
- peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
- wg.Add(1)
- go func() {
- defer wg.Done()
- if host.Network().Connectedness(peerinfo.ID) != network.Connected {
- if err := host.Connect(ctx, *peerinfo); err != nil {
- c.Debug(err.Error())
- } else {
- c.Debug("Connection established with bootstrap node:", *peerinfo)
- }
- }
- }()
- }
- wg.Wait()
- }
- func (d *DHT) FindClosePeers(ll log.StandardLogger, onlyStaticRelays bool, static ...string) func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
- return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
- peerChan := make(chan peer.AddrInfo, numPeers)
- go func() {
- toStream := []peer.AddrInfo{}
- if !onlyStaticRelays {
- closestPeers, err := d.GetClosestPeers(ctx, d.PeerID().String())
- if err != nil {
- ll.Debug("Error getting closest peers: ", err)
- }
- for _, p := range closestPeers {
- addrs := d.Host().Peerstore().Addrs(p)
- if len(addrs) == 0 {
- continue
- }
- ll.Debugf("[relay discovery] Found close peer '%s'", p.String())
- toStream = append(toStream, peer.AddrInfo{ID: p, Addrs: addrs})
- }
- }
- for _, r := range static {
- pi, err := peer.AddrInfoFromString(r)
- if err == nil {
- ll.Debug("[static relay discovery] scanning ", pi.ID)
- toStream = append(toStream, peer.AddrInfo{ID: pi.ID, Addrs: pi.Addrs})
- }
- }
- if len(toStream) > numPeers {
- toStream = toStream[0 : numPeers-1]
- }
- for _, t := range toStream {
- peerChan <- t
- }
- close(peerChan)
- }()
- return peerChan
- }
- }
- func (d *DHT) announceAndConnect(l log.StandardLogger, ctx context.Context, kademliaDHT *dht.IpfsDHT, host host.Host, rv string) error {
- l.Debug("Announcing ourselves...")
- tCtx, c := context.WithTimeout(ctx, time.Second*120)
- defer c()
- routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
- routingDiscovery.Advertise(tCtx, rv)
- l.Debug("Successfully announced!")
- // Now, look for others who have announced
- // This is like your friend telling you the location to meet you.
- l.Debug("Searching for other peers...")
- fCtx, cf := context.WithTimeout(ctx, time.Second*120)
- defer cf()
- peerChan, err := routingDiscovery.FindPeers(fCtx, rv)
- if err != nil {
- return err
- }
- for p := range peerChan {
- // Don't dial ourselves or peers without address
- if p.ID == host.ID() || len(p.Addrs) == 0 {
- continue
- }
- if host.Network().Connectedness(p.ID) != network.Connected {
- l.Debug("Found peer:", p)
- timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*120)
- defer cancel()
- if err := host.Connect(timeoutCtx, p); err != nil {
- l.Debugf("Failed connecting to '%s', error: '%s'", p, err.Error())
- } else {
- l.Debug("Connected to:", p)
- }
- } else {
- l.Debug("Known peer (already connected):", p)
- }
- }
- l.Debug("Finished searching for peers.")
- return nil
- }
|