timeout.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package nebula
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // How many timer objects should be cached
  7. const timerCacheMax = 50000
  8. type TimerWheel[T any] struct {
  9. // Current tick
  10. current int
  11. // Cheat on finding the length of the wheel
  12. wheelLen int
  13. // Last time we ticked, since we are lazy ticking
  14. lastTick *time.Time
  15. // Durations of a tick and the entire wheel
  16. tickDuration time.Duration
  17. wheelDuration time.Duration
  18. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  19. wheel []*TimeoutList[T]
  20. // Singly linked list of items that have timed out of the wheel
  21. expired *TimeoutList[T]
  22. // Item cache to avoid garbage collect
  23. itemCache *TimeoutItem[T]
  24. itemsCached int
  25. }
  26. type LockingTimerWheel[T any] struct {
  27. m sync.Mutex
  28. t *TimerWheel[T]
  29. }
  30. // TimeoutList Represents a tick in the wheel
  31. type TimeoutList[T any] struct {
  32. Head *TimeoutItem[T]
  33. Tail *TimeoutItem[T]
  34. }
  35. // TimeoutItem Represents an item within a tick
  36. type TimeoutItem[T any] struct {
  37. Item T
  38. Next *TimeoutItem[T]
  39. }
  40. // NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  41. // Purge must be called once per entry to actually remove anything
  42. // The TimerWheel does not handle concurrency on its own.
  43. // Locks around access to it must be used if multiple routines are manipulating it.
  44. func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
  45. //TODO provide an error
  46. //if min >= max {
  47. // return nil
  48. //}
  49. // Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
  50. // max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
  51. // timeout
  52. wLen := int((max / min) + 2)
  53. tw := TimerWheel[T]{
  54. wheelLen: wLen,
  55. wheel: make([]*TimeoutList[T], wLen),
  56. tickDuration: min,
  57. wheelDuration: max,
  58. expired: &TimeoutList[T]{},
  59. }
  60. for i := range tw.wheel {
  61. tw.wheel[i] = &TimeoutList[T]{}
  62. }
  63. return &tw
  64. }
  65. // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
  66. func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
  67. return &LockingTimerWheel[T]{
  68. t: NewTimerWheel[T](min, max),
  69. }
  70. }
  71. // Add will add an item to the wheel in its proper timeout.
  72. // Caller should Advance the wheel prior to ensure the proper slot is used.
  73. func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  74. i := tw.findWheel(timeout)
  75. // Try to fetch off the cache
  76. ti := tw.itemCache
  77. if ti != nil {
  78. tw.itemCache = ti.Next
  79. tw.itemsCached--
  80. ti.Next = nil
  81. } else {
  82. ti = &TimeoutItem[T]{}
  83. }
  84. // Relink and return
  85. ti.Item = v
  86. if tw.wheel[i].Tail == nil {
  87. tw.wheel[i].Head = ti
  88. tw.wheel[i].Tail = ti
  89. } else {
  90. tw.wheel[i].Tail.Next = ti
  91. tw.wheel[i].Tail = ti
  92. }
  93. return ti
  94. }
  95. // Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
  96. // If no item is available then an empty T is returned and the 2nd argument is false.
  97. func (tw *TimerWheel[T]) Purge() (T, bool) {
  98. if tw.expired.Head == nil {
  99. var na T
  100. return na, false
  101. }
  102. ti := tw.expired.Head
  103. tw.expired.Head = ti.Next
  104. if tw.expired.Head == nil {
  105. tw.expired.Tail = nil
  106. }
  107. // Clear out the items references
  108. ti.Next = nil
  109. // Maybe cache it for later
  110. if tw.itemsCached < timerCacheMax {
  111. ti.Next = tw.itemCache
  112. tw.itemCache = ti
  113. tw.itemsCached++
  114. }
  115. return ti.Item, true
  116. }
  117. // findWheel find the next position in the wheel for the provided timeout given the current tick
  118. func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
  119. if timeout < tw.tickDuration {
  120. // Can't track anything below the set resolution
  121. timeout = tw.tickDuration
  122. } else if timeout > tw.wheelDuration {
  123. // We aren't handling timeouts greater than the wheels duration
  124. timeout = tw.wheelDuration
  125. }
  126. // Find the next highest, rounding up
  127. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  128. // Add another tick since the current tick may almost be over then map it to the wheel from our
  129. // current position
  130. tick += tw.current + 1
  131. if tick >= tw.wheelLen {
  132. tick -= tw.wheelLen
  133. }
  134. return tick
  135. }
  136. // Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
  137. // passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
  138. func (tw *TimerWheel[T]) Advance(now time.Time) {
  139. if tw.lastTick == nil {
  140. tw.lastTick = &now
  141. }
  142. // We want to round down
  143. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  144. adv := ticks
  145. if ticks > tw.wheelLen {
  146. ticks = tw.wheelLen
  147. }
  148. for i := 0; i < ticks; i++ {
  149. tw.current++
  150. if tw.current >= tw.wheelLen {
  151. tw.current = 0
  152. }
  153. if tw.wheel[tw.current].Head != nil {
  154. // We need to append the expired items as to not starve evicting the oldest ones
  155. if tw.expired.Tail == nil {
  156. tw.expired.Head = tw.wheel[tw.current].Head
  157. tw.expired.Tail = tw.wheel[tw.current].Tail
  158. } else {
  159. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  160. tw.expired.Tail = tw.wheel[tw.current].Tail
  161. }
  162. tw.wheel[tw.current].Head = nil
  163. tw.wheel[tw.current].Tail = nil
  164. }
  165. }
  166. // Advance the tick based on duration to avoid losing some accuracy
  167. newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
  168. tw.lastTick = &newTick
  169. }
  170. func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  171. lw.m.Lock()
  172. defer lw.m.Unlock()
  173. return lw.t.Add(v, timeout)
  174. }
  175. func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
  176. lw.m.Lock()
  177. defer lw.m.Unlock()
  178. return lw.t.Purge()
  179. }
  180. func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
  181. lw.m.Lock()
  182. defer lw.m.Unlock()
  183. lw.t.Advance(now)
  184. }