3
0

timeout.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package nebula
  2. import (
  3. "time"
  4. )
  5. // How many timer objects should be cached
  6. const timerCacheMax = 50000
  7. type TimerWheel[T any] struct {
  8. // Current tick
  9. current int
  10. // Cheat on finding the length of the wheel
  11. wheelLen int
  12. // Last time we ticked, since we are lazy ticking
  13. lastTick *time.Time
  14. // Durations of a tick and the entire wheel
  15. tickDuration time.Duration
  16. wheelDuration time.Duration
  17. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  18. wheel []*TimeoutList[T]
  19. // Singly linked list of items that have timed out of the wheel
  20. expired *TimeoutList[T]
  21. // Item cache to avoid garbage collect
  22. itemCache *TimeoutItem[T]
  23. itemsCached int
  24. }
  25. type LockingTimerWheel[T any] struct {
  26. m syncMutex
  27. t *TimerWheel[T]
  28. }
  29. // TimeoutList Represents a tick in the wheel
  30. type TimeoutList[T any] struct {
  31. Head *TimeoutItem[T]
  32. Tail *TimeoutItem[T]
  33. }
  34. // TimeoutItem Represents an item within a tick
  35. type TimeoutItem[T any] struct {
  36. Item T
  37. Next *TimeoutItem[T]
  38. }
  39. // NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  40. // Purge must be called once per entry to actually remove anything
  41. // The TimerWheel does not handle concurrency on its own.
  42. // Locks around access to it must be used if multiple routines are manipulating it.
  43. func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
  44. //TODO provide an error
  45. //if min >= max {
  46. // return nil
  47. //}
  48. // Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
  49. // max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
  50. // timeout
  51. wLen := int((max / min) + 2)
  52. tw := TimerWheel[T]{
  53. wheelLen: wLen,
  54. wheel: make([]*TimeoutList[T], wLen),
  55. tickDuration: min,
  56. wheelDuration: max,
  57. expired: &TimeoutList[T]{},
  58. }
  59. for i := range tw.wheel {
  60. tw.wheel[i] = &TimeoutList[T]{}
  61. }
  62. return &tw
  63. }
  64. // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
  65. func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] {
  66. return &LockingTimerWheel[T]{
  67. m: newSyncMutex(name),
  68. t: NewTimerWheel[T](min, max),
  69. }
  70. }
  71. // Add will add an item to the wheel in its proper timeout.
  72. // Caller should Advance the wheel prior to ensure the proper slot is used.
  73. func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  74. i := tw.findWheel(timeout)
  75. // Try to fetch off the cache
  76. ti := tw.itemCache
  77. if ti != nil {
  78. tw.itemCache = ti.Next
  79. tw.itemsCached--
  80. ti.Next = nil
  81. } else {
  82. ti = &TimeoutItem[T]{}
  83. }
  84. // Relink and return
  85. ti.Item = v
  86. if tw.wheel[i].Tail == nil {
  87. tw.wheel[i].Head = ti
  88. tw.wheel[i].Tail = ti
  89. } else {
  90. tw.wheel[i].Tail.Next = ti
  91. tw.wheel[i].Tail = ti
  92. }
  93. return ti
  94. }
  95. // Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
  96. // If no item is available then an empty T is returned and the 2nd argument is false.
  97. func (tw *TimerWheel[T]) Purge() (T, bool) {
  98. if tw.expired.Head == nil {
  99. var na T
  100. return na, false
  101. }
  102. ti := tw.expired.Head
  103. tw.expired.Head = ti.Next
  104. if tw.expired.Head == nil {
  105. tw.expired.Tail = nil
  106. }
  107. // Clear out the items references
  108. ti.Next = nil
  109. // Maybe cache it for later
  110. if tw.itemsCached < timerCacheMax {
  111. ti.Next = tw.itemCache
  112. tw.itemCache = ti
  113. tw.itemsCached++
  114. }
  115. return ti.Item, true
  116. }
  117. // findWheel find the next position in the wheel for the provided timeout given the current tick
  118. func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
  119. if timeout < tw.tickDuration {
  120. // Can't track anything below the set resolution
  121. timeout = tw.tickDuration
  122. } else if timeout > tw.wheelDuration {
  123. // We aren't handling timeouts greater than the wheels duration
  124. timeout = tw.wheelDuration
  125. }
  126. // Find the next highest, rounding up
  127. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  128. // Add another tick since the current tick may almost be over then map it to the wheel from our
  129. // current position
  130. tick += tw.current + 1
  131. if tick >= tw.wheelLen {
  132. tick -= tw.wheelLen
  133. }
  134. return tick
  135. }
  136. // Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
  137. // passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
  138. func (tw *TimerWheel[T]) Advance(now time.Time) {
  139. if tw.lastTick == nil {
  140. tw.lastTick = &now
  141. }
  142. // We want to round down
  143. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  144. adv := ticks
  145. if ticks > tw.wheelLen {
  146. ticks = tw.wheelLen
  147. }
  148. for i := 0; i < ticks; i++ {
  149. tw.current++
  150. if tw.current >= tw.wheelLen {
  151. tw.current = 0
  152. }
  153. if tw.wheel[tw.current].Head != nil {
  154. // We need to append the expired items as to not starve evicting the oldest ones
  155. if tw.expired.Tail == nil {
  156. tw.expired.Head = tw.wheel[tw.current].Head
  157. tw.expired.Tail = tw.wheel[tw.current].Tail
  158. } else {
  159. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  160. tw.expired.Tail = tw.wheel[tw.current].Tail
  161. }
  162. tw.wheel[tw.current].Head = nil
  163. tw.wheel[tw.current].Tail = nil
  164. }
  165. }
  166. // Advance the tick based on duration to avoid losing some accuracy
  167. newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
  168. tw.lastTick = &newTick
  169. }
  170. func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
  171. lw.m.Lock()
  172. defer lw.m.Unlock()
  173. return lw.t.Add(v, timeout)
  174. }
  175. func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
  176. lw.m.Lock()
  177. defer lw.m.Unlock()
  178. return lw.t.Purge()
  179. }
  180. func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
  181. lw.m.Lock()
  182. defer lw.m.Unlock()
  183. lw.t.Advance(now)
  184. }