manager.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. // The MIT License (MIT)
  2. // Copyright (c) 2017 Whyrusleeping (MIT)
  3. // Copyright (c) 2022 Ettore Di Giacinto (Apache v2)
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to deal
  6. // in the Software without restriction, including without limitation the rights
  7. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. // copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. // THE SOFTWARE.
  19. // This package is a port of go-libp2p-connmgr, but adapted for streams
  20. package stream
  21. import (
  22. "context"
  23. "errors"
  24. "sort"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/libp2p/go-libp2p/core/connmgr"
  29. "github.com/libp2p/go-libp2p/core/network"
  30. "github.com/libp2p/go-libp2p/core/peer"
  31. logging "github.com/ipfs/go-log/v2"
  32. )
  33. var log = logging.Logger("peermgr")
  34. // Manager is a ConnManager that trims connections whenever the count exceeds the
  35. // high watermark. New connections are given a grace period before they're subject
  36. // to trimming. Trims are automatically run on demand, only if the time from the
  37. // previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
  38. // requested through the public interface of this struct (see TrimOpenConns).
  39. //
  40. // See configuration parameters in NewConnManager.
  41. type Manager struct {
  42. *decayer
  43. cfg *config
  44. segments segments
  45. plk sync.RWMutex
  46. protected map[peer.ID]map[string]struct{}
  47. // channel-based semaphore that enforces only a single trim is in progress
  48. trimMutex sync.Mutex
  49. connCount int32
  50. // to be accessed atomically. This is mimicking the implementation of a sync.Once.
  51. // Take care of correct alignment when modifying this struct.
  52. trimCount uint64
  53. lastTrimMu sync.RWMutex
  54. lastTrim time.Time
  55. refCount sync.WaitGroup
  56. ctx context.Context
  57. cancel func()
  58. unregisterMemoryWatcher func()
  59. }
  60. type segment struct {
  61. sync.Mutex
  62. peers map[peer.ID]*peerInfo
  63. }
  64. type segments [256]*segment
  65. func (ss *segments) get(p peer.ID) *segment {
  66. return ss[byte(p[len(p)-1])]
  67. }
  68. func (ss *segments) countPeers() (count int) {
  69. for _, seg := range ss {
  70. seg.Lock()
  71. count += len(seg.peers)
  72. seg.Unlock()
  73. }
  74. return count
  75. }
  76. func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
  77. pi, ok := s.peers[p]
  78. if ok {
  79. return pi
  80. }
  81. // create a temporary peer to buffer early tags before the Connected notification arrives.
  82. pi = &peerInfo{
  83. id: p,
  84. firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
  85. temp: true,
  86. tags: make(map[string]int),
  87. decaying: make(map[*decayingTag]*connmgr.DecayingValue),
  88. conns: make(map[network.Stream]time.Time),
  89. }
  90. s.peers[p] = pi
  91. return pi
  92. }
  93. // NewConnManager creates a new Manager with the provided params:
  94. // lo and hi are watermarks governing the number of connections that'll be maintained.
  95. // When the peer count exceeds the 'high watermark', as many peers will be pruned (and
  96. // their connections terminated) until 'low watermark' peers remain.
  97. func NewConnManager(low, hi int, opts ...Option) (*Manager, error) {
  98. cfg := &config{
  99. highWater: hi,
  100. lowWater: low,
  101. gracePeriod: 10 * time.Second,
  102. silencePeriod: 10 * time.Second,
  103. }
  104. for _, o := range opts {
  105. if err := o(cfg); err != nil {
  106. return nil, err
  107. }
  108. }
  109. if cfg.decayer == nil {
  110. // Set the default decayer config.
  111. cfg.decayer = (&DecayerCfg{}).WithDefaults()
  112. }
  113. cm := &Manager{
  114. cfg: cfg,
  115. protected: make(map[peer.ID]map[string]struct{}, 16),
  116. segments: func() (ret segments) {
  117. for i := range ret {
  118. ret[i] = &segment{
  119. peers: make(map[peer.ID]*peerInfo),
  120. }
  121. }
  122. return ret
  123. }(),
  124. }
  125. cm.ctx, cm.cancel = context.WithCancel(context.Background())
  126. decay, _ := NewDecayer(cfg.decayer, cm)
  127. cm.decayer = decay
  128. cm.refCount.Add(1)
  129. go cm.background()
  130. return cm, nil
  131. }
  132. func (cm *Manager) Close() error {
  133. cm.cancel()
  134. if cm.unregisterMemoryWatcher != nil {
  135. cm.unregisterMemoryWatcher()
  136. }
  137. if err := cm.decayer.Close(); err != nil {
  138. return err
  139. }
  140. cm.refCount.Wait()
  141. return nil
  142. }
  143. func (cm *Manager) Protect(id peer.ID, tag string) {
  144. cm.plk.Lock()
  145. defer cm.plk.Unlock()
  146. tags, ok := cm.protected[id]
  147. if !ok {
  148. tags = make(map[string]struct{}, 2)
  149. cm.protected[id] = tags
  150. }
  151. tags[tag] = struct{}{}
  152. }
  153. func (cm *Manager) Unprotect(id peer.ID, tag string) (protected bool) {
  154. cm.plk.Lock()
  155. defer cm.plk.Unlock()
  156. tags, ok := cm.protected[id]
  157. if !ok {
  158. return false
  159. }
  160. if delete(tags, tag); len(tags) == 0 {
  161. delete(cm.protected, id)
  162. return false
  163. }
  164. return true
  165. }
  166. func (cm *Manager) IsProtected(id peer.ID, tag string) (protected bool) {
  167. cm.plk.Lock()
  168. defer cm.plk.Unlock()
  169. tags, ok := cm.protected[id]
  170. if !ok {
  171. return false
  172. }
  173. if tag == "" {
  174. return true
  175. }
  176. _, protected = tags[tag]
  177. return protected
  178. }
  179. // peerInfo stores metadata for a given peer.
  180. type peerInfo struct {
  181. id peer.ID
  182. tags map[string]int // value for each tag
  183. decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
  184. value int // cached sum of all tag values
  185. temp bool // this is a temporary entry holding early tags, and awaiting connections
  186. conns map[network.Stream]time.Time // start time of each connection
  187. firstSeen time.Time // timestamp when we began tracking this peer.
  188. }
  189. type peerInfos []peerInfo
  190. func (p peerInfos) SortByValue() {
  191. sort.Slice(p, func(i, j int) bool {
  192. left, right := p[i], p[j]
  193. // temporary peers are preferred for pruning.
  194. if left.temp != right.temp {
  195. return left.temp
  196. }
  197. // otherwise, compare by value.
  198. return left.value < right.value
  199. })
  200. }
  201. // TrimOpenConns closes the connections of as many peers as needed to make the peer count
  202. // equal the low watermark. Peers are sorted in ascending order based on their total value,
  203. // pruning those peers with the lowest scores first, as long as they are not within their
  204. // grace period.
  205. //
  206. // This function blocks until a trim is completed. If a trim is underway, a new
  207. // one won't be started, and instead it'll wait until that one is completed before
  208. // returning.
  209. func (cm *Manager) TrimOpenConns(_ context.Context) {
  210. // TODO: error return value so we can cleanly signal we are aborting because:
  211. // (a) there's another trim in progress, or (b) the silence period is in effect.
  212. cm.doTrim()
  213. }
  214. func (cm *Manager) background() {
  215. defer cm.refCount.Done()
  216. interval := cm.cfg.gracePeriod / 2
  217. if cm.cfg.silencePeriod != 0 {
  218. interval = cm.cfg.silencePeriod
  219. }
  220. ticker := time.NewTicker(interval)
  221. defer ticker.Stop()
  222. for {
  223. select {
  224. case <-ticker.C:
  225. if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
  226. // Below high water, skip.
  227. continue
  228. }
  229. case <-cm.ctx.Done():
  230. return
  231. }
  232. cm.trim()
  233. }
  234. }
  235. func (cm *Manager) doTrim() {
  236. // This logic is mimicking the implementation of sync.Once in the standard library.
  237. count := atomic.LoadUint64(&cm.trimCount)
  238. cm.trimMutex.Lock()
  239. defer cm.trimMutex.Unlock()
  240. if count == atomic.LoadUint64(&cm.trimCount) {
  241. cm.trim()
  242. cm.lastTrimMu.Lock()
  243. cm.lastTrim = time.Now()
  244. cm.lastTrimMu.Unlock()
  245. atomic.AddUint64(&cm.trimCount, 1)
  246. }
  247. }
  248. // trim starts the trim, if the last trim happened before the configured silence period.
  249. func (cm *Manager) trim() {
  250. // do the actual trim.
  251. for _, c := range cm.getConnsToClose() {
  252. c.Close()
  253. }
  254. }
  255. // getConnsToClose runs the heuristics described in TrimOpenConns and returns the
  256. // connections to close.
  257. func (cm *Manager) getConnsToClose() []network.Stream {
  258. if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
  259. // disabled
  260. return nil
  261. }
  262. if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
  263. log.Info("open connection count below limit")
  264. return nil
  265. }
  266. candidates := make(peerInfos, 0, cm.segments.countPeers())
  267. var ncandidates int
  268. gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
  269. cm.plk.RLock()
  270. for _, s := range cm.segments {
  271. s.Lock()
  272. for id, inf := range s.peers {
  273. if _, ok := cm.protected[id]; ok {
  274. // skip over protected peer.
  275. continue
  276. }
  277. if inf.firstSeen.After(gracePeriodStart) {
  278. // skip peers in the grace period.
  279. continue
  280. }
  281. // note that we're copying the entry here,
  282. // but since inf.conns is a map, it will still point to the original object
  283. candidates = append(candidates, *inf)
  284. ncandidates += len(inf.conns)
  285. }
  286. s.Unlock()
  287. }
  288. cm.plk.RUnlock()
  289. if ncandidates < cm.cfg.lowWater {
  290. log.Info("open connection count above limit but too many are in the grace period")
  291. // We have too many connections but fewer than lowWater
  292. // connections out of the grace period.
  293. //
  294. // If we trimmed now, we'd kill potentially useful connections.
  295. return nil
  296. }
  297. // Sort peers according to their value.
  298. candidates.SortByValue()
  299. target := ncandidates - cm.cfg.lowWater
  300. // slightly overallocate because we may have more than one conns per peer
  301. selected := make([]network.Stream, 0, target+10)
  302. for _, inf := range candidates {
  303. if target <= 0 {
  304. break
  305. }
  306. // lock this to protect from concurrent modifications from connect/disconnect events
  307. s := cm.segments.get(inf.id)
  308. s.Lock()
  309. if len(inf.conns) == 0 && inf.temp {
  310. // handle temporary entries for early tags -- this entry has gone past the grace period
  311. // and still holds no connections, so prune it.
  312. delete(s.peers, inf.id)
  313. } else {
  314. for c := range inf.conns {
  315. selected = append(selected, c)
  316. }
  317. target -= len(inf.conns)
  318. }
  319. s.Unlock()
  320. }
  321. return selected
  322. }
  323. // GetTagInfo is called to fetch the tag information associated with a given
  324. // peer, nil is returned if p refers to an unknown peer.
  325. func (cm *Manager) GetTagInfo(p peer.ID) *connmgr.TagInfo {
  326. s := cm.segments.get(p)
  327. s.Lock()
  328. defer s.Unlock()
  329. pi, ok := s.peers[p]
  330. if !ok {
  331. return nil
  332. }
  333. out := &connmgr.TagInfo{
  334. FirstSeen: pi.firstSeen,
  335. Value: pi.value,
  336. Tags: make(map[string]int),
  337. Conns: make(map[string]time.Time),
  338. }
  339. for t, v := range pi.tags {
  340. out.Tags[t] = v
  341. }
  342. for t, v := range pi.decaying {
  343. out.Tags[t.name] = v.Value
  344. }
  345. for c, t := range pi.conns {
  346. out.Conns[c.ID()] = t
  347. }
  348. return out
  349. }
  350. // TagPeer is called to associate a string and integer with a given peer.
  351. func (cm *Manager) TagPeer(p peer.ID, tag string, val int) {
  352. s := cm.segments.get(p)
  353. s.Lock()
  354. defer s.Unlock()
  355. pi := s.tagInfoFor(p)
  356. // Update the total value of the peer.
  357. pi.value += val - pi.tags[tag]
  358. pi.tags[tag] = val
  359. }
  360. // UntagPeer is called to disassociate a string and integer from a given peer.
  361. func (cm *Manager) UntagPeer(p peer.ID, tag string) {
  362. s := cm.segments.get(p)
  363. s.Lock()
  364. defer s.Unlock()
  365. pi, ok := s.peers[p]
  366. if !ok {
  367. log.Info("tried to remove tag from untracked peer: ", p)
  368. return
  369. }
  370. // Update the total value of the peer.
  371. pi.value -= pi.tags[tag]
  372. delete(pi.tags, tag)
  373. }
  374. // UpsertTag is called to insert/update a peer tag
  375. func (cm *Manager) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
  376. s := cm.segments.get(p)
  377. s.Lock()
  378. defer s.Unlock()
  379. pi := s.tagInfoFor(p)
  380. oldval := pi.tags[tag]
  381. newval := upsert(oldval)
  382. pi.value += newval - oldval
  383. pi.tags[tag] = newval
  384. }
  385. // CMInfo holds the configuration for Manager, as well as status data.
  386. type CMInfo struct {
  387. // The low watermark, as described in NewConnManager.
  388. LowWater int
  389. // The high watermark, as described in NewConnManager.
  390. HighWater int
  391. // The timestamp when the last trim was triggered.
  392. LastTrim time.Time
  393. // The configured grace period, as described in NewConnManager.
  394. GracePeriod time.Duration
  395. // The current connection count.
  396. ConnCount int
  397. }
  398. // GetInfo returns the configuration and status data for this connection manager.
  399. func (cm *Manager) GetInfo() CMInfo {
  400. cm.lastTrimMu.RLock()
  401. lastTrim := cm.lastTrim
  402. cm.lastTrimMu.RUnlock()
  403. return CMInfo{
  404. HighWater: cm.cfg.highWater,
  405. LowWater: cm.cfg.lowWater,
  406. LastTrim: lastTrim,
  407. GracePeriod: cm.cfg.gracePeriod,
  408. ConnCount: int(atomic.LoadInt32(&cm.connCount)),
  409. }
  410. }
  411. // HasStream is called to retrieve a stream if it does exist for the pid
  412. func (cm *Manager) HasStream(n network.Network, pid peer.ID) (network.Stream, error) {
  413. s := cm.segments.get(pid)
  414. s.Lock()
  415. defer s.Unlock()
  416. pinfo, ok := s.peers[pid]
  417. if !ok {
  418. return nil, errors.New("no stream available for pid")
  419. }
  420. for c := range pinfo.conns {
  421. pinfo.conns[c] = time.Now()
  422. return c, nil
  423. }
  424. return nil, errors.New("no stream available")
  425. }
  426. // Connected is called by notifiers to inform that a new connection has been established.
  427. // The notifee updates the Manager to start tracking the connection. If the new connection
  428. // count exceeds the high watermark, a trim may be triggered.
  429. func (cm *Manager) Connected(n network.Network, c network.Stream) {
  430. p := c.Conn().RemotePeer()
  431. s := cm.segments.get(p)
  432. s.Lock()
  433. defer s.Unlock()
  434. pinfo, ok := s.peers[p]
  435. if !ok {
  436. pinfo = &peerInfo{
  437. id: p,
  438. firstSeen: time.Now(),
  439. tags: make(map[string]int),
  440. decaying: make(map[*decayingTag]*connmgr.DecayingValue),
  441. conns: make(map[network.Stream]time.Time),
  442. }
  443. s.peers[p] = pinfo
  444. } else if pinfo.temp {
  445. // we had created a temporary entry for this peer to buffer early tags before the
  446. // Connected notification arrived: flip the temporary flag, and update the firstSeen
  447. // timestamp to the real one.
  448. pinfo.temp = false
  449. pinfo.firstSeen = time.Now()
  450. }
  451. // cache one stream for peer
  452. pinfo.conns = map[network.Stream]time.Time{c: time.Now()}
  453. // _, ok = pinfo.conns[c]
  454. // if ok {
  455. // log.Error("received connected notification for conn we are already tracking: ", p)
  456. // return
  457. // }
  458. // pinfo.conns[c] = time.Now()
  459. atomic.AddInt32(&cm.connCount, 1)
  460. }
  461. // Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
  462. // The notifee updates the Manager accordingly to stop tracking the connection, and performs housekeeping.
  463. func (cm *Manager) Disconnected(n network.Network, c network.Stream) {
  464. p := c.Conn().RemotePeer()
  465. s := cm.segments.get(p)
  466. s.Lock()
  467. defer s.Unlock()
  468. cinf, ok := s.peers[p]
  469. if !ok {
  470. log.Error("received disconnected notification for peer we are not tracking: ", p)
  471. return
  472. }
  473. _, ok = cinf.conns[c]
  474. if !ok {
  475. log.Error("received disconnected notification for conn we are not tracking: ", p)
  476. return
  477. }
  478. delete(cinf.conns, c)
  479. if len(cinf.conns) == 0 {
  480. delete(s.peers, p)
  481. }
  482. atomic.AddInt32(&cm.connCount, -1)
  483. }