123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- // The MIT License (MIT)
- // Copyright (c) 2017 Whyrusleeping
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- // This package is a port of go-libp2p-connmgr, but adapted for streams
- package stream
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/libp2p/go-libp2p/core/connmgr"
- "github.com/libp2p/go-libp2p/core/peer"
- "github.com/benbjohnson/clock"
- )
- // DefaultResolution is the default resolution of the decay tracker.
- var DefaultResolution = 1 * time.Minute
- // bumpCmd represents a bump command.
- type bumpCmd struct {
- peer peer.ID
- tag *decayingTag
- delta int
- }
- // removeCmd represents a tag removal command.
- type removeCmd struct {
- peer peer.ID
- tag *decayingTag
- }
- // decayer tracks and manages all decaying tags and their values.
- type decayer struct {
- cfg *DecayerCfg
- mgr *Manager
- clock clock.Clock // for testing.
- tagsMu sync.Mutex
- knownTags map[string]*decayingTag
- // lastTick stores the last time the decayer ticked. Guarded by atomic.
- lastTick atomic.Value
- // bumpTagCh queues bump commands to be processed by the loop.
- bumpTagCh chan bumpCmd
- removeTagCh chan removeCmd
- closeTagCh chan *decayingTag
- // closure thingies.
- closeCh chan struct{}
- doneCh chan struct{}
- err error
- }
- var _ connmgr.Decayer = (*decayer)(nil)
- // DecayerCfg is the configuration object for the Decayer.
- type DecayerCfg struct {
- Resolution time.Duration
- Clock clock.Clock
- }
- // WithDefaults writes the default values on this DecayerConfig instance,
- // and returns itself for chainability.
- //
- // cfg := (&DecayerCfg{}).WithDefaults()
- // cfg.Resolution = 30 * time.Second
- // t := NewDecayer(cfg, cm)
- func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
- cfg.Resolution = DefaultResolution
- return cfg
- }
- // NewDecayer creates a new decaying tag registry.
- func NewDecayer(cfg *DecayerCfg, mgr *Manager) (*decayer, error) {
- // use real time if the Clock in the config is nil.
- if cfg.Clock == nil {
- cfg.Clock = clock.New()
- }
- d := &decayer{
- cfg: cfg,
- mgr: mgr,
- clock: cfg.Clock,
- knownTags: make(map[string]*decayingTag),
- bumpTagCh: make(chan bumpCmd, 128),
- removeTagCh: make(chan removeCmd, 128),
- closeTagCh: make(chan *decayingTag, 128),
- closeCh: make(chan struct{}),
- doneCh: make(chan struct{}),
- }
- d.lastTick.Store(d.clock.Now())
- // kick things off.
- go d.process()
- return d, nil
- }
- func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
- d.tagsMu.Lock()
- defer d.tagsMu.Unlock()
- if _, ok := d.knownTags[name]; ok {
- return nil, fmt.Errorf("decaying tag with name %s already exists", name)
- }
- if interval < d.cfg.Resolution {
- log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
- name, interval, d.cfg.Resolution)
- interval = d.cfg.Resolution
- }
- if interval%d.cfg.Resolution != 0 {
- log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
- "some precision may be lost", name, interval, d.cfg.Resolution)
- }
- lastTick := d.lastTick.Load().(time.Time)
- tag := &decayingTag{
- trkr: d,
- name: name,
- interval: interval,
- nextTick: lastTick.Add(interval),
- decayFn: decayFn,
- bumpFn: bumpFn,
- }
- d.knownTags[name] = tag
- return tag, nil
- }
- // Close closes the Decayer. It is idempotent.
- func (d *decayer) Close() error {
- select {
- case <-d.doneCh:
- return d.err
- default:
- }
- close(d.closeCh)
- <-d.doneCh
- return d.err
- }
- // process is the heart of the tracker. It performs the following duties:
- //
- // 1. Manages decay.
- // 2. Applies score bumps.
- // 3. Yields when closed.
- func (d *decayer) process() {
- defer close(d.doneCh)
- ticker := d.clock.Ticker(d.cfg.Resolution)
- defer ticker.Stop()
- var (
- bmp bumpCmd
- now time.Time
- visit = make(map[*decayingTag]struct{})
- )
- for {
- select {
- case now = <-ticker.C:
- d.lastTick.Store(now)
- d.tagsMu.Lock()
- for _, tag := range d.knownTags {
- if tag.nextTick.After(now) {
- // skip the tag.
- continue
- }
- // Mark the tag to be updated in this round.
- visit[tag] = struct{}{}
- }
- d.tagsMu.Unlock()
- // Visit each peer, and decay tags that need to be decayed.
- for _, s := range d.mgr.segments {
- s.Lock()
- // Entered a segment that contains peers. Process each peer.
- for _, p := range s.peers {
- for tag, v := range p.decaying {
- if _, ok := visit[tag]; !ok {
- // skip this tag.
- continue
- }
- // ~ this value needs to be visited. ~
- var delta int
- if after, rm := tag.decayFn(*v); rm {
- // delete the value and move on to the next tag.
- delta -= v.Value
- delete(p.decaying, tag)
- } else {
- // accumulate the delta, and apply the changes.
- delta += after - v.Value
- v.Value, v.LastVisit = after, now
- }
- p.value += delta
- }
- }
- s.Unlock()
- }
- // Reset each tag's next visit round, and clear the visited set.
- for tag := range visit {
- tag.nextTick = tag.nextTick.Add(tag.interval)
- delete(visit, tag)
- }
- case bmp = <-d.bumpTagCh:
- var (
- now = d.clock.Now()
- peer, tag = bmp.peer, bmp.tag
- )
- s := d.mgr.segments.get(peer)
- s.Lock()
- p := s.tagInfoFor(peer)
- v, ok := p.decaying[tag]
- if !ok {
- v = &connmgr.DecayingValue{
- Tag: tag,
- Peer: peer,
- LastVisit: now,
- Added: now,
- Value: 0,
- }
- p.decaying[tag] = v
- }
- prev := v.Value
- v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
- p.value += v.Value - prev
- s.Unlock()
- case rm := <-d.removeTagCh:
- s := d.mgr.segments.get(rm.peer)
- s.Lock()
- p := s.tagInfoFor(rm.peer)
- v, ok := p.decaying[rm.tag]
- if !ok {
- s.Unlock()
- continue
- }
- p.value -= v.Value
- delete(p.decaying, rm.tag)
- s.Unlock()
- case t := <-d.closeTagCh:
- // Stop tracking the tag.
- d.tagsMu.Lock()
- delete(d.knownTags, t.name)
- d.tagsMu.Unlock()
- // Remove the tag from all peers that had it in the connmgr.
- for _, s := range d.mgr.segments {
- // visit all segments, and attempt to remove the tag from all the peers it stores.
- s.Lock()
- for _, p := range s.peers {
- if dt, ok := p.decaying[t]; ok {
- // decrease the value of the tagInfo, and delete the tag.
- p.value -= dt.Value
- delete(p.decaying, t)
- }
- }
- s.Unlock()
- }
- case <-d.closeCh:
- return
- }
- }
- }
- // decayingTag represents a decaying tag, with an associated decay interval, a
- // decay function, and a bump function.
- type decayingTag struct {
- trkr *decayer
- name string
- interval time.Duration
- nextTick time.Time
- decayFn connmgr.DecayFn
- bumpFn connmgr.BumpFn
- // closed marks this tag as closed, so that if it's bumped after being
- // closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
- closed int32
- }
- var _ connmgr.DecayingTag = (*decayingTag)(nil)
- func (t *decayingTag) Name() string {
- return t.name
- }
- func (t *decayingTag) Interval() time.Duration {
- return t.interval
- }
- // Bump bumps a tag for this peer.
- func (t *decayingTag) Bump(p peer.ID, delta int) error {
- if atomic.LoadInt32(&t.closed) == 1 {
- return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
- }
- bmp := bumpCmd{peer: p, tag: t, delta: delta}
- select {
- case t.trkr.bumpTagCh <- bmp:
- return nil
- default:
- return fmt.Errorf(
- "unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
- p.String(), t.name, delta, len(t.trkr.bumpTagCh))
- }
- }
- func (t *decayingTag) Remove(p peer.ID) error {
- if atomic.LoadInt32(&t.closed) == 1 {
- return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
- }
- rm := removeCmd{peer: p, tag: t}
- select {
- case t.trkr.removeTagCh <- rm:
- return nil
- default:
- return fmt.Errorf(
- "unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
- p.String(), t.name, len(t.trkr.removeTagCh))
- }
- }
- func (t *decayingTag) Close() error {
- if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
- log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
- return nil
- }
- select {
- case t.trkr.closeTagCh <- t:
- return nil
- default:
- return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
- }
- }
|