3
0

timeout_system.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package nebula
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // How many timer objects should be cached
  7. const systemTimerCacheMax = 50000
  8. type SystemTimerWheel struct {
  9. // Current tick
  10. current int
  11. // Cheat on finding the length of the wheel
  12. wheelLen int
  13. // Last time we ticked, since we are lazy ticking
  14. lastTick *time.Time
  15. // Durations of a tick and the entire wheel
  16. tickDuration time.Duration
  17. wheelDuration time.Duration
  18. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  19. wheel []*SystemTimeoutList
  20. // Singly linked list of items that have timed out of the wheel
  21. expired *SystemTimeoutList
  22. // Item cache to avoid garbage collect
  23. itemCache *SystemTimeoutItem
  24. itemsCached int
  25. lock sync.Mutex
  26. }
  27. // Represents a tick in the wheel
  28. type SystemTimeoutList struct {
  29. Head *SystemTimeoutItem
  30. Tail *SystemTimeoutItem
  31. }
  32. // Represents an item within a tick
  33. type SystemTimeoutItem struct {
  34. Item uint32
  35. Next *SystemTimeoutItem
  36. }
  37. // Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  38. // Purge must be called once per entry to actually remove anything
  39. func NewSystemTimerWheel(min, max time.Duration) *SystemTimerWheel {
  40. //TODO provide an error
  41. //if min >= max {
  42. // return nil
  43. //}
  44. // Round down and add 1 so we can have the smallest # of ticks in the wheel and still account for a full
  45. // max duration
  46. wLen := int((max / min) + 1)
  47. tw := SystemTimerWheel{
  48. wheelLen: wLen,
  49. wheel: make([]*SystemTimeoutList, wLen),
  50. tickDuration: min,
  51. wheelDuration: max,
  52. expired: &SystemTimeoutList{},
  53. }
  54. for i := range tw.wheel {
  55. tw.wheel[i] = &SystemTimeoutList{}
  56. }
  57. return &tw
  58. }
  59. func (tw *SystemTimerWheel) Add(v uint32, timeout time.Duration) *SystemTimeoutItem {
  60. tw.lock.Lock()
  61. defer tw.lock.Unlock()
  62. // Check and see if we should progress the tick
  63. //tw.advance(time.Now())
  64. i := tw.findWheel(timeout)
  65. // Try to fetch off the cache
  66. ti := tw.itemCache
  67. if ti != nil {
  68. tw.itemCache = ti.Next
  69. ti.Next = nil
  70. tw.itemsCached--
  71. } else {
  72. ti = &SystemTimeoutItem{}
  73. }
  74. // Relink and return
  75. ti.Item = v
  76. ti.Next = tw.wheel[i].Head
  77. tw.wheel[i].Head = ti
  78. if tw.wheel[i].Tail == nil {
  79. tw.wheel[i].Tail = ti
  80. }
  81. return ti
  82. }
  83. func (tw *SystemTimerWheel) Purge() interface{} {
  84. tw.lock.Lock()
  85. defer tw.lock.Unlock()
  86. if tw.expired.Head == nil {
  87. return nil
  88. }
  89. ti := tw.expired.Head
  90. tw.expired.Head = ti.Next
  91. if tw.expired.Head == nil {
  92. tw.expired.Tail = nil
  93. }
  94. p := ti.Item
  95. // Clear out the items references
  96. ti.Item = 0
  97. ti.Next = nil
  98. // Maybe cache it for later
  99. if tw.itemsCached < systemTimerCacheMax {
  100. ti.Next = tw.itemCache
  101. tw.itemCache = ti
  102. tw.itemsCached++
  103. }
  104. return p
  105. }
  106. func (tw *SystemTimerWheel) findWheel(timeout time.Duration) (i int) {
  107. if timeout < tw.tickDuration {
  108. // Can't track anything below the set resolution
  109. timeout = tw.tickDuration
  110. } else if timeout > tw.wheelDuration {
  111. // We aren't handling timeouts greater than the wheels duration
  112. timeout = tw.wheelDuration
  113. }
  114. // Find the next highest, rounding up
  115. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  116. // Add another tick since the current tick may almost be over then map it to the wheel from our
  117. // current position
  118. tick += tw.current + 1
  119. if tick >= tw.wheelLen {
  120. tick -= tw.wheelLen
  121. }
  122. return tick
  123. }
  124. func (tw *SystemTimerWheel) advance(now time.Time) {
  125. tw.lock.Lock()
  126. defer tw.lock.Unlock()
  127. if tw.lastTick == nil {
  128. tw.lastTick = &now
  129. }
  130. // We want to round down
  131. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  132. //l.Infoln("Ticks: ", ticks)
  133. for i := 0; i < ticks; i++ {
  134. tw.current++
  135. //l.Infoln("Tick: ", tw.current)
  136. if tw.current >= tw.wheelLen {
  137. tw.current = 0
  138. }
  139. // We need to append the expired items as to not starve evicting the oldest ones
  140. if tw.expired.Tail == nil {
  141. tw.expired.Head = tw.wheel[tw.current].Head
  142. tw.expired.Tail = tw.wheel[tw.current].Tail
  143. } else {
  144. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  145. if tw.wheel[tw.current].Tail != nil {
  146. tw.expired.Tail = tw.wheel[tw.current].Tail
  147. }
  148. }
  149. //l.Infoln("Head: ", tw.expired.Head, "Tail: ", tw.expired.Tail)
  150. tw.wheel[tw.current].Head = nil
  151. tw.wheel[tw.current].Tail = nil
  152. tw.lastTick = &now
  153. }
  154. }