123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- // The MIT License (MIT)
- // Copyright (c) 2017 Whyrusleeping (MIT)
- // Copyright (c) 2022 Ettore Di Giacinto (Apache v2)
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- // This package is a port of go-libp2p-connmgr, but adapted for streams
- package stream
- import (
- "context"
- "errors"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- "github.com/libp2p/go-libp2p/core/connmgr"
- "github.com/libp2p/go-libp2p/core/network"
- "github.com/libp2p/go-libp2p/core/peer"
- logging "github.com/ipfs/go-log/v2"
- )
- var log = logging.Logger("peermgr")
- // Manager is a ConnManager that trims connections whenever the count exceeds the
- // high watermark. New connections are given a grace period before they're subject
- // to trimming. Trims are automatically run on demand, only if the time from the
- // previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
- // requested through the public interface of this struct (see TrimOpenConns).
- //
- // See configuration parameters in NewConnManager.
- type Manager struct {
- *decayer
- cfg *config
- segments segments
- plk sync.RWMutex
- protected map[peer.ID]map[string]struct{}
- // channel-based semaphore that enforces only a single trim is in progress
- trimMutex sync.Mutex
- connCount int32
- // to be accessed atomically. This is mimicking the implementation of a sync.Once.
- // Take care of correct alignment when modifying this struct.
- trimCount uint64
- lastTrimMu sync.RWMutex
- lastTrim time.Time
- refCount sync.WaitGroup
- ctx context.Context
- cancel func()
- unregisterMemoryWatcher func()
- }
- type segment struct {
- sync.Mutex
- peers map[peer.ID]*peerInfo
- }
- type segments [256]*segment
- func (ss *segments) get(p peer.ID) *segment {
- return ss[byte(p[len(p)-1])]
- }
- func (ss *segments) countPeers() (count int) {
- for _, seg := range ss {
- seg.Lock()
- count += len(seg.peers)
- seg.Unlock()
- }
- return count
- }
- func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
- pi, ok := s.peers[p]
- if ok {
- return pi
- }
- // create a temporary peer to buffer early tags before the Connected notification arrives.
- pi = &peerInfo{
- id: p,
- firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
- temp: true,
- tags: make(map[string]int),
- decaying: make(map[*decayingTag]*connmgr.DecayingValue),
- conns: make(map[network.Stream]time.Time),
- }
- s.peers[p] = pi
- return pi
- }
- // NewConnManager creates a new Manager with the provided params:
- // lo and hi are watermarks governing the number of connections that'll be maintained.
- // When the peer count exceeds the 'high watermark', as many peers will be pruned (and
- // their connections terminated) until 'low watermark' peers remain.
- func NewConnManager(low, hi int, opts ...Option) (*Manager, error) {
- cfg := &config{
- highWater: hi,
- lowWater: low,
- gracePeriod: 10 * time.Second,
- silencePeriod: 10 * time.Second,
- }
- for _, o := range opts {
- if err := o(cfg); err != nil {
- return nil, err
- }
- }
- if cfg.decayer == nil {
- // Set the default decayer config.
- cfg.decayer = (&DecayerCfg{}).WithDefaults()
- }
- cm := &Manager{
- cfg: cfg,
- protected: make(map[peer.ID]map[string]struct{}, 16),
- segments: func() (ret segments) {
- for i := range ret {
- ret[i] = &segment{
- peers: make(map[peer.ID]*peerInfo),
- }
- }
- return ret
- }(),
- }
- cm.ctx, cm.cancel = context.WithCancel(context.Background())
- decay, _ := NewDecayer(cfg.decayer, cm)
- cm.decayer = decay
- cm.refCount.Add(1)
- go cm.background()
- return cm, nil
- }
- func (cm *Manager) Close() error {
- cm.cancel()
- if cm.unregisterMemoryWatcher != nil {
- cm.unregisterMemoryWatcher()
- }
- if err := cm.decayer.Close(); err != nil {
- return err
- }
- cm.refCount.Wait()
- return nil
- }
- func (cm *Manager) Protect(id peer.ID, tag string) {
- cm.plk.Lock()
- defer cm.plk.Unlock()
- tags, ok := cm.protected[id]
- if !ok {
- tags = make(map[string]struct{}, 2)
- cm.protected[id] = tags
- }
- tags[tag] = struct{}{}
- }
- func (cm *Manager) Unprotect(id peer.ID, tag string) (protected bool) {
- cm.plk.Lock()
- defer cm.plk.Unlock()
- tags, ok := cm.protected[id]
- if !ok {
- return false
- }
- if delete(tags, tag); len(tags) == 0 {
- delete(cm.protected, id)
- return false
- }
- return true
- }
- func (cm *Manager) IsProtected(id peer.ID, tag string) (protected bool) {
- cm.plk.Lock()
- defer cm.plk.Unlock()
- tags, ok := cm.protected[id]
- if !ok {
- return false
- }
- if tag == "" {
- return true
- }
- _, protected = tags[tag]
- return protected
- }
- // peerInfo stores metadata for a given peer.
- type peerInfo struct {
- id peer.ID
- tags map[string]int // value for each tag
- decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
- value int // cached sum of all tag values
- temp bool // this is a temporary entry holding early tags, and awaiting connections
- conns map[network.Stream]time.Time // start time of each connection
- firstSeen time.Time // timestamp when we began tracking this peer.
- }
- type peerInfos []peerInfo
- func (p peerInfos) SortByValue() {
- sort.Slice(p, func(i, j int) bool {
- left, right := p[i], p[j]
- // temporary peers are preferred for pruning.
- if left.temp != right.temp {
- return left.temp
- }
- // otherwise, compare by value.
- return left.value < right.value
- })
- }
- // TrimOpenConns closes the connections of as many peers as needed to make the peer count
- // equal the low watermark. Peers are sorted in ascending order based on their total value,
- // pruning those peers with the lowest scores first, as long as they are not within their
- // grace period.
- //
- // This function blocks until a trim is completed. If a trim is underway, a new
- // one won't be started, and instead it'll wait until that one is completed before
- // returning.
- func (cm *Manager) TrimOpenConns(_ context.Context) {
- // TODO: error return value so we can cleanly signal we are aborting because:
- // (a) there's another trim in progress, or (b) the silence period is in effect.
- cm.doTrim()
- }
- func (cm *Manager) background() {
- defer cm.refCount.Done()
- interval := cm.cfg.gracePeriod / 2
- if cm.cfg.silencePeriod != 0 {
- interval = cm.cfg.silencePeriod
- }
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
- // Below high water, skip.
- continue
- }
- case <-cm.ctx.Done():
- return
- }
- cm.trim()
- }
- }
- func (cm *Manager) doTrim() {
- // This logic is mimicking the implementation of sync.Once in the standard library.
- count := atomic.LoadUint64(&cm.trimCount)
- cm.trimMutex.Lock()
- defer cm.trimMutex.Unlock()
- if count == atomic.LoadUint64(&cm.trimCount) {
- cm.trim()
- cm.lastTrimMu.Lock()
- cm.lastTrim = time.Now()
- cm.lastTrimMu.Unlock()
- atomic.AddUint64(&cm.trimCount, 1)
- }
- }
- // trim starts the trim, if the last trim happened before the configured silence period.
- func (cm *Manager) trim() {
- // do the actual trim.
- for _, c := range cm.getConnsToClose() {
- c.Close()
- }
- }
- // getConnsToClose runs the heuristics described in TrimOpenConns and returns the
- // connections to close.
- func (cm *Manager) getConnsToClose() []network.Stream {
- if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
- // disabled
- return nil
- }
- if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
- log.Info("open connection count below limit")
- return nil
- }
- candidates := make(peerInfos, 0, cm.segments.countPeers())
- var ncandidates int
- gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
- cm.plk.RLock()
- for _, s := range cm.segments {
- s.Lock()
- for id, inf := range s.peers {
- if _, ok := cm.protected[id]; ok {
- // skip over protected peer.
- continue
- }
- if inf.firstSeen.After(gracePeriodStart) {
- // skip peers in the grace period.
- continue
- }
- // note that we're copying the entry here,
- // but since inf.conns is a map, it will still point to the original object
- candidates = append(candidates, *inf)
- ncandidates += len(inf.conns)
- }
- s.Unlock()
- }
- cm.plk.RUnlock()
- if ncandidates < cm.cfg.lowWater {
- log.Info("open connection count above limit but too many are in the grace period")
- // We have too many connections but fewer than lowWater
- // connections out of the grace period.
- //
- // If we trimmed now, we'd kill potentially useful connections.
- return nil
- }
- // Sort peers according to their value.
- candidates.SortByValue()
- target := ncandidates - cm.cfg.lowWater
- // slightly overallocate because we may have more than one conns per peer
- selected := make([]network.Stream, 0, target+10)
- for _, inf := range candidates {
- if target <= 0 {
- break
- }
- // lock this to protect from concurrent modifications from connect/disconnect events
- s := cm.segments.get(inf.id)
- s.Lock()
- if len(inf.conns) == 0 && inf.temp {
- // handle temporary entries for early tags -- this entry has gone past the grace period
- // and still holds no connections, so prune it.
- delete(s.peers, inf.id)
- } else {
- for c := range inf.conns {
- selected = append(selected, c)
- }
- target -= len(inf.conns)
- }
- s.Unlock()
- }
- return selected
- }
- // GetTagInfo is called to fetch the tag information associated with a given
- // peer, nil is returned if p refers to an unknown peer.
- func (cm *Manager) GetTagInfo(p peer.ID) *connmgr.TagInfo {
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- pi, ok := s.peers[p]
- if !ok {
- return nil
- }
- out := &connmgr.TagInfo{
- FirstSeen: pi.firstSeen,
- Value: pi.value,
- Tags: make(map[string]int),
- Conns: make(map[string]time.Time),
- }
- for t, v := range pi.tags {
- out.Tags[t] = v
- }
- for t, v := range pi.decaying {
- out.Tags[t.name] = v.Value
- }
- for c, t := range pi.conns {
- out.Conns[c.ID()] = t
- }
- return out
- }
- // TagPeer is called to associate a string and integer with a given peer.
- func (cm *Manager) TagPeer(p peer.ID, tag string, val int) {
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- pi := s.tagInfoFor(p)
- // Update the total value of the peer.
- pi.value += val - pi.tags[tag]
- pi.tags[tag] = val
- }
- // UntagPeer is called to disassociate a string and integer from a given peer.
- func (cm *Manager) UntagPeer(p peer.ID, tag string) {
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- pi, ok := s.peers[p]
- if !ok {
- log.Info("tried to remove tag from untracked peer: ", p)
- return
- }
- // Update the total value of the peer.
- pi.value -= pi.tags[tag]
- delete(pi.tags, tag)
- }
- // UpsertTag is called to insert/update a peer tag
- func (cm *Manager) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- pi := s.tagInfoFor(p)
- oldval := pi.tags[tag]
- newval := upsert(oldval)
- pi.value += newval - oldval
- pi.tags[tag] = newval
- }
- // CMInfo holds the configuration for Manager, as well as status data.
- type CMInfo struct {
- // The low watermark, as described in NewConnManager.
- LowWater int
- // The high watermark, as described in NewConnManager.
- HighWater int
- // The timestamp when the last trim was triggered.
- LastTrim time.Time
- // The configured grace period, as described in NewConnManager.
- GracePeriod time.Duration
- // The current connection count.
- ConnCount int
- }
- // GetInfo returns the configuration and status data for this connection manager.
- func (cm *Manager) GetInfo() CMInfo {
- cm.lastTrimMu.RLock()
- lastTrim := cm.lastTrim
- cm.lastTrimMu.RUnlock()
- return CMInfo{
- HighWater: cm.cfg.highWater,
- LowWater: cm.cfg.lowWater,
- LastTrim: lastTrim,
- GracePeriod: cm.cfg.gracePeriod,
- ConnCount: int(atomic.LoadInt32(&cm.connCount)),
- }
- }
- // HasStream is called to retrieve a stream if it does exist for the pid
- func (cm *Manager) HasStream(n network.Network, pid peer.ID) (network.Stream, error) {
- s := cm.segments.get(pid)
- s.Lock()
- defer s.Unlock()
- pinfo, ok := s.peers[pid]
- if !ok {
- return nil, errors.New("no stream available for pid")
- }
- for c := range pinfo.conns {
- pinfo.conns[c] = time.Now()
- return c, nil
- }
- return nil, errors.New("no stream available")
- }
- // Connected is called by notifiers to inform that a new connection has been established.
- // The notifee updates the Manager to start tracking the connection. If the new connection
- // count exceeds the high watermark, a trim may be triggered.
- func (cm *Manager) Connected(n network.Network, c network.Stream) {
- p := c.Conn().RemotePeer()
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- pinfo, ok := s.peers[p]
- if !ok {
- pinfo = &peerInfo{
- id: p,
- firstSeen: time.Now(),
- tags: make(map[string]int),
- decaying: make(map[*decayingTag]*connmgr.DecayingValue),
- conns: make(map[network.Stream]time.Time),
- }
- s.peers[p] = pinfo
- } else if pinfo.temp {
- // we had created a temporary entry for this peer to buffer early tags before the
- // Connected notification arrived: flip the temporary flag, and update the firstSeen
- // timestamp to the real one.
- pinfo.temp = false
- pinfo.firstSeen = time.Now()
- }
- // cache one stream for peer
- pinfo.conns = map[network.Stream]time.Time{c: time.Now()}
- // _, ok = pinfo.conns[c]
- // if ok {
- // log.Error("received connected notification for conn we are already tracking: ", p)
- // return
- // }
- // pinfo.conns[c] = time.Now()
- atomic.AddInt32(&cm.connCount, 1)
- }
- // Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
- // The notifee updates the Manager accordingly to stop tracking the connection, and performs housekeeping.
- func (cm *Manager) Disconnected(n network.Network, c network.Stream) {
- p := c.Conn().RemotePeer()
- s := cm.segments.get(p)
- s.Lock()
- defer s.Unlock()
- cinf, ok := s.peers[p]
- if !ok {
- log.Error("received disconnected notification for peer we are not tracking: ", p)
- return
- }
- _, ok = cinf.conns[c]
- if !ok {
- log.Error("received disconnected notification for conn we are not tracking: ", p)
- return
- }
- delete(cinf.conns, c)
- if len(cinf.conns) == 0 {
- delete(s.peers, p)
- }
- atomic.AddInt32(&cm.connCount, -1)
- }
|