timeout.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package nebula
  2. import (
  3. "time"
  4. "github.com/slackhq/nebula/firewall"
  5. )
  6. // How many timer objects should be cached
  7. const timerCacheMax = 50000
  8. var emptyFWPacket = firewall.Packet{}
  9. type TimerWheel struct {
  10. // Current tick
  11. current int
  12. // Cheat on finding the length of the wheel
  13. wheelLen int
  14. // Last time we ticked, since we are lazy ticking
  15. lastTick *time.Time
  16. // Durations of a tick and the entire wheel
  17. tickDuration time.Duration
  18. wheelDuration time.Duration
  19. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  20. wheel []*TimeoutList
  21. // Singly linked list of items that have timed out of the wheel
  22. expired *TimeoutList
  23. // Item cache to avoid garbage collect
  24. itemCache *TimeoutItem
  25. itemsCached int
  26. }
  27. // TimeoutList Represents a tick in the wheel
  28. type TimeoutList struct {
  29. Head *TimeoutItem
  30. Tail *TimeoutItem
  31. }
  32. // TimeoutItem Represents an item within a tick
  33. type TimeoutItem struct {
  34. Packet firewall.Packet
  35. Next *TimeoutItem
  36. }
  37. // NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  38. // Purge must be called once per entry to actually remove anything
  39. func NewTimerWheel(min, max time.Duration) *TimerWheel {
  40. //TODO provide an error
  41. //if min >= max {
  42. // return nil
  43. //}
  44. // Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
  45. // max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
  46. // timeout
  47. wLen := int((max / min) + 2)
  48. tw := TimerWheel{
  49. wheelLen: wLen,
  50. wheel: make([]*TimeoutList, wLen),
  51. tickDuration: min,
  52. wheelDuration: max,
  53. expired: &TimeoutList{},
  54. }
  55. for i := range tw.wheel {
  56. tw.wheel[i] = &TimeoutList{}
  57. }
  58. return &tw
  59. }
  60. // Add will add a firewall.Packet to the wheel in it's proper timeout
  61. func (tw *TimerWheel) Add(v firewall.Packet, timeout time.Duration) *TimeoutItem {
  62. // Check and see if we should progress the tick
  63. tw.advance(time.Now())
  64. i := tw.findWheel(timeout)
  65. // Try to fetch off the cache
  66. ti := tw.itemCache
  67. if ti != nil {
  68. tw.itemCache = ti.Next
  69. tw.itemsCached--
  70. ti.Next = nil
  71. } else {
  72. ti = &TimeoutItem{}
  73. }
  74. // Relink and return
  75. ti.Packet = v
  76. if tw.wheel[i].Tail == nil {
  77. tw.wheel[i].Head = ti
  78. tw.wheel[i].Tail = ti
  79. } else {
  80. tw.wheel[i].Tail.Next = ti
  81. tw.wheel[i].Tail = ti
  82. }
  83. return ti
  84. }
  85. func (tw *TimerWheel) Purge() (firewall.Packet, bool) {
  86. if tw.expired.Head == nil {
  87. return emptyFWPacket, false
  88. }
  89. ti := tw.expired.Head
  90. tw.expired.Head = ti.Next
  91. if tw.expired.Head == nil {
  92. tw.expired.Tail = nil
  93. }
  94. // Clear out the items references
  95. ti.Next = nil
  96. // Maybe cache it for later
  97. if tw.itemsCached < timerCacheMax {
  98. ti.Next = tw.itemCache
  99. tw.itemCache = ti
  100. tw.itemsCached++
  101. }
  102. return ti.Packet, true
  103. }
  104. // advance will move the wheel forward by proper number of ticks. The caller _should_ lock the wheel before calling this
  105. func (tw *TimerWheel) findWheel(timeout time.Duration) (i int) {
  106. if timeout < tw.tickDuration {
  107. // Can't track anything below the set resolution
  108. timeout = tw.tickDuration
  109. } else if timeout > tw.wheelDuration {
  110. // We aren't handling timeouts greater than the wheels duration
  111. timeout = tw.wheelDuration
  112. }
  113. // Find the next highest, rounding up
  114. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  115. // Add another tick since the current tick may almost be over then map it to the wheel from our
  116. // current position
  117. tick += tw.current + 1
  118. if tick >= tw.wheelLen {
  119. tick -= tw.wheelLen
  120. }
  121. return tick
  122. }
  123. // advance will lock and move the wheel forward by proper number of ticks.
  124. func (tw *TimerWheel) advance(now time.Time) {
  125. if tw.lastTick == nil {
  126. tw.lastTick = &now
  127. }
  128. // We want to round down
  129. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  130. adv := ticks
  131. if ticks > tw.wheelLen {
  132. ticks = tw.wheelLen
  133. }
  134. for i := 0; i < ticks; i++ {
  135. tw.current++
  136. if tw.current >= tw.wheelLen {
  137. tw.current = 0
  138. }
  139. if tw.wheel[tw.current].Head != nil {
  140. // We need to append the expired items as to not starve evicting the oldest ones
  141. if tw.expired.Tail == nil {
  142. tw.expired.Head = tw.wheel[tw.current].Head
  143. tw.expired.Tail = tw.wheel[tw.current].Tail
  144. } else {
  145. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  146. tw.expired.Tail = tw.wheel[tw.current].Tail
  147. }
  148. tw.wheel[tw.current].Head = nil
  149. tw.wheel[tw.current].Tail = nil
  150. }
  151. }
  152. // Advance the tick based on duration to avoid losing some accuracy
  153. newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
  154. tw.lastTick = &newTick
  155. }