timeout.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package nebula
  2. import (
  3. "time"
  4. "github.com/wadey/synctrace"
  5. )
  6. // How many timer objects should be cached
  7. const timerCacheMax = 50000
  8. type TimerWheel[T any] struct {
  9. // Current tick
  10. current int
  11. // Cheat on finding the length of the wheel
  12. wheelLen int
  13. // Last time we ticked, since we are lazy ticking
  14. lastTick *time.Time
  15. // Durations of a tick and the entire wheel
  16. tickDuration time.Duration
  17. wheelDuration time.Duration
  18. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  19. wheel []*TimeoutList[T]
  20. // Singly linked list of items that have timed out of the wheel
  21. expired *TimeoutList[T]
  22. // Item cache to avoid garbage collect
  23. itemCache *TimeoutItem[T]
  24. itemsCached int
  25. }
  26. type LockingTimerWheel[T any] struct {
  27. m synctrace.Mutex
  28. t *TimerWheel[T]
  29. }
  30. // TimeoutList Represents a tick in the wheel
  31. type TimeoutList[T any] struct {
  32. Head *TimeoutItem[T]
  33. Tail *TimeoutItem[T]
  34. }
  35. // TimeoutItem Represents an item within a tick
  36. type TimeoutItem[T any] struct {
  37. Item T
  38. Next *TimeoutItem[T]
  39. }
  40. // NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  41. // Purge must be called once per entry to actually remove anything
  42. // The TimerWheel does not handle concurrency on its own.
  43. // Locks around access to it must be used if multiple routines are manipulating it.
  44. func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
  45. //TODO provide an error
  46. //if min >= max {
  47. // return nil
  48. //}
  49. // Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
  50. // max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
  51. // timeout
  52. wLen := int((max / min) + 2)
  53. tw := TimerWheel[T]{
  54. wheelLen: wLen,
  55. wheel: make([]*TimeoutList[T], wLen),
  56. tickDuration: min,
  57. wheelDuration: max,
  58. expired: &TimeoutList[T]{},
  59. }
  60. for i := range tw.wheel {
  61. tw.wheel[i] = &TimeoutList[T]{}
  62. }
  63. return &tw
  64. }
  65. // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
  66. func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] {
  67. return &LockingTimerWheel[T]{
  68. m: synctrace.NewMutex(name),
  69. t: NewTimerWheel[T](min, max),
  70. }
  71. }
  72. // Add will add an item to the wheel in its proper timeout.
  73. // Caller should Advance the wheel prior to ensure the proper slot is used.
  74. func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  75. i := tw.findWheel(timeout)
  76. // Try to fetch off the cache
  77. ti := tw.itemCache
  78. if ti != nil {
  79. tw.itemCache = ti.Next
  80. tw.itemsCached--
  81. ti.Next = nil
  82. } else {
  83. ti = &TimeoutItem[T]{}
  84. }
  85. // Relink and return
  86. ti.Item = v
  87. if tw.wheel[i].Tail == nil {
  88. tw.wheel[i].Head = ti
  89. tw.wheel[i].Tail = ti
  90. } else {
  91. tw.wheel[i].Tail.Next = ti
  92. tw.wheel[i].Tail = ti
  93. }
  94. return ti
  95. }
  96. // Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
  97. // If no item is available then an empty T is returned and the 2nd argument is false.
  98. func (tw *TimerWheel[T]) Purge() (T, bool) {
  99. if tw.expired.Head == nil {
  100. var na T
  101. return na, false
  102. }
  103. ti := tw.expired.Head
  104. tw.expired.Head = ti.Next
  105. if tw.expired.Head == nil {
  106. tw.expired.Tail = nil
  107. }
  108. // Clear out the items references
  109. ti.Next = nil
  110. // Maybe cache it for later
  111. if tw.itemsCached < timerCacheMax {
  112. ti.Next = tw.itemCache
  113. tw.itemCache = ti
  114. tw.itemsCached++
  115. }
  116. return ti.Item, true
  117. }
  118. // findWheel find the next position in the wheel for the provided timeout given the current tick
  119. func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
  120. if timeout < tw.tickDuration {
  121. // Can't track anything below the set resolution
  122. timeout = tw.tickDuration
  123. } else if timeout > tw.wheelDuration {
  124. // We aren't handling timeouts greater than the wheels duration
  125. timeout = tw.wheelDuration
  126. }
  127. // Find the next highest, rounding up
  128. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  129. // Add another tick since the current tick may almost be over then map it to the wheel from our
  130. // current position
  131. tick += tw.current + 1
  132. if tick >= tw.wheelLen {
  133. tick -= tw.wheelLen
  134. }
  135. return tick
  136. }
  137. // Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
  138. // passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
  139. func (tw *TimerWheel[T]) Advance(now time.Time) {
  140. if tw.lastTick == nil {
  141. tw.lastTick = &now
  142. }
  143. // We want to round down
  144. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  145. adv := ticks
  146. if ticks > tw.wheelLen {
  147. ticks = tw.wheelLen
  148. }
  149. for i := 0; i < ticks; i++ {
  150. tw.current++
  151. if tw.current >= tw.wheelLen {
  152. tw.current = 0
  153. }
  154. if tw.wheel[tw.current].Head != nil {
  155. // We need to append the expired items as to not starve evicting the oldest ones
  156. if tw.expired.Tail == nil {
  157. tw.expired.Head = tw.wheel[tw.current].Head
  158. tw.expired.Tail = tw.wheel[tw.current].Tail
  159. } else {
  160. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  161. tw.expired.Tail = tw.wheel[tw.current].Tail
  162. }
  163. tw.wheel[tw.current].Head = nil
  164. tw.wheel[tw.current].Tail = nil
  165. }
  166. }
  167. // Advance the tick based on duration to avoid losing some accuracy
  168. newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
  169. tw.lastTick = &newTick
  170. }
  171. func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  172. lw.m.Lock()
  173. defer lw.m.Unlock()
  174. return lw.t.Add(v, timeout)
  175. }
  176. func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
  177. lw.m.Lock()
  178. defer lw.m.Unlock()
  179. return lw.t.Purge()
  180. }
  181. func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
  182. lw.m.Lock()
  183. defer lw.m.Unlock()
  184. lw.t.Advance(now)
  185. }