decay.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. // The MIT License (MIT)
  2. // Copyright (c) 2017 Whyrusleeping
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. // The above copyright notice and this permission notice shall be included in
  10. // all copies or substantial portions of the Software.
  11. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  12. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  13. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  14. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  15. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  16. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  17. // THE SOFTWARE.
  18. // This package is a port of go-libp2p-connmgr, but adapted for streams
  19. package stream
  20. import (
  21. "fmt"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/libp2p/go-libp2p/core/connmgr"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. "github.com/benbjohnson/clock"
  28. )
  29. // DefaultResolution is the default resolution of the decay tracker.
  30. var DefaultResolution = 1 * time.Minute
  31. // bumpCmd represents a bump command.
  32. type bumpCmd struct {
  33. peer peer.ID
  34. tag *decayingTag
  35. delta int
  36. }
  37. // removeCmd represents a tag removal command.
  38. type removeCmd struct {
  39. peer peer.ID
  40. tag *decayingTag
  41. }
  42. // decayer tracks and manages all decaying tags and their values.
  43. type decayer struct {
  44. cfg *DecayerCfg
  45. mgr *Manager
  46. clock clock.Clock // for testing.
  47. tagsMu sync.Mutex
  48. knownTags map[string]*decayingTag
  49. // lastTick stores the last time the decayer ticked. Guarded by atomic.
  50. lastTick atomic.Value
  51. // bumpTagCh queues bump commands to be processed by the loop.
  52. bumpTagCh chan bumpCmd
  53. removeTagCh chan removeCmd
  54. closeTagCh chan *decayingTag
  55. // closure thingies.
  56. closeCh chan struct{}
  57. doneCh chan struct{}
  58. err error
  59. }
  60. var _ connmgr.Decayer = (*decayer)(nil)
  61. // DecayerCfg is the configuration object for the Decayer.
  62. type DecayerCfg struct {
  63. Resolution time.Duration
  64. Clock clock.Clock
  65. }
  66. // WithDefaults writes the default values on this DecayerConfig instance,
  67. // and returns itself for chainability.
  68. //
  69. // cfg := (&DecayerCfg{}).WithDefaults()
  70. // cfg.Resolution = 30 * time.Second
  71. // t := NewDecayer(cfg, cm)
  72. func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
  73. cfg.Resolution = DefaultResolution
  74. return cfg
  75. }
  76. // NewDecayer creates a new decaying tag registry.
  77. func NewDecayer(cfg *DecayerCfg, mgr *Manager) (*decayer, error) {
  78. // use real time if the Clock in the config is nil.
  79. if cfg.Clock == nil {
  80. cfg.Clock = clock.New()
  81. }
  82. d := &decayer{
  83. cfg: cfg,
  84. mgr: mgr,
  85. clock: cfg.Clock,
  86. knownTags: make(map[string]*decayingTag),
  87. bumpTagCh: make(chan bumpCmd, 128),
  88. removeTagCh: make(chan removeCmd, 128),
  89. closeTagCh: make(chan *decayingTag, 128),
  90. closeCh: make(chan struct{}),
  91. doneCh: make(chan struct{}),
  92. }
  93. d.lastTick.Store(d.clock.Now())
  94. // kick things off.
  95. go d.process()
  96. return d, nil
  97. }
  98. func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
  99. d.tagsMu.Lock()
  100. defer d.tagsMu.Unlock()
  101. if _, ok := d.knownTags[name]; ok {
  102. return nil, fmt.Errorf("decaying tag with name %s already exists", name)
  103. }
  104. if interval < d.cfg.Resolution {
  105. log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
  106. name, interval, d.cfg.Resolution)
  107. interval = d.cfg.Resolution
  108. }
  109. if interval%d.cfg.Resolution != 0 {
  110. log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
  111. "some precision may be lost", name, interval, d.cfg.Resolution)
  112. }
  113. lastTick := d.lastTick.Load().(time.Time)
  114. tag := &decayingTag{
  115. trkr: d,
  116. name: name,
  117. interval: interval,
  118. nextTick: lastTick.Add(interval),
  119. decayFn: decayFn,
  120. bumpFn: bumpFn,
  121. }
  122. d.knownTags[name] = tag
  123. return tag, nil
  124. }
  125. // Close closes the Decayer. It is idempotent.
  126. func (d *decayer) Close() error {
  127. select {
  128. case <-d.doneCh:
  129. return d.err
  130. default:
  131. }
  132. close(d.closeCh)
  133. <-d.doneCh
  134. return d.err
  135. }
  136. // process is the heart of the tracker. It performs the following duties:
  137. //
  138. // 1. Manages decay.
  139. // 2. Applies score bumps.
  140. // 3. Yields when closed.
  141. func (d *decayer) process() {
  142. defer close(d.doneCh)
  143. ticker := d.clock.Ticker(d.cfg.Resolution)
  144. defer ticker.Stop()
  145. var (
  146. bmp bumpCmd
  147. now time.Time
  148. visit = make(map[*decayingTag]struct{})
  149. )
  150. for {
  151. select {
  152. case now = <-ticker.C:
  153. d.lastTick.Store(now)
  154. d.tagsMu.Lock()
  155. for _, tag := range d.knownTags {
  156. if tag.nextTick.After(now) {
  157. // skip the tag.
  158. continue
  159. }
  160. // Mark the tag to be updated in this round.
  161. visit[tag] = struct{}{}
  162. }
  163. d.tagsMu.Unlock()
  164. // Visit each peer, and decay tags that need to be decayed.
  165. for _, s := range d.mgr.segments {
  166. s.Lock()
  167. // Entered a segment that contains peers. Process each peer.
  168. for _, p := range s.peers {
  169. for tag, v := range p.decaying {
  170. if _, ok := visit[tag]; !ok {
  171. // skip this tag.
  172. continue
  173. }
  174. // ~ this value needs to be visited. ~
  175. var delta int
  176. if after, rm := tag.decayFn(*v); rm {
  177. // delete the value and move on to the next tag.
  178. delta -= v.Value
  179. delete(p.decaying, tag)
  180. } else {
  181. // accumulate the delta, and apply the changes.
  182. delta += after - v.Value
  183. v.Value, v.LastVisit = after, now
  184. }
  185. p.value += delta
  186. }
  187. }
  188. s.Unlock()
  189. }
  190. // Reset each tag's next visit round, and clear the visited set.
  191. for tag := range visit {
  192. tag.nextTick = tag.nextTick.Add(tag.interval)
  193. delete(visit, tag)
  194. }
  195. case bmp = <-d.bumpTagCh:
  196. var (
  197. now = d.clock.Now()
  198. peer, tag = bmp.peer, bmp.tag
  199. )
  200. s := d.mgr.segments.get(peer)
  201. s.Lock()
  202. p := s.tagInfoFor(peer)
  203. v, ok := p.decaying[tag]
  204. if !ok {
  205. v = &connmgr.DecayingValue{
  206. Tag: tag,
  207. Peer: peer,
  208. LastVisit: now,
  209. Added: now,
  210. Value: 0,
  211. }
  212. p.decaying[tag] = v
  213. }
  214. prev := v.Value
  215. v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
  216. p.value += v.Value - prev
  217. s.Unlock()
  218. case rm := <-d.removeTagCh:
  219. s := d.mgr.segments.get(rm.peer)
  220. s.Lock()
  221. p := s.tagInfoFor(rm.peer)
  222. v, ok := p.decaying[rm.tag]
  223. if !ok {
  224. s.Unlock()
  225. continue
  226. }
  227. p.value -= v.Value
  228. delete(p.decaying, rm.tag)
  229. s.Unlock()
  230. case t := <-d.closeTagCh:
  231. // Stop tracking the tag.
  232. d.tagsMu.Lock()
  233. delete(d.knownTags, t.name)
  234. d.tagsMu.Unlock()
  235. // Remove the tag from all peers that had it in the connmgr.
  236. for _, s := range d.mgr.segments {
  237. // visit all segments, and attempt to remove the tag from all the peers it stores.
  238. s.Lock()
  239. for _, p := range s.peers {
  240. if dt, ok := p.decaying[t]; ok {
  241. // decrease the value of the tagInfo, and delete the tag.
  242. p.value -= dt.Value
  243. delete(p.decaying, t)
  244. }
  245. }
  246. s.Unlock()
  247. }
  248. case <-d.closeCh:
  249. return
  250. }
  251. }
  252. }
  253. // decayingTag represents a decaying tag, with an associated decay interval, a
  254. // decay function, and a bump function.
  255. type decayingTag struct {
  256. trkr *decayer
  257. name string
  258. interval time.Duration
  259. nextTick time.Time
  260. decayFn connmgr.DecayFn
  261. bumpFn connmgr.BumpFn
  262. // closed marks this tag as closed, so that if it's bumped after being
  263. // closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
  264. closed int32
  265. }
  266. var _ connmgr.DecayingTag = (*decayingTag)(nil)
  267. func (t *decayingTag) Name() string {
  268. return t.name
  269. }
  270. func (t *decayingTag) Interval() time.Duration {
  271. return t.interval
  272. }
  273. // Bump bumps a tag for this peer.
  274. func (t *decayingTag) Bump(p peer.ID, delta int) error {
  275. if atomic.LoadInt32(&t.closed) == 1 {
  276. return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
  277. }
  278. bmp := bumpCmd{peer: p, tag: t, delta: delta}
  279. select {
  280. case t.trkr.bumpTagCh <- bmp:
  281. return nil
  282. default:
  283. return fmt.Errorf(
  284. "unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
  285. p.String(), t.name, delta, len(t.trkr.bumpTagCh))
  286. }
  287. }
  288. func (t *decayingTag) Remove(p peer.ID) error {
  289. if atomic.LoadInt32(&t.closed) == 1 {
  290. return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
  291. }
  292. rm := removeCmd{peer: p, tag: t}
  293. select {
  294. case t.trkr.removeTagCh <- rm:
  295. return nil
  296. default:
  297. return fmt.Errorf(
  298. "unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
  299. p.String(), t.name, len(t.trkr.removeTagCh))
  300. }
  301. }
  302. func (t *decayingTag) Close() error {
  303. if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
  304. log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
  305. return nil
  306. }
  307. select {
  308. case t.trkr.closeTagCh <- t:
  309. return nil
  310. default:
  311. return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
  312. }
  313. }